o
    TiVK                     @   s   d dl Z d dl mZ ddlT ddlT ddlT ddlmZ d dlmZ d dlZda	da
dadadad	d
 Zdd Zdd Zdd Zdd Zd#ddZd#ddZd#ddZd#ddZd#ddZd#ddZG dd  d ZG d!d" d"eZdS )$    N)utils   )*   )compiler)required_torch_versionFc                 C   s   t ddr| S t| S )Ngffffff@min_version)r   r   disable)func r   H/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/comm/torch.pydisable_compiler_collective   s   

r   c                  C   sF   t  d} | d u stjj| j sd S |  }td|   d |S )NShareMemCommBuilderz
DeepSpeed z built successfully)	get_acceleratorcreate_op_builder	deepspeedops__compatible_ops__NAMEloadprintabsolute_name)buildershm_cpp_moduler   r   r   build_shm_op   s   r   c                  C   s   t tjd} | ot tjjdS )Ndistributed_c10d_coalescing_manager)hasattrtorchdistributedr   )has_c10dr   r   r   has_coalescing_manager&   s   r"   c                   C   s   t tjdo
tddS )Nall_reduce_coalescedgGz?r   )r   r   r    r   r   r   r   r   has_all_reduce_coalesced+   s   r$   c                 C   sN   t dddrtjjj| ||dS t ddrtjjj| ||dS tjj| |S )Ng       @)r	   max_version)devicereqsg @r   )r&   	async_ops)r   r   r    r   r   )groupr&   r'   async_opr   r   r   get_coalescing_manager/   s
   
r+   c                 C      | a d S N)DS_COMM_ALL_GATHER_OFFflagr   r   r   all_gather_comm_off:      r1   c                 C   r,   r-   )DS_COMM_REDUCE_SCATTER_OFFr/   r   r   r   reduce_scatter_comm_off?   r2   r4   c                 C   r,   r-   )DS_COMM_BROADCAST_OFFr/   r   r   r   broadcast_comm_offD   r2   r6   c                 C   r,   r-   )DS_COMM_ALL_REDUCE_OFFr/   r   r   r   all_reduce_comm_offI   r2   r8   c                 C   r,   r-   )DS_COMM_REDUCE_OFFr/   r   r   r   reduce_comm_offN   r2   r:   c                 C   s   t |  t|  d S r-   )r1   r4   r/   r   r   r   backward_comm_offU   s   r;   c                   @   s   e Zd Zdd ZdS )Noopc                 C   s   d S r-   r   selfr   r   r   wait\   s   z	Noop.waitN)__name__
__module____qualname__r?   r   r   r   r   r<   Z   s    r<   c                       s:  e Zd ZdZdU fdd	Zeedd Zeedd	 Zd
d Z	dd Z
dd ZeejjjddfddZdVddZeejjjddfddZeejddfddZeejddfddZedWddZedXddZedWd d!ZedWd"d#ZedWd$d%ZedWd&d'Zeejddfd(d)Ze				dYd*d+ZedWd,d-ZedZd/d0Zed[d1d2ZedZd3d4Z ed[d5d6Z!ed\d7d8Z"ed\d9d:Z#eejj$j%ddfd;d<Z&eejj$j%ddfd=d>Z'dVd?d@Z(dVdAdBZ)dCdD Z*dVdEdFZ+dGdH Z,dIdJ Z-dKdL Z.dVdMdNZ/dOdP Z0dQdR Z1dSdT Z2  Z3S )]TorchBackenda8  
        A light-weight wrapper class for torch.distributed API.
        Only a subset of functions are wrapped. Once the init_process_group
        is initialized, standard torch.distributed.* can be used directly
        so no need to wrap all the functions. We can keep adding wrappers as
        needed.
    r   c                    s   t t|   t | _t | _t | _|  | _| 	 | _
d| _|| _d| _| ||||| | jd krA| j|  |   d S d S )NT)superrC   __init__r   shm_comm_opr$   r"   get_all_gather_functionall_gather_functionget_reduce_scatter_functionreduce_scatter_functioninitializednamesingle_gpu_modeinit_process_group
initializeget_world_sizeget_rank)r>   backendtimeoutinit_methodrank
world_sizerM   	__class__r   r   rF   i   s   


zTorchBackend.__init__c                 C   ,   t tjdr
tjjS t tjdrtjjS d S )Nall_gather_into_tensor_all_gather_base)r   r   r    r[   r\   r=   r   r   r   rH   z   
   z$TorchBackend.get_all_gather_functionc                 C   rZ   )Nreduce_scatter_tensor_reduce_scatter_base)r   r   r    r^   r_   r=   r   r   r   rJ      r]   z(TorchBackend.get_reduce_scatter_functionc                 C   
   | j d uS r-   )rI   r=   r   r   r   has_all_gather_into_tensor      
