o
    zi&                     @   s`  d dl mZ d dlmZ d dlmZmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+m,Z, d dl-m.Z.m/Z/m0Z0m1Z1m2Z2 d dl-m3Z4 d dl5mZ dZ6G dd de'Z7G dd de,Z8dS )    )nullcontext)	timedelta)AnyContextManagerDictListLiteralOptionalUnionN)rank_zero_only)Tensor)Module)DistributedDataParallel)override)Accelerator)default_pg_timeout)ClusterEnvironment)CheckpointIO)	Precision)_MultiProcessingLauncher)_SubprocessScriptLauncher)ParallelStrategy)_StrategyRegistry)
TBroadcast_BackwardSyncControl)ReduceOp_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)ddp_forkddp_notebookc                       sp  e Zd ZdZddddddedfdee deeej	  dee
 dee dee d	ee d
ee ded deddf fddZeedej	fddZedefddZejdeddfddZedefddZeedeeef fddZedee fddZedFddZedF fdd Zed!edefd"d#Z ed!eddfd$d%Z!e	&dGd'e"d(ee d)ee#e$ef  de"fd*d+Z%ed,ededdfd-d.Z&edHd0e'd1ede'fd2d3Z(ed!edeee#ee"f f f fd4d5Z)e	6dId!ed7eee#ee"f f d8e*ddf fd9d:Z+e,ed;e-ddfd<d=Z.dFd>d?Z/defd@dAZ0dFdBdCZ1deee  fdDdEZ2  Z3S )JDDPStrategyzKStrategy for multi-process single-device training on one or multiple nodes.Npopenacceleratorparallel_devicescluster_environmentcheckpoint_io	precisionprocess_group_backendtimeoutstart_method)r%   spawnfork
forkserverkwargsreturnc	           
         s@   t  j|||||d d| _|| _|| _|| _t | _|	| _d S )N)r&   r'   r(   r)   r*      )	super__init__
_num_nodes_process_group_backend_timeout_start_method_DDPBackwardSyncControl_backward_sync_control_ddp_kwargs)
selfr&   r'   r(   r)   r*   r+   r,   r-   r1   	__class__ S/home/ubuntu/.local/lib/python3.10/site-packages/lightning_fabric/strategies/ddp.pyr5   7   s   
zDDPStrategy.__init__c                 C   s   | j d usJ | j | j S N)r'   
local_rankr=   r@   r@   rA   root_deviceQ   s   zDDPStrategy.root_devicec                 C      | j S rB   r6   rD   r@   r@   rA   	num_nodesW      zDDPStrategy.num_nodesrH   c                 C   s
   || _ d S rB   rG   )r=   rH   r@   r@   rA   rH   [   s   
c                 C   s   | j d ur
t| j S dS )Nr   )r'   lenrD   r@   r@   rA   num_processes`   s   zDDPStrategy.num_processesc                 C   s   | j | j | jdS )N)num_replicasrank)rH   rK   global_rankrD   r@   r@   rA   distributed_sampler_kwargsd   s   z&DDPStrategy.distributed_sampler_kwargsc                 C   rF   rB   )r7   rD   r@   r@   rA   r+   i   rI   z!DDPStrategy.process_group_backendc                 C   sD   | j d usJ | jdkrt| j | j| j| _d S t| | jd| _d S )Nr%   )r-   )r(   r9   r   rK   rH   	_launcherr   rD   r@   r@   rA   _configure_launcherm   s   
zDDPStrategy._configure_launcherc                    s   t    |   d S rB   )r4   setup_environment_setup_distributedrD   r>   r@   rA   rR   u   s   
zDDPStrategy.setup_environmentmodulec                 C   sf   |   }|durtjtj nt }| td||d| jW  d   S 1 s,w   Y  dS )z^Wraps the model into a :class:`~torch.nn.parallel.distributed.DistributedDataParallel` module.N)rT   
device_idsr@   )_determine_ddp_device_idstorchcudastreamStreamr   r   r<   )r=   rT   rU   ctxr@   r@   rA   setup_modulez   s
    $zDDPStrategy.setup_modulec                 C   s   | | j d S rB   )torE   r=   rT   r@   r@   rA   module_to_device   s   zDDPStrategy.module_to_devicemeantensorr!   	reduce_opc                 C   s   t |trt|||dS |S )a  Reduces a tensor from several distributed processes to one aggregated tensor.

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to gather results from. Defaults to all processes (world)
            reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
                Can also be a string 'sum' to calculate the sum during reduction.

        Return:
            reduced value, except when the input was not a tensor the output remains is unchanged

        )rb   )
