o
    -iN6                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZ d dlmZ d dlmZ d dlmZ eeZdadd	 ZG d
d dZdS )    N)ProcessGroupReduceOp)NCCLLibrarybuffer_typecudaStream_t
ncclComm_tncclDataTypeEnumncclRedOpTypeEnumncclUniqueId)StatelessProcessGroup)init_logger)current_streamFc                    sh   ddl m  ddlm} trd S dadtjdtjf fdd}dtjdtjfd	d
}|d||d d S )Nr   )nccl_symm_mem_context)direct_register_custom_opTinput_tensorreturnc                    sV     t | }t | }W d    n1 sw   Y  ||  ||}|S N)torch
empty_likecopy_
all_reduce)r   
symm_inputsymm_outputr   pynccl_comm i/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/vllm/distributed/device_communicators/pynccl.py#all_reduce_symmetric_with_copy_impl(   s   


zHregister_nccl_symmetric_ops.<locals>.all_reduce_symmetric_with_copy_implc                 S   s
   t | S r   )r   r   )r   r   r   r   #all_reduce_symmetric_with_copy_fake0   s   
zHregister_nccl_symmetric_ops.<locals>.all_reduce_symmetric_with_copy_fakeall_reduce_symmetric_with_copy)op_nameop_func	fake_impl)6vllm.distributed.device_communicators.pynccl_allocatorr   vllm.utils.torch_utilsr   _NCCL_SYMM_OPS_REGISTEREDr   Tensor)r   r   r   r   r   r   r   register_nccl_symmetric_ops   s   
r'   c                	   @   sf  e Zd Z	d-deeB deeB ejB dedB fddZ	de
jdfdejdejd	e
d
ejfddZ	d-dejdejfddZ	d-dejdejdee fddZe
jdfdejdejd	e
fddZe
jdfdejdejdee d	e
fddZd-dejdefddZd-dejdefddZd-dejdefdd Zd!d" Zd#d$ Zdejfd%d&Zd'ed(efd)d*Zd+d, ZdS ).PyNcclCommunicatorNgroupdevicelibrary_pathc                 C   s,  t |ts%t sJ t|tjjksJ dt|| _t	|| _
n|j| _|j
| _
|| _| j
dks8tjr@d| _d| _dS zt|| _W n tyW   d| _d| _Y dS w d| _d| _| j | _| jdkr{| j | _tjd| j dd	 nt | _t |tstt| jj}t|}tj ||d |d
 |! }t"|D ]
\}}|| jj|< qn	|j#| jdd| _t |t$rt%d| }n
t |t&rt%|}t |tj%sJ || _%tj'%|) | j(| j
| j| j| _)t* }	tj+d|d}
| ,|
 |	-  ~
W d   dS 1 sw   Y  dS )a  
        Args:
            group: the process group to work on. If None, it will use the
                default process group.
            device: the device to bind the PyNcclCommunicator to. If None,
                it will be bound to f"cuda:{local_rank}".
            library_path: the path to the NCCL library. If None, it will
                use the default library path.
        It is the caller's responsibility to make sure each communicator
        is bind to a unique device.
        z:PyNcclCommunicator should be attached to a non-NCCL group.   FTNr   zvLLM is using nccl==%slocal)scope)srcr)   )r/   zcuda:)r*   ).
isinstancer   distis_initializedget_backendBackendNCCLget_rankrankget_world_size
world_sizer)   envsVLLM_DISABLE_PYNCCL	availabledisabledr   nccl	ExceptionncclGetRawVersionnccl_versionncclGetUniqueId	unique_idlogger	info_oncencclGetVersionr
   r   
ByteTensorlistinternalget_process_group_ranks	broadcasttolist	enumeratebroadcast_objintr*   strcudancclCommInitRankcommr   zerosr   synchronize)selfr)   r*   r+   tensorranks	byte_listibytestreamdatar   r   r   __init__;   sn   







$zPyNcclCommunicator.__init__	in_tensor
out_tensoropr   c              
   C   s   | j rd S |j| jksJ d| j d|j |d u r t|}|d u r't }| jt| t| |	 t
