o
    پiR                     @   s  d dl Z d dlmZ d dlmZmZmZmZ d dlZd dl	Z	d dl
Z	d dl
mZ d dlmZmZ d dlmZ d dlmZ e eZedZG d	d
 d
Zdd Zdeeee	j f dededee dedefddZdeeee	j f dededee dedefddZdd Z			d3dee	j dee	j dee dee dededed ee d!ed"efd#d$ZG d%d& d&Z d'ee fd(d)Z!d*eeeee f  ded ed+efd,d-Z"d.efd/d0Z#d1d2 Z$dS )4    N)defaultdict)DictListOptionalTuple)P2POp)ExpertLocationMetadata#get_global_expert_location_metadata)get_global_server_args)get_bool_env_var(SGLANG_EXPERT_LOCATION_UPDATER_LOG_INPUTc                
   @   sD   e Zd Zdd Zdeeeej f de	dee dedef
dd	Z
d
S )ExpertLocationUpdaterc                 C   s
   d| _ d S )NT)_first_execution)self r   [/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/eplb/expert_location_updater.py__init__%   s   
zExpertLocationUpdater.__init__routed_experts_weights_of_layernew_expert_location_metadataupdate_layer_idsnnodesrankc                 C   sP   | j rd| _ t   t }|d usJ t||||||d |j||d d S )NFr   old_expert_location_metadatar   r   r   r   )r   )r   torchget_device_moduleempty_cacher	   _update_expert_weightsupdate)r   r   r   r   r   r   r   r   r   r   r   (   s"   
zExpertLocationUpdater.updateN)__name__
__module____qualname__r   r   intr   r   Tensorr   r   r   r   r   r   r   $   s    r   c                  K   s$   t drtdi | S tdi | S )N%SGLANG_EXPERT_LOCATION_UPDATER_CANARYr   )r   "_update_expert_weights_with_canary_update_expert_weights_raw)kwargsr   r   r   r   E   s   r   r   r   r   r   r   r   c                    s   |j  dtdtf fdd}dd |  D } |D ]}||| jt jdd}| | | qt	| ||||d	 |D ]0}|||}	| | d
 
 }
t|	|
ksnJ d|	d|
d|d|j d|j dq>d S )Nmetalayer_idc                    s   | j |   d  f S N   )physical_to_logical_map_cpu)r(   r)   num_local_physical_expertsr   r   r   _get_canary_valueW   s
   z=_update_expert_weights_with_canary.<locals>._get_canary_valuec                 S   s    i | ]\}}|d d |D qS )c                 S   s   g | ]}|qS r   r   .0xr   r   r   
<listcomp>^   s    zA_update_expert_weights_with_canary.<locals>.<dictcomp>.<listcomp>r   )r1   kvr   r   r   
<dictcomp>]   s    z6_update_expert_weights_with_canary.<locals>.<dictcomp>T)devicenon_blockingr   zexpect_value=z actual_value=z
 layer_id=zC old_expert_location_metadata.physical_to_logical_map_cpu.tolist()=zC new_expert_location_metadata.physical_to_logical_map_cpu.tolist()= )r.   r   r"   itemsclonetor
   r7   appendr&   cpur   allr,   tolist)r   r   r   r   r   r   r/   r)   canary_tensorexpect_valueactual_valuer   r-   r   r%   M   s>   
	
r%   c                 C   sp   t d}t| |d  }tj }|j}	|| }
|D ]}t| | ||j|  |j|  |	|
|||d	 qd S )N*SGLANG_EXPERT_LOCATION_UPDATER_LOG_METRICSr   )	routed_experts_weightstemp_buffersold_physical_to_logical_mapnew_physical_to_logical_mapr.   num_gpu_per_noder   
world_sizelog_metrics)	r   create_temp_buffersr   distributedget_world_sizer.   "update_expert_weights_single_layerr,   rA   )r   r   r   r   r   r   rL   rG   rK   r.   rJ   r)   r   r   r   r&   |   s4   

