o
    .i                      @   sT  U d dl Z d dlmZ d dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ e	eZi aeed< daeed	< g aeed
  ed< G dd
 d
ZdefddZdefddZdd ZeejZeejZeejZeej Z!eej"Z#eej$Z%eej&Z'eej(Z)dd Z*dd Z+	d!dedej,j-dej,j-dee de j.de/dee fdd Z0dS )"    N)Optional)forward_context)ForwardContext)init_logger)current_stream_THREAD_ID_TO_CONTEXT   _NUM_UBATCHESUBatchContext_CURRENT_CONTEXTSc                   @   s   e Zd ZdZ	d2dedejjdejjdede	j
de	jd	e	jd
ejdejdefddZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1S )3r
   zT
    Context manager for micro-batching synchronization using threading events.
    defaultidcomm_streamcompute_streamr   ready_barriercpu_wait_eventcpu_signal_eventgpu_comm_done_eventgpu_compute_done_eventschedulec                 C   sL   || _ || _|| _|| _|| _|| _|| _|| _|| _|	| _	|
| _
d | _d S N)r   r   r   r   r   r   r   r   r   r   r   	recv_hook)selfr   r   r   r   r   r   r   r   r   r    r   U/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/vllm/v1/worker/ubatching.py__init__   s   
zUBatchContext.__init__c                 C   sN   | j tt < | t| j < | j  | j  | j  | 	  | 
| j | S r   )r   r   	threading	get_identr   r   waitr   clear_restore_contextupdate_streamr   r   r   r   r   	__enter__4   s   



zUBatchContext.__enter__c                 C   s4   d t | j< tt = |   | j  | j	  dS )NF)
r   r   r   r   r   maybe_run_recv_hookr   setr   r   )r   exc_typeexc_valexc_tbr   r   r   __exit__B   s   



zUBatchContext.__exit__c                 C   s   | j t _d S r   )r   _forward_contextr"   r   r   r   r    K      zUBatchContext._restore_contextc                 C   s(   || _ t  | j krtj| j  d S d S r   )r   torchcuda
set_stream)r   streamr   r   r   r!   N   s   zUBatchContext.update_streamc                 C      | j | j d S r   )r   recordr   r"   r   r   r   _signal_comm_doneS      zUBatchContext._signal_comm_donec                 C   r0   r   )r   r1   r   r"   r   r   r   _signal_compute_doneV   r3   z"UBatchContext._signal_compute_donec                 C   r0   r   )r   
wait_eventr   r"   r   r   r   _wait_compute_doneY   r3   z UBatchContext._wait_compute_donec                 C   r0   r   )r   r5   r   r"   r   r   r   _wait_comm_done\   r3   zUBatchContext._wait_comm_donec                 C   sX   t j| j ksJ t | jksJ | j rJ | j  | j  | j  | 	  d S r   )
r   r*   r   r   is_setr   r%   r   r   r    r"   r   r   r   
_cpu_yield_   s   


zUBatchContext._cpu_yieldc                 C      |  | j d S r   )r!   r   r"   r   r   r   switch_to_comml      zUBatchContext.switch_to_commc                 C   r:   r   )r!   r   r"   r   r   r   switch_to_computeo   r<   zUBatchContext.switch_to_computec                 C       |    | | j |   d S r   )r4   r!   r   r6   r"   r   r   r   switch_to_comm_syncr      z!UBatchContext.switch_to_comm_syncc                 C   r>   r   )r2   r!   r   r7   r"   r   r   r   switch_to_compute_syncw   r@   z$UBatchContext.switch_to_compute_syncc                 C   s    | j d ur|    d | _ d S d S r   )r   r"   r   r   r   r$   |   s   

z!UBatchContext.maybe_run_recv_hookc                 C   s    t  | _ |   | | j  d S r   )r   r9   r!   r"   r   r   r   yield_   s   zUBatchContext.yield_c                 C   H   t  | jksJ |   |   | j | jksJ | | j |   d S r   )r   r   r4   r9   r!   r   r6   r"   r   r   r   %yield_and_switch_from_compute_to_comm      z3UBatchContext.yield_and_switch_from_compute_to_commc                 C   rC   r   )r   r   r2   r9   r!   r   r7   r"   r   r   r   %yield_and_switch_from_comm_to_compute   rE   z3UBatchContext.yield_and_switch_from_comm_to_computeNr   )__name__
__module____qualname____doc__intr,   r-   Streamr   r   BarrierEventstrr   r#   r)   r    r!   r2   r4   r6   r7   r9   r;   r=   r?   rA   r$   rB   rD   rF   r   r   r   r   r
      sR    	

	returnc                   C   s   t tdkS Nr   )lenr   r   r   r   r   dbo_enabled   r+   rT   c                   C   s   t tdkrdS tt  S rR   )rS   r   r   r   r   r   r   r   dbo_current_ubatch_id   s   rU   c                    s    fdd}|S )Nc                     s>   t tdkrtt  }t| } |g| R i | d S d S rR   )rS   r   r   r   r   )argskwargsctx_idxctxfuncr   r   wrapper   s
   z*_register_ubatch_function.<locals>.wrapperr   )r[   r\   r   rZ   r   _register_ubatch_function   s   r]   c                 C   s6   t tdkrtt  }t|d t  }| |_d S d S )Nr      )rS   r   r   r   r   r	   r   )r   rX   next_ctxr   r   r   dbo_register_recv_hook   s
   
r`   c                 O   sd   t tdkr0tt  }t| }tj|j | |i |W  d    S 1 s)w   Y  d S d S rR   )	rS   r   r   r   r   r,   r-   r/   r   )r[   rV   rW   rX   rY   r   r   r   dbo_get_previous_event   s   $ra   r   num_micro_batchesr   r   forward_contextsr   r   c                 C   s   | dksJ d| a tt| k rtd g| tt   	 dd t| D }dd t| D }dd t| D }g }	t| D ]#}
t|
||||
 |||
 ||
d |   ||
 ||
 |d
}|	| q>|	S )Nr^   z(num_micro_batches must be greater than 1c                 S      g | ]}t  qS r   )r   rO   .0_r   r   r   
<listcomp>       z(make_ubatch_contexts.<locals>.<listcomp>c                 S   rd   r   r,   rO   re   r   r   r   rh      ri   c                 S   rd   r   rj   re   r   r   r   rh      ri   )
r   r   r   r   r   r   r   r   r   r   )r	   rS   r   extendranger
   append)rb   r   r   rc   r   r   
cpu_eventsgpu_comm_done_eventsgpu_compute_done_eventsctxsirY   r   r   r   make_ubatch_contexts   s0   	rs   rG   )1r   typingr   r,   vllmr   vllm.forward_contextr   vllm.loggerr   vllm.utils.torch_utilsr   rH   loggerr   dict__annotations__r	   rL   r   listr
   boolrT   rU   r]   r$   dbo_maybe_run_recv_hookrB   	dbo_yieldrD   )dbo_yield_and_switch_from_compute_to_commrF   )dbo_yield_and_switch_from_comm_to_computer;   dbo_switch_to_commr=   dbo_switch_to_computer?   dbo_switch_to_comm_syncrA   dbo_switch_to_compute_syncr`   ra   r-   rM   rN   rP   rs   r   r   r   r   <module>   s`   
 





