o
    
۾iE,                  
   @   sl  d Z ddlZddlZddlmZmZmZmZ ddlmZm	Z	m
Z
 ddlmZmZ ddlmZ ddlmZmZ ddlmZ dd	lmZ ed
ZedZG dd dZdefddZ	d&de
eef dedB de
ee	e f fddZdede
fddZdedefddZerdeedf fddZd eedf dee e!ef df fd!d"Z"d#eedf de#e fd$d%Z$dS )'ze
Contains helpers related to asynchronous code.

This is similar in concept to the `asyncio` module.
    N)FIRST_COMPLETEDAbstractEventLoopFutureTask)AsyncGenerator	AwaitableCallable)ExecutorThreadPoolExecutor)partial)TYPE_CHECKINGTypeVar)BatchEncoding)	ParamSpecPTc                
   @   s   e Zd ZdZ		d dededdfdd	Zdefd
dZde	e fddZ
defddZdejdedejeeeef ee	e ef B  fddZdejdefddZdejfddZdededefddZdd ZdS )!AsyncMicrobatchTokenizerzAsynchronous tokenizer with micro-batching.

    Pulls pending encode/decode requests from a queue and batches them
    up to reduce overhead. A single-thread ThreadPoolExecutor is used
    so the event loop stays responsive.
        Mb`?max_batch_sizebatch_wait_timeout_sreturnNc                 C   s8   || _ || _|| _t | _i | _g | _tdd| _	d S )N   )max_workers)
	tokenizerr   r   asyncioget_running_loop_loop_queues_batcher_tasksr
   	_executor)selfr   r   r    r"   J/home/ubuntu/.local/lib/python3.10/site-packages/vllm/utils/async_utils.py__init__    s   
z!AsyncMicrobatchTokenizer.__init__c                    sF   | j  }| d|}| | j |}||||fI d H  |I d H S )Nencoder   create_future
_queue_key
_get_queueput)r!   promptkwargsresult_futurekeyqueuer"   r"   r#   __call__5   s   

z!AsyncMicrobatchTokenizer.__call__c                    s   | |fi |I d H j S N)	input_ids)r!   r+   r,   r"   r"   r#   r%   <   s   zAsyncMicrobatchTokenizer.encodec                    sD   | j  }| d|}| | j |}|||fI d H  |I d H S )Ndecoder&   )r!   	token_idsr,   r-   r.   r/   r"   r"   r#   r3   ?   s   

zAsyncMicrobatchTokenizer.decodeloopr.   c                 C   s   | j |}|du rDt  | j |< }|d dkr&|d dk}| ||}n|d dks6J d|d  d| |}| j|| |S )	zkGet the request queue for the given operation key, creating a new
        queue and batcher task if needed.Nr   r%   r   otherr3   zUnknown operation type: .)	r   getr   Queue_batch_encode_loop_batch_decode_loopr   appendcreate_task)r!   r5   r.   r/   	can_batchcoror"   r"   r#   r)   G   s    
z#AsyncMicrobatchTokenizer._get_queuer/   r>   c              
      s  	 |  I dH \}}}|g}|g}|g}j j }	t|jk rc|	j  }
|
dkr0n3z!t|  |
I dH \}}}|| || |sP|| W n
 tj	y[   Y nw t|jk s$zf|rt|dkrt
j|fi |}jj|I dH }t|D ]\ }| s fdd| D }|t| qn'||ffdd	}jj|I dH }t||D ]\}}| s|| qW n  ty } z|D ]}| s|| qW Y d}~nd}~ww q)	z.Batch incoming encode requests for efficiency.TNr   r   c                    s   i | ]	\}}||  qS r"   r"   ).0kv)ir"   r#   
<dictcomp>{   s    z?AsyncMicrobatchTokenizer._batch_encode_loop.<locals>.<dictcomp>c                    s    fddt | |D S )Nc                    s"   g | ]\}} j |fi |qS r"   )r   )r@   pkwr!   r"   r#   
<listcomp>~   s    zQAsyncMicrobatchTokenizer._batch_encode_loop.<locals>.<lambda>.<locals>.<listcomp>)zip)promptsr,   rG   r"   r#   <lambda>~   s   
 z=AsyncMicrobatchTokenizer._batch_encode_loop.<locals>.<lambda>)r8   r   timer   lenr   r   wait_forr<   TimeoutErrorr   r   run_in_executorr    	enumeratedoneitems
set_resultr   rI   	Exceptionset_exception)r!   r/   r>   r+   r,   r-   rJ   kwargs_listresult_futuresdeadlinetimeoutbatch_encode_fnresultsfutdata	encode_fnreser"   )rC   r!   r#   r:   X   sj   






z+AsyncMicrobatchTokenizer._batch_encode_loopc              
      s8  	 |  I dH \}}|g}|g}| j | j }t|| jk rW|| j  }|dkr,n+zt|  |I dH \}}|| || W n
 tj	yO   Y nw t|| jk s z#| j
| j| jj|I dH }t||D ]\}	}
|	 sx|	|
 qkW n  ty } z|D ]}	|	 s|	| qW Y d}~nd}~ww q)z.Batch incoming decode requests for efficiency.TNr   )r8   r   rL   r   rM   r   r   rN   r<   rO   rP   r    r   batch_decoderI   rR   rT   rU   rV   )r!   r/   r4   r-   token_ids_listrX   rY   rZ   r\   r]   r`   ra   r"   r"   r#   r;      sL   