|jt|| jt|j |S N-this nccl communicator is created to work on , but the input tensor is on )r=   r*   r   r   r   r>   ncclAllReducer   data_ptrnumelr   
from_torchdtyper	   rS   r   cuda_stream)rV   r_   r`   ra   r\   r   r   r   r      s*   




	zPyNcclCommunicator.all_reduceoutput_tensorr   c              	   C   sz   | j rd S |j| jksJ d| j d|j |d u rt }| jt| t| | t	|j
| jt|j d S rb   )r=   r*   r   r>   ncclAllGatherr   rf   rg   r   rh   ri   rS   r   rj   )rV   rk   r   r\   r   r   r   
all_gather   s"   



zPyNcclCommunicator.all_gathersizesc           	      C   s   | j rd S |j| jksJ d| j d|j |d u rt }|jd t|ks)J d}| j  t|D ].\}}||||  }| jt	|
 t	|
 | t|j|| jt|j ||7 }q4| j  d S )Nrc   rd   r   )r=   r*   r   shapesumr>   ncclGroupStartrM   ncclBroadcastr   rf   rg   r   rh   ri   rS   r   rj   ncclGroupEnd)	rV   rk   r   rn   r\   split_offsetroot
split_size	dst_slicer   r   r   all_gatherv   s2   





	zPyNcclCommunicator.all_gathervc              
   C   s   | j rd S |j| jksJ d| j d|j |d u rt }| jt| t| | t	|j
t	|| jt|j d S rb   )r=   r*   r   r>   ncclReduceScatterr   rf   rg   r   rh   ri   r	   rS   r   rj   )rV   rk   r   ra   r\   r   r   r   reduce_scatter   s$   



z!PyNcclCommunicator.reduce_scatterc           
      C   s   | j rd S |j| jksJ d| j d|j |d u rt }d}| j  t|D ]4\}}|||| df }	| jt|	 t| |		 t
|jt||| jt|j ||7 }q)| j  d S )Nrc   rd   r   .)r=   r*   r   r>   rq   rM   
ncclReducer   rf   rg   r   rh   ri   r	   rS   r   rj   rs   )
rV   rk   r   rn   ra   r\   rt   ru   rv   chunkr   r   r   reduce_scatterv  s2   






z"PyNcclCommunicator.reduce_scattervrW   dstc              	   C   r   | j rd S |j| jksJ d| j d|j |d u rt }| jt| | t	|j
|| jt|j d S rb   )r=   r*   r   r>   ncclSendr   rf   rg   r   rh   ri   rS   r   rj   )rV   rW   r~   r\   r   r   r   send2  "   


zPyNcclCommunicator.sendr/   c              	   C   r   rb   )r=   r*   r   r>   ncclRecvr   rf   rg   r   rh   ri   rS   r   rj   )rV   rW   r/   r\   r   r   r   recvD  r   zPyNcclCommunicator.recvc              
   C   s   | j rd S |j| jksJ d| j d|j |d u rt }|| jkr0t| }t| }n	t }t| }| j||| t	
|j|| jt|j d S rb   )r=   r*   r   r7   r   rf   r>   rr   rg   r   rh   ri   rS   r   rj   )rV   rW   r/   r\   sendbuffrecvbuffr   r   r   rK   V  s.   


zPyNcclCommunicator.broadcastc                 C      | j   d S r   )r>   rq   rV   r   r   r   group_startp     zPyNcclCommunicator.group_startc                 C   r   r   )r>   rs   r   r   r   r   	group_ends  r   zPyNcclCommunicator.group_endc                 C   s(   | j | jt| | |  dS Nr,   )r>   ncclCommWindowRegisterrS   r   rf   rg   element_size)rV   rW   r   r   r   register_comm_windowv  s   
z'PyNcclCommunicator.register_comm_windowptrsizec                 C   s   | j | jt||dS r   )r>   r   rS   r   )rV   r   r   r   r   r   register_comm_window_raw~  s   z+PyNcclCommunicator.register_comm_window_rawc                 C   s   | j | j|S r   )r>   ncclCommWindowDeregisterrS   )rV   windowr   r   r   deregister_comm_window  s   z)PyNcclCommunicator.deregister_comm_windowr   )__name__
__module____qualname__r   r   rO   rP   r   r*   r^   r   SUMr&   r   rm   rH   rx   rz   r}   r   r   rK   r   r   r   r   r   r   r   r   r   r(   :   s~    
^
"

'
!
%r(   )r   torch.distributeddistributedr1   r   r   	vllm.envsr:   4vllm.distributed.device_communicators.pynccl_wrapperr   r   r   r   r   r	   r
   vllm.distributed.utilsr   vllm.loggerr   r$   r   r   rD   r%   r'   r(   r   r   r   r   <module>   s   $	