o
    Tix                  
   @   sP  d Z ddlZddlZddlmZmZ ddlT ddlT ddlmZ ddl	m
Z
 dd	lmZ dd
lmZmZ ddlmZ ddlmZ ddlmZ dae Zi Ze ZdadadadaddlT G dd dZ dd Z!						d~ddZ"dd Z#dd Z$dd Z%dddZ&dd Z'de(fdd Z)d!d" Z*e#dd#d#d$e fd%d$Z+e#dd&d'Z,e#dd#d#d(e fd)d(Z-d*d+ Z.e/j0dd#d#e fd,d-Z1e#e/j0dd#d#d.e fd/d.Z2e#dd#d#d0e fd1d0Z3d2d3 Z4dd#e fd4d5Z5e#dddd#d#d6e fd7d6Z6e#dd8d9Z7e#ddd#d:e fd;d:Z8e#dddd#d<e fd=d<Z9e#ddd#d>e fd?d>Z:e#dddd#d@e fdAd@Z;e#dddd#d#dBe fdCdBZ<e#dddd#d#dDe fdEdDZ=e#dd#dd#dFe fdGdFZ>e#ddd#d#dHe fdIdHZ?ddJdKZ@e#e/j0dd#d#dLe fdMdLZAe#e/j0dd#d#dNe fdOdNZBdPdQ ZCdRdS ZDddTdUZEe#e/j0dd#d#dVe fdWdVZFe#e/j0dd#d#dVe fdXdYZGe#e/j0dd#d#dVe fdZd[ZHd\d] ZIddeJfd^d_ZKdd`daZLdbdc ZMddddeZNddfdgZOdhdi ZPdjeQfdkdlZRddmedmeddddndnf
dodpZSedmfdqdrZTdsdt ZUdudv ZVdwdx ZWddzd{ZXdd|d}ZYdS )a  
    DeepSpeed Communication Package: deepspeed.comm
    deepspeed.comm
        -- import and use deepspeed.ops.comm
        -- use torch.distributed directly if both this package and torch.distributed use the same NCCL version
        -- use custom collectives
            -- can either use torch.dist or ds.ops.comm?
        Note: the old 1-bit compressed allreduce variants that resided in deepspeed.runtime.comm will be moved here as well.
    deepspeed.comm API
        -- must be kept fully compatible (same signatures) as torch.dist API to ensure backward/cross-framework compatibility.
        -- e.g. if a client code used
            from deepspeed import comm as dist
            instead of
            import torch.distributed as dist
            The code should work without breaking any of the public torch.distributed functionality
    Future:
        -- deepspeed groups API should be brought into ds.comm
    N   )TORCH_DISTRIBUTED_DEFAULT_PORTdefault_pg_timeout   )*)get_accelerator)
