o
    8wiiJ                     @   s  d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlZd dlmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z%m&Z&m'Z' d dl#m(Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 d dl4m5Z5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z< d dl=m>Z> d dl?m@Z@mAZA d dlBmCZC d dlDmEZE d dlFmGZGmHZHmZ erd dlImJZJ e KeLZMdZNG dd  d e>ZOG d!d" d"eAZPdS )#    N)nullcontext)	timedelta)TYPE_CHECKINGAnyCallableLiteralOptionalUnion)rank_zero_only)Tensor)Module)DistributedDataParallel)	Optimizer)override)CheckpointIOClusterEnvironment)default_pg_timeout)_StrategyRegistry)_distributed_is_initialized-_get_default_process_group_backend_for_device_init_dist_connection_sync_ddp_if_availablegroup)_IS_WINDOWS)_optimizers_to_device)
reset_seed)ReduceOp)LightningOptimizer)_register_ddp_comm_hook_sync_module_statesprepare_for_backward)	Precision)_MultiProcessingLauncher_SubprocessScriptLauncher)ParallelStrategy)
TBroadcast_ForwardRedirection)	TrainerFn_augment_message)rank_zero_deprecationrank_zero_infor
   )ModelAverager)ddp_fork%ddp_fork_find_unused_parameters_false$ddp_fork_find_unused_parameters_trueddp_notebook)ddp_notebook_find_unused_parameters_false(ddp_notebook_find_unused_parameters_truec                       s  e Zd ZdZddddddddddedfded deeej  dee	 dee
 d	ee d
ee dee dee dee dee dee ded deddf fddZedefddZeedejfddZedefddZejdeddfddZedefddZeedeeef fd d!Zedee fd"d#Zed]d$d%Zed] fd&d'Z ed^d*d+Z!ed,e"de#fd-d.Z$d]d/d0Z%defd1d2Z&d]d3d4Z'd]d5d6Z(d]d7d8Z)e	d_d9e*d:eg ef d,ee+d;e"f  dedef
 fd<d=Z,d]d>d?Z-deee  fd@dAZ.edBededdfdCdDZ/ed`dFe0dGede0fdHdIZ1edJe2ddfdKdLZ3ed]dMdNZ4e	OdadPe2dQee dRee+e5ef  de2fdSdTZ6e7edUe8ddfdVdWZ9edXe:ddfdYdZZ;ed] fd[d\Z<  Z=S )bDDPStrategyzKStrategy for multi-process single-device training on one or multiple nodes.Npopenacceleratorzpl.accelerators.Acceleratorparallel_devicescluster_environmentcheckpoint_ioprecision_pluginddp_comm_stateddp_comm_hookddp_comm_wrappermodel_averaging_periodprocess_group_backendtimeoutstart_method)r5   spawnfork
forkserverkwargsreturnc                    sr   t  j|||||d t| jj d t | _d| _|| _	|| _
|| _|| _|	| _d | _|
| _|| _|| _d S )N)r6   r7   r8   r9   r:   z: initializing DDP strategy   )super__init__logdebug	__class____name___DDPForwardRedirection_forward_redirection
_num_nodes_ddp_kwargs_ddp_comm_state_ddp_comm_hook_ddp_comm_wrapper_model_averaging_period_model_averager_process_group_backend_timeout_start_method)selfr6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rE   rL    ]/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/pytorch_lightning/strategies/ddp.pyrI   G   s&   
zDDPStrategy.__init__c                 C   s   t dt| j ddd dS )z1Legacy property kept for backwards compatibility.`z3.is_distributed` is deprecated. Use is discouraged.   )
stacklevelT)r+   typerM   rZ   r\   r\   r]   is_distributedk   s   zDDPStrategy.is_distributedc                 C   s   | j d usJ | j | j S N)r7   
local_rankrb   r\   r\   r]   root_devices   s   zDDPStrategy.root_devicec                 C      | j S rd   rP   rb   r\   r\   r]   	num_nodesy      zDDPStrategy.num_nodesri   c                 C   s
   || _ d S rd   rh   )rZ   ri   r\   r\   r]   ri   }   s   
c                 C   s   | j d ur
t| j S dS Nr   )r7   lenrb   r\   r\   r]   num_processes   s   zDDPStrategy.num_processesc                 C   s   | j | j | jdS )N)num_replicasrank)ri   rm   global_rankrb   r\   r\   r]   distributed_sampler_kwargs   s   z&DDPStrategy.distributed_sampler_kwargsc                 C   rg   rd   )rW   rb   r\   r\   r]   r?      rj   z!DDPStrategy.process_group_backendc                 C   sD   | j d usJ | jdkrt| j | j| j| _d S t| | jd| _d S )Nr5   )rA   )r8   rY   r$   rm   ri   	_launcherr#   rb   r\   r\   r]   _configure_launcher   s   
zDDPStrategy._configure_launcherc                    s   t    |   d S rd   )rH   setup_environmentsetup_distributedrb   r[   r\   r]   rt      s   
zDDPStrategy.setup_environmenttrainer
pl.Trainerc                 C   s   | j d usJ | j | |jj}| jd usJ |tjkr(| jr(| j| j| _| j	
| j |   |tjkrB|   | | nt| j |   |tjkrst| j| j dd lm  m  m  m} t| j|jru|   d S d S d S rk   )r6   setupstatefnmodelr(   FITTING_layer_syncapplyr:   convert_modulemodel_to_deviceconfigure_ddpsetup_optimizersr    setup_precision_pluginr   
optimizersrf   >torch.distributed.algorithms.ddp_comm_hooks.post_localSGD_hookdistributed
algorithmsddp_comm_hookspost_localSGD_hook
isinstancerR   PostLocalSGDState_enable_model_averaging)rZ   rv   
trainer_fnpost_localSGDr\   r\   r]   rx      s(   


zDDPStrategy.setupr{   c                 C   s~   |   }td| d| j  |durtjtj nt }| t	d||d| jW  d   S 1 s8w   Y  dS )z^Wraps the model into a :class:`~torch.nn.parallel.distributed.DistributedDataParallel` module.z&setting up DDP model with device ids: z
, kwargs: N)module
device_idsr\   )
determine_ddp_device_idsrJ   rK   rQ   torchcudastreamStreamr   r   )rZ   r{   r   ctxr\   r\   r]   _setup_model   s    $zDDPStrategy._setup_modelc                 C   sR   t | jj d t  |   |  | _| jd usJ t	| j| j| j
d d S )Nz: setting up distributed...)r@   )rJ   rK   rL   rM   r   set_world_ranks_get_process_group_backendrW   r8   r   rX   rb   r\   r\   r]   ru      s   
zDDPStrategy.setup_distributedc                 C   s   | j pt| jS rd   )rW   r   rf   rb   r\   r\   r]   r      s   z&DDPStrategy._get_process_group_backendc                 C   sJ   | j d ur| j | j| j | j  | j | j| j  | j t_	t
_	d S rd   )r8   set_global_rank	node_rankrm   re   set_world_sizeri   rp   r
   ro   utils_rank_zero_onlyrb   r\   r\   r]   r      s   
zDDPStrategy.set_world_ranksc                 C   sP   t | jj d | jjdkr&t| jtsJ t	| j| j
| j| jd d S d S )Nz: registering ddp hooksr   )r{   r;   r<   r=   )rJ   rK   rL   rM   rf   ra   r   r{   r   r   rR   rS   rT   rb   r\   r\   r]   _register_ddp_hooks   s   
zDDPStrategy._register_ddp_hooksc                 C   s   t | jj d | jd u rtdddlm}m}m	} | j
D ]&}t|tr*|j}ts1t||nd}t|||fs<|rFtd|jj dq | jd usNJ tjjjjj| j| jjd| _d S )	Nz.: reinitializing optimizers with post localSGDz\Post-localSGD algorithm is used, but model averaging period is not provided to DDP strategy.r   )DistributedOptimizerPostLocalSGDOptimizerZeroRedundancyOptimizerFzKCurrently model averaging cannot work with a distributed optimizer of type .)periodwarmup_steps)rJ   rK   rL   rM   rU   
ValueErrortorch.distributed.optimr   r   r   r   r   r   
_optimizerr   rR   r   r   r   model_averaging	averagersPeriodicModelAveragerstart_localSGD_iterrV   )rZ   r   r   r   	optimizeris_distributed_optimizerr\   r\   r]   r      s*   



z#DDPStrategy._enable_model_averagingr   closurepl.LightningModulec                    sJ   t  j|||fi |}| jdu r|S dd |jD }| jt| |S )aI  Performs the actual optimizer step.

        Args:
            optimizer: the optimizer performing the step
            closure: closure calculating the loss value
            model: reference to the model, optionally defining optimizer step related hooks
            **kwargs: Any extra arguments to ``optimizer.step``

        Nc                 S   s(   g | ]}|d  D ]	}|j dur|qqS )paramsN)grad).0r   paramr\   r\   r]   
<listcomp>  s   ( z.DDPStrategy.optimizer_step.<locals>.<listcomp>)rH   optimizer_steprV   param_groupsaverage_parametersiter)rZ   r   r   r{   rE   optimizer_outputr   r[   r\   r]   r      s   
zDDPStrategy.optimizer_stepc                 C   s@   t | jj d t| jtjsJ | | j| _| 	  d S )Nz%: configuring DistributedDataParallel)
rJ   rK   rL   rM   r   r{   plLightningModuler   r   rb   r\   r\   r]   r     s   zDDPStrategy.configure_ddpc                 C   s   | j jdkrd S | j jgS )Ncpu)rf   ra   indexrb   r\   r\   r]   r     s   
z$DDPStrategy.determine_ddp_device_idsargsc                 O   s<   t  sd S tj dkrtjj|  d d S tj  d S )Nnccl)r   )r   r   r   get_backendbarrierr   )rZ   r   rE   r\   r\   r]   r   #  s
   zDDPStrategy.barrierr   objsrcc                 C   s,   t  s|S |g}tjj||tjd |d S )Nr   r   )r   r   r   broadcast_object_list_groupWORLD)rZ   r   r   r\   r\   r]   	broadcast-  s
   zDDPStrategy.broadcastclosure_lossc                 C   s:   t | jtsdS | jdusJ | jjst| j| dS dS )z.Run before precision plugin executes backward.N)r   r{   r   lightning_moduleautomatic_optimizationr!   )rZ   r   r\   r\   r]   pre_backward6  s   zDDPStrategy.pre_backwardc                 C   s<   t | jj d| j d | jd usJ | j| j d S )Nz: moving model to device [z]...)rJ   rK   rL   rM   rf   r{   torb   r\   r\   r]   r   ?  s   zDDPStrategy.model_to_devicemeantensorr   	reduce_opc                 C   s   t |trt|||dS |S )a  Reduces a tensor from several distributed processes to one aggregated tensor.

        Args:
            tensor: the tensor to sync and reduce
            group: the process group to gather results from. Defaults to all processes (world)
            reduce_op: the reduction operation. Defaults to 'mean'/'avg'.
                Can also be a string 'sum' to calculate the sum during reduction.

        Return:
            reduced value, except when the input was not a tensor the output remains is unchanged

        )r   )r   r   r   )rZ   r   r   r   r\   r\   r]   reduceE  s   
zDDPStrategy.reducestrategy_registryc              	   C   sf   d}|D ]\}}|j || d| d|d qd}|D ]\}}}|j || d| d| d||d qd S )	N))ddpr5   )	ddp_spawnrB   )r.   rC   )r1   rC   z"DDP strategy with `start_method` '')descriptionrA   )) ddp_find_unused_parameters_falseFr5   )ddp_find_unused_parameters_trueTr5   )&ddp_spawn_find_unused_parameters_falseFrB   )%ddp_spawn_find_unused_parameters_trueTrB   )r/   FrC   )r0   TrC   )r2   FrC   )r3   TrC   z.DDP strategy with `find_unused_parameters` as z and `start_method` ')r   find_unused_parametersrA   )register)clsr   entriesnamerA   fupr\   r\   r]   register_strategiesY  s$   

zDDPStrategy.register_strategies	exceptionc                 C   s   t |ddd d S )Nz>.*Expected to have finished reduction in the prior iteration.*ay  It looks like your LightningModule has parameters that were not used in producing the loss returned by training_step. If this is intentional, you must enable the detection of unused parameters in DDP, either by setting the string value `strategy='ddp_find_unused_parameters_true'` or by setting the flag in the strategy with `strategy=DDPStrategy(find_unused_parameters=True)`.)patternnew_messager)   )rZ   r   r\   r\   r]   on_exception}  s
   
zDDPStrategy.on_exceptionc                    s   t | jj d | j}t| jtr,| jjs)| j	 
dr)td| jj d || _|d urO|jd urO|jjjtjkrO| jrO| jd usGJ | j| j| _t   d S )Nz: tearing down strategycan_set_static_graphzyYour model can run with static graph optimizations. For future training runs, we suggest you pass `Trainer(..., strategy=z%(static_graph=True))` to enable them.)rJ   rK   rL   rM   r   r   r{   r   static_graph_get_ddp_logging_datagetr,   _trainerry   rz   r(   r|   r}   revertrH   teardown)rZ   	pl_moduler[   r\   r]   r     s$   
zDDPStrategy.teardown)rF   N)rv   rw   rF   Nrd   )r   )Nr   )>rM   
__module____qualname____doc__r   r   listr   devicer   r   r"   objectr   intstrr   r   r   rI   propertyboolrc   r   rf   ri   setterrm   dictrq   r?   rs   rt   rx   r   r   r   ru   r   r   r   r   r   r	   r   r   r   r   r&   r   r   r   r   r   r   classmethodr   r   BaseExceptionr   r   __classcell__r\   r\   r[   r]   r4   D   s    	
$
	




	"r4   c                   @   s@   e Zd ZededdddfddZededdddfdd	ZdS )
rN   wrapper_moduleoriginal_moduler   rF   Nc                 C   "   t |tr|jsd|_d S d S d S )NFr   r   r   require_backward_grad_syncrZ   r   r   r\   r\   r]   on_after_inner_forward  s   
z-_DDPForwardRedirection.on_after_inner_forwardc                 C   r   )NTr   r  r\   r\   r]   on_after_outer_forward  s   
z-_DDPForwardRedirection.on_after_outer_forward)rM   r   r   r   r   r  r  r\   r\   r\   r]   rN     s
    rN   )Qlogging
contextlibr   datetimer   typingr   r   r   r   r   r	   r   torch.distributed"lightning_utilities.core.rank_zeror
   r   r   torch.nnr   torch.nn.parallel.distributedr   torch.optim.optimizerr   typing_extensionsr   pytorch_lightningr   lightning_fabric.pluginsr   r   5lightning_fabric.plugins.collectives.torch_collectiver   lightning_fabric.strategiesr   &lightning_fabric.utilities.distributedr   r   r   r   r   r   "lightning_fabric.utilities.importsr   $lightning_fabric.utilities.optimizerr   lightning_fabric.utilities.seedr    lightning_fabric.utilities.typesr    pytorch_lightning.core.optimizerr   'pytorch_lightning.overrides.distributedr   r    r!   #pytorch_lightning.plugins.precisionr"   &pytorch_lightning.strategies.launchersr#   r$   %pytorch_lightning.strategies.parallelr%   %pytorch_lightning.strategies.strategyr&   r'    pytorch_lightning.trainer.statesr(   &pytorch_lightning.utilities.exceptionsr*   %pytorch_lightning.utilities.rank_zeror+   r,   6torch.distributed.algorithms.model_averaging.averagersr-   	getLoggerrM   rJ   _DDP_FORK_ALIASESr4   rN   r\   r\   r\   r]   <module>   sN    

  d