r&   c                 C   s   dd | D S )Nc                 S   s   g | ]}t |qS r   )r   
empty_liker1   tensorr   r   r   r3      s    z'create_temp_buffers.<locals>.<listcomp>r   )sample_tensorsr   r   r   rM      s   rM   FrF   rG   rH   rI   r.   rJ   rK   debugrL   c
                    s  t fddD sJ dddd D tts!J tts(J trRtddd D d	d
d D ddddddd 
rVg nd tt  d  f
f
dd}
	fdddtf 
fdd	dtdtdtffddfdd 
fd d!dtffd"d# d$d% fd&d'd(td)td*tj	ffd+d,d)td*tffd-d.|
  S )/Nc                 3   s    | ]
}|j d   kV  qdS )r   NshaperR   )r.   r   r   	<genexpr>   s
    
z5update_expert_weights_single_layer.<locals>.<genexpr>znum_local_physical_experts=z+ [x.shape for x in routed_experts_weights]=c                 S      g | ]}|j qS r   rV   r0   r   r   r   r3          z6update_expert_weights_single_layer.<locals>.<listcomp>zMupdate_expert_weights_single_layer [x.shape for x in routed_experts_weights]=c                 S   rY   r   rV   r0   r   r   r   r3      rZ   z! [x.shape for x in temp_buffers]=c                 S   rY   r   rV   r0   r   r   r   r3      rZ   z old_physical_to_logical_map=z new_physical_to_logical_map=z num_local_physical_experts=z num_gpu_per_node=z rank=z world_size=r:   r+   c                     sj   g } g }||   |  |  | rt | 	d r3d|  d| d S d S )N)rK   rJ   self_node_idzp2p_op_infos=zbuffer2weight_copy_infos=)_log_p2p_op_metricsr>   )p2p_op_infosbuffer2weight_copy_infos)
_create_isend_ops_execute_buffer2weight_copies_execute_p2p_ops_handle_recvrU   rL   rJ   output_logsr[   rK   r   r   _entrypoint   s"   
z7update_expert_weights_single_layer.<locals>._entrypointc                    s   t  D ]} || | qd S N)range)r^   r]   dst_expert_location)#_handle_recv_of_dst_expert_locationlocal_expert_location_ranger   r   rb      s
   z8update_expert_weights_single_layer.<locals>._handle_recvrg   c           
         s|  |  }|  |krr	 d| d d S t D ]3}| |krNtD ]}|| || q'| | | f rK	 d| d|  d S qt
 | D ]}| |kru| || f rr	 d| d|  d S qV |d\}}}
|v r|j
d}	|||	|| d r	 d| d|	 d S |j
d}	|||	|| d r	 d| d	|	 d S )
Nz7handle_recv_of_dst_expert_location dst_expert_location=z case=unchangedz# case=same-gpu src_expert_location=z% case=free-rider src_expert_location=logical_expert_id)element_value)src_rankrk   rg   z  case=same-node chosen_src_rank=z! case=cross-node chosen_src_rank=)r>   rf   copy_chunk_value_from_element_value)
rg   r^   r]   rk   src_expert_locationisame_node_mappingcross_node_mappingneed_comm_self_node_dst_rankschosen_src_rank)_compute_comm_info'_create_p2p_recv_and_buffer2weight_copy_get_tensorrU   ri   rI   r.   num_tensorsrH   rc   r   rF   rG   r   r   rh      s   

