o
    oiD                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZmZ d dlZd dlm  mZ d dlmZ d dlmZ d dlmZmZmZ d d	l m!Z!m"Z"m#Z# d d
l$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. ej/0 rd dl1m2Z2 nG dd dZ2erd dl3m4Z4 d dl5m6Z6 d dl7m8Z8 e9e:Z;dKdddee- de<de=fddZ>dLdedee dee fddZ?deded e<dee fd!d"Z@	dMdedee d#eee.eAf  defd$d%ZBdMdedee d#eee.eAf  defd&d'ZC	(dNd)eded* d+e=defd,d-ZD		dMd.d/d0eAd1ee< d ee< d2eddfd3d4ZEdOd5d6ZFd7ejGdeAfd8d9ZHG d:d; d;eZIG d<d= d=eZJdPd?e<de<fd@dAZKdPd?e<ddfdBdCZLde=fdDdEZMG dFdG dGZNd)ede"dH fdIdJZOdS )Q    N)nullcontext)	timedelta)Path)TYPE_CHECKINGAnyIterableIteratorListOptionalSizedUnion)package_available)Tensor)DatasetDistributedSamplerSampler)Self	TypeGuardoverride)_is_local_file_protocol)_num_cpus_available)_TORCH_GREATER_EQUAL_2_4)rank_zero_info)_PATHReduceOpgroupc                   @   s   e Zd ZdZdS )r   N)__name__
__module____qualname__WORLD r!   r!   Z/home/ubuntu/.local/lib/python3.10/site-packages/lightning/fabric/utilities/distributed.pyr      s    r   DTensor)ClusterEnvironment)Strategy   strategyr&   pathtimeoutreturnc                 C   sT  |dur
t |s
dS t|du rt n| }t| dr"| jdkr$dS | |}| j||kdds4dS | j| ddsDt	d| |
 rK|jn|}|d }|jdd	 |   | jre|  d}nt }d}|st | |k r| }|st | |k su|   | j|dd}tt |  W d   |S 1 sw   Y  |S )
a  Checks whether the filesystem under the given path is shared across all processes.

    This function should only be used in a context where distributed is initialized.

    Args:
        strategy: The strategy being used, either from Fabric (``fabric.strategy``) or from Trainer
            (``trainer.strategy``).
        path: The path to check. Defaults to the current working directory. The user must have permissions to write
            to this path or the parent folder, and the filesystem must be writable.
        timeout: If any of the processes can't list the file created by rank 0 within this many seconds, the
            filesystem is determined to be not shared.

    NT
world_size   allFzYUnable to determine if the path belongs to a shared filesystem. The path does not exist: z.lightning_shared_fs_check)
missing_ok)r   r   cwdresolvehasattrr,   	broadcastreduce_boolean_decisionexistsFileNotFoundErroris_fileparentunlinkbarrieris_global_zerotouchtimeperf_counter
contextlibsuppressOSError)r(   r)   r*   rank_zero_path
check_filefoundstart	all_foundr!   r!   r"   is_shared_filesystem+   s@   


rH   resultc                    s^  |du r	t jjj}|  } t j|}t jj|d | jdkr%t| ||S t j	| j
| jd  fddt|D }t jj| |d t |jddjtfdd	|D }|r_t| ||S g }    }t|D ]}|d ||  qmt| |fd
dt|D }t j|| t|D ]\}	}
dd |
D }||	 | ||	< q|S )ai  Function to gather all tensors from several DDP processes onto a list that is broadcasted to all processes.

    Works on tensors that have the same number of dimensions, but where each dimension may differ. In this case
    tensors are padded, gathered and then trimmed to secure equal workload for all processes.

    Args:
        result: The value to sync
        group: The process group to gather results from. Defaults to all processes (world)

    Return:
        gathered_result: List with size equal to the process group where
            gathered_result[i] corresponds to result tensor from process i

    Nr   r   devicec                       g | ]}t  qS r!   torch
