o
    
۾i:\                     @   s:  d dl mZ d dlZd dlmZ d dlmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZmZmZ dd	lmZmZ e rWd d
lmZ d dlmZ d dlmZ eeZG dd deZ G dd deZ!G dd deZ"G dd deZ#G dd de#Z$G dd de#Z%G dd deZ&G dd deZ'dS )    )AnyN)get_dp_groupget_ep_group)get_forward_context)init_logger)has_flashinfer_all2all)has_deep_ephas_morihas_pplx   )All2AllManagerBaseCache)Mapping)MnnvlConfig)MnnvlMoec                       s   e Zd ZdZ fddZdejdejdedejfdd	Z	
	ddejdejdede	ej dB de
ejejf f
ddZ	
	ddejdejdejdede	ej dB de
ejejejf fddZ	
ddejdedejfddZdd Z  ZS )NaiveAll2AllManagerz
    A naive implementation of all2all communication.
    It uses all-reduce under the hood, which is not
    efficient at all. The main purpose is for testing and
    debugging.
    c                       t  | d S Nsuper__init__self	cpu_group	__class__ a/home/ubuntu/.local/lib/python3.10/site-packages/vllm/distributed/device_communicators/all2all.pyr   #      zNaiveAll2AllManager.__init__xcu_tokens_across_sp_cpuis_sequence_parallelreturnc           
      C   s   t |jdks	J tj|d |df|j|jd}|r| jn| j}|r'| j	n| j
}|dkr0dn||d  }|| }|||d d f | t|D ]!}	|	dkrSdn||	d  }||	 }t |||d d f |	 qK|S )N   r   )devicedtyper   )lenshapetorchemptysizer%   r&   rankdp_rank
world_sizedp_world_sizecopy_ranger   	broadcast)
r   r   r    r!   bufferr,   r.   startendidxr   r   r   naive_multicast&   s    z#NaiveAll2AllManager.naive_multicastFNhidden_statesrouter_logitsextra_tensorsc                 C   sb   |d urt d|r| jjnd}t j}|d usJ ||}| |||}| |||}||fS Nz6extra_tensors is not supported for NaiveAll2AllManagerr   NotImplementedErrortp_groupr.   r   dp_metadatacu_tokens_across_spr7   )r   r8   r9   r!   r:   sp_sizer?   r    r   r   r   dispatch_router_logits>   s   
z*NaiveAll2AllManager.dispatch_router_logitstopk_weightstopk_idsc           	      C   sr   |d urt d|r| jjnd}t j}|d usJ ||}| |||}| |||}| |||}|||fS r;   r<   )	r   r8   rC   rD   r!   r:   rA   r?   r    r   r   r   dispatchW   s$   

zNaiveAll2AllManager.dispatchc           
      C   s   |r| j n| j}t j}|d usJ |r| jjnd}||}|dkr%dn||d  }|| }t |}	|	||d d f }|S )Nr   r   )	r,   r-   r   r?   r>   r.   r@   r   
all_reduce)
r   r8   r!   ep_rankr?   rA   r    r4   r5   all_hidden_statesr   r   r   combines   s   
zNaiveAll2AllManager.combinec                 C      d S r   r   r   r   r   r   destroy      zNaiveAll2AllManager.destroyFNF)__name__
__module____qualname____doc__r   r)   Tensorboolr7   listtuplerB   rE   rI   rL   __classcell__r   r   r   r   r      sb    



r   c                       s   e Zd ZdZ fddZ		ddejdejded	eej dB d
e	ejejf e	ejejeej f B f
ddZ
		ddejdejdejded	eej dB d
e	ejejejf e	ejejejeej f B fddZ	ddejded
ejfddZdd Z  ZS )AgRsAll2AllManagerzu
    An implementation of all2all communication based on
    all-gather (dispatch) and reduce-scatter (combine).
    c                    r   r   r   r   r   r   r   r      r   zAgRsAll2AllManager.__init__FNr8   r9   r!   r:   r"   c           
      C   s   t  j}|dus
J | }|dusJ |rt nt }||j |jd ks(J ||g}|dur5|| |j|d|d}	|durN|	d |	d |	dd fS |	d |	d fS )K
        Gather hidden_states and router_logits from all dp ranks.
        Nr   dimsizesr   r#   	r   r?   get_chunk_sizes_across_dp_rankr   r   rank_in_groupr(   extendall_gatherv)