z+AsyncMicrobatchTokenizer._batch_decode_loopopr,   c                 C   st   |dkrdS | dd}| dd}| d}|sd|dd	fS t| jd
d	}|d	u s2|d	ur8||kr8d|ddfS dS )a  
        Return a normalized key describing operation + kwargs.

        - `add_special_tokens`: {True/False}
        - `truncation`: {True/False}
          - If `truncation` is False (`max_length` is None),
            returns a key for a can_batch queue.
          - If `truncation` is True and `max_length` is None or equals
            `tokenizer.model_max_length`, returns a key for a can_batch queue.
          - Otherwise, returns a key for a cannot_batch queue.

        Examples:
          - Decode: ("decode",)
          - Encode typical:
            ("encode", add_special_tokens, bool_truncation, max_length_label)
          - Fallback: ("encode", "other")
        r3   )r3   add_special_tokensT
truncationF
max_lengthr%   Nmodel_max_length	model_max)r%   r6   )r8   getattrr   )r!   rd   r,   re   rf   rg   ri   r"   r"   r#   r(      s   
z#AsyncMicrobatchTokenizer._queue_keyc                    sN   t | dd   r!t | dd  }r#| s% fdd}|| d S d S d S d S )Nr   r   c                     s    D ]} |    qd S r1   )canceltasktasksr"   r#   cancel_tasks   s   
z6AsyncMicrobatchTokenizer.__del__.<locals>.cancel_tasks)rj   	is_closedcall_soon_threadsafe)r!   r5   rp   r"   rn   r#   __del__   s   z AsyncMicrobatchTokenizer.__del__)r   r   )__name__
__module____qualname____doc__intfloatr$   r   r0   listr%   strr3   r   r   tupler9   dictr   r)   boolr:   r;   r(   rs   r"   r"   r"   r#   r      s2    

"
5"#r   rm   c                 C   s(   | r|   st|  | j d S d S d S r1   )rR   run_in_loopget_looprk   rl   r"   r"   r#   cancel_task_threadsafe   s   r   funcexecutorr   c                    s(   dt jdt jdtt f fdd}|S )z
    Take a blocking function, and run it on in an executor thread.

    This function prevents the blocking function from blocking the
    asyncio event loop.
    The code in this function needs to be thread safe.
    argsr,   r   c                     s,   t  }tg| R i |}|j |dS )Nr   r   )r   get_event_loopr   rP   )r   r,   r5   p_funcr   r"   r#   _async_wrapper   s   z"make_async.<locals>._async_wrapper)r   r   r,   r   r   )r   r   r   r"   r   r#   
make_async   s   $r   r5   functionc                 G   s6   t | r
||  d S |  s| j|g|R   d S d S r1   )in_looprq   rr   )r5   r   r   r"   r"   r#   r      s
   r   
event_loopc                 C   s$   zt  | kW S  ty   Y dS w )NF)r   r   RuntimeError)r   r"   r"   r#   r      s
   r   itc                 C   s   |   S r1   )	__anext__)r   r"   r"   r#   anext
  s   r   	iteratorsc            
        s  t | dkr| d 2 z3 dH W }d|fV  q6 dS t   fddt| D }zl|rhtj| tdI dH \}}|D ])}||}z|I dH }|\}}|| t	|< ||fV  W q< t
ye   Y q<w |s,W | D ]&\}	\}}tt |	  | I dH  W d   n1 sw   Y  qmdS | D ]&\}	\}}tt |	  | I dH  W d   n1 sw   Y  qw )zMerge multiple asynchronous iterators into a single iterator.

    This method handle the case where some iterators finish before others.
    When it yields, it yields a tuple (i, item) where i is the index of the
    iterator that yields the item.
    r   r   Nc                    s$   i | ]\}}  t|||fqS r"   )r=   r   )r@   rC   r   r5   r"   r#   rD     s   $ z)merge_async_iterators.<locals>.<dictcomp>)return_when)rM   r   r   rQ   waitkeysr   popr=   r   StopAsyncIterationrS   
contextlibsuppressBaseExceptionrk   aclose)
r   itemawaitsrR   _dpairrC   r   fr"   r   r#   merge_async_iterators  sH   	

r   iteratorc                    s(   g }| 2 z3 dH W }| | q6 |S )z6Collect all items from an async generator into a list.N)r<   )r   rS   r   r"   r"   r#   collect_from_async_generator4  s   r   r1   )%rw   r   r   r   r   r   r   collections.abcr   r   r   concurrent.futuresr	   r
   	functoolsr   typingr   r   $transformers.tokenization_utils_baser   typing_extensionsr   r   r   r   r   r   r   r~   r   r   r|   rx   r   rz   r   r"   r"   r"   r#   <module>   sB    I



"&