isinstancer   r   )r=   ra   r!   rb   r@   r@   rA   
all_reduce   s   
zDDPStrategy.all_reduceargsc                 O   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)rU   )r   rW   distributedget_backendbarrierrV   )r=   re   r1   r@   r@   rA   ri      s
   zDDPStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr    r   )r   rW   rg   broadcast_object_list_groupWORLD)r=   rj   rk   r@   r@   rA   	broadcast   s
   zDDPStrategy.broadcastc                    s   t |tr|j}t |S rB   )rc   r   rT   r4   get_module_state_dictr^   r>   r@   rA   rp      s   
z!DDPStrategy.get_module_state_dictT
state_dictstrictc                    s&   t |tr|j}t j|||d d S )N)rT   rq   rr   )rc   r   rT   r4   load_module_state_dict)r=   rT   rq   rr   r>   r@   rA   rs      s   
z"DDPStrategy.load_module_state_dictstrategy_registryc                 C   s0   d}|D ]\}}|j || d|d|d qd S )N))ddpr%   )	ddp_spawnr.   )r"   r/   )r#   r/   z DDP strategy with `start_method=`)descriptionr-   )register)clsrt   entriesnamer-   r@   r@   rA   register_strategies   s   
zDDPStrategy.register_strategiesc                 C   s8   |    |  | _| jd usJ t| j| j| jd d S )N)r,   )_set_world_ranks_get_process_group_backendr7   r(   r   r8   rD   r@   r@   rA   rS      s   
zDDPStrategy._setup_distributedc                 C   s   | j pt| jS rB   )r7   r   rE   rD   r@   r@   rA   r      s   z&DDPStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S rB   )r(   set_global_rank	node_rankrK   rC   set_world_sizerH   rN   r   rM   utils_rank_zero_onlyrD   r@   r@   rA   r~      s   
zDDPStrategy._set_world_ranksc                 C   s   | j jdkrd S | j jgS )Ncpu)rE   typeindexrD   r@   r@   rA   rV      s   z%DDPStrategy._determine_ddp_device_ids)r2   N)Nr`   )r   )T)4__name__
__module____qualname____doc__r   r	   r   r   rW   devicer   r   r   strr   r   r   r5   propertyr   rE   intrH   setterrK   r   rO   r+   rQ   rR   r   r   r\   r_   r   r
   r   rd   ri   r   ro   rp   boolrs   classmethodr   r}   rS   r   r~   rV   __classcell__r@   r@   r>   rA   r$   4   s    	
(

r$   c                   @   s&   e Zd ZedededefddZdS )r:   rT   enabledr2   c                 C   s:   |st  S t|tstd| jj d|jj d| S )z{Blocks gradient synchronization inside the :class:`~torch.nn.parallel.distributed.DistributedDataParallel`
        wrapper.zABlocking backward sync is only possible if the module passed to `zA.no_backward_sync` is wrapped in `DistributedDataParallel`. Got: .)r   rc   r   	TypeErrorr?   r   no_sync)r=   rT   r   r@   r@   rA   no_backward_sync   s   
z(_DDPBackwardSyncControl.no_backward_syncN)r   r   r   r   r   r   r   r   r@   r@   r@   rA   r:      s    r:   )9
contextlibr   datetimer   typingr   r   r   r   r   r	   r
   rW   torch.distributed"lightning_utilities.core.rank_zeror   r   r   torch.nnr   torch.nn.parallel.distributedr   typing_extensionsr   )lightning_fabric.accelerators.acceleratorr   5lightning_fabric.plugins.collectives.torch_collectiver   9lightning_fabric.plugins.environments.cluster_environmentr   )lightning_fabric.plugins.io.checkpoint_ior   "lightning_fabric.plugins.precisionr   5lightning_fabric.strategies.launchers.multiprocessingr   7lightning_fabric.strategies.launchers.subprocess_scriptr   $lightning_fabric.strategies.parallelr   $lightning_fabric.strategies.registryr   $lightning_fabric.strategies.strategyr   r   &lightning_fabric.utilities.distributedr   r   r   r   r   r!   rm   $lightning_fabric.utilities.rank_zero_DDP_FORK_ALIASESr$   r:   r@   r@   r@   rA   <module>   s6   $ .