r   r8   r9   r!   r:   r?   r]   
dist_grouptensors_to_gathergathered_tensorsr   r   r   rB      s"   
z)AgRsAll2AllManager.dispatch_router_logitsrC   rD   c                 C   s   t  j}|dus
J | }|dusJ |rt nt }||j |jd ks(J |||g}	|dur6|	| |j|	d|d}
|
d }|
d }|
d }|du rS|||fS ||||
dd fS )rZ   Nr   r[   r   r#      r^   )r   r8   rC   rD   r!   r:   r?   r]   rc   rd   re   r   r   r   rE      s(   


zAgRsAll2AllManager.dispatchc                 C   sL   t  j}|dus
J | }|dusJ |rt nt }|j|d|d}|S )zC
        Reduce-scatter hidden_states across all dp ranks.
        Nr   r[   )r   r?   r_   r   r   reduce_scatterv)r   r8   r!   r?   r]   rc   r   r   r   rI      s   zAgRsAll2AllManager.combinec                 C   rJ   r   r   rK   r   r   r   rL      rM   zAgRsAll2AllManager.destroyrN   rO   )rP   rQ   rR   rS   r   r)   rT   rU   rV   rW   rB   rE   rI   rL   rX   r   r   r   r   rY      sX    
'
)
rY   c                          e Zd ZdZ fddZdd Z		ddejd	ejd
ede	ej dB de
ejejf f
ddZ		ddejdejdejd
ede	ej dB de
ejejejf e
ejejeje	ej f B fddZ	ddejd
edejfddZdd Z  ZS )PPLXAll2AllManagerz6
    All2All communication based on PPLX kernels.
    c                    s   t  sJ dt | | jrKddlm}m}m} t	d| j
| j | j
dkr+| n| }tj|t| jd | jd t	d| ||| j
| j t | _d S )Nzpplx_kernels not found. Please follow https://github.com/vllm-project/vllm/blob/main/tools/ep_kernels/README.md to install pplx_kernels.r   )nvshmem_alloc_empty_unique_idnvshmem_get_unique_idnvshmem_initz;Initialize NVSHMEM for pplx_kernels: rank=%d, world size=%d)srcgroupzPPLX NVSHMEM UID = %s)r
   r   r   	internodepplx_kernels.nvshmemrj   rk   rl   loggerdebugr,   r.   distr2   get_process_group_ranksr   r   handle_cache)r   r   rj   rk   rl   uidr   r   r   r      s.   
zPPLXAll2AllManager.__init__c                 C   s*   dd l }| j|| jr|jjS |jjS Nr   )pplx_kernelsru   get_or_createro   AllToAll	intranode)r   kwargspplxr   r   r   
get_handle  s   zPPLXAll2AllManager.get_handleFNr8   r9   r!   r:   r"   c                 C      t r   r=   r   r8   r9   r!   r:   r   r   r   rB        z)PPLXAll2AllManager.dispatch_router_logitsrC   rD   c                 C   r   r   r   r   r8   rC   rD   r!   r:   r   r   r   rE   (     zPPLXAll2AllManager.dispatchc                 C   r   r   r   r   r8   r!   r   r   r   rI   5     zPPLXAll2AllManager.combinec                 C   sp   | j j | j j D ]\}}|  qW d    n1 sw   Y  | jr6ddlm} t	d |  d S d S )Nr   )nvshmem_finalizezPPLX NVSHMEM finalize)
ru   _lock_cacheitemsrL   ro   rp   r   rq   rr   )r   _handler   r   r   r   rL   :  s   



zPPLXAll2AllManager.destroyrN   rO   rP   rQ   rR   rS   r   r~   r)   rT   rU   rV   rW   rB   rE   rI   rL   rX   r   r   r   r   ri      sV    $


ri   c                       rh   )DeepEPAll2AllManagerBaseH
    All2All communication based on DeepEP High-Throughput kernels.
    c                    s,   t  sJ dt | t | _d| _d S )NzDeepEP kernels not found. Please follow https://github.com/vllm-project/vllm/blob/main/tools/ep_kernels/README.md to install DeepEP kernels.   )r   r   r   r   ru   num_smsr   r   r   r   r   M  s   
z!DeepEPAll2AllManagerBase.__init__c                 C   r   r   r   r   r|   r   r   r   r~   Y  rM   z#DeepEPAll2AllManagerBase.get_handleFNr8   r9   r!   r:   r"   c                 C   r   r   r   r   r   r   r   rB   \  r   z/DeepEPAll2AllManagerBase.dispatch_router_logitsrC   rD   c                 C   r   r   r   r   r   r   r   rE   e  r   z!DeepEPAll2AllManagerBase.dispatchc                 C   r   r   r   r   r   r   r   rI   r  r   z DeepEPAll2AllManagerBase.combinec                 C   rJ   r   r   rK   r   r   r   rL   w  rM   z DeepEPAll2AllManagerBase.destroyrN   rO   r   r   r   r   r   r   H  sV    