zOupdate_expert_weights_single_layer.<locals>._handle_recv_of_dst_expert_locationrk   rm   c                   s8   | | fddtD f |    f d S )Nc                    s&   g | ]}t tjj |d qS )oprS   peer)r   r   rN   irecv)r1   rq   )rx   rg   rm   rG   r   r   r3   K  s    
zgupdate_expert_weights_single_layer.<locals>._create_p2p_recv_and_buffer2weight_copy.<locals>.<listcomp>)r>   rf   )r^   r]   rk   rm   rg   )rx   ry   rG   )rg   rm   r   rw   @  s   zSupdate_expert_weights_single_layer.<locals>._create_p2p_recv_and_buffer2weight_copyc                    s@   t  }t D ]}| }||v rq||  |||  qd S re   )setrf   add)r]   handled_logical_expert_idsrp   rk   )&_create_isend_ops_of_logical_expert_idri   rH   r   r   r_   W  s   
z=update_expert_weights_single_layer.<locals>._create_isend_opsc           	   
      s|   | d\}}}|j d}|j d}|| }r+d| d d|d| ||  fdd|D f d S )	Nrj   )chunk_valuez8create_isend_ops_of_logical_expert_id logical_expert_id=z src_expert_location=z same_node_dst_ranks=z cross_node_dst_ranks=c              	      s4   g | ]}t D ]}ttjj ||d qqS rz   )rf   r   r   rN   isend)r1   dst_rankrq   )rx   ry   rF   rp   r   r   r3   {  s    	zfupdate_expert_weights_single_layer.<locals>._create_isend_ops_of_logical_expert_id.<locals>.<listcomp>)element_values_from_chunk_valuer>   )	rk   rp   r]   rr   rs   rt   same_node_dst_rankscross_node_dst_ranksall_dst_ranks)rv   rx   rU   ry   rc   r   rF   )rp   r   r   d  s(   zRupdate_expert_weights_single_layer.<locals>._create_isend_ops_of_logical_expert_idc                    s   t fddtD fddD  fddD }t fddtD }t|dkrBfdd|D ng } fdd|D }t||d	}t|d	}|||fS )
Nc                    s    g | ]}|  kr| qS r   r   r0   )rk   r.   rH   r   r   r3     s
    zRupdate_expert_weights_single_layer.<locals>._compute_comm_info.<locals>.<listcomp>c                    s   g | ]}|  qS r   r   r0   )rJ   r   r   r3     s    c                       g | ]
}|  kr|qS r   r   r0   rJ   r[   r   r   r3     s    c                    s,   g | ]}| kr|  vr| qS r   r   r0   )all_src_ranksrk   rI   r.   r   r   r3     s    r   c                    r   r   r   r0   r   r   r   r3     s    c                    s   g | ]
}|  vr|qS r   r   r0   )all_src_nodesrJ   r   r   r3     s
    chunk_valueselement_values)_deduplicate_orderedrf   len_ChunkUtils)rk   self_node_src_ranksneed_comm_dst_ranksrt   need_comm_cross_node_dst_ranksrr   rs   )rI   rJ   r.   num_physical_expertsrH   r[   )r   r   rk   r   rv     s<   

z>update_expert_weights_single_layer.<locals>._compute_comm_infoc                 S   sP   t | dd d}dd |D }t|dkrd S tj|}|D ]}|  qd S )Nc                 S   s   | d S )Nr   r   )infor   r   r   <lambda>  s    zNupdate_expert_weights_single_layer.<locals>._execute_p2p_ops.<locals>.<lambda>)keyc                 S      g | ]\}}|D ]}|qqS r   r   r1   _opsr{   r   r   r   r3         zPupdate_expert_weights_single_layer.<locals>._execute_p2p_ops.<locals>.<listcomp>r   )sortedr   r   rN   batch_isend_irecvwait)r]   sorted_infosp2p_opsreqsreqr   r   r   ra     s   
z<update_expert_weights_single_layer.<locals>._execute_p2p_opsc                    s:   | D ]\}}t D ]} || || q
qd S re   )rf   rn   )r^   temp_buffers_expert_location&routed_experts_weights_expert_locationrq   )rx   ry   rF   rG   r   r   r`     s   zIupdate_expert_weights_single_layer.<locals>._execute_buffer2weight_copiestensor_indexexpert_locationreturnc                    s   | |  | S re   r   )tensorsr   r   )_get_local_expert_locationr   r   rx     s   z7update_expert_weights_single_layer.<locals>._get_tensorc                    s,    d |   kr d k sJ  J |  S )Nr   r+   r   )r   )ri   r.   r   r   r     s   zFupdate_expert_weights_single_layer.<locals>._get_local_expert_location)
r@   
isinstancelist
_LOG_INPUTloggerr   r   r"   r   r#   )rF   rG   rH   rI   r.   rJ   r   rK   rU   rL   rd   r   )rv   r_   r   rw   r`   ra   r   rx   rb   rh   rU   ri   rL   rI   rJ   r.   r   ry   rH   rc   r   rF   r[   rG   rK   r   rP      sn   

&S%,

rP   c                	   @   sp   e Zd ZdedefddZdd Zdefdd	Zed
edededefddZ	ed
ededede
fddZdS )r   r   r   c                C   s   || _ || _d S re   r   )r   r   r   r   r   r   r     s   
z_ChunkUtils.__init__c                 C   s.   | j t| jt| j| j|d}| j| S )N)num_elements
num_chunkselement_index)_chunk_index_from_element_indexr   r   r   index)r   rl   chunk_indexr   r   r   ro     s   

