o
    ॵiy+                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZ d dlZd dlZd dlmZ d dlmZ d dlmZ defddZdedefd	d
Zdd Zd9dededdfddZdeddfddZdeddfddZd:dede
e ddfddZd:deeef fddZ dd Z!dd Z"dd Z#d d! Z$d"d# Z%d:d$d%Z&d:d&d'Z'd(d) Z(d*d+ Z)d,d- Z*e + d.d/ Z,d0d1 Z-d2d3 Z.d:d4d5Z/d6ej0j1defd7d8Z2dS );    N)CallableListOptionalTuple)version)distributedreturnc                  C   s2   t  t jt j} | d |  d }|   |S )N) r      )socketAF_INETSOCK_STREAMbindgetsocknameclose)sockport r   P/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/utils/torch_utils.py_find_free_port   s
   
r   r   c                    sh   t t  d }|d t  t jt jt fdd|D W  d    S 1 s-w   Y  d S )N	localhostc                 3   s"    | ]} | fd kV  qdS )r   N)
connect_ex).0ipr   sr   r   	<genexpr>"   s     z _is_free_port.<locals>.<genexpr>)r   gethostbyname_exgethostnameappendr   r   all)r   ipsr   r   r   _is_free_port   s
   
$r#   c                 K   s`   t | dr| jdi |} | S ttjtdkr%tj| fi |} | S tdtj d | S )Ncompilez	2.0.0.devzDCompiling model needs torch version > 2.0.0, your torch version is: z , origin model will be returned.r   )hasattrr$   r   parsetorch__version__print)modelcompile_optionsr   r   r   compile_model%   s   
r,   nccllauncherbackendc                 K   s|   t jddd u rt d | dkrt|fi | d S | dkr)t|fi | d S | dkr7t|fi | d S td|  )NT)
allow_nonespawnpytorchmpislurmzInvalid launcher type: )mpget_start_methodset_start_method_init_dist_pytorch_init_dist_mpi_init_dist_slurm
ValueError)r.   r/   kwargsr   r   r   	init_dist3   s   
r=   c                 K   s2   t tjd }tj| tjdd| i| d S )N
LOCAL_RANKr/   r   )intosenvironr'   cuda
set_devicedistinit_process_groupr/   r<   
local_rankr   r   r   r8   @   s   r8   c                 K   sx   t tjd }tj| dtjvrdtjd< dtjvr tdtjd tjd< tjd tjd	< tjdd
| i| d S )NOMPI_COMM_WORLD_LOCAL_RANKMASTER_PORT29500MASTER_ADDRz/The environment variable MASTER_ADDR is not setOMPI_COMM_WORLD_SIZE
WORLD_SIZEOMPI_COMM_WORLD_RANKRANKr/   r   )	r?   r@   rA   r'   rB   rC   KeyErrorrD   rE   rF   r   r   r   r9   G   s   


r9   c                 C   s   t tjd }t tjd }tjd }tj }tj||  td| d}|dur5t	|tjd< ndtjv r;nt
drEd	tjd< nt	t tjd< d
tjvrW|tjd
< t	|tjd< t	|| tjd< t	|tjd< tj| d dS )a  Initialize slurm distributed training environment.

    If argument ``port`` is not specified, then the master port will be system
    environment variable ``MASTER_PORT``. If ``MASTER_PORT`` is not in system
    environment variable, then a default port ``29500`` will be used.

    Args:
        backend (str): Backend of torch.distributed.
        port (int, optional): Master port. Defaults to None.
    SLURM_PROCIDSLURM_NTASKSSLURM_NODELISTzscontrol show hostname z | head -n1NrI   i<s  rJ   rK   rM   r>   rO   r/   )r?   r@   rA   r'   rB   device_countrC   
subprocess	getoutputstrr#   r   rD   rE   )r/   r   proc_idntasks	node_listnum_gpusaddrr   r   r   r:   T   s*   