r   c                       sL   e Zd ZdZ fddZdeeef fddZdd Zd	e	fd
dZ
  ZS )DeepEPHTAll2AllManagerr   c                    r   r   r   r   r   r   r   r     r   zDeepEPHTAll2AllManager.__init__r"   c                 C   sp   t jd d }d }d }| jrt jst jd d }| jd }nd}d}|d us(J |d us.J t| j||d|dS )N   r#   r   r   F)rn   num_nvl_bytesnum_rdma_byteslow_latency_modenum_qps_per_rank)envsVLLM_DEEPEP_BUFFER_SIZE_MBro   ,VLLM_DEEPEP_HIGH_THROUGHPUT_FORCE_INTRA_NODEr   dictr   )r   r   r   r   r   r   r   _make_all2all_kwargs  s"   z+DeepEPHTAll2AllManager._make_all2all_kwargsc                 C   sD   t |dks
J ddd l}|  }td| | j||j}|S )Nr   zfDeepEPHTAll2AllManager expects no arguments. All the required args are computed in the Manager itself.DeepEP all2all args %s)r'   deep_epr   rq   rr   ru   ry   Bufferr   r|   r   buffer_kwargsr   r   r   r   r~     s   z!DeepEPHTAll2AllManager.get_handler   c                 C   s(   dd l }|| jkr| j}|j| d S rw   )r   r   r   set_num_sms)r   r   r   r   r   r   r     s   
z"DeepEPHTAll2AllManager.set_num_sms)rP   rQ   rR   rS   r   r   r   r   r~   intr   rX   r   r   r   r   r   {  s    r   c                       sd   e Zd ZdZ fddZdededededed	eeef fd
dZdd Z	d	edB fddZ
  ZS )DeepEPLLAll2AllManagerzD
    All2All communication based on DeepEP Low-Latency kernels.
    c                    r   r   r   r   r   r   r   r     r   zDeepEPLLAll2AllManager.__init__max_num_tokens_per_dp_ranktoken_hidden_sizenum_ep_ranksnum_global_expertsnum_local_expertsr"   c           
   	   C   sT   ddl }tjd d }|}|jj||||d}	|	dusJ t| j||	d|dtjdS )a  
        max_num_tokens_per_dp_rank : the maximum number of tokens a DP rank
          can dispatch all the ranks must hold the same value.
        token_hidden_size: the hidden dimension of each token.
        num_ep_ranks: the number of EP group ranks.
        num_global_experts: Number of experts in the model.
        num_local_experts: Number of experts in an EP rank.
        r   Nr   ) num_max_dispatch_tokens_per_rankhidden	num_ranksnum_expertsT)rn   r   r   r   r   !allow_nvlink_for_low_latency_modeallow_mnnvl)r   r   r   r   get_low_latency_rdma_size_hintr   r   !VLLM_DEEPEP_LOW_LATENCY_USE_MNNVL)
r   r   r   r   r   r   r   r   r   r   r   r   r   r     s&   z+DeepEPLLAll2AllManager._make_all2all_kwargsc                 C   s8   ddl }| jdi |}td| | j||j}|S )zd
        The kwargs for DeepEPLLAll2AllManager is dictated by
        _make_all2all_kwargs.
        r   Nr   r   )r   r   rq   rr   ru   ry   r   r   r   r   r   r~     s   z!DeepEPLLAll2AllManager.get_handleNc                 C   s   dS rw   r   rK   r   r   r   max_sms_used  rM   z#DeepEPLLAll2AllManager.max_sms_used)rP   rQ   rR   rS   r   r   r   r   r   r~   r   rX   r   r   r   r   r     s$    

'r   c                       s`   e Zd ZU dZeed< eed<  fddZdededefddZd	d
 Zdd Z	dd Z
  ZS )FlashInferAllToAllManagerz<
    All2All communication based on flashinfer kernels.
    r,   r.   c                    s<   t  sJ dt | td| j| j d| _d | _d S )NzDflashinfer all2all module not found. Please install/check flashinferz8Initialize for flashinfer All2All rank=%d, world size=%dF)	r   r   r   rq   rr   r,   r.   initializedalltoall_infor   r   r   r   r      s   
