o
    ziuJ                     @   s  d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlZd dlmZm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z&m'Z'm(Z(m)Z) d dl%m*Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 d dl4m5Z5 d dl6m7Z7m8Z8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> d dl?m@Z@ d dlAmBZBmCZC d dlDmEZE d dlFmGZG d dlHmIZImJZJmZ erd dlKmLZL e MeNZOdZPG dd  d e@ZQG d!d" d"eCZRdS )#    N)nullcontext)	timedelta)TYPE_CHECKINGAnyCallableDictListLiteralOptionalUnion)rank_zero_only)Tensor)Module)DistributedDataParallel)	Optimizer)override)CheckpointIOClusterEnvironment)default_pg_timeout)_StrategyRegistry)_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)_IS_WINDOWS)_optimizers_to_device)
reset_seed)ReduceOp)LightningOptimizer)_register_ddp_comm_hook_sync_module_statesprepare_for_backward)	Precision)_MultiProcessingLauncher_SubprocessScriptLauncher)ParallelStrategy)
TBroadcast_ForwardRedirection)	TrainerFn_augment_message)rank_zero_deprecationrank_zero_infor   )ModelAverager)ddp_fork%ddp_fork_find_unused_parameters_false$ddp_fork_find_unused_parameters_trueddp_notebook)ddp_notebook_find_unused_parameters_false(ddp_notebook_find_unused_parameters_truec                       s  e Zd ZdZddddddddddedfded deeej  dee	 dee
 d	ee d
ee dee dee dee dee dee ded deddf fddZedefddZeedejfddZedefddZejdeddfddZedefddZeedeeef fd d!Zedee fd"d#Zed]d$d%Zed] fd&d'Z ed^d*d+Z!ed,e"de#fd-d.Z$d]d/d0Z%defd1d2Z&d]d3d4Z'd]d5d6Z(d]d7d8Z)e	d_d9e*d:eg ef d,ee+d;e"f  dedef
 fd<d=Z,d]d>d?Z-deee  fd@dAZ.edBededdfdCdDZ/ed`dFe0dGede0fdHdIZ1edJe2ddfdKdLZ3ed]dMdNZ4e	OdadPe2dQee dRee+e5ef  de2fdSdTZ6e7edUe8ddfdVdWZ9edXe:ddfdYdZZ;ed] fd[d\Z<  Z=S )bDDPStrategyzKStrategy for multi-process single-device training on one or multiple nodes.Npopenacceleratorzpl.accelerators.Acceleratorparallel_devicescluster_environmentcheckpoint_ioprecision_pluginddp_comm_stateddp_comm_hookddp_comm_wrappermodel_averaging_periodprocess_group_backendtimeoutstart_method)r7   spawnfork
forkserverkwargsreturnc                    sr   t  j|||||d t| jj d t | _d| _|| _	|| _
|| _|| _|	| _d | _|
| _|| _|| _d S )N)r8   r9   r:   r;   r<   z: initializing DDP strategy   )super__init__logdebug	__class____name___DDPForwardRedirection_forward_redirection
_num_nodes_ddp_kwargs_ddp_comm_state_ddp_comm_hook_ddp_comm_wrapper_model_averaging_period_model_averager_process_group_backend_timeout_start_method)selfr8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rG   rN    T/home/ubuntu/.local/lib/python3.10/site-packages/pytorch_lightning/strategies/ddp.pyrK   G   s&   
zDDPStrategy.__init__c                 C   s   t dt| j ddd dS )z1Legacy property kept for backwards compatibility.`z3.is_distributed` is deprecated. Use is discouraged.   )
stacklevelT)r-   typerO   r\   r^   r^   r_   is_distributedk   s   zDDPStrategy.is_distributedc                 C   s   | j d usJ | j | j S N)r9   
local_rankrd   r^   r^   r_   root_devices   s   zDDPStrategy.root_devicec                 C      | j S rf   rR   rd   r^   r^   r_   	num_nodesy      zDDPStrategy.num_nodesrk   c                 C   s
   || _ d S rf   rj   )r\   rk   r^   r^   r_   rk   }   s   
c                 C   s   | j d ur
t| j S dS Nr   )r9   lenrd   r^   r^   r_   num_processes   s   zDDPStrategy.num_processesc                 C   s   | j | j | jdS )N)num_replicasrank)rk   ro   global_rankrd   r^   r^   r_   distributed_sampler_kwargs   s   z&DDPStrategy.distributed_sampler_kwargsc                 C   ri   rf   )rY   rd   r^   r^   r_   rA      rl   z!DDPStrategy.process_group_backendc                 C   sD   | j d usJ | jdkrt| j | j| j| _d S t| | jd| _d S )Nr7   )rC   )r:   r[   r&   ro   rk   	_launcherr%   rd   r^   r^   r_   _configure_launcher   s   
zDDPStrategy._configure_launcherc                    s   t    |   d S rf   )rJ   setup_environmentsetup_distributedrd   r]   r^   r_   rv      s   
zDDPStrategy.setup_environmenttrainer
pl.Trainerc                 C   s   | j d usJ | j | |jj}| jd usJ |tjkr(| jr(| j| j| _| j	
| j |   |tjkrB|   | | nt| j |   |tjkrst| j| j dd lm  m  m  m} t| j|jru|   d S d S d S rm   )r8   setupstatefnmodelr*   FITTING_layer_syncapplyr<   convert_modulemodel_to_deviceconfigure_ddpsetup_optimizersr"   setup_precision_pluginr   
optimizersrh   >torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hookdistributed
algorithmsddp_comm_hookspost_localSGD_hook
isinstancerT   PostLocalSGDState_enable_model_averaging)r\   rx   
trainer_fnpost_localSGDr^   r^   r_   rz      s(   


zDDPStrategy.setupr}   c                 C   s~   |   }td| d| j  |durtjtj nt }| t	d||d| jW  d   S 1 s8w   Y  dS )z^Wraps the model into a :class:`~torch.nn.parallel.distributed.DistributedDataParallel` module.z&setting up DDP model with device ids: z
, kwargs: N)module
device_idsr^   )
determine_ddp_device_idsrL   rM   rS   torchcudastreamStreamr   r   )r\   r}   r   ctxr^   r^   r_   _setup_model   s    $zDDPStrategy._setup_modelc                 C   sR   t | jj d t  |   |  | _| jd usJ t	| j| j| j
d d S )Nz: setting up distributed...)rB   )rL   rM   rN   rO   r   set_world_ranks_get_process_group_backendrY   r:   r   rZ   rd   r^   r^   r_   rw      s   
zDDPStrategy.setup_distributedc                 C   s   | j pt| jS rf   )rY   r   rh   rd   r^   r^   r_   r      s   z&DDPStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S rf   )r:   set_global_rank	node_rankro   rg   set_world_sizerk   rr   r   rq   utils_rank_zero_onlyrd   r^   r^   r_   r      s   
zDDPStrategy.set_world_ranksc                 C   sP   t | jj d | jjdkr&t| jtsJ t	| j| j
| j| jd d S d S )Nz: registering ddp hooksr   )r}   r=   r>   r?   )rL   rM   rN   rO   rh   rc   r   r}   r   r!   rT   rU   rV   rd   r^   r^   r_   _register_ddp_hooks   s   
zDDPStrategy._register_ddp_hooksc                 C   s   t | jj d | jd u rtdddlm}m}m	} | j
D ]&}t|tr*|j}ts1t||nd}t|||fs<|rFtd|jj dq | jd usNJ tjjjjj| j| jjd| _d S )	Nz.: reinitializing optimizers with post localSGDz\Post-localSGD algorithm is used, but model averaging period is not provided to DDP strategy.r   )DistributedOptimizerPostLocalSGDOptimizerZeroRedundancyOptimizerFzKCurrently model averaging cannot work with a distributed optimizer of type .)periodwarmup_steps)rL   rM   rN   rO   rW   
ValueErrortorch.distributed.optimr   r   r   r   r   r    
_optimizerr   rT   r   r   r   model_averaging	averagersPeriodicModelAveragerstart_localSGD_iterrX   )r\   r   r   r   	optimizeris_distributed_optimizerr^   r^   r_   r      s*   



z#DDPStrategy._enable_model_averagingr   closurepl.LightningModulec                    sJ   t  j|||fi |}| jdu r|S dd |jD }| jt| |S )aI  Performs the actual optimizer step.

        Args:
            optimizer: the optimizer performing the step
            closure: closure calculating the loss value
            model: reference to the model, optionally defining optimizer step related hooks
            **kwargs: Any extra arguments to ``optimizer.step``

        Nc                 S   s(   g | ]}|d  D ]	}|j dur|qqS )paramsN)grad).0r   paramr^   r^   r_   
<listcomp>  s   ( z.DDPStrategy.optimizer_step.<locals>.<listcomp>)rJ   optimizer_steprX   param_groupsaverage_parametersiter)r\   r   r   r}   rG   optimizer_outputr   r]   r^   r_   r      s   
zDDPStrategy.optimizer_stepc                 C   s@   t | jj d t| jtjsJ | | j| _| 	  d S )Nz%: configuring DistributedDataParallel)
rL   rM   rN   rO   r   r}   plLightningModuler   r   rd   r^   r^   r_   r     s   zDDPStrategy.configure_ddpc                 C   s   | j jdkrd S | j jgS )Ncpu)rh   rc   indexrd   r^   r^   r_   r     s   
z$DDPStrategy.determine_ddp_device_idsargsc                 O   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)r   )r   r   r   get_backendbarrierr   )r\   r   rG   r^   r^   r_   r   #  s
   zDDPStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr   r   )r   r   r   broadcast_object_list_groupWORLD)r\   r   r   r^   r^   r_   	broadcast-  s
   zDDPStrategy.broadcastclosure_lossc                 C   s:   t | jtsdS | jdusJ | jjst| j| dS dS )z.Run before precision plugin executes backward.N)r   r}   r   lightning_moduleautomatic_optimizationr#   )r\   r   r^   r^   r_   pre_backward6  s   zDDPStrategy.pre_backwardc                 C   s<   t | jj d| j d | jd usJ | j| j d S )Nz: moving model to device [z]...)rL   rM   rN   rO   rh   r}   tord   r^   r^   r_   r   ?  s   zDDPStrategy.model_to_devicemeantensorr   	reduce_opc                 C   s   t |trt|||dS |S )a  Reduces a tensor from several distributed processes to one aggregated tensor.

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to gather results from. Defaults to all processes (world)
            reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
                Can also be a string 'sum' to calculate the sum during reduction.

        Return:
            reduced value, except when the input was not a tensor the output remains is unchanged

        )r   )r   r   r   )r\   r   r   r   r^   r^   r_   reduceE  s   
zDDPStrategy.reducestrategy_registryc              	   C   sf   d}|D ]\}}|j || d| d|d qd}|D ]\}}}|j || d| d| d||d qd S )	N))ddpr7   )	ddp_spawnrD   )r0   rE   )r3   rE   z"DDP strategy with `start_method` '')descriptionrC   )) ddp_find_unused_parameters_falseFr7   )ddp_find_unused_parameters_trueTr7   )&ddp_spawn_find_unused_parameters_falseFrD   )%ddp_spawn_find_unused_parameters_trueTrD   )r1   FrE   )r2   TrE   )r4   FrE   )r5   TrE   z.DDP strategy with `find_unused_parameters` as z and `start_method` ')r   find_unused_parametersrC   )register)clsr   entriesnamerC   fupr^   r^   r_   register_strategiesY  s$   

zDDPStrategy.register_strategies	exceptionc                 C   s   t |ddd d S )Nz>.*Expected to have finished reduction in the prior iteration.*ay  It looks like your LightningModule has parameters that were not used in producing the loss returned by training_step. If this is intentional, you must enable the detection of unused parameters in DDP, either by setting the string value `strategy='ddp_find_unused_parameters_true'` or by setting the flag in the strategy with `strategy=DDPStrategy(find_unused_parameters=True)`.)patternnew_messager+   )r\   r   r^   r^   r_   on_exception}  s
   
zDDPStrategy.on_exceptionc                    s   t | jj d | j}t| jtr,| jjs)| j	 
dr)td| jj d || _|d urO|jd urO|jjjtjkrO| jrO| jd usGJ | j| j| _t   d S )Nz: tearing down strategycan_set_static_graphzyYour model can run with static graph optimizations. For future training runs, we suggest you pass `Trainer(..., strategy=z%(static_graph=True))` to enable them.)rL   rM   rN   rO   r   r   r}   r   static_graph_get_ddp_logging_datagetr.   _trainerr{   r|   r*   r~   r   revertrJ   teardown)r\   	pl_moduler]   r^   r_   r     s$   
zDDPStrategy.teardown)rH   N)rx   ry   rH   Nrf   )r   )Nr   )>rO   
__module____qualname____doc__r   r
   r   r   devicer   r   r$   objectr   intstrr   r	   r   rK   propertyboolre   r   rh   rk   setterro   r   rs   rA   ru   rv   rz   r   r   r   rw   r   r   r   r   r   r   r   r   r   r   r(   r   r   r   r   r   r   classmethodr   r   BaseExceptionr   r   __classcell__r^   r^   r]   r_   r6   D   s    	
$
	




	"r6   c                   @   s@   e Zd ZededdddfddZededdddfdd	ZdS )
rP   wrapper_moduleoriginal_moduler   rH   Nc                 C   "   t |tr|jsd|_d S d S d S )NFr   r   r   require_backward_grad_syncr\   r   r   r^   r^   r_   on_after_inner_forward  s   
z-_DDPForwardRedirection.on_after_inner_forwardc                 C   r   )NTr   r  r^   r^   r_   on_after_outer_forward  s   
z-_DDPForwardRedirection.on_after_outer_forward)rO   r   r   r   r   r  r  r^   r^   r^   r_   rP     s
    rP   )Slogging
contextlibr   datetimer   typingr   r   r   r   r   r	   r
   r   r   torch.distributed"lightning_utilities.core.rank_zeror   r   r   torch.nnr   torch.nn.parallel.distributedr   torch.optim.optimizerr   typing_extensionsr   pytorch_lightningr   lightning_fabric.pluginsr   r   5lightning_fabric.plugins.collectives.torch_collectiver   lightning_fabric.strategiesr   &lightning_fabric.utilities.distributedr   r   r   r   r   r   "lightning_fabric.utilities.importsr   $lightning_fabric.utilities.optimizerr   lightning_fabric.utilities.seedr    lightning_fabric.utilities.typesr    pytorch_lightning.core.optimizerr    'pytorch_lightning.overrides.distributedr!   r"   r#   #pytorch_lightning.plugins.precisionr$   &pytorch_lightning.strategies.launchersr%   r&   %pytorch_lightning.strategies.parallelr'   %pytorch_lightning.strategies.strategyr(   r)    pytorch_lightning.trainer.statesr*   &pytorch_lightning.utilities.exceptionsr,   %pytorch_lightning.utilities.rank_zeror-   r.   6torch.distributed.algorithms.model_averaging.averagersr/   	getLoggerrO   rL   _DDP_FORK_ALIASESr6   rP   r^   r^   r^   r_   <module>   sN   (

  d