CCLBackend)CommsLogger)timerget_caller_func)TorchBackend)utils)	timedeltac                   @   s   e Zd Zg fddZdS )ProcessGroupc                 C   s   || _ || _t|| _d S N)rankscomm_idlensize)selfr   r    r   G/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/comm/comm.py__init__=   s   zProcessGroup.__init__N)__name__
__module____qualname__r   r   r   r   r   r   ;   s    r   c                 C   s   | j r
t|  d S d S r   )comms_logger_enabledcomms_logger	configure)configr   r   r   _configure_using_config_fileC   s   r    c                 C   s`   | d ur	t | j |d ur|t_|d ur|t_|d ur|t_|d ur%|t_|d ur.|t_d S d S r   )r    comms_configr   enabledprof_allprof_opsverbosedebug)deepspeed_configr"   r#   r$   r%   r&   r   r   r   r   H   s   	

r   c                    s    fdd}|S )Nc                     s  t jr9d|v r|d st jsd|v r9|d t jv r9t }|| t g| R i |}t|t j}t	|
  zO | i |W t jrt   tjrPt  d|v rX|d sft jsfd|v r|d t jv rt|t j} j}t	|  t	|jdd}t |||| S S S S t jrt   tjrt  d|v r|d st jsd|v r|d t jv rt|t j} j}t	|  t	|jdd}t |||| w w w w )Nproflog_nameF)reset)r   r"   r#   r$   get_default_argsupdateget_msg_size_from_argsget_debug_log_namer&   timersstartr   synchronizecdb	using_mpibarrierr   stopelapsedappend)argskwargs	func_argsmsg_sizer)   raw_nametime_elapsedfuncr   r   log_wrapperg   sR   


ztimed_op.<locals>.log_wrapperr   )r?   r@   r   r>   r   timed_ope   s   rA   c                 C   s   t tdd}t tdd}| tkrtjd d S | tkr(tjd d S | tkr4tjd d S | t	krLt
||||datjd|  d	 d S | tkrXtjd
 d S tjd|  d	 d S )NRANKz-1
WORLD_SIZEz-NCCL backend in DeepSpeed not yet implementedz,MPI backend in DeepSpeed not yet implementedz-Gloo backend in DeepSpeed not yet implemented)rank
world_sizetimeoutinit_methodzInitialize z backendz-HCCL backend in DeepSpeed not yet implementedzDeepSpeed does not support )intosgetenvNCCL_BACKENDr   loggerr&   MPI_BACKENDGLOO_BACKENDCCL_BACKENDr   ccl_backendinfoHCCL_BACKEND)
ds_backendrF   rG   rD   r   r   r   r   init_deepspeed_backend   s   rT   c                   C   s   t d u rdS t  S NF)r2   is_initializedr   r   r   r   rV      s   rV   c                 C   s   t j| dS )Ngroup)r2   destroy_process_grouprW   r   r   r   rY      s   rY   c                 C   s"   t d urt  sJ dt | S NJDeepSpeed backend not set, please initialize it using init_process_group())r2   rV   	new_group)r   r   r   r   r\      s   
r\   returnc                   C   s   dS )NTr   r   r   r   r   is_available   s   r^   c                  C   s   t   } | tkrtd urt rtad S d S d S | tkr-td ur)t r+tad S d S d S | tkrAt	d ur=t	 r?t	ad S d S d S | t
krQtd urSt rUtad S d S d S d S r   )r   communication_backend_namerK   nccl_backendrV   r2   rM   mpi_backendrO   rP   rR   hccl_backend)backend_namer   r   r   set_backend   s$   
rd   F	broadcastc                 C      t j| |||dS )N)tensorsrcrX   async_op)r2   re   )rg   rh   rX   ri   r(   r)   r&   r   r   r   re         c                 C   rf   )Nobject_listrh   rX   device)r2   broadcast_object_listrk   r   r   r   rn      rj   rn   
all_gatherc                 C   rf   )N)tensor_listrg   rX   ri   )r2   ro   )rp   rg   rX   ri   r(   r)   r&   r   r   r   ro         	c                   C       t d urt  sJ dt  S rZ   )r2   rV   has_reduce_scatter_tensorr   r   r   r   rs         rs   c              	   C   sx   t d urt  sJ dt  rt| ||||||dS t dkr&tjd tt	
|t |}t| ||||||dS )Nr[   )oprX   ri   r(   r&   r   zunable to find torch.distributed.reduce_scatter_tensor. will fall back to torch.distributed.reduce_scatter which will result in suboptimal performance. please consider upgrading your pytorch installation.)r2   rV   rs   reduce_scatter_tensorget_rankr   rL   warning_oncelisttorchchunkget_world_sizereduce_scatter)output_tensorrg   ru   rX   ri   r(   r&   input_tensor_lstr   r   r   reduce_scatter_fn   s.   
r   rv   c                 C      t j| ||||dS )N)r~   input_tensorru   rX   ri   )r2   rv   )r~   rg   ru   rX   ri   r(   r)   r&   r   r   r   rv     s   
all_gather_into_tensorc                 C   rf   )N)r~   r   rX   ri   )r2   r   )r~   rg   rX   ri   r(   r)   r&   r   r   r   r   .  rq   c                   C   rr   rZ   )r2   rV   has_all_gather_into_tensorr   r   r   r   r   :  rt   r   c                 C   sp   t d urt  sJ dt  rt| ||||dS t dkr$tjd tt	
| t |}t|||||dS )Nr[   )rX   ri   r&   r   zunable to find torch.distributed.all_gather_into_tensor. will fall back to torch.distributed.all_gather which will result in suboptimal performance. please consider upgrading your pytorch installation.)r2   rV   r   r   rw   r   rL   rx   ry   rz   r{   r|   ro   )r~   r   rX   ri   r&   output_tensorsr   r   r   allgather_fnA  s   
r   all_to_all_singlec	           	      C   s   t j| |||||dS )N)outputinputoutput_split_sizesinput_split_sizesrX   ri   )r2   r   )	r   rg   r   r   rX   ri   r(   r)   r&   r   r   r   r   P  s   c                 C   rf   NrX   ri   )r2   
all_to_all)output_tensor_listinput_tensor_listrX   ri   r   r   r   r   c  rj   r   sendc                 C   rf   N)rg   dstrX   tagr2   r   rg   r   rX   r   r(   r)   r&   r   r   r   r   i  rj   recvc                 C   rf   N)rg   rh   rX   r   r2   r   rg   rh   rX   r   r(   r)   r&   r   r   r   r   o  rj   isendc                 C   rf   r   r   r   r   r   r   r   u  rj   irecvc                 C   rf   r   r   r   r   r   r   r   {  rj   gatherc                 C   r   )N)rg   gather_listr   rX   ri   )r2   r   )rg   r   r   rX   ri   r(   r)   r&   r   r   r   r        
scatterc                 C   r   )N)rg   scatter_listrh   rX   ri   )r2   r   )rg   r   rh   rX   ri   r(   r)   r&   r   r   r   r     r   r4   c                 C   s   t j| |dS r   )r2   r4   )rX   ri   
device_idsr(   r)   r&   r   r   r   r4     s   monitored_barrierc                 C   s   t j| ||dS )N)rX   rF   wait_all_ranks)r2   r   )rX   rF   r   r(   r)   r&   r   r   r   r     s   c                 C   sB   t dd t dkrtjd| d ntjd| d t dd d S )Nlog_summary_barrier)r)   r   T)	print_logshow_stragglerF)r4   r2   rw   r   log_all)r   r   r   r   log_summary  s
   
r   reducec                 C   r   )N)rg   r   ru   rX   ri   )r2   r   )rg   r   ru   rX   ri   r(   r)   r&   r   r   r   r     r   r}   c                 C   r   )N)r   
input_listru   rX   ri   )r2   r}   )r   r   ru   rX   ri   r(   r)   r&   r   r   r   r}     r   c                   C   s0   t durt  sJ dt jdusJ dt jS ) Nr[   z+has_all_reduce_coalesced is not yet defined)r2   rV   has_all_reduce_coalescedr   r   r   r   r     s
   r   c                   C   s0   t d urt  sJ dt jd usJ dt jS )Nr[   z)has_coalescing_manager is not yet defined)r2   rV   has_coalescing_managerr   r   r   r   r     s
   r   c                 C   s*   t d urt  sJ dt j| |||dS )Nr[   r   )r2   rV   all_gather_coalesced)r   input_tensorsrX   ri   r   r   r   r     s   r   
all_reducec                 C      t | |||S r   )r2   r   rg   ru   rX   ri   r(   r)   r&   r   r   r   r     s   c                 C   s   t | ||S r   )r2   inference_all_reducer   r   r   r   r     s   	r   c                 C   r   r   )r2   all_reduce_coalesced)tensorsru   rX   ri   r(   r)   r&   r   r   r   r     s   	r   c                   C   rr   rZ   )r2   rV   get_world_groupr   r   r   r   r     rt   r   c                 C   "   t durt  sJ dt | S )a1  
    Returns the number of processes in the current process group
    Args:
        group (ProcessGroup, optional): The process group to work on. If None,
            the default process group will be used.
    Returns:
        The world size of the process group
        -1, if not part of the group
    Nr[   )r2   rV   r|   rW   r   r   r   r|     s   
r|   c                 C   r   )a  
    Returns the rank of the current process in the provided ``group`` or the
    default group if none was provided.
    Rank is a unique identifier assigned to each process within a distributed
    process group. They are always consecutive integers ranging from 0 to
    ``world_size``.
    Args:
        group (ProcessGroup, optional): The process group to work on. If None,
            the default process group will be used.
    Returns:
        The rank of the process group
        -1, if not part of the group
    Nr[   )r2   rV   rw   rW   r   r   r   rw   )  s   
rw   c                   C   s   t durt  sJ dt S )z
        Helper function to get local rank after a backend has been set and initialized
        Args:
            None
        Returns:
            local rank (= GPU device ID)
    Nr[   )r2   rV   get_local_rank_from_launcherr   r   r   r   get_local_rank=  s   	r   c                 C   s$   t d urt  sJ dt | |S rZ   )r2   rV   get_global_rank)rX   
group_rankr   r   r   r   K  s   r   c              	   C   sX   t d urt  sJ dd}g }z	 |t | | |d7 }q ttfy+   Y |S w )Nr[   r   Tr   )r2   rV   r7   r   RuntimeError
ValueError)rX   rD   group_ranksr   r   r   get_all_ranks_from_groupR  s   r   c                 C   sz   t d urt  sJ dd }tt dr+tjdt j d|  d|  t | |}|S t dkr;tj	dt j d |S )	Nr[   init_device_meshz&Initializing mesh device with backend z                 with shape z and dim names r   Backend z, does not support mesh device initialization)
r2   rV   hasattrr   rL   rQ   namer   rw   rx   )
mesh_shapemesh_dim_namesmesh_devicer   r   r   initialize_mesh_devicea  s   

r   
group_namec                 C   sB   t d urt  sJ dtt drt |  d S tdt j d)Nr[   enable_symm_mem_for_groupr   z1 does not support symmetric memory initialization)r2   rV   r   r   r   r   )r   r   r   r   r   p  s   
r   Tc
                 C   s  t |d |du rtdu pt  }tdu r+tt  || t  tj	dt  tdu r<t
j r<t| ||adS |du rPtdurJt du sNJ ddS g d}
|rttdd	 |
s|rgtj	d
 t rst sst|d nt r|t|d nt||d tdurt rttdddkrtj	d dS dS t|tsJ | du rt  } ttdddkrtj	d|  t| ||||	adS )u   Initialize dist backend, potentially performing MPI discovery if needed

    Arguments:
        dist_backend: Optional (str). torch distributed backend, e.g., nccl, mpi, gloo, hccl
        auto_mpi_discovery Optional (bool). if distributed environment variables are not set, attempt to discover them from MPI
        distributed_port: Optional (int). torch distributed backend port
        verbose: Optional (bool). verbose logging
        timeout: Optional (timedelta). Timeout for operations executed against the process group. The default value of 30 minutes can be overridden by the environment variable `DEEPSPEED_TIMEOUT`.
        init_method: Optional (string). Torch distributed, URL specifying how to initialize the process group. Default is “env://” if no init_method or store is specified.
        config: Optional (dict). DeepSpeed configuration for setting up comms options (e.g. Comms profiling)
        rank: Optional (int). The current manually specified rank. Some init_method like “tcp://” need the rank and world_size as well (see: https://pytorch.org/docs/stable/distributed.html#tcp-initialization)
        world_size: Optional (int). Desired world_size for the TCP or Shared file-system initialization.
    )r'   Nzcdb=FTzDistributed backend is not initialized. Please set dist_init_required to True or initialize before calling deepspeed.initialize())rB   rC   MASTER_ADDRMASTER_PORT
LOCAL_RANKc                 S   s
   | t jv S r   rI   environ)vr   r   r   <lambda>  s   
 z"init_distributed.<locals>.<lambda>zRNot using the DeepSpeed or dist launchers, attempting to detect MPI environment...r%   )distributed_portr%   rB   0r   z'Distributed backend already initializedz6Initializing TorchBackend in DeepSpeed with backend {})r   r2   rV   rT   r   r_   rd   r   rL   rQ   rz   distributedr   allmapin_amlin_dlts$patch_aml_env_for_torch_nccl_backend	in_aws_sm'patch_aws_sm_env_for_torch_nccl_backendmpi_discoveryrH   rI   rJ   
isinstancer   format)dist_backendauto_mpi_discoveryr   r%   rF   rG   dist_init_requiredr   rD   rE   required_envr   r   r   init_distributed|  sB   

r   c              
      s  ddl m} ddl}|j}| }| }d}|dkrJddl}z|d}	||	}
|
	d d }W n |j
yI   ddl}|| }Y nw |j|dd}|  | }t fdd|d| D }t|tjd	< t|tjd
< t|tjd< |tjd< t| tjd< |rtjdtjd	 tjd tjd
 tjd tjd  tdurt rt |ksJ d|t t |ksJ d|t dS dS dS )zM
    Discovery MPI environment via mpi4py and map to relevant dist state
    r   )MPINzhostname -Izutf-8)rootc                    s   g | ]}| kqS r   r   ).0i	proc_namer   r   
<listcomp>  s    z!mpi_discovery.<locals>.<listcomp>rB   rC   r   r   r   zfDiscovered MPI settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}z(MPI rank {} does not match torch rank {}z4MPI world size {} does not match torch world size {})mpi4pyr   
subprocess
COMM_WORLDGet_rankGet_sizeshlexsplitcheck_outputdecodeCalledProcessErrorsocketgethostbynamegethostnamebcastGet_processor_name	allgathersumstrrI   r   r   rL   rQ   r   r2   rV   rw   r|   )r   r%   r   r   commrD   rE   master_addrr   hostname_cmdresultr   	all_procs
local_rankr   r   r   r     sN   



" r   c                   C   
   dt jv S )NAZUREML_EXPERIMENT_IDr   r   r   r   r   r        
r   c                   C   r  )NSM_TRAINING_ENVr   r   r   r   r   r     r  r   c                   C   r  )NDLTS_JOB_IDr   r   r   r   r   r     r  r     c              
   C   s  t jd t jd< t jd t jd< tt jd tt jd k}|s<t jd d}|d t jd	< d
t jvr;t| t jd
< nt jd t jd	< tt jd
< |rWtjd	t jd  t
t jd< t jd t jd< |rtjd	t jd t jd t jd t jd	 t jd
  dS dS )zHelper routine to get and set environment variables.
    This is adapted from Azure ML's documentation available from:
    https://azure.github.io/azureml-web/docs/cheatsheet/distributed-training/#environment-variables-from-openmpi
    OMPI_COMM_WORLD_RANKrB   OMPI_COMM_WORLD_SIZErC   OMPI_COMM_WORLD_LOCAL_SIZEAZ_BATCH_MASTER_NODE:r   r   r   AZ_BATCHAI_MPI_MASTER_NODEz&NCCL_SOCKET_IFNAME original value = {}NCCL_SOCKET_IFNAMEOMPI_COMM_WORLD_LOCAL_RANKr   zjDiscovered AzureML settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}N)rI   r   rH   r   r   DEFAULT_AML_MASTER_PORTr   rL   rQ   r   DEFAULT_AML_NCCL_SOCKET_IFNAME)master_portr%   single_nodemaster_node_paramsr   r   r   r     s.   


"r   c              
   C   st   t jd t jd< t jd t jd< t jd t jd< | r8tjdt jd t jd t jd t jd t jd	  d
S d
S )zjHelper routine to get and set environment variables when running inside an AWS SageMaker environment.
    r  rB   r  r   r  rC   zpDiscovered AWS SageMaker settings of world_rank={}, local_rank={}, world_size={}, master_addr={}, master_port={}r   r   N)rI   r   r   rL   rQ   r   r   r   r   r   r     s   "r   )NNNNNNr   )NNrU   )F)Nr   )r
  T)T)Z__doc__rz   rI   	constantsr   r   	reduce_opdeepspeed.acceleratorr   deepspeed.comm.cclr   deepspeed.utils.comms_loggingr	   deepspeed.utilsr
   r   deepspeed.comm.torchr   	deepspeedr   datetimer   r2   SynchronizedWallClockTimerr/   timer_summaryr   r`   ra   rP   rb   deepspeed.comm.utilsr   r    r   rA   rT   rV   rY   r\   boolr^   rd   re   rn   ro   rs   ReduceOpSUMr   rv   r   r   r   r   r   r   r   r   r   r   r   r4   r   r   r   r}   r   r   r   r   r   r   r   rH   r|   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   sz  
*

	
!


	




E-
 