o
    -i-m                     @   sD  d Z ddlmZmZ ddlmZ ddlZddlZddl	m
Z
mZmZmZmZ ddlmZ eeZeG dd dZeejejef Zd	ejd
edejdejdeeeee f eeee f f f
ddZd
edejdejdeej deej dejjdB dedefddZdeej deej dejdejdedejdeddfddZ				d-dejdejdeeej  deej ded e d!edejjdB d"eeef dB defd#d$Z!		d.dejdejdeeej  ded e d"eeef dB ddfd%d&Z"dejd"eeef d'edejfd(d)Z#dejd"eeef dejfd*d+Z$g d,Z%dS )/zh
The actual execution of the rearrangement.

This involves the exchange of expert weights between GPUs.
    )IterableSequence)	dataclassN)P2POpProcessGroup
all_gatherbatch_isend_irecvget_global_rank)init_loggerc                   @   s>   e Zd ZU dZejed< 	 eed< 	 ejed< 	 ejed< dS )RecvMetadataz<Metadata describing remote receives during EPLB rebalancing.recv_primary_mask
recv_countrecv_expert_idsrecv_dst_rowsN)__name__
__module____qualname____doc__npndarray__annotations__int r   r   d/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/vllm/distributed/eplb/rebalance_execute.pyr      s   
 


r   
expert_idsnum_local_expertsold_indicesnew_indicesreturnc                    st  i }i }| j dkr||fS t| }t|}tj|tjd}t||}	t||}
t|	r||	 }||	 }|| }t||f}|| }|| }t	dgt
t|dkd d t|gg}tt|d D ]/}|| ||d  }}t|| }||| }tj|dd\}}|t| }| ||< qjt|
r||
 }||
 }|| }t||f}|| }|| }t	dgt
t|dkd d t|gg}tt|d D ]>}|| ||d  }}t|| }||| }tj|dd\}}|t| }t||g   fdd|D }|||< q|D ]}t|}||vr+g ||< ||vr4g ||< q||fS )a  
    Get the ranks of the experts that need to be exchanged.

    Args:
        expert_ids: 1D array of expert indices to query.
        num_local_experts: The number of local experts.
        old_indices: The old indices of the experts.
        new_indices: The new indices of the experts.

    Returns:
        A tuple of two dictionaries mapping expert_id to:
        - ranks_to_send: The ranks that have this expert and need to send.
        - ranks_to_recv: The ranks that need to receive this expert.
    r   dtype   Treturn_indexc                    s   g | ]
}| vrt |qS r   )r   ).0rsend_ranks_setr   r   
<listcomp>   s    z3get_ep_ranks_with_experts_batch.<locals>.<listcomp>)sizer   uniquelenarangeint32isinanylexsortconcatenatewherediffranger   sorttolistsetget)r   r   r   r   ranks_to_send_mapranks_to_recv_mapunique_expertsnum_positionsposition_indicesold_relevant_masknew_relevant_maskold_relevant_positionsold_relevant_expertsold_relevant_ranks
sort_ordersorted_expertssorted_ranksexpert_boundariesistartendexpertexpert_ranks_
unique_idxunique_ranksnew_relevant_positionsnew_relevant_expertsnew_relevant_ranksrecv_ranks_actualr   r&   r   get_ep_ranks_with_experts_batch-   sn   


((



rS   expert_weightsexpert_weights_bufferscuda_streamep_groupc           >         s2  |j |j ksJ  }tj| ftjd}tj| fdtjd}	tj| fdtjd}
tj| fdtjd}tj| fdtjd}||  }tj| tjd}|| }|| }|| }||k}|dk}tj	||dd}t
|t||}d}|dk}t|rtj|| dd\}}|| }|| }t|j d }||	d|< ||
d|< d}t| |}t|r|| }|| }tj|dd\} }!||! }"t| j d }| |d|< |"|d|< d||"< t| |}#t|# r0|dkr0t|#d  }$tt|	d| |
d| }%|$D ]( |  }&|%|&d}'|'dkr.t||D ]\}(})|)  j|(|' dd	 qqg }* }+fd
dt|+D },|dkr|	d| }-|
d| }.tj|-dd}/|-|/ }-|.|/ }.t|-| ||\}0}1t|- |. D ]^\}&|0|& }2|1|& }3|2r|3sqst|3t|2 }4|2|}5|5|4 }6|6|4 }7|3|6|7 }8t|2|4 }9|9|5 }:|:t|3k r|8|3|:  |8D ] |,  |*fdd|D 7 }*qqs|dkrO|d| }-|d| };tj|-dd}/|-|/ }-|;|/ };t|-| ||\}0}1t|- |; D ]J\}& |0|& }2|1|& }3|2r|3sqt|3t|2 }4|3|}:t|2|4 }9|:|9k r7|2|:|4  n|2|:|9  |, |* fdd|D 7 }*q|*r}|dur}tj| t |*}<|<D ]}=|=!  qdW d   n	1 sww   Y  n|*rt |*}<|<D ]}=|=!  q||t"||||dfS )a  
    Rearranges expert weights during EPLB rebalancing.

    Args:
        num_local_experts: Number of local experts.
        old_indices: (num_experts_total,) ndarray of current (old)
            global-to-local expert assignments.
        new_indices: (num_experts_total,) ndarray of desired (new)
            global-to-local assignments after rebalance.
        expert_weights: Original expert weights for the layer.
        expert_weights_buffers: Intermediate buffers (one per tensor).
        cuda_stream: CUDA stream for async copies (can be None for sync mode).
        ep_group: Distributed process group for expert parallel comms.

    Returns:
        is_unchanged (np.ndarray): (num_local_experts,), True where an expert row
            is unchanged after rebalance.
        is_received_locally (np.ndarray): (num_local_experts,), True where a row
            can be updated from local data.
        RecvMetadata: Metadata needed for completing remote weight transfers.
    r   F)assume_uniquer   Tr"   Nnon_blockingc                    s   i | ]}|t  |qS r   )r	   )r$   rank)rW   r   r   
