o
    i                      @   sD  U d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 ee
Zi aeed< daeed< g aed	 ed
< G dd dZdefddZdefddZdd ZeejZeejZeejZeejZeej Z!eej"Z#eej$Z%eej&Z'dd Z(dd Z)	d!dedej*j+dej*j+dee de j,de-dee fdd Z.dS )"    N)forward_context)ForwardContext)init_logger)current_stream_THREAD_ID_TO_CONTEXT   _NUM_UBATCHESzUBatchContext | None_CURRENT_CONTEXTSc                   @   s   e Zd ZdZ	d2dedejjdejjdede	j
de	jd	e	jd
ejdejdefddZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1S )3UBatchContextzT
    Context manager for micro-batching synchronization using threading events.
    defaultidcomm_streamcompute_streamr   ready_barriercpu_wait_eventcpu_signal_eventgpu_comm_done_eventgpu_compute_done_eventschedulec                 C   sL   || _ || _|| _|| _|| _|| _|| _|| _|| _|	| _	|
| _
d | _d S N)r   r   r   r   r   r   r   r   r   r   r   	recv_hook)selfr   r   r   r   r   r   r   r   r   r    r   N/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm/v1/worker/ubatching.py__init__   s   
zUBatchContext.__init__c                 C   sN   | j tt < | t| j < | j  | j  | j  | 	  | 
| j | S r   )r   r   	threading	get_identr	   r   waitr   clear_restore_contextupdate_streamr   r   r   r   r   	__enter__3   s   



zUBatchContext.__enter__c                 C   s4   d t | j< tt = |   | j  | j	  dS )NF)
r	   r   r   r   r   maybe_run_recv_hookr   setr   r   )r   exc_typeexc_valexc_tbr   r   r   __exit__A   s   



zUBatchContext.__exit__c                 C   s   | j t _d S r   )r   _forward_contextr!   r   r   r   r   J      zUBatchContext._restore_contextc                 C   s(   || _ t  | j krtj| j  d S d S r   )r   torchcuda
set_stream)r   streamr   r   r   r    M   s   zUBatchContext.update_streamc                 C      | j | j d S r   )r   recordr   r!   r   r   r   _signal_comm_doneR      zUBatchContext._signal_comm_donec                 C   r/   r   )r   r0   r   r!   r   r   r   _signal_compute_doneU   r2   z"UBatchContext._signal_compute_donec                 C   r/   r   )r   
wait_eventr   r!   r   r   r   _wait_compute_doneX   r2   z UBatchContext._wait_compute_donec                 C   r/   r   )r   r4   r   r!   r   r   r   _wait_comm_done[   r2   zUBatchContext._wait_comm_donec                 C   sX   t j| j ksJ t | jksJ | j rJ | j  | j  | j  | 	  d S r   )
r   r)   r   r   is_setr   r$   r   r   r   r!   r   r   r   
_cpu_yield^   s   


zUBatchContext._cpu_yieldc                 C      |  | j d S r   )r    r   r!   r   r   r   switch_to_commk      zUBatchContext.switch_to_commc                 C   r9   r   )r    r   r!   r   r   r   switch_to_computen   r;   zUBatchContext.switch_to_computec                 C       |    | | j |   d S r   )r3   r    r   r5   r!   r   r   r   switch_to_comm_syncq      z!UBatchContext.switch_to_comm_syncc                 C   r=   r   )r1   r    r   r6   r!   r   r   r   switch_to_compute_syncv   r?   z$UBatchContext.switch_to_compute_syncc                 C   s    | j d ur|    d | _ d S d S r   )r   r!   r   r   r   r#   {   s   

z!UBatchContext.maybe_run_recv_hookc                 C   s    t  | _ |   | | j  d S r   )r   r8   r    r!   r   r   r   yield_   s   zUBatchContext.yield_c                 C   H   t  | jksJ |   |   | j | jksJ | | j |   d S r   )r   r   r3   r8   r    r   r5   r!   r   r   r   %yield_and_switch_from_compute_to_comm      z3UBatchContext.yield_and_switch_from_compute_to_commc                 C   rB   r   )r   r   r1   r8   r    r   r6   r!   r   r   r   %yield_and_switch_from_comm_to_compute   rD   z3UBatchContext.yield_and_switch_from_comm_to_computeNr   )__name__
__module____qualname____doc__intr+   r,   Streamr   r   BarrierEventstrr   r"   r(   r   r    r1   r3   r5   r6   r8   r:   r<   r>   r@   r#   rA   rC   rE   r   r   r   r   r
      sR    	

	r
   returnc                   C   s   t tdkS Nr   )lenr   r   r   r   r   dbo_enabled   r*   rS   c                   C   s   t tdkrdS tt  S rQ   )rR   r   r   r   r   r   r   r   dbo_current_ubatch_id   s   rT   c                    s    fdd}|S )Nc                     s>   t tdkrtt  }t| } |g| R i | d S d S rQ   )rR   r   r   r   r	   )argskwargsctx_idxctxfuncr   r   wrapper   s
   z*_register_ubatch_function.<locals>.wrapperr   )rZ   r[   r   rY   r   _register_ubatch_function   s   r\   c                 C   s6   t tdkrtt  }t|d t  }| |_d S d S )Nr      )rR   r   r   r   r	   r   r   )r   rW   next_ctxr   r   r   dbo_register_recv_hook   s
   
r_   c                 O   sd   t tdkr0tt  }t| }tj|j | |i |W  d    S 1 s)w   Y  d S d S rQ   )	rR   r   r   r   r	   r+   r,   r.   r   )rZ   rU   rV   rW   rX   r   r   r   dbo_get_previous_event   s   $r`   r   num_micro_batchesr   r   forward_contextsr   r   c                 C   s   | dksJ d| a tt| k rtd g| tt   	 dd t| D }dd t| D }dd t| D }g }	t| D ]#}
t|
||||
 |||
 ||
d |   ||
 ||
 |d
}|	| q>|	S )Nr]   z(num_micro_batches must be greater than 1c                 S      g | ]}t  qS r   )r   rN   .0_r   r   r   
<listcomp>       z(make_ubatch_contexts.<locals>.<listcomp>c                 S   rc   r   r+   rN   rd   r   r   r   rg      rh   c                 S   rc   r   ri   rd   r   r   r   rg      rh   )
r   r   r   r   r   r   r   r   r   r   )r   rR   r	   extendranger
   append)ra   r   r   rb   r   r   
cpu_eventsgpu_comm_done_eventsgpu_compute_done_eventsctxsirX   r   r   r   make_ubatch_contexts   s0   	rr   rF   )/r   r+   vllmr   vllm.forward_contextr   vllm.loggerr   vllm.utils.torch_utilsr   rG   loggerr   dict__annotations__r   rK   r	   listr
   boolrS   rT   r\   r#   dbo_maybe_run_recv_hookrA   	dbo_yieldrC   )dbo_yield_and_switch_from_compute_to_commrE   )dbo_yield_and_switch_from_comm_to_computer:   dbo_switch_to_commr<   dbo_switch_to_computer>   dbo_switch_to_comm_syncr@   dbo_switch_to_compute_syncr_   r`   r,   rL   rM   rO   rr   r   r   r   r   <module>   s^   
 