zeros_like.0_)
local_sizer!   r"   
<listcomp>       z'_gather_all_tensors.<locals>.<listcomp>)dimc                 3   s    | ]	}t | kV  qd S Nr.   )rQ   ls)max_sizer!   r"   	<genexpr>   s    z&_gather_all_tensors.<locals>.<genexpr>c                    rL   r!   rM   rP   )result_paddedr!   r"   rT      rU   c                 S   s   g | ]}t |qS r!   )slice)rQ   dim_sizer!   r!   r"   rT      s    )rN   distributedr   r    
contiguousget_world_sizer;   ndim_simple_gather_all_tensorstensorshaperK   range
all_gatherstackmaxvaluesr/   detachcpureversedappenditemFpad	enumerate)rI   r   r,   local_sizesall_sizes_equalpad_dimspad_byvalgathered_resultidx	item_sizeslice_paramr!   )rS   rY   r[   r"   _gather_all_tensorsf   s4   


r{   r,   c                    s*    fddt |D }tj| | |S )Nc                    rL   r!   rM   rP   rI   r!   r"   rT      rU   z._simple_gather_all_tensors.<locals>.<listcomp>)re   rN   r^   rf   )rI   r   r,   rw   r!   r|   r"   rb      s   rb   	reduce_opc                 C   s   t  r
t| ||dS | S )a  Function to reduce a tensor across worker processes during distributed training.

    Args:
        result: The value to sync and reduce (typically tensor or number)
        group: The process group to gather results from. Defaults to all processes (world)
        reduce_op: The reduction operation. Defaults to sum.
            Can also be a string of 'avg', 'mean' to calculate the mean during reduction.

    Return:
        reduced value

    )r   r}   )_distributed_is_initialized	_sync_ddp)rI   r   r}   r!   r!   r"   _sync_ddp_if_available   s   r   c                 C   s   d}|du rt jjjn|}t|tr6|dkrdn|}| dkr.t j|dkr.tj	}d}n
t
t| }n|}tdrRtjdd	krR|  d
v rRtd |  } t jj|d t jj| ||dd t j|}|sm| S t | sy| | | S | |S )a%  Reduces a tensor across several distributed processes.

    This operation is performed in-place, meaning the result will be placed back into the input tensor on all processes.

    Args:
        result: The value to sync and reduce (typically tensor or number)
        group: The process group to gather results from. Defaults to all processes (world)
        reduce_op: The reduction operation. Defaults to sum.
            Can also be a string of 'avg', 'mean' to calculate the mean during reduction.

    Return:
        The reduced value.

    FNmeanavgglooThabana_frameworksHCCL_DISTRIBUTED_BACKEND1)ztorch.LongTensorztorch.hpu.LongTensorz0Long tensor unsupported on HPU, casting to floatr   )opr   async_op)rN   r^   r   r    
