o
    8wi'                     @   sX  d dl mZmZ d dlmZ d dlmZmZmZm	Z	 d dl
Z
d dlZ
d dlmZ d dl
mZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/m0Z0 d dl+m1Z2 d dl3mZ dZ4G dd de%Z5G dd de*Z6dS )    )AbstractContextManagernullcontext)	timedelta)AnyLiteralOptionalUnionN)rank_zero_only)Tensor)Module)DistributedDataParallel)override)Accelerator)default_pg_timeout)ClusterEnvironment)CheckpointIO)	Precision)_MultiProcessingLauncher)_SubprocessScriptLauncher)ParallelStrategy)_StrategyRegistry)
TBroadcast_BackwardSyncControl)ReduceOp_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)ddp_forkddp_notebookc                       sp  e Zd ZdZddddddedfdee deeej	  dee
 dee dee d	ee d
ee ded deddf fddZeedej	fddZedefddZejdeddfddZedefddZeedeeef fddZedee fddZedFddZedF fdd Zed!edefd"d#Z ed!eddfd$d%Z!e	&dGd'e"d(ee d)ee#e$ef  de"fd*d+Z%ed,ededdfd-d.Z&edHd0e'd1ede'fd2d3Z(ed!edeee#ee"f f f fd4d5Z)e	6dId!ed7eee#ee"f f d8e*ddf fd9d:Z+e,ed;e-ddfd<d=Z.dFd>d?Z/defd@dAZ0dFdBdCZ1deee  fdDdEZ2  Z3S )JDDPStrategyzKStrategy for multi-process single-device training on one or multiple nodes.Npopenacceleratorparallel_devicescluster_environmentcheckpoint_io	precisionprocess_group_backendtimeoutstart_method)r#   spawnfork
forkserverkwargsreturnc	           
         s@   t  j|||||d d| _|| _|| _|| _t | _|	| _d S )N)r$   r%   r&   r'   r(      )	super__init__
_num_nodes_process_group_backend_timeout_start_method_DDPBackwardSyncControl_backward_sync_control_ddp_kwargs)
selfr$   r%   r&   r'   r(   r)   r*   r+   r/   	__class__ \/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/lightning_fabric/strategies/ddp.pyr3   7   s   
zDDPStrategy.__init__c                 C   s   | j d usJ | j | j S N)r%   
local_rankr;   r>   r>   r?   root_deviceQ   s   zDDPStrategy.root_devicec                 C      | j S r@   r4   rB   r>   r>   r?   	num_nodesW      zDDPStrategy.num_nodesrF   c                 C   s
   || _ d S r@   rE   )r;   rF   r>   r>   r?   rF   [   s   
c                 C   s   | j d ur
t| j S dS )Nr   )r%   lenrB   r>   r>   r?   num_processes`   s   zDDPStrategy.num_processesc                 C   s   | j | j | jdS )N)num_replicasrank)rF   rI   global_rankrB   r>   r>   r?   distributed_sampler_kwargsd   s   z&DDPStrategy.distributed_sampler_kwargsc                 C   rD   r@   )r5   rB   r>   r>   r?   r)   i   rG   z!DDPStrategy.process_group_backendc                 C   sD   | j d usJ | jdkrt| j | j| j| _d S t| | jd| _d S )Nr#   )r+   )r&   r7   r   rI   rF   	_launcherr   rB   r>   r>   r?   _configure_launcherm   s   
zDDPStrategy._configure_launcherc                    s   t    |   d S r@   )r2   setup_environment_setup_distributedrB   r<   r>   r?   rP   u   s   
zDDPStrategy.setup_environmentmodulec                 C   sf   |   }|durtjtj nt }| td||d| jW  d   S 1 s,w   Y  dS )z^Wraps the model into a :class:`~torch.nn.parallel.distributed.DistributedDataParallel` module.N)rR   
device_idsr>   )_determine_ddp_device_idstorchcudastreamStreamr   r   r:   )r;   rR   rS   ctxr>   r>   r?   setup_modulez   s
    $zDDPStrategy.setup_modulec                 C   s   | | j d S r@   )torC   r;   rR   r>   r>   r?   module_to_device   s   zDDPStrategy.module_to_devicemeantensorr   	reduce_opc                 C   s   t |trt|||dS |S )a  Reduces a tensor from several distributed processes to one aggregated tensor.

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to gather results from. Defaults to all processes (world)
            reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
                Can also be a string 'sum' to calculate the sum during reduction.

        Return:
            reduced value, except when the input was not a tensor the output remains is unchanged

        )r`   )
