o
    8wi4                     @   s   d dl mZmZmZ d dlZd dlmZ d dlmZmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZmZ d dlmZ d dlmZ G dd deZdS )    )AnyOptionalUnionN)Tensor)DataParallelModule)override)Accelerator)CheckpointIO)	Precision)ParallelStrategy)_StrategyRegistry)
TBroadcastTReduce)apply_to_collection)ReduceOpc                       s  e Zd ZdZ				d4dee deeej  dee	 dee
 f fddZeed	ejfd
dZeed5ddZeded	efddZeded	dfddZed6dedeej d	efddZe	d7dedee deeeef  d	efddZededed	dfdd Zed8d"ed#ed	efd$d%Zed9d'ed(ed	efd)d*Z eded	e!eeee"f f f fd+d,Z#e	&d9ded-e!eeee"f f d.ed	df fd/d0Z$e%ed1e&d	dfd2d3Z'  Z(S ):DataParallelStrategyzImplements data-parallel training in a single process, i.e., the model gets replicated to each device and each
    gets a split of the data.Nacceleratorparallel_devicescheckpoint_io	precisionc                    s   t  j||d ||d d S )N)r   r   cluster_environmentr   r   )super__init__)selfr   r   r   r   	__class__ [/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/lightning_fabric/strategies/dp.pyr   #   s   
zDataParallelStrategy.__init__returnc                 C   s   | j d usJ | j d S )Nr   )r   r   r   r   r   root_device2   s   
z DataParallelStrategy.root_devicec                 C      d S Nr   r    r   r   r   distributed_sampler_kwargs8      z/DataParallelStrategy.distributed_sampler_kwargsmodulec                 C   s   t || jdS )zDWraps the given model into a :class:`~torch.nn.DataParallel` module.)r&   
device_ids)r   r   r   r&   r   r   r   setup_module=   s   z!DataParallelStrategy.setup_modulec                 C   s   | | j d S r#   )tor!   r(   r   r   r   module_to_deviceB   s   z%DataParallelStrategy.module_to_devicebatchdevicec                 C      |S r#   r   )r   r,   r-   r   r   r   batch_to_deviceF   r%   z$DataParallelStrategy.batch_to_devicemean
collectiongroup	reduce_opc                 C   s   dt dt fdd}t|t |S )Ntr   c                 S   s   | j }|   |S r#   )dtypefloatr0   r*   )r4   original_dtyper   r   r   r0   O   s   z-DataParallelStrategy.all_reduce.<locals>.mean)r   r   )r   r1   r2   r3   r0   r   r   r   
all_reduceK   s   zDataParallelStrategy.all_reduceargskwargsc                 O   r"   r#   r   )r   r9   r:   r   r   r   barrierU      zDataParallelStrategy.barrierr   objsrcc                 C   r.   r#   r   )r   r=   r>   r   r   r   	broadcastY   r<   zDataParallelStrategy.broadcastTdecisionallc                 C   r.   r#   r   )r   r@   rA   r   r   r   reduce_boolean_decision]   r<   z,DataParallelStrategy.reduce_boolean_decisionc                    s   t |tr|j}t |S r#   )
isinstancer   r&   r   get_module_state_dictr(   r   r   r   rD   a   s   
z*DataParallelStrategy.get_module_state_dict
state_dictstrictc                    s&   t |tr|j}t j|||d d S )N)r&   rE   rF   )rC   r   r&   r   load_module_state_dict)r   r&   rE   rF   r   r   r   rG   g   s   
z+DataParallelStrategy.load_module_state_dictstrategy_registryc                 C   s   |j d| | jd d S )Ndp)description)register__name__)clsrH   r   r   r   register_strategieso   s   z(DataParallelStrategy.register_strategies)NNNN)r   Nr#   )Nr0   )r   )T))rL   
__module____qualname____doc__r   r	   listtorchr-   r
   r   r   propertyr   r!   r$   r   r   r)   r+   r   r/   r   r   r   strr8   r;   r   intr?   boolrB   dictr   rD   rG   classmethodr   rN   __classcell__r   r   r   r   r      sx     	(r   )typingr   r   r   rS   r   torch.nnr   r   typing_extensionsr   lightning_fabric.acceleratorsr	   )lightning_fabric.plugins.io.checkpoint_ior
   "lightning_fabric.plugins.precisionr   $lightning_fabric.strategies.parallelr   $lightning_fabric.strategies.registryr   $lightning_fabric.strategies.strategyr   r   %lightning_fabric.utilities.apply_funcr   &lightning_fabric.utilities.distributedr   r   r   r   r   r   <module>   s   