o
    پiD                     @   s`   d Z ddlZddlZddlmZ ddlmZ ddlmZm	Z	m
Z
mZ eeZG dd dZdS )z
Asynchronous dynamic batch tokenizer for SGLang.

This module provides an async tokenizer with dynamic batching capabilities
to reduce tokenization overhead when multiple requests arrive concurrently.
    N)ThreadPoolExecutor)partial)AnyDictListOptionalc                   @   s   e Zd ZdZ		ddededdfdd	Zd
d Zdede	fddZ
dede	fddZdd Zdee dee deej ddfddZdd ZdS )AsyncDynamicbatchTokenizera  Asynchronous tokenizer with dynamic batching for single string prompts.

    Dynamically batches pending encode requests from a queue to reduce overhead.
    Only handles single string prompts - regular batch processing of multiple
    strings per request should be handled at a higher level.
    A single-thread ThreadPoolExecutor is used so the event loop stays responsive.

    Note: Uses lazy initialization for asyncio components because this class
    is instantiated in TokenizerManager.__init__() before the event loop starts.
        Mb`?max_batch_sizebatch_wait_timeout_sreturnNc                 C   s4   || _ || _|| _d | _d | _tdd| _d| _d S )N   )max_workersF)	tokenizerr   r   _queue_batcher_taskr   	_executor_initialized)selfr   r   r    r   e/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/async_dynamic_batch_tokenizer.py__init__   s   
z#AsyncDynamicbatchTokenizer.__init__c                 C   s.   | j st | _t|  | _d| _ dS dS )z7Lazy initialization of event loop dependent components.TN)r   asyncioQueuer   create_task_dynamic_batch_loopr   r   r   r   r   _ensure_initialized/   s
   

z.AsyncDynamicbatchTokenizer._ensure_initializedpromptc                    s   | j |fi |I dH S zEncode a single prompt.N)encode)r   r   kwargsr   r   r   __call__6   s   z#AsyncDynamicbatchTokenizer.__call__c                    s8   |    t  }| j|||fI dH  |I dH S r    )r   r   get_running_loopcreate_futurer   put)r   r   r"   result_futurer   r   r   r!   :   s
   
z!AsyncDynamicbatchTokenizer.encodec              
      s@  	 z| j  I dH \}}}|g}|g}|g}| j  rnRt  }t|| jk rot  | }|| jkr8n7| j| }	z t	| j  |	I dH \}}}|
| |
| |
| W n
 tjyg   Y nw t|| jk s*tdt|  | |||I dH  W n ty }
 ztd|
  W Y d}
~
nd}
~
ww q)z:Dynamically batch incoming encode requests for efficiency.TNz=AsyncDynamicbatchTokenizer: Processing dynamic batch of size zError in dynamic batch loop: )r   getemptyr   r$   timelenr   r   wait_forappendTimeoutErrorloggerdebug_process_dynamic_batch	Exceptionerror)r   r   r"   r'   promptskwargs_listresult_futures
start_timeelapsedremaining_timeer   r   r   r   A   sF   





z.AsyncDynamicbatchTokenizer._dynamic_batch_loopr4   r5   r6   c              
      sz  t tdd D dk}|rd nd}z}|rUt |dkrUtj|fi |}t j|I dH }t|D ]\ }|	 sQ fdd|
 D }	||	 q9W dS t |dkrh|shtdt | d	 |ffd
d	}t j|I dH }t||D ]\}}
|	 s||
 qW dS  ty } ztd|  |D ]}|	 s|| qW Y d}~dS d}~ww )zEProcess a dynamic batch of encode requests for single string prompts.c                 s   s     | ]}t t| V  qd S )N)strsorteditems).0kwr   r   r   	<genexpr>{   s    zDAsyncDynamicbatchTokenizer._process_dynamic_batch.<locals>.<genexpr>r   r   Nc                    s   i | ]	\}}||  qS r   r   )r>   kv)ir   r   
<dictcomp>   s    zEAsyncDynamicbatchTokenizer._process_dynamic_batch.<locals>.<dictcomp>zCAsyncDynamicbatchTokenizer: Dynamic batching disabled for batch of z requests due to differing kwargs. This reduces performance benefits. Consider using consistent tokenization parameters across requests.c                    s   fddt |  D S )Nc                    s"   g | ]\}} j |fi |qS r   )r   )r>   pr?   r   r   r   
<listcomp>   s    zWAsyncDynamicbatchTokenizer._process_dynamic_batch.<locals>.<lambda>.<locals>.<listcomp>)zip)r4   r"   )r5   r   r   r   <lambda>   s   
 zCAsyncDynamicbatchTokenizer._process_dynamic_batch.<locals>.<lambda>z#Error in dynamic batch processing: )r+   setr   r   r   r$   run_in_executorr   	enumeratedoner=   
set_resultr/   warningrG   r2   r3   set_exception)r   r4   r5   r6   	can_batchr"   	encode_fnresultsfutdataresr:   r   )rC   r5   r   r   r1   s   sJ   




z1AsyncDynamicbatchTokenizer._process_dynamic_batchc                 C   sD   t | dr| jr| j s| j  t | dr | jjdd dS dS )zClean up background tasks.r   r   F)waitN)hasattrr   rL   cancelr   shutdownr   r   r   r   __del__   s   


z"AsyncDynamicbatchTokenizer.__del__)r	   r
   )__name__
__module____qualname____doc__intfloatr   r   r;   r   r#   r!   r   r   r   r   Futurer1   rZ   r   r   r   r   r      s2    
2
1r   )r^   r   loggingconcurrent.futuresr   	functoolsr   typingr   r   r   r   	getLoggerr[   r/   r   r   r   r   r   <module>   s    