isinstancer
   r   )r;   r_   r   r`   r>   r>   r?   
all_reduce   s   
zDDPStrategy.all_reduceargsc                 O   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)rS   )r   rU   distributedget_backendbarrierrT   )r;   rc   r/   r>   r>   r?   rg      s
   zDDPStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr   r   )r   rU   re   broadcast_object_list_groupWORLD)r;   rh   ri   r>   r>   r?   	broadcast   s
   zDDPStrategy.broadcastc                    s   t |tr|j}t |S r@   )ra   r   rR   r2   get_module_state_dictr\   r<   r>   r?   rn      s   
z!DDPStrategy.get_module_state_dictT
state_dictstrictc                    s&   t |tr|j}t j|||d d S )N)rR   ro   rp   )ra   r   rR   r2   load_module_state_dict)r;   rR   ro   rp   r<   r>   r?   rq      s   
z"DDPStrategy.load_module_state_dictstrategy_registryc                 C   sD   d}|D ]\}}|j || d|d|d q|j d| dddd	 d S )
N))ddpr#   )	ddp_spawnr,   )r    r-   )r!   r-   z DDP strategy with `start_method=`)descriptionr+   ddp_find_unused_parameters_truezBAlias for `find_unused_parameters_true` and `start_method='popen'`Tr#   )rv   find_unused_parametersr+   )register)clsrr   entriesnamer+   r>   r>   r?   register_strategies   s   

zDDPStrategy.register_strategiesc                 C   s8   |    |  | _| jd usJ t| j| j| jd d S )N)r*   )_set_world_ranks_get_process_group_backendr5   r&   r   r6   rB   r>   r>   r?   rQ      s   
zDDPStrategy._setup_distributedc                 C   s   | j pt| jS r@   )r5   r   rC   rB   r>   r>   r?   r      s   z&DDPStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S r@   )r&   set_global_rank	node_rankrI   rA   set_world_sizerF   rL   r	   rK   utils_rank_zero_onlyrB   r>   r>   r?   r~      s   
zDDPStrategy._set_world_ranksc                 C   s   | j jdkrd S | j jgS )Ncpu)rC   typeindexrB   r>   r>   r?   rT      s   z%DDPStrategy._determine_ddp_device_ids)r0   N)Nr^   )r   )T)4__name__
__module____qualname____doc__r   r   r   listrU   devicer   r   r   strr   r   r   r3   propertyr   rC   intrF   setterrI   dictrM   r)   rO   rP   r   r   rZ   r]   r
   r   r   rb   rg   r   rm   rn   boolrq   classmethodr   r}   rQ   r   r~   rT   __classcell__r>   r>   r<   r?   r"   4   s    	
(

r"   c                   @   s&   e Zd ZedededefddZdS )r8   rR   enabledr0   c                 C   s:   |st  S t|tstd| jj d|jj d| S )z{Blocks gradient synchronization inside the :class:`~torch.nn.parallel.distributed.DistributedDataParallel`
        wrapper.zABlocking backward sync is only possible if the module passed to `zA.no_backward_sync` is wrapped in `DistributedDataParallel`. Got: .)r   ra   r   	TypeErrorr=   r   no_sync)r;   rR   r   r>   r>   r?   no_backward_sync   s   
z(_DDPBackwardSyncControl.no_backward_syncN)r   r   r   r   r   r   r   r   r>   r>   r>   r?   r8      s    r8   )7
contextlibr   r   datetimer   typingr   r   r   r   rU   torch.distributed"lightning_utilities.core.rank_zeror	   r   r
   torch.nnr   torch.nn.parallel.distributedr   typing_extensionsr   )lightning_fabric.accelerators.acceleratorr   5lightning_fabric.plugins.collectives.torch_collectiver   9lightning_fabric.plugins.environments.cluster_environmentr   )lightning_fabric.plugins.io.checkpoint_ior   "lightning_fabric.plugins.precisionr   5lightning_fabric.strategies.launchers.multiprocessingr   7lightning_fabric.strategies.launchers.subprocess_scriptr   $lightning_fabric.strategies.parallelr   $lightning_fabric.strategies.registryr   $lightning_fabric.strategies.strategyr   r   &lightning_fabric.utilities.distributedr   r   r   r   r   r   rk   $lightning_fabric.utilities.rank_zero_DDP_FORK_ALIASESr"   r8   r>   r>   r>   r?   <module>   s6    5