z*_ChunkUtils.chunk_value_from_element_valuer   c                 C   s@   t | jdkr	g S | jt | jt | j| j|d}| j| S )Nr   )r   r   r   )r   r   _element_slice_from_chunk_indexr   r   )r   r   element_slicer   r   r   r     s   

z+_ChunkUtils.element_values_from_chunk_valuer   r   r   c                 C   s>   t | |\}}||d  }||k r||d  S ||| |  S r*   )divmod)r   r   r   short_chunk_sizenum_long_chunksnum_elements_for_long_chunksr   r   r   r     s   
z+_ChunkUtils._chunk_index_from_element_indexr   c                 C   s>   t | |\}}|| t|| }|| t||k  }t||S re   )r   minr"   slice)r   r   r   r   r   startendr   r   r   r     s   
z+_ChunkUtils._element_slice_from_chunk_indexN)r   r    r!   r   r   ro   r   staticmethodr"   r   r   r   r   r   r   r   r     s0    
r   arrc                 C   s4   g }| D ]}t |dks||d kr|| q|S )Nr   r9   )r   r>   )r   outputitemr   r   r   r     s   
r   r]   r[   c                 C   s   d}dd | D }t |t D ]Y\}}dg| }|D ]}	||	j  |	jj7  < qtj|tjd}tj	|d|dd}
|
| }t
|
| }|| d	|  d
| d|
  d
| d|  d
| d|  d
7 }qtd|  d S )N c                 S   r   r   r   r   r   r   r   r3     r   z'_log_p2p_op_metrics.<locals>.<listcomp>r   )dtypez)(num_nodes num_gpu_per_node) -> num_nodessum)rJ   	reductionz_nbytes_of_gpu=r:   z_nbytes_of_node=z_nbytes_curr_node=z_nbytes_cross_node=z[ExpertLocationUpdater] )	_group_by_get_direction_from_opr;   r|   rS   nbytesr   int64einopsreducer   rA   r   r   r   )r]   rJ   rK   r[   textall_ops	directionr   nbytes_of_gpur{   nbytes_of_nodenbytes_curr_nodenbytes_cross_noder   r   r   r\     s>   
r\   r{   c                 C   s(   | j tjjkr	dS | j tjjkrdS t)Nr   r}   )r{   r   rN   r   r}   NotImplementedError)r{   r   r   r   r   3  s
   r   c                 C   s,   t t}| D ]}||| | qt|S re   )r   r   r>   dict)r;   keyfuncansr   r   r   r   r   ;  s   r   )NFF)%loggingcollectionsr   typingr   r   r   r   r   r   torch.distributedr   sglang.srt.eplb.expert_locationr   r	   sglang.srt.server_argsr
   sglang.srt.utilsr   	getLoggerr   r   r   r   r   r"   r#   r%   r&   rM   boolrP   r   r   r\   r   r   r   r   r   r   <module>   s   
!
/
$	

  7/
#