o
    پi:                     @   s   d dl Z d dlmZ d dlmZmZ d dlZd dlmZ	 d dlm
Z
mZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZ e eZG dd	 d	ZdS )
    N)contextmanager)OptionalUnion)ProcessGroupReduceOp)NCCLLibrarybuffer_typecudaStream_t
ncclComm_tncclDataTypeEnumncclRedOpTypeEnumncclUniqueId)StatelessProcessGroup)get_current_device_stream_fastc                   @   s  e Zd Z		d3deeef deeeej	f de
e defddZd	e
ejj fd
dZejdfdejdefddZdejdfdejde
ej dede
ej fddZ		d4dejdejde
ee  fddZ		d4dejdejde
ee  fddZejddfdejdejdede
ee  fddZd5dejdefdd Zd5dejd!efd"d#Zd5dejd!efd$d%Zd&ed'efd(d)Zd*d+ Zd,d- Zd.d/ Ze 	d4d0e
e d	e
ejj fd1d2Z!dS )6PyNcclCommunicatorNFgroupdevicelibrary_pathuse_current_streamc                 C   sJ  t |ts%t sJ t|tjjksJ dt|| _t	|| _
n|j| _|j
| _
|| _| j
dkr@d| _d| _d| _dS zt|| _W n tyZ   d| _d| _d| _Y dS w d| _d| _|| _| j | _| jdkrxtd| j  | jdkr| j | _nt | _t |tstt| jj}t|}tj ||d |d |! }t"|D ]
\}}	|	| jj|< qn	|j#| jdd	| _t |t$rt%d
| }n
t |t&rt%|}t |tj%sJ || _%tj'%|, | j(| j
| j| j| _)tj'* | _tj+d|d}
| ,|
 | j-  ~
W d   n	1 sw   Y  d| _dS )a  
        Args:
            group: the process group to work on. If None, it will use the
                default process group.
            device: the device to bind the PyNcclCommunicator to. If None,
                it will be bind to f"cuda:{local_rank}".
            library_path: the path to the NCCL library. If None, it will
                use the default library path.
        It is the caller's responsibility to make sure each communicator
        is bind to a unique device.
        z:PyNcclCommunicator should be attached to a non-NCCL group.   FTNr   zsglang is using nccl==%s)srcr   )r   zcuda:)r   ).
isinstancer   distis_initializedget_backendBackendNCCLget_rankrankget_world_size
world_sizer   	availabledisabledstreamr   nccl	Exceptionr   ncclGetRawVersionnccl_versionloggerinfoncclGetVersionncclGetUniqueId	unique_idr   torch
ByteTensorlistinternalget_process_group_ranks	broadcasttolist	enumeratebroadcast_objintr   strcudancclCommInitRankcommStreamzeros
all_reducesynchronize)selfr   r   r   r   tensorranks	byte_listibytedata rF   f/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/distributed/device_communicators/pynccl.py__init__   st   











zPyNcclCommunicator.__init__r#   c                 C   s   |dur|S | j rt S | jS )aV  Return the stream to use for NCCL calls.

        Behavior mirrors the previous inline logic:
        - if an explicit stream is provided, return it
        - if stream is None and self.use_current_stream is True, return
          torch.cuda.current_stream()
        - otherwise return the communicator's default stream (self.stream)
        N)r   r   r#   )r?   r#   rF   rF   rG   _resolve_stream   s
   	z"PyNcclCommunicator._resolve_streamr@   opc              
   C   s~   | j rd S |j| jksJ d| j d|j | |}| jt| t| | t	|j
t	|| jt|j d S N-this nccl communicator is created to work on , but the input tensor is on )r"   r   rI   r$   ncclAllReducer   data_ptrnumelr   
from_torchdtyper   r:   r	   cuda_stream)r?   r@   rJ   r#   rF   rF   rG   r=      s"   




zPyNcclCommunicator.all_reduce	in_tensor
out_tensorreturnc              
   C   s   | j rd S |j| jksJ d| j d|j |d u r t|}| |}| jt| t| |	 t
|jt|| jt|j |S rK   )r"   r   r-   
empty_likerI   r$   rN   r   rO   rP   r   rQ   rR   r   r:   r	   rS   )r?   rT   rU   rJ   r#   rF   rF   rG   outplace_all_reduce   s(   





	z&PyNcclCommunicator.outplace_all_reduceoutput_tensorinput_tensorsizesc           	      C   s   | j rd S |j| jksJ d| j d|j | |}|d urad}| j  t|D ].\}}||||  }| jt| t| |	 t