<dictcomp>   s    z"move_to_buffer.<locals>.<dictcomp>stablekindc                    s    g | ]}t tjj|  qS r   )r   torchdistributedisendr$   w)
dst_globalsrcr   r   r(          z"move_to_buffer.<locals>.<listcomp>c                    s    g | ]}t tjj|  qS r   )r   ra   rb   irecv)r$   b)dst
src_globalr   r   r(   E  rh   )r   r   r   r   )#shaper\   r   zerosbool_fullint64r-   r,   r.   
logical_orlogical_andr/   r*   r   boolnonzeror6   dictzipr8   copy_r)   r4   argsortrS   r+   indexappendra   cudastreamr   waitr   )>r   r   r   rT   rU   rV   rW   ep_rankr   send_expert_idssend_src_rowsr   r   base
local_rowslocal_globalold_local_expert_idsnew_local_expert_idsis_unchanged	new_validcan_recv_localis_received_locally
send_count	valid_olduniq_experts	first_idxfiltered_rowssrc_rowsr   need_recv_maskdesired_expertsdesired_dstsuniq_recv_expertsuniq_indicesdst_rowseligible_local_buffer_maskdest_indicesexpert_to_src_maprJ   	src_localre   rj   p2p_opsep_sizerank_to_globalexpertssrcsordersend_maprecv_mapranks_to_sendranks_to_recvnum_dst_per_sender
sender_pos
recv_beginrecv_end
recv_ranksremainder_start
recver_posdstsreqsreqr   )rk   rf   rW   rg   rl   r   move_to_buffer   s  









r   r   r   recv_metadatar   c           !   	   C   s  |j }|j}|j}	|j}
|jd }t||}t| |}t|	 rGt
|d  }|D ]}t| |D ]\}}|| j|| dd q6q/|dkrMdS || }||tj|tjd  }tt| | t| |dk}t|	 sxdS t
|d }|| }|	d| }|
d| }tj|dd}|| }|| }t||}t||jd k |t||jd d	  |k}t|	 sdS || }|||  }t| | D ]\}} | D ]}|| j||  dd qqdS )
a  
    Copies expert weights from communication buffers back to the target weight tensors
    after EPLB rebalancing.

    Args:
        expert_weights: List of the actual MoE layer weights used in the execution.
        expert_weights_buffers: Intermediate buffers containing the experts weights
            after the transfer is completed.
        is_unchanged: (num_local_experts,), True where an expert row is unchanged.
        is_received_locally: (num_local_experts,), True where a row is updated locally.
        recv_metadata: RecvMetadata containing remote receive metadata.
        new_indices: (num_experts_total,) mapping from local rows to desired
            (possibly global) expert id, after rebalance.
        ep_rank: Rank of the process in the expert parallel group.
    r   TrZ   Nr   rX   r^   r_   r!   )r   r   r   r   rm   r   rr   rs   rt   r/   ru   r6   rw   rx   r,   r-   ry   searchsortedminimum)!rT   rU   r   r   r   r   r   r   r   r   r   r   	copy_maskdest_mask_npr   rk   re   rj   r   local_expertsduplicate_maskdup_dst_rowsdup_expertsprim_experts	prim_dstsr   prim_experts_sortedprim_dsts_sortedposvalidmatched_dst_rowsmatched_src_rowsrg   r   r   r   move_from_buffere  s\   
