o
    8wi                     @   s   d dl mZmZ d dlmZ d dlmZ d dlmZm	Z	 d dl
Z
d dl
mZ d dlmZ d dlZd dlmZmZ d d	lmZmZ d d
lmZ d dlmZ d dlmZ G dd deeZdS )    )ABCabstractmethod)	Generator)contextmanager)AnyOptionalN)Tensor)override)CheckpointIOClusterEnvironment)ReduceOp_all_gather_ddp_if_available)	LayerSync)	Precision)Strategyc                       s  e Zd ZdZ					d,ded deeej  dee dee	 dee
 f
 fd	d
ZeeedejfddZedefddZedefddZedefddZedefddZeedefddZedeeej  fddZejdeeej  ddfddZedeeef fddZed-dedee d edefd!d"Zed.d$ed%edefd&d'Ze de!fd(d)Z"ed/ fd*d+Z#  Z$S )0ParallelStrategyz:Strategy for training with multiple processes in parallel.Nacceleratorzpl.accelerators.Acceleratorparallel_devicescluster_environmentcheckpoint_ioprecision_pluginc                    s(   t  j|||d || _|| _d | _d S )N)r   r   r   )super__init__r   r   _layer_sync)selfr   r   r   r   r   	__class__ b/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/pytorch_lightning/strategies/parallel.pyr   "   s   
zParallelStrategy.__init__returnc                 C   s   dS )zReturn the root device.Nr   r   r   r   r   root_device/   s    zParallelStrategy.root_devicec                 C      | j d ur
| j  S dS Nr   )r   global_rankr    r   r   r   r$   5      zParallelStrategy.global_rankc                 C   r"   r#   )r   
local_rankr    r   r   r   r&   9   r%   zParallelStrategy.local_rankc                 C   r"   r#   )r   	node_rankr    r   r   r   r'   =   r%   zParallelStrategy.node_rankc                 C   r"   )N   )r   
world_sizer    r   r   r   r)   A   r%   zParallelStrategy.world_sizec                 C   s
   | j dkS r#   )r$   r    r   r   r   is_global_zeroE   s   
zParallelStrategy.is_global_zeroc                 C   s   | j S N_parallel_devicesr    r   r   r   r   J   s   z!ParallelStrategy.parallel_devicesc                 C   s
   || _ d S r+   r,   )r   r   r   r   r   r   N   s   
c                 C   s    | j d ur
t| j nd| jdS )Nr   )num_replicasrank)r   lenr$   r    r   r   r   distributed_sampler_kwargsR   s   z+ParallelStrategy.distributed_sampler_kwargsFtensorgroup
sync_gradsc                 C   s   t |||dS )z&Perform a all_gather on all processes.)r3   r4   )r   )r   r2   r3   r4   r   r   r   
all_gatherY   s   zParallelStrategy.all_gatherTdecisionallc                 C   sF   t jt|| jd}| j|tjd}|rt|| jk}|S t|}|S )a  Reduces a boolean decision over distributed processes. By default is analogous to ``all`` from the standard
        library, returning ``True`` only if all input decisions evaluate to ``True``. If ``all`` is set to ``False``,
        it behaves like ``any`` instead.

        Args:
            decision: A single input decision.
            all: Whether to logically emulate ``all`` or ``any``. Defaults to True.

        Returns:
            bool: The reduced boolean decision.

        )device)	reduce_op)	torchr2   intr!   reducer   SUMboolr)   )r   r6   r7   r   r   r   reduce_boolean_decision^   s   z(ParallelStrategy.reduce_boolean_decisionc                 c   sT    t | jtjjjr%| j  dV  W d   dS 1 sw   Y  dS dV  dS )zBlocks ddp sync gradients behaviour on backwards pass.

        This is useful for skipping sync when accumulating gradients, reducing communication overhead
        Returns: context manager with sync behaviour off

        N)
isinstancemodelpl	utilitiestypesDistributedDataParallelno_syncr    r   r   r   block_backward_synct   s   "
z$ParallelStrategy.block_backward_syncc                    s&   | j d usJ | j   t   d S r+   )r   teardownr   r    r   r   r   rH      s   
zParallelStrategy.teardown)NNNNN)NF)T)r   N)%__name__
__module____qualname____doc__r   listr:   r8   r   r
   r   r   propertyr   r	   r!   r;   r$   r&   r'   r)   r>   r*   r   setterdictstrr   r1   r   r5   r?   r   r   rG   rH   __classcell__r   r   r   r   r      s^    "r   )abcr   r   collections.abcr   
contextlibr   typingr   r   r:   r   typing_extensionsr	   pytorch_lightningrB   lightning_fabric.pluginsr
   r   &lightning_fabric.utilities.distributedr   r   pytorch_lightning.pluginsr   #pytorch_lightning.plugins.precisionr   %pytorch_lightning.strategies.strategyr   r   r   r   r   r   <module>   s   