z"FlashInferAllToAllManager.__init__gpus_per_nodec                 C   s   | j rdS |   td|| t||||d| _ddlm} t|t	 j
ddd}t| j|| _t| j|| _|| _|| _|| _d| _ td	|| dS )
zInitialize workspaceNz"making map: rank=%d, world size=%d)tp_sizer   )CustomCommunicatori    )comm_backendfabric_page_sizeallocation_granularityTz3FlashInfer All2All initialized for rank %s, size %s)r   cleanuprq   rr   r   mapping2vllm.distributed.device_communicators.mnnvl_compatr   r   r   r   r   get_moe_workspacesworkspace_tensorget_moe_prepare_workspaceprepare_workspace_tensorr.   r,   r   info)r   r.   r,   r   r   	dp_configr   r   r   
initialize  s6   
z$FlashInferAllToAllManager.initializec                 C   s<   t  sdS | jdkrdS | js| j| j| jtjjd | jS )zEnsure workspace is initializedFr   )r.   r,   r   )r   r.   r   r   r,   r)   cudadevice_countrK   r   r   r   %ensure_alltoall_workspace_initialized8  s   
z?FlashInferAllToAllManager.ensure_alltoall_workspace_initializedc                 C   s   | S r   r   r   r   r   r   r~   H  rM   z$FlashInferAllToAllManager.get_handlec              
   C   s   | j rX| jdurZ| jdur\z=z| `| `W n ty, } ztd| W Y d}~nd}~ww W d| _d| _d| _d| _ dS W d| _d| _d| _d| _ dS d| _d| _d| _d| _ w dS dS dS )zClean up workspaceNz*Failed to cleanup FlashInfer workspace: %sF)r   r   r   	Exceptionrq   warningr   )r   er   r   r   r   K  s8   



z!FlashInferAllToAllManager.cleanup)rP   rQ   rR   rS   r   __annotations__r   r   r   r~   r   rX   r   r   r   r   r     s   
 
+r   c                       sb   e Zd Z fddZdededejdejdeded	ed
ededefddZdd Zdd Z	  Z
S )MoriAll2AllManagerc                    sJ   t  sJ ddd l}t | t | _tjj	d| |j
d d S )NzoMoRI kernels not found. Please follow https://github.com/ROCm/mori/blob/main/README.md to install MoRI kernels.r   mori)r	   r   r   r   r   ru   r)   _C_distributed_c10d_register_process_groupshmemshmem_torch_process_group_init)r   r   r   r   r   r   r   _  s   zMoriAll2AllManager.__init__r,   r   input_dtypequant_dtyper   	scale_dimscale_type_sizer   r   num_experts_per_tokenc                 C   s   dd l }ddlm}m} | s| sJ d| js%|jjj}d}d}d}n|jjj}| r4d}d}d}n| r>d}d}d}nt	dt
|||||||j||	|
||||td|d	S )
Nr   )	on_gfx942	on_gfx950z2mori currently only support arch gfx942 and gfx950   P          @   )r,   r.   	data_type
hidden_dimr   r   max_token_type_sizemax_num_inp_token_per_ranknum_experts_per_rankr   warp_num_per_block	block_numkernel_typerdma_block_numgpu_per_node)r   vllm.platforms.rocmr   r   ro   opsEpDispatchCombineKernelType	IntraNodeInterNodeV1r=   r   itemsizemin)r   r,   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   l  sN   

z'MoriAll2AllManager._make_all2all_kwargsc                 K   s*   dd l }|jjdi |}|j|}|S )Nr   r   )r   r   EpDispatchCombineConfigEpDispatchCombineOp)r   r|   r   mori_configr   r   r   r   _make_handle  s   zMoriAll2AllManager._make_handlec                 C   s8   dd l }| jdi |}td| | j|| j}|S )Nr   zMoRI all2all args %sr   )r   r   rq   rr   ru   ry   r   )r   r|   r   mori_kwargsr   r   r   r   r~     s   zMoriAll2AllManager.get_handle)rP   rQ   rR   r   r   r)   r&   r   r   r~   rX   r   r   r   r   r   ^  s2    	

=r   )(typingr   r)   torch.distributeddistributedrs   	vllm.envsr   vllm.distributedr   r   vllm.forward_contextr   vllm.loggerr   vllm.utils.flashinferr   vllm.utils.import_utilsr   r	   r
   base_device_communicatorr   r   flashinfer.commr   flashinfer.comm.mnnvlr   flashinfer.comm.trtllm_alltoallr   rP   rq   r   rY   ri   r   r   r   r   r   r   r   r   r   <module>   s.   mfZ39Bh