|j|| jt|j ||7 }q+| j  d S | jt| t| |	 t
|j| jt|j d S )NrL   rM   r   )r"   r   rI   r$   ncclGroupStartr4   ncclBroadcastr   rO   rP   r   rQ   rR   r:   r	   rS   ncclGroupEndncclAllGather)	r?   rY   rZ   r#   r[   split_offsetroot
split_size	dst_slicerF   rF   rG   
all_gather   s@   






	


zPyNcclCommunicator.all_gatherc              	   C   sl   |j | j ksJ d| j  d|j  | |}| jt| t| | t|j	| j
t|j dS )z
        Currently, it is mainly used in context parallelism,
        primarily leveraging pynccl to implement non-blocking allgather communication.
        rL   rM   N)r   rI   r$   r_   r   rO   rP   r   rQ   rR   r:   r	   rS   )r?   rY   rZ   r#   r[   rF   rF   rG   cp_all_gather_into_tensor   s   




z,PyNcclCommunicator.cp_all_gather_into_tensorc           
      C   s  | j rd S |j| jksJ d| j d|j | |}|d urgd}| j  t|D ]4\}}|||| df }	| jt|	 t| |		 t
|jt||| jt|j ||7 }q+| j  d S | jt| t| |	 t
|jt|| jt|j d S )NrL   rM   r   .)r"   r   rI   r$   r\   r4   
ncclReducer   rO   rP   r   rQ   rR   r   r:   r	   rS   r^   ncclReduceScatter)
r?   rY   rZ   rJ   r#   r[   r`   ra   rb   chunkrF   rF   rG   reduce_scatter  sD   










z!PyNcclCommunicator.reduce_scatterdstc              	   C   n   | j rd S |j| jksJ d| j d|j | |}| jt| | t	|j
|| jt|j d S rK   )r"   r   rI   r$   ncclSendr   rO   rP   r   rQ   rR   r:   r	   rS   )r?   r@   rj   r#   rF   rF   rG   send=      



zPyNcclCommunicator.sendr   c              	   C   rk   rK   )r"   r   rI   r$   ncclRecvr   rO   rP   r   rQ   rR   r:   r	   rS   )r?   r@   r   r#   rF   rF   rG   recvN  rn   zPyNcclCommunicator.recvc              
   C   s   | j rd S |j| jksJ d| j d|j | |}|| jkr.t| }t| }n	t }t| }| j||| t	
|j|| jt|j d S rK   )r"   r   rI   r   r   rO   r$   r]   rP   r   rQ   rR   r:   r	   rS   )r?   r@   r   r#   sendbuffrecvbuffrF   rF   rG   r2   _  s,   



zPyNcclCommunicator.broadcastptrsizec                 C   s   | j | jt||dS )Nr   )r$   ncclCommWindowRegisterr:   r   )r?   rs   rt   rF   rF   rG   register_comm_window_rawy  s   z+PyNcclCommunicator.register_comm_window_rawc                 C   s   | j | j|S N)r$   ncclCommWindowDeregisterr:   )r?   windowrF   rF   rG   deregister_comm_window|  s   z)PyNcclCommunicator.deregister_comm_windowc                 C      | j   d S rw   )r$   r\   r?   rF   rF   rG   group_start     zPyNcclCommunicator.group_startc                 C   r{   rw   )r$   r^   r|   rF   rF   rG   	group_end  r~   zPyNcclCommunicator.group_endenablec                 c   sN    |du r| j }|du r| j}| j}| j}|| _| | _dV  || _|| _dS )zL
        A context manager to change the state of the communicator.
        N)r!   r#   r"   )r?   r   r#   old_disable
old_streamrF   rF   rG   change_state  s   
zPyNcclCommunicator.change_state)NF)NNrw   )"__name__
__module____qualname__r   r   r   r6   r7   r-   r   r   boolrH   r8   r;   rI   r   SUMTensorr=   rX   r/   rd   re   ri   rm   rp   r2   rv   rz   r}   r   r   r   rF   rF   rF   rG   r      s    

d

!

1

 

0
r   )logging
contextlibr   typingr   r   r-   torch.distributeddistributedr   r   r   :sglang.srt.distributed.device_communicators.pynccl_wrapperr   r   r	   r
   r   r   r   sglang.srt.distributed.utilsr   sglang.srt.utils.commonr   	getLoggerr   r(   r   rF   rF   rF   rG   <module>   s   $	