r   Fold_global_expert_indicesnew_global_expert_indicesexpert_weights_buffer
is_profilelayerrank_mappingc	              	      s   |  }	|durt||  krt||}nt| ||  } | jd |jd ks+J | j\}
}t||
ks8J tt|d jd }|j|
|fksLJ ||	| ksTJ |   }|  }t	||| || || |||d\}}}|||fS )a  
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.

    Returns:
        is_unchanged (np.ndarray): (1, num_local_experts), True where expert
            is left unchanged.
        is_received_locally (np.ndarray): (1, num_local_experts), True where expert
            can be received locally.
        RecvMetadata: Metadata needed for completing remote weight transfers.
    Nr!   r   r   r   r   rT   rU   rV   rW   )
r)   r+   )_map_new_expert_indices_with_rank_mapping)_map_old_expert_indices_with_rank_mappingrm   nextitercpunumpyr   )r   r   rT   r   rW   r   r   rV   r   r   num_moe_layersnum_physical_expertsnum_local_physical_expertsold_global_expert_indices_npnew_global_expert_indices_npr   r   r   r   r   r   transfer_layer  s>   $

	r   c              
      s  |durt || krt||}nt| || } | jd |jd ks&J | j\}}t ||ks3J tt|d jd }|j||fksGJ | }	||	| ksSJ t|d }
dd |
D }|rt|d |D ]\}  fddt	|	D }t
j  t|||d qidS t
j  |   }|  }t	|D ]'}t||| || || |d|d\}}}t|| |||||| | d	 qdS )
a  
    Rearranges the expert weights in place according to the new expert indices.

    The value of the indices arguments are logical indices of the experts,
    while keys are physical.

    Args:
        old_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        new_global_expert_indices: Shape (num_moe_layers, num_physical_experts).
        expert_weights: A sequence of shape (num_moe_layers)(weight_count)
            of tensors of shape (num_local_physical_experts, hidden_size_i).
            For example, a linear layer may have up and down projection,
            so weight_count = 2. Each weight's hidden size can be different.
        ep_group: The device process group for expert parallelism.
        is_profile (bool): If `True`, do not perform any actual weight copy.
            This is used during profile run, where we only perform dummy
            communications to reserve enough memory for the buffers.
        rank_mapping: A dictionary mapping old rank to new rank.
    Nr!   r   c                 S   s   g | ]}t |qS r   )ra   
empty_likerd   r   r   r   r(   7  s    
z4rearrange_expert_weights_inplace.<locals>.<listcomp>c                    s   g | ]} qS r   r   )r$   rL   bufferr   r   r(   =  s    )groupr   )rT   rU   r   r   r   r   r   )r+   r)   r   r   rm   r   r   listrw   r4   ra   rb   barrierr   r|   synchronizer   r   r   r   r\   )r   r   rT   rW   r   r   r   r   r   r   first_layer_weightsweights_bufferweightdummy_recv_bufferold_global_expert_indices_cpunew_global_expert_indices_cpu	layer_idxr   r   r   r   r   r    rearrange_expert_weights_inplace  sn   



r   new_ep_sizec                 C   s   | j \}}|sJ dt|}|| }|| }tj||fd| j| jd}t|D ]9}	||	}
|
dura|
dkra|
|k ra|	| }|	d | }|
| }|
d | }| dd||f |dd||f< q(|S )a  
    Map the old global expert indices to the new global expert indices.

    Args:
        old_global_expert_indices:
            Shape (num_layers, old_ep_size * num_local_physical_experts).
        rank_mapping: Mapping from old rank to new rank.
        new_ep_size: New expert parallelism size.

    Returns:
        Mapped expert indices with shape
        (num_layers, new_ep_size * num_local_physical_experts).
    Rank mapping is requiredrX   
fill_valuer    deviceNr   r!   )rm   r+   ra   rp   r    r   r4   r8   )r   r   r   
num_layersold_num_physical_expertsold_ep_sizer   new_num_physical_expertsmapped_expert_indicesold_ranknew_rankold_start_idxold_end_idxnew_start_idxnew_end_idxr   r   r   r   c  s,   

r   c                 C   s   | j \}}|sJ dt|}tdd | D }|| }|| }tj||fd| j| jd}t|D ]4}	||	 }
|
dkrg|
|k rg|	| }|	d | }|
| }|
d | }| d d ||f |d d ||f< q3|S )Nr   c                 s   s    | ]}|d kV  qdS )rX   Nr   )r$   r   r   r   r   	<genexpr>  s    z<_map_new_expert_indices_with_rank_mapping.<locals>.<genexpr>rX   r   r   r!   )	rm   r+   sumvaluesra   rp   r    r   r4   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r     s.   
r   )r   r   r   )Fr   NN)FN)&r   collections.abcr   r   dataclassesr   r   r   ra   torch.distributedr   r   r   r   r	   vllm.loggerr
   r   loggerr   tupler   MoveToBufferResultr   rv   r   rS   Tensorr|   Streamr   r   rt   r   r   r   r   __all__r   r   r   r   <module>   s   "
k

 N
U
	

O
e

5

#