z'TorchBackend.has_all_gather_into_tensorc                 C   r`   r-   )rK   r=   r   r   r   has_reduce_scatter_tensor   rb   z&TorchBackend.has_reduce_scatter_tensorc                 C   s4   t j st jj|||||d t j dk| _d S )N)rT   rU   rV   rW   mpi)r   r    is_initializedrO   get_backend	using_mpi)r>   rS   rT   rU   rV   rW   r   r   r   rO      s   
zTorchBackend.init_process_groupNFc                 C   s   |  |}tjj||||dS )Ntensoropr)   r*   )
_reduce_opr   r    
all_reduce)r>   ri   rj   r)   r*   r   r   r   rl      s   
zTorchBackend.all_reducec                 C   sF   t tjdrt tjjds| |}tjj|||ddS tjj|S )Nr   inference_all_reduce_Frh   )r   r   r   r   rk   r    rl   rm   )r>   ri   rj   r)   r   r   r   inference_all_reduce   s   
z!TorchBackend.inference_all_reducec                 C   s6   | j stdtj d| |}tjj||||dS )zs proxy func to torch.distributed.all_reduce_coalesced,
        which is included in PyTorch 1.13 and above
        zQCurrent torch version does not have all_reduce_coalesced api (torch.__version__: ))tensorsrj   r)   r*   )r$   RuntimeErrorr   __version__rk   r    r#   )r>   rp   rj   r)   r*   r   r   r   r#      s   

z!TorchBackend.all_reduce_coalescedc                 C   F   t rttdddkrtjd t S tj	j
||| |||dS )NRANK0r   zREDUCE is OFF)ri   dstrj   r)   r*   )r9   intosgetenvr   loggerwarningr<   r   r    reducerk   )r>   ri   rv   rj   r)   r*   r   r   r   r|      s
   zTorchBackend.reducec                 C   rs   )Nrt   ru   r   zREDUCE SCATTER  is OFF)output
input_listrj   r)   r*   )r3   rw   rx   ry   r   rz   r{   r<   r   r    reduce_scatterrk   )r>   r}   r~   rj   r)   r*   r   r   r   r      s   zTorchBackend.reduce_scatterc                 C   >   t rttdddkrtjd t S tj	j
||||dS )Nrt   ru   r   zBROADCAST  is OFF)ri   srcr)   r*   )r5   rw   rx   ry   r   rz   r{   r<   r   r    	broadcast)r>   ri   r   r)   r*   r   r   r   r      
   zTorchBackend.broadcastc                 C      t jj||||dS )N)object_listr   r)   r&   )r   r    broadcast_object_list)r>   r   r   r)   r&   r   r   r   r         z"TorchBackend.broadcast_object_listc                 C   r   )Nrt   ru   r   All Gather is OFF)tensor_listri   r)   r*   )r.   rw   rx   ry   r   rz   r{   r<   r   r    
all_gather)r>   r   ri   r)   r*   r   r   r   r      r   zTorchBackend.all_gatherc                 C   s   |   r| j||||dS d S )Noutput_tensorinput_tensorr)   r*   )ra   rI   r>   r   r   r)   r*   r   r   r   r[      s   z#TorchBackend.all_gather_into_tensorc                 C   sX   t rttdddkrtjd t S | jr#t	j
jj||||dS tjd 	 d S )Nrt   ru   r   r   r   zunable to find torch.distributed._all_gather_base. will fall back to torch.distributed.reduce_scatter which will result in suboptimal performance. please consider upgrading your pytorch installation.)r.   rw   rx   ry   r   rz   r{   r<   has_allgather_baser   r    r   r\   r   r   r   r   all_gather_base   s   
zTorchBackend.all_gather_basec           	      C   s   t |t |ksJ d ttjjdrtjjj||||dS t reg }t||d j||" t	||D ]\}}tjjj
|||dd}|| q3W d   n1 sRw   Y  |r]|d S |d   dS dS ) _all_gather_base_coalescedr)   r*   r   TNrD   )lenr   r   r    r   r   r"   r+   r&   zipr[   appendr?   )	r>   output_tensorsinput_tensorsr)   r*   r'   r}   inputhandler   r   r   all_gather_coalesced   s.   