isinstancestrlowerget_backendr   SUMgetattrupperr   osenvirongettyper   floatr;   
all_reducer`   is_floating_pointcopy_div_)rI   r   r}   divide_by_world_sizer   r,   r!   r!   r"   r      s2   


r   Frc   ztorch.distributed.ProcessGroup
sync_gradsc                 C   sd   t  s| S ddlm} |  } |rt nt  || |}W d   n1 s(w   Y  t|S )an  Function to gather a tensor from several distributed processes.

    Args:
        tensor: Tensor of shape (batch, ...)
        group: The process group to gather results from. Defaults to all processes (world)
        sync_grads: Flag that allows users to synchronize gradients for all_gather op

    Return:
        A tensor of shape (world_size, batch, ...)

    r   )rf   N)r~   torch.distributed.nn.functionalrf   r_   r   rN   no_gradrg   )rc   r   r   rf   gathered_tensorsr!   r!   r"   _all_gather_ddp_if_available   s   
r   cluster_environmentr%   torch_distributed_backendglobal_rankkwargsc              	   K   s   t j s	tdt j rtd dS |dur|n|  }|dur%|n|  }| j	t
jd< t| jt
jd< td| d|d  d	|  t jj|f||d
| |dkr]tt td d| d| dd d dS )a  Utility function to initialize distributed connection by setting env variables and initializing the distributed
    process group.

    Args:
        cluster_environment: ``ClusterEnvironment`` instance
        torch_distributed_backend: Backend to use (includes `nccl` and `gloo`)
        global_rank: Rank of the current process
        world_size: Number of processes in the group
        kwargs: Kwargs for ``init_process_group``

    Raises:
        RuntimeError:
            If ``torch.distributed`` is not available

    zOtorch.distributed is not available. Cannot initialize distributed process groupz7torch.distributed is already initialized. Exiting earlyNMASTER_ADDRMASTER_PORTz'Initializing distributed: GLOBAL_RANK: z
, MEMBER: r-   /)rankr,   ncclzd----------------------------------------------------------------------------------------------------z
distributed_backend=z5
All distributed processes registered. Starting with z processes

)rN   r^   is_availableRuntimeErroris_initializedlogdebugr   r,   main_addressr   r   r   	main_portinfoinit_process_groupatexitregister_destroy_dist_connectionr   )r   r   r   r,   r   r!   r!   r"   _init_dist_connection	  s,   


 
r   c                   C   s4   t  t jt j t rtj  t  t jt j d S rW   )signalSIGINTSIG_IGNr~   rN   r^   destroy_process_groupSIG_DFLr!   r!   r!   r"   r   8  s   
r   rK   c                 C   s   | j dkrdS dS )Ncudar   r   )r   rJ   r!   r!   r"   -_get_default_process_group_backend_for_device@  s   r   c                   @   sX   e Zd ZdZdeeef ddfddZede	de
fdd	Zde	fd
dZdddZdS )_DatasetSamplerWrapperz6Dataset to create indexes from `Sampler` or `Iterable`samplerr+   Nc                 C   s:   t |ts	tdt|tdkrtd|| _d | _d S )Na  You seem to have configured a sampler in your DataLoader which does not provide `__len__` method. The sampler was about to be replaced by `DistributedSamplerWrapper` since `use_distributed_sampler` is True and you are using distributed training. Either provide `__len__` method in your sampler, remove it from DataLoader or set `use_distributed_sampler=False` if you want to handle distributed sampling yourself.infa  You seem to have configured a sampler in your DataLoader which does not provide finite `__len__` method. The sampler was about to be replaced by `DistributedSamplerWrapper` since `use_distributed_sampler` is True and you are using distributed training. Either provide `__len__` method in your sampler which returns a finite number, remove it from DataLoader or set `use_distributed_sampler=False` if you want to handle distributed sampling yourself.)r   r   	TypeErrorlenr   _sampler_sampler_list)selfr   r!   r!   r"   __init__G  s   

z_DatasetSamplerWrapper.__init__indexc                 C   s    | j d u rt| j| _ | j | S rW   )r   listr   )r   r   r!   r!   r"   __getitem__^  s   

z"_DatasetSamplerWrapper.__getitem__c                 C   s
   t | jS rW   )r   r   r   r!   r!   r"   __len__d  s   
z_DatasetSamplerWrapper.__len__c                 C   s   t | j| _dS )z4Reset the sampler list in order to get new sampling.N)r   r   r   r   r!   r!   r"   resetg  s   z_DatasetSamplerWrapper.resetr+   N)r   r   r   __doc__r   r   r   r   r   intr   r   r   r   r!   r!   r!   r"   r   D  s    r   c                       sP   e Zd ZdZdeeef dededdf fddZe	de
f fd	d
Z  ZS )DistributedSamplerWrappera  Wrapper over ``Sampler`` for distributed training.

    Allows you to use any sampler in distributed mode. It will be automatically used by Lightning in distributed mode if
    sampler replacement is enabled.

    Note:
        The purpose of this wrapper is to take care of sharding the sampler indices. It is up to the underlying
        sampler to handle randomness and shuffling. The ``shuffle`` and ``seed`` arguments on this wrapper won't
        have any effect.

    r   argsr   r+   Nc                    s"   t  jt|g|R i | d S rW   )superr   r   )r   r   r   r   	__class__r!   r"   r   y  s   "z"DistributedSamplerWrapper.__init__c                    s"    j    fddt  D S )Nc                 3   s    | ]} j | V  qd S rW   )dataset)rQ   r   r   r!   r"   rZ     s    z5DistributedSamplerWrapper.__iter__.<locals>.<genexpr>)r   r   r   __iter__r   r   r   r"   r   |  s   
z"DistributedSamplerWrapper.__iter__)r   r   r   r   r   r   r   r   r   r   r   r   __classcell__r!   r!   r   r"   r   l  s
    &r   r-   num_processesc                 C   s(   | dk rt d|  dtdt |  S )Nr-   z$`num_processes` should be >= 1, got .)
ValueErrorrh   r   )r   r!   r!   r"   _suggested_max_num_threads  s   r   c                 C   s2   dt jvrt| }t| t|t jd< d S d S )NOMP_NUM_THREADS)r   r   r   rN   set_num_threadsr   )r   num_threadsr!   r!   r"   _set_num_threads_if_needed  s
   

r   c                   C   s   t j o	t j S rW   )rN   r^   r   r   r!   r!   r!   r"   r~     s   r~   c                   @   sL   e Zd ZdZdddZdddZdefdd	Zd
edededdfddZ	dS )_InfiniteBarrieraP  A barrier with an infinite timeout.

    Creates a new process group with the GLOO backend with a very high timeout that makes the barrier effectively wait
    forever. This is useful in cases where you want to execute a long-running operation on a subset of ranks that should
    not be subject to the regular collective timeout.

    r+   Nc                 C   s   d | _ dd | _d S )Nc                   S   s   d S rW   r!   r!   r!   r!   r"   <lambda>  s    z+_InfiniteBarrier.__init__.<locals>.<lambda>)r   r;   r   r!   r!   r"   r     s   z_InfiniteBarrier.__init__c                 C   s   |    d S rW   )r;   r   r!   r!   r"   __call__  s   z_InfiniteBarrier.__call__c                 C   s,   t  rtjjdtddd| _| jj| _| S )Nr   i'  )days)backendr*   )r~   rN   r^   	new_groupr   r   monitored_barrierr;   r   r!   r!   r"   	__enter__  s   
z_InfiniteBarrier.__enter__exc_type	exc_value	tracebackc                 C   s(   |    | jd urtj| j d S d S rW   )r;   r   rN   r^   r   )r   r   r   r   r!   r!   r"   __exit__  s   
z_InfiniteBarrier.__exit__r   )
r   r   r   r   r   r   r   r   r   r   r!   r!   r!   r"   r     s    

r   r$   c                 C   s   t rddlm} t| |S dS )Nr   r#   F)r   torch.distributed._tensorr$   r   )rc   r$   r!   r!   r"   _is_dtensor  s   
r   )Nr'   rW   )NN)NFr   )r-   )Pr   r@   loggingr   r   r>   r   datetimer   pathlibr   typingr   r   r   r   r	   r
   r   r   rN   torch.nn.functionalnn
functionalro    lightning_utilities.core.importsr   r   torch.utils.datar   r   r   typing_extensionsr   r   r   #lightning.fabric.utilities.cloud_ior   lightning.fabric.utilities.datar   "lightning.fabric.utilities.importsr   $lightning.fabric.utilities.rank_zeror    lightning.fabric.utilities.typesr   r   r^   r   torch.distributedr   r   r$   lightning.fabric.pluginsr%   lightning.fabric.strategiesr&   	getLoggerr   r   r   boolrH   r{   rb   r   r   r   r   r   r   rK   r   r   r   r   r   r~   r   r   r!   r!   r!   r"   <module>   s    (

  ;6
,;


/(