r:   c                 C   s`   t  r(ddlm} | du r| rddlm} | } t| }t| }||fS d}d}||fS )zGet dist info of a specified group

    Args:
        group: The parallel group, default None, for the global group

    Returns:
        A tuple of the current rank and world_size of the group
    r   )is_megatron_initializedN)mpur
   )	is_distmodelscope.utils.megatron_utilsr^   megatron_utilr_   get_data_parallel_grouprD   get_rankget_world_size)groupr^   r_   rank
world_sizer   r   r   get_dist_info{   s   	

ri   c                   C   s   t tjddS )Nr>   r   )r?   r@   rA   getr   r   r   r   get_local_rank   s   rk   c                   C       t  sdS t  sdS t  S )Nr   )rD   is_availableis_initializedrd   r   r   r   r   rd      
   rd   c                   C   rl   )Nr
   )rD   rm   rn   re   r   r   r   r   re      ro   re   c                  C   s8   t  sdS t  sdS t  } | dkrdS t   dS )zj
    Helper function to synchronize (barrier)
    among all processes when using distributed training
    Nr
   )rD   rm   rn   re   barrier)rh   r   r   r   synchronize   s   rq   c                   C   s   t  ot  S N)rD   rm   rn   r   r   r   r   r`      s   r`   c                 C   s   t  r
t| dkS dS )Nr   T)r`   rD   rd   rf   r   r   r   	is_master   s   rt   c                    s   dt dt f fdd}|S )Nfuncr   c                    s   t   fdd}|S )Nc                     s   t r | i |S d S rr   )rt   )argsr<   )ru   rf   r   r   wrapper   s   z.master_only.<locals>.decorate.<locals>.wrapper)	functoolswraps)ru   rw   rs   )ru   r   decorate   s   zmaster_only.<locals>.decorate)r   )rf   rz   r   rs   r   master_only   s   	r{   c                  C   s6   t  st S d} t rt } t  t| d} | S )zRMake sure each rank has the same temporary directory on the distributed mode.
    Nr   )r`   tempfilemkdtemprt   rD   rp   	broadcast)tmpdirr   r   r   make_tmp_dir   s   
r   c                 C   s   t  }tjdgdd}||kr&tjtt| tjdd}tj|jdd}t 	  t 
|| ||krAtj| fdtjdd}t 	  t 
|| t|   S )z
    Broadcasts the inputs to all ranks.

    Arguments:
        inputs : Any objects that can be serialized by pickle.
        src (int): Source rank.
    Returns:
        Each rank returns the same value as src.
    r   rB   devicedtyper   )rD   rd   r'   tensor	bytearraypickledumpsuint8shaperp   r~   fullitemloadscpunumpytobytes)inputssrcrg   shape_tensorinputs_tensorr   r   r   r~      s$   
r~   c                 C   sN   | d ur | dkr t |  tj |  t|  tj|  d S td|  )Nr   z0Random seed should be positive, current seed is )randomseednpr'   manual_seedrB   manual_seed_allr;   )r   r   r   r   set_random_seed   s   

r   c                   C   s    t  dkrt jddS t jjS )zj
    Return a process group based on gloo backend, containing all the ranks
    The result is cached.
    r-   gloorT   )rD   get_backend	new_grouprf   WORLDr   r   r   r   _get_global_gloo_group  s   r   c                 C   s   t |}|dv sJ t|dkrdnd}t| }t|dkr0td	t
 t|d | tj|}t|j|d}|S )N)r   r-   r   r   rB   i   @z;Rank {} trying to all-gather {:.2f} GB of data on device {}r   )rD   r   r'   r   r   r   lenloggerwarningformatrd   ByteStoragefrom_buffer
ByteTensorto)datarf   r/   r   bufferstorager   r   r   r   _serialize_to_tensor  s   

r   c                    s   t j|d}|dksJ dtj  gtj jd} fddt|D }t j|||d dd |D }t	|}||krStj
|| ftj jd}tj |fdd	 | fS )
zz
    Returns:
        list[int]: size of the tensor, on each rank
        Tensor: padded tensor that has the max size
    rs   r
   zBcomm.gather/all_gather must be called from ranks within the group!r   c                    s"   g | ]}t jd gt j jdqS )r
   r   )r'   zerosint64r   r   _r   r   r   
<listcomp>/      z*_pad_to_largest_tensor.<locals>.<listcomp>c                 S   s   g | ]}t | qS r   )r?   r   )r   sizer   r   r   r   4  s    r   )dim)rD   re   r'   r   numelr   r   range
all_gathermaxr   r   cat)r   rf   rh   
local_size	size_listmax_sizepaddingr   r   r   _pad_to_largest_tensor"  s*   

r   c                    s   t  dkr| gS |du rt }t |dkr| gS t| |t|\}t|  fdd|D }tj||d g }t||D ]\} 	 
 d| }|t| qB|S )a;  
    Run all_gather on arbitrary picklable data (not necessarily tensors).
    Args:
        data: any picklable object
        group: a torch process group. By default, will use a group which
            contains all ranks on gloo backend.
    Returns:
        list[data]: list of data gathered from each rank
    r
   Nc                    s"   g | ]}t j ft jjd qS )r   )r'   emptyr   r   r   r   r   r   r   r   Y  r   zall_gather.<locals>.<listcomp>rs   )re   r   rD   r   r   r   r   zipr   r   r   r    r   r   )r   rf   r   tensor_list	data_listr   r   r   r   r   r   B  s$   


r   r*   c                 C   s(   t dd |  D dh }t|dkS )Nc                 s   s    | ]}t |jV  qd S rr   )rX   r   )r   pr   r   r   r   h  s    z$is_on_same_device.<locals>.<genexpr>r   r
   )set
parametersr   )r*   
device_setr   r   r   is_on_same_deviceg  s   r   )r-   rr   )3rx   r@   r   r   r   rV   r|   typingr   r   r   r   r   r   r'   torch.multiprocessingmultiprocessingr5   	packagingr   r   rD   rX   r   r?   boolr#   r,   r=   r8   r9   r:   ri   rk   rd   re   rq   r`   rt   r{   r   r~   r   	lru_cacher   r   r   r   nnModuler   r   r   r   r   <module>   sJ   '

!

 %