z!TorchBackend.all_gather_coalescedc                 C   s4   |   r| j||| |||dS tjd 	 d S )N)rj   r)   r*   zunable to find torch.distributed.reduce_scatter_tensor. will fall back to torch.distributed.reduce_scatter which will result in suboptimal performance. please consider upgrading your pytorch installation.)rc   rK   rk   r   rz   r{   )r>   r   r   rj   r)   r*   r   r   r   r^     s   z"TorchBackend.reduce_scatter_tensorc                 C   s   t jj||||||dS )N)r}   r   output_split_sizesinput_split_sizesr)   r*   )r   r    all_to_all_single)r>   r}   r   r   r   r)   r*   r   r   r   r     s   zTorchBackend.all_to_all_singlec                 C   r   )Nr   )r   r    
all_to_all)r>   output_tensor_listinput_tensor_listr)   r*   r   r   r   r   ,  r   zTorchBackend.all_to_allr   c                 C   r   N)ri   rv   r)   tag)r   r    sendr>   ri   rv   r)   r   r   r   r   r   0  r   zTorchBackend.sendc                 C   r   N)ri   r   r)   r   )r   r    recvr>   ri   r   r)   r   r   r   r   r   4  r   zTorchBackend.recvc                 C   r   r   )r   r    isendr   r   r   r   r   8  r   zTorchBackend.isendc                 C   r   r   )r   r    irecvr   r   r   r   r   <  r   zTorchBackend.irecvc                 C      t jj|||||dS )N)ri   gather_listrv   r)   r*   )r   r    gather)r>   ri   r   rv   r)   r*   r   r   r   r   @     zTorchBackend.gatherc                 C   r   )N)ri   scatter_listr   r)   r*   )r   r    scatter)r>   ri   r   r   r)   r*   r   r   r   r   H  r   zTorchBackend.scatterc                 C   $   |d u r	t jjj}t jj|||dS )N)r)   r*   
device_ids)r   r    GroupMemberWORLDbarrier)r>   r)   r*   r   r   r   r   r   P     
zTorchBackend.barrierc                 C   r   )N)r)   rT   wait_all_ranks)r   r    r   r   monitored_barrier)r>   r)   rT   r   r   r   r   r   V  r   zTorchBackend.monitored_barrierc                 C      t jj|dS N)r)   )r   r    rR   r>   r)   r   r   r   rR   \     zTorchBackend.get_rankc                 C   r   r   )r   r    rQ   r   r   r   r   rQ   _  r   zTorchBackend.get_world_sizec                 C   s
   t j S r-   )r   r    re   r=   r   r   r   re   b  rb   zTorchBackend.is_initializedc                 C   r   r   )r   r    rf   r   r   r   r   rf   e  r   zTorchBackend.get_backendc                 C   s   t j|S r-   )r   r    	new_group)r>   ranksr   r   r   r   h  s   zTorchBackend.new_groupc                 C   s2   t tjjdrddlm} nddlm} |||S )Nget_global_rankr   )r   )_get_global_rank)r   r   r    r   "torch.distributed.distributed_c10dr   r   )r>   r)   
group_rankr   r   r   r   r   k  s   
zTorchBackend.get_global_rankc                 C   s
   t jjjS r-   )r   r    r)   r   r=   r   r   r   get_world_groupr  rb   zTorchBackend.get_world_groupc                 C   r   r   )r   r    destroy_process_groupr   r   r   r   r   u  r   z"TorchBackend.destroy_process_groupc                 C   s   t |tjjse|tjkrtjjj}|S |tjkrtjjj}|S |tjkr+tjjj}|S |tjkr7tjjj}|S |tjkrCtjjj}|S |tj	krOtjjj	}|S |tj
kr[tjjj
}|S |tjkretjjj}|S )zm
            Helper function. If the op provided is not a torch.dist.ReduceOp, convert it and return
        )
isinstancer   r    ReduceOpSUMPRODUCTAVGMINMAXBANDBORBXOR)r>   rj   r   r   r   rk   x  s2   







	







zTorchBackend._reduce_opc                 C   sZ   t ddstdtj dt dds tjjjt  ||dS tjjjt 	 ||dS )Ng@r   zGCurrent torch version does not have device meshapi (torch.__version__: ro   g333333@)r%   )mesh_dim_names)
r   rq   r   rr   r    device_meshinit_device_meshr   device_namecurrent_device_name)r>   
mesh_shaper   r   r   r   r     s   


zTorchBackend.init_device_meshc                 C   s.   t ddstdtj ddlm} ||S )Ng      @r   zNTorch version must be 2.5 or higher to use symmetric memory. Current version: r   )enable_symm_mem_for_group)r   rq   r   rr   #torch.distributed._symmetric_memoryr   )r>   
group_namer   r   r   r   r     s   
z&TorchBackend.enable_symm_mem_for_group)rD   rD   r   r-   )NF)NN)NNNF)Nr   )NNr   )Nr   NF)4r@   rA   rB   __doc__rF   classmethodr   rH   rJ   ra   rc   rO   r   r    r   r   rl   rn   r#   r|   r   r   r   r   r[   r   r   r^   r   r   r   r   r   r   r   r   r   r   r   r   rR   rQ   re   rf   r   r   r   r   rk   r   r   __classcell__r   r   rX   r   rC   `   s    	





rC   )F)r   r   rS   commruntimer   deepspeed.utils.torchr   rx   r.   r3   r5   r7   r9   r   r   r"   r$   r+   r1   r4   r6   r8   r:   r;   r<   BackendrC   r   r   r   r   <module>   s4   	





