o
    ߥi#"                     @   s   d Z ddlZddlZddlZddlZddlmZ dadd Z	d#ddZ
	d$d	d
Zd%ddZdd Zdd Zdd Ze dd Zdd Zdd Zd&ddZdd Zdefdd Zdefd!d"ZdS )'zDistributed helpers.    Nc                    sl   g }g }t  }| D ]  fddt|D }t j| dd || q
|D ]}|tj|dd q'|S )z
    All gathers the provided tensors from all processes across machines.
    Args:
        tensors (list): tensors to perform all gather across all processes in
        all machines.
    c                    s   g | ]}t  qS  )torch	ones_like.0_tensorr   q/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/models/multi_modal/videocomposer/utils/distributed.py
<listcomp>   s    
zall_gather.<locals>.<listcomp>Fasync_opr   dim)distget_world_sizerange
all_gatherappendr   cat)tensorsgather_listoutput_tensor
world_sizetensor_placeholdergathered_tensorr   r   r
   r      s   
r   Tc                 C   s@   | D ]	}t j|dd q|rt  }| D ]	}|d|  q| S )a-  
    All reduce the provided tensors from all processes across machines.
    Args:
        tensors (list): tensors to perform all reduce across all processes in
        all machines.
        average (bool): scales the reduced tensor by the number of overall
        processes across all machines.
    Fr   g      ?)r   
all_reducer   mul_)r   averager	   r   r   r   r
   r   %   s   
r   ncclc                 C   s6   t j|  | ||  }|| }tj||||d dS )a~  
    Initializes the default process group.
    Args:
        local_rank (int): the rank on the current local machine.
        local_world_size (int): the world size (number of processes running) on
        the current local machine.
        shard_id (int): the shard index (machine rank) of the current machine.
        num_shards (int): number of shards for distributed training.
        init_method (string): supporting three different methods for
            initializing process groups:
            "file": use shared file system to initialize the groups across
            different processes.
            "tcp": use tcp address to initialize the groups across different
        dist_backend (string): backend to use for distributed training. Options
            includes gloo, mpi and nccl, the details can be found here:
            https://pytorch.org/docs/stable/distributed.html
    )backendinit_methodr   rankN)r   cuda
set_devicer   init_process_group)
local_ranklocal_world_sizeshard_id
num_shardsr!   dist_backend	proc_rankr   r   r   r
   r%   8   s   
r%      c                 C   s   t j rt |  dkS dS )zB
    Determines if the current process is the master process.
    r   T)r   distributedis_initializedr   get_rank)num_gpusr   r   r
   is_master_proc^   s   
r1   c                   C       t  sdS t  sdS t  S )z$
    Get the size of the world.
       )r   is_availabler.   r   r   r   r   r
   r   h   
   r   c                   C   r2   )z.
    Get the rank of the current process.
    r   )r   r4   r.   r/   r   r   r   r
   r/   s   r5   r/   c                  C   s8   t  sdS t  sdS t  } | dkrdS t   dS )zj
    Helper function to synchronize (barrier) among all processes when
    using distributed training
    Nr3   )r   r4   r.   r   barrier)r   r   r   r
   synchronize~   s   r7   c                   C   s    t  dkrt jddS t jjS )z
    Return a process group based on gloo backend, containing all the ranks
    The result is cached.
    Returns:
        (group): pytorch dist group.
    r   gloo)r    )r   get_backend	new_groupgroupWORLDr   r   r   r
   _get_global_gloo_group   s   r=   c                 C   s   t |}|dv sJ t|dkrdnd}t| }t|dkr5tt	}|
dt t|d | tj|}t|j|d}|S )a  
    Seriialize the tensor to ByteTensor. Note that only `gloo` and `nccl`
        backend is supported.
    Args:
        data (data): data to be serialized.
        group (group): pytorch dist group.
    Returns:
        tensor (ByteTensor): tensor that serialized.
    )r8   r   r8   cpur#   i   @z;Rank {} trying to all-gather {:.2f} GB of data on device {})device)r   r9   r   r?   pickledumpslenlogging	getLogger__name__warningformatr/   ByteStoragefrom_buffer
ByteTensorto)datar;   r    r?   bufferloggerstorager	   r   r   r
   _serialize_to_tensor   s   


rP   c                    s   t j|d}|dksJ dtj  gtj jd} fddt|D }t j|||d dd |D }t	|}||krStj
|| ftj jd}tj |fdd	 | fS )
a  
    Padding all the tensors from different GPUs to the largest ones.
    Args:
        tensor (tensor): tensor to pad.
        group (group): pytorch dist group.
    Returns:
        list[int]: size of the tensor, on each rank
        Tensor: padded tensor that has the max size
    r;   r3   zHcomm.gather/all_gather must be called from ranks within the given group!dtyper?   c                    s"   g | ]}t jd gt j jdqS )r3   rR   )r   zerosint64r?   r   r   r   r
   r          z*_pad_to_largest_tensor.<locals>.<listcomp>c                 S   s   g | ]}t | qS r   )intitem)r   sizer   r   r
   r      s    r   r   )r   r   r   r	   numelrU   r?   r   r   maxrT   uint8r   )r	   r;   r   
local_size	size_listmax_sizepaddingr   r   r
   _pad_to_largest_tensor   s*   


ra   c                    s   t  dkr| gS |du rt }t |dkr| gS t| |t|\}t|  fdd|D }tj||d g }t||D ]\} 	 
 d| }|t| qB|S )a=  
    Run all_gather on arbitrary picklable data (not necessarily tensors).

    Args:
        data: any picklable object
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.

    Returns:
        list[data]: list of data gathered from each rank
    r3   Nc                    s"   g | ]}t j ft jjd qS )rR   )r   emptyr\   r?   r   r_   r	   r   r
   r      rV   z(all_gather_unaligned.<locals>.<listcomp>rQ   )r   r=   r   rP   ra   r[   r   zipr>   numpytobytesr   r@   loads)rL   r;   r^   tensor_list	data_listrY   rM   r   rc   r
   all_gather_unaligned   s$   

rj   c                 C   sd   | j dkrdS | j }t | }t|D ]}tt|| |d | }t|}|| jkr/|aqdS )z?
    Initialize variables needed for distributed training.
    r3   N)NUM_GPUSr   r   r   listr:   SHARD_ID_LOCAL_PROCESS_GROUP)cfgnum_gpus_per_machinenum_machinesi
ranks_on_ipgr   r   r
   init_distributed_training  s   


ru   returnc                   C   s$   t  sdS t  sdS t jtdS )zw
    Returns:
        The size of the per-machine process group,
        i.e. the number of processes per machine.
    r3   rQ   )r   r4   r.   r   rn   r   r   r   r
   get_local_size  s
   rw   c                   C   s0   t  sdS t  sdS tdusJ t jtdS )zh
    Returns:
        The rank of the current process within the local (per-machine) process group.
    r   NrQ   )r   r4   r.   rn   r/   r   r   r   r
   get_local_rank  s   rx   )T)r   )r,   )N)__doc__	functoolsrC   r@   r   torch.distributedr-   r   rn   r   r   r%   r1   r   r/   r7   	lru_cacher=   rP   ra   rj   ru   rW   rw   rx   r   r   r   r
   <module>   s.   


&


$'