o
    i                     @   s   d Z ddlZddlZddlmZ ddlZddlmZ ddlm	Z	 ddl
mZ ddlmZ er4dd	lmZ eeZ		
ddddeeef dB dedejfddZ	
	ddddedejjdedeeef dB ddfddZdS )z<
The async worker that transfers experts in the background.
    N)TYPE_CHECKING)ProcessGroup)get_ep_group)init_logger   )transfer_layer)	EplbStateFstater   rank_mapping
is_profilereturnc                    sR   t  j j jsJ d fdd}tj|dd}|  |S )Nr   c               
      s    d usJ t j  t jj d} t }t| z8z|t| d W n t	yF } zt
dt| W Y d }~nd }~ww W |  d S W |  d S |  w )N)device)r	   ep_groupcuda_streamr   r
   zasync loop error (Rank %d): %s)torchcuda
set_deviceStreamasyncionew_event_loopset_event_looprun_until_completetransfer_run_periodically	Exceptionlogger	exceptionstrclose)r   loopexcdevice_indexr   r   rankr
   r	    X/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm/distributed/eplb/async_worker.pythread_target#   s.   
	z)start_async_worker.<locals>.thread_targetT)targetdaemon)r   N)r   device_groupr"   cuda_device_indexis_async	threadingThreadstart)r	   r
   r   r%   threadr#   r    r$   start_async_worker   s   
r/   r   r   c                    s^  	 t | jjI d H  td | jsJ | j D ]}|j	j
}|jr|j|k r|js|jr|jd urt |jjI d H  zO|j|krKW |j  q|jd urY||j d |_t|j|j|j	j|j|||j||d	I d H \|_|_|_tjjdd}|| ||_d|_W |j  n|j  w |jsnt  dI d H  |jr|j|k s)q| j!  q)NTz&async worker woke up for EPLB transfer)	old_global_expert_indicesnew_global_expert_indicesexpert_weightsexpert_weights_bufferr   r   layerr   r
   F)blockingr   gMbP?)"r   	to_threadrearrange_eventwaitr   infor*   model_statesvaluesmodelnum_moe_layers
rebalancedlayer_to_transferep_buffer_readynew_physical_to_logical_mapbuffer_lockacquirereleasebuffer_consumed_event
wait_eventr   physical_to_logical_mapr2   expert_bufferis_unchangedis_received_locallyrecv_metadatar   r   Eventrecord_eventbuffer_ready_eventsleepclear)r	   r   r   r   r
   model_statecurrent_num_layerseventr#   r#   r$   r   =   sh   









,r   )NF)FN)__doc__r   r+   typingr   r   torch.distributedr   vllm.distributed.parallel_stater   vllm.loggerr   rebalance_executer   
eplb_stater   __name__r   dictintboolr,   r/   r   r   r   r#   r#   r#   r$   <module>   sL   
(