o
    i                     @   sT  d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZ d dl	m	Z	 d dl
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3 d dl4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZG d dlHmIZI d dlJmKZKmLZL d dlMmNZN d d lOmPZPmQZQ d d!lRmSZS d d"lTmUZU d d#lVmWZWmXZXmYZY d d$lZm[Z[ d d%l\m]Z] ee^Z_G d&d' d'e`ZaG d(d) d)eZbdS )*    N)AsyncGeneratorIterableMapping)copy)Any)TokensPrompt)
VllmConfig)WeightTransferInitRequestWeightTransferUpdateRequest)AsyncEngineArgs)EngineClient)
PromptTypeStreamingInput)init_logger)LoRARequest)MULTIMODAL_REGISTRYMultiModalRegistry)STREAM_FINISHEDPoolingRequestOutputRequestOutput)get_io_processor)PoolingParams)BaseRenderermerge_kwargs)
DictPrompt	TokPrompt)extract_prompt_components)RequestOutputKindSamplingParams)SupportedTask)TokenizerLike)init_tracer)(maybe_register_config_serialize_by_value)UsageContext)cancel_task_threadsafe)as_list)EngineCoreRequest	PauseMode)EngineCoreClient)EngineDeadErrorEngineGenerateError)InputProcessor)OutputProcessorRequestOutputCollector)ParentRequest)Executor)StatLoggerFactoryStatLoggerManager!load_stat_logger_plugin_factories)shutdown_prometheus)IterationStatsc                       s&   e Zd ZdZdef fddZ  ZS )InputStreamErrorzWrapper for errors from the input stream generator.

    This is used to propagate errors from the user's input generator
    without wrapping them in EngineGenerateError.
    causec                    s   || _ t t| d S N)r6   super__init__str)selfr6   	__class__ N/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm/v1/engine/async_llm.pyr9   B   s   zInputStreamError.__init__)__name__
__module____qualname____doc__	Exceptionr9   __classcell__r>   r>   r<   r?   r5   ;   s    r5   c                   @   s  e Zd Zejeddddddddf
dedee de	d	ed
e
de	de	de	dee dB de	deeef dB dededdfddZedejdddddddf	dede	d	edee dB de	de	de	deeef dB dededd fddZedejdfdede	d	edee dB dd f
ddZdd Zdd  Zdeed!f fd"d#Z							dd$ed%eeB eB eB eedf B d&e e!B d'e"dB d(e#dB d)eee$f dB d*e%eef dB d+ed,edB d-edB de&fd.d/Z'd0ed%edB d1e(dB d2ed3e&f
d4d5Z)						dd$ed6eedf d7e e!B d'e"dB d(e#dB d)eee$f dB d*e%eef dB d+ed,edB de&fd8d9Z*e+d&e e!B fd:d;Z,ddddddd<d%eeB eB eB eedf B d7e d$ed-edB d(e#dB d)eee$f dB d*e%eef dB d+ed,edB dee-df fd=d>Z.d?d@ Z/	dd$ee0e B dAe	ddfdBdCZ1dDdddEdFe2dGe	dB dHe	ddfdIdJZ3ddKdLZ4de	fdMdNZ5				dd%eeB eB dOe!d$ed(e#dB d*e%eef dB d+ed)eee$f dB dee6df fdPdQZ7e8de9dB fdRdSZ:de9fdTdUZ;e8de<fdVdWZ=de	fdXdYZ>ddZd[Z?dd\d]Z@dd^d_ZAdd`daZBddbdcZC	ddde	dee	de	fdfdgZDddhdiZEddjeddfdkdlZFddmee dB ddfdndoZGde	fdpdqZHd(e#de	fdrdsZIdtede	fdudvZJdeKe fdwdxZLdtede	fdydzZM		{	dd|ed}e"dB d~ededB fddZNddefddZO	ddedefddZPe8de	fddZQe8de	fddZRe8de	fddZSe8deTfddZUd0eVddfddZWd0eXddfddZYdS )AsyncLLMFTN   r   vllm_configexecutor_class	log_statsusage_contextmm_registryuse_cached_outputslog_requestsstart_engine_loopstat_loggersaggregate_engine_loggingclient_addressesclient_countclient_indexreturnc                 C   s  t   |j| _|| _|j| _| jj}|durtd| || _t|	p"g }|t	  t
|}|p1|| _|s<|r<td t| j| _t| j| jj| _t| j| j| jjjd| _|dur`d| j_tj||| j|||d| _d| _| jrt|| jj||||
d| _| j  t ! | _"d| _#|| _$d| _%z
t &  | '  W n	 t(y   Y nw |j)j*d	kr|j)j+s|j)j,}td
| t-.  dt/0  d}t1j*j2t1j*j3j4g|j)j5t1j*j6|||j)j7dd| _*dS d| _*dS )a  
        Create an AsyncLLM.

        Args:
            vllm_config: global configuration.
            executor_class: an Executor impl, e.g. MultiprocExecutor.
            log_stats: Whether to log stats.
            usage_context: Usage context of the LLM.
            mm_registry: Multi-modal registry.
            use_cached_outputs: Whether to use cached outputs.
            log_requests: Whether to log requests.
            start_engine_loop: Whether to start the engine loop.
            stat_loggers: customized stat loggers for the engine.
                If not provided, default stat loggers will be used.
                PLEASE BE AWARE THAT STAT LOGGER IS NOT STABLE
                IN V1, AND ITS BASE CLASS INTERFACE MIGHT CHANGE.

        Returns:
            None
        Nzvllm.llm_enginezyAsyncLLM created with log_stats=False, but custom stat loggers were found; enabling logging without default stat loggers.)rJ   stream_intervalT)rH   rI   rJ   rR   rS   rT   )rH   engine_idxscustom_stat_loggersenable_default_loggersrS   rQ   FtorchzFTorch profiler enabled. AsyncLLM CPU traces will be collected under %s_z
.async_llm)worker_nameuse_gzip)
activities
with_stackon_trace_ready)8r"   model_configrH   observability_configotlp_traces_endpointr!   rN   listextendr2   boolrJ   loggerinfor+   input_processorr   io_processor_pluginio_processorr,   	tokenizerscheduler_configrV   output_processortracing_enabledr(   make_async_mp_clientengine_corelogger_managerr1   engine_ranks_managedlog_engine_initializedasyncio	Condition_pause_cond_paused_client_countoutput_handlerget_running_loop_run_output_handlerRuntimeErrorprofiler_configprofilerignore_frontendtorch_profiler_dirsocketgethostnameosgetpidrZ   profileProfilerActivityCPUtorch_profiler_with_stacktensorboard_trace_handlertorch_profiler_use_gzip)r;   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rS   rT   tracing_endpointrX   has_custom_loggersprofiler_dirr\   r>   r>   r?   r9   H   s   %





zAsyncLLM.__init__enable_log_requestsdisable_log_statsc                 C   s&   | |t ||||| ||||	|
dS )N)rH   rI   rO   rP   rN   rJ   rQ   rK   rR   rS   rT   )r/   	get_class)clsrH   rO   rK   rP   r   rQ   r   rR   rS   rT   r>   r>   r?   from_vllm_config   s   zAsyncLLM.from_vllm_configengine_argsc              	   C   s0   | |}t|}| |||j|j |||dS )z'Create an AsyncLLM from the EngineArgs.)rH   rI   rN   rJ   rO   rK   rP   )create_engine_configr/   r   r   r   )r   r   rO   rK   rP   rH   rI   r>   r>   r?   from_engine_args   s   

zAsyncLLM.from_engine_argsc                 C   s   |    d S r7   )shutdownr;   r>   r>   r?   __del__  s   zAsyncLLM.__del__c                 C   sZ   t   t| dd }r|  t| dd }r|  t| dd}|dur+t| dS dS )z2Shutdown, cleaning up the background proc and IPC.rq   Nri   rz   )r3   getattrr   closer$   )r;   rq   ri   handlerr>   r>   r?   r     s   zAsyncLLM.shutdown.c                    s$   t | ds| j I d H | _| jS )N_supported_tasks)hasattrrq   get_supported_tasks_asyncr   r   r>   r>   r?   get_supported_tasks  s   
zAsyncLLM.get_supported_tasks
request_idpromptparamsarrival_timelora_requesttokenization_kwargstrace_headersprioritydata_parallel_rankprompt_textc                    s,   j rt t|t} jjjr|s|jrtd|j	dur9t
|j}tjd| dtdd t|t|j	d}t|trN |||||||||		I dH S t|tr`|}||jkr_td n&|
durhtd	 jj|||||||||	  I dH d

}t j|\}
}} j|     j4 I dH   j fddI dH  W d  I dH  n1 I dH sw   Y  t |j!|j}|j"}|s|j#dkrو $||
dd|I dH  |S |}t|t%sJ t&|}t'|j#D ](}|(|\}}||j#d kr|nt)|}||_||_* $||
|||I dH  q|S )z Add new request to the AsyncLLM.z--kv-sharing-fast-prefill produces incorrect logprobs for prompt tokens, please disable it when the requests need prompt logprobsNz+The `truncate_prompt_tokens` parameter in `z_` is deprecated and will be removed in v0.16. Please pass it via `tokenization_kwargs` instead.   
stacklevel)truncate_prompt_tokenszAsyncLLM.add_request() was passed a request_id parameter that does not match the EngineCoreRequest.request_id attribute. The latter will be used, and the former will be ignored.z6should only provide prompt_text with EngineCoreRequest)r   r   r   r   r   r   supported_tasksc                      s    j  S r7   )rx   r>   r   r>   r?   <lambda>  s    z&AsyncLLM.add_request.<locals>.<lambda>rG   r   )+erroredr)   
isinstancer   rH   cache_configkv_sharing_fast_prefillprompt_logprobs
ValueErrorr   typer@   warningswarnDeprecationWarningr   dictr   _add_streaming_input_requestr&   r   rg   warning_onceri   process_inputsr   r   ra   assign_request_idr|   rw   wait_forr-   output_kindr   n_add_requestr   r.   rangeget_child_infor   sampling_params)r;   r   r   r   r   r   r   r   r   r   r   
is_poolingparams_typerequestr[   queueparent_paramsparent_requestidxchild_paramschild_requestr>   r   r?   add_request  s   








(
zAsyncLLM.add_requestr   
parent_reqindexr   c                    sD   | j ||||| | j|I d H  | jr td|j d S d S )NzAdded request %s.)rn   r   rq   add_request_asyncrN   rg   rh   r   )r;   r   r   r   r   r   r>   r>   r?   r     s   	zAsyncLLM._add_requestinput_streamr   c
                    s     t||||||	djs d_jjdtdgdd j   jt	j
 fdd}
  t|
 _S )	N)r   r   r   r   r   r   Tr   )prompt_token_ids)r   r   r   c                     s<  d} zzJ2 zD3 d H W }|j }|r| n}jjd|j|dd}|_|jd ur5tdtj	|j\}}}
||d dI d H  q6 W n& tjtfy\   d} Y n tyt } zt| W Y d }~nd }~ww W d _| s
 d d dI d H  d S d S d _| s
 d d dI d H  w w )NFT)r   r   r   	resumablez0prompt_embeds not supported for streaming inputsr   r>   )r   )_validate_streaming_input_sampling_paramsri   r   r   external_req_idprompt_embedsr   r   ra   r   ru   CancelledErrorGeneratorExitrD   putr5   _input_stream_task)	cancelledinput_chunkspreqr   r[   error	final_reqr   inputsinternal_req_idr   r   r   r;   r>   r?   handle_inputs  sR   

z<AsyncLLM._add_streaming_input_request.<locals>.handle_inputsr>   )r   r   
skip_clonecloneri   r   r   r   r   r-   r   r|   ru   create_taskr   )r;   r   r   r   r   r   r   r   r   r   r   r>   r   r?   r     s6   
	
(z%AsyncLLM._add_streaming_input_requestc                 C   s2   t | tr| jdks| jtjks| jrtdd S )NrG   zrInput streaming not currently supported for pooling models, n > 1, request_kind = FINAL_ONLY or with stop strings.)r   r   r   r   r   
FINAL_ONLYstopr   )r   r>   r>   r?   r     s   
z2AsyncLLM._validate_streaming_input_sampling_params)r   r   r   r   r   r   c                C  s,  d}
zz5| j ||||||||	|d	I dH }
d}|s9|
 p%|
 I dH }t|ts-J |j}|tur7|V  |rW n tjt	fy]   |
durS| j
|
jddI dH  | jr\td|   tym   | jrltd|   ty } z| jr~td||  d}~w ty } z|
dur| j
|
jddI dH  | jrtd	|| |j|d}~w ty } zF|
dur| j
|
jddI dH  | jrz|jj d
| }W n ty } z|jj d|jj }W Y d}~nd}~ww td|| t |d}~ww W |
dur	|
  dS dS |
dur|
  w w )aj  
        Main function called by the API server to kick off a request
            * 1) Making an AsyncStream corresponding to the Request.
            * 2) Processing the Input.
            * 3) Adding the Request to the Detokenizer.
            * 4) Adding the Request to the EngineCore (separate process).

        A separate output_handler loop runs in a background AsyncIO task,
        pulling outputs from EngineCore and putting them into the
        per-request AsyncStream.

        The caller of generate() iterates the returned AsyncGenerator,
        returning the RequestOutput back to the caller.
        N)r   r   r   r   r   r   FTinternalRequest %s aborted. Request %s failed (engine dead).z$Request %s failed (bad request): %s.z$Request %s failed (input error): %s.z: z-: error during printing an exception of classzRequest %s failed due to %s.)r   
get_nowaitgetr   r   finishedr   ru   r   r   abortr   rN   rg   rh   r)   r   r5   r6   rD   r=   r@   r*   r   )r;   r   r   r   r   r   r   r   r   r   qr   outese2r>   r>   r?   generate  s    

zAsyncLLM.generatec                    sZ   | j durdS | j| j| j| j| jtj  fdd}t	| | _ dS )zBBackground loop: pulls from EngineCore and pushes to AsyncStreams.Nc            	   
      s  zg	   I d H } t| j}r|rt nd }| j}td| D ]1}|  }||| }|| j|}|jr:J ||k rFt	dI d H  |j
rR|j
I d H  q!| j rhj| j| j| d q ty } ztd | W Y d }~d S d }~ww )NTr   )
engine_idxscheduler_statsiteration_statsmm_cache_statszAsyncLLM output_handler failed.)get_output_asynclenoutputsr4   r   process_outputs	timestamprequest_outputsru   sleepreqs_to_abortabort_requests_asyncupdate_scheduler_statsr   recordengine_indexstat_mm_cacherD   rg   	exceptionpropagate_error)	r  num_outputsr   engine_core_outputsstartendoutputs_sliceprocessed_outputsr   
chunk_sizerq   ri   rJ   rr   rn   r>   r?   rz     sH   


-
z4AsyncLLM._run_output_handler.<locals>.output_handler)
rz   rq   rn   rJ   rr   ri   envsVLLM_V1_OUTPUT_PROC_CHUNK_SIZEru   r   )r;   rz   r>   r  r?   r|     s   
3zAsyncLLM._run_output_handlerr   c                    sZ   t |tr	|fnt|}| j||}| j|I dH  | jr+t	dd
| dS dS )z2Abort RequestId in OutputProcessor and EngineCore.NzAborted request(s) %s.,)r   r:   r%   rn   abort_requestsrq   r  rN   rg   rh   join)r;   r   r   request_idsall_request_idsr>   r>   r?   r     s   zAsyncLLM.abortr   )modewait_for_inflight_requestsclear_cacher  r  r  c             	      s&  |rt jdtdd d}|dkr| j I dH  n^| jdkr#td| j4 I dH C | jscd	| _|d
krJt	| j
j }|rI| j|d	dI dH  n|dkr\| j
 r[| j
 I dH  ntd| W d  I dH  n1 I dH ssw   Y  |r|  I dH  |  I dH  |  I dH  dS dS )aQ  
        Pause generation to allow model weight updates.

        New generation/encoding requests are blocked until resume.

        Args:
            mode: How to handle in-flight requests:
                - ``"abort"``: Abort all in-flight requests immediately
                  (default).
                - ``"wait"``: Wait for in-flight requests to complete.
                - ``"keep"``: Freeze requests in queue; they resume on
                  :meth:`resume_generation`.
            wait_for_inflight_requests: DEPRECATED: use mode argument.
                Whether to wait for in-flight requests to complete before pausing.
            clear_cache: Whether to clear KV cache and prefix cache after
                draining. Set to ``False`` to preserve cache for faster resume.
                Default is ``True`` (clear caches).

        z~The `wait_for_inflight_requests` parameter in `AsyncLLM.pause_generation()` is deprecated. Please use `mode` argument instead.r   r   waitkeepNrG   zUpause_generation is not supported with --api-server-count > 1 when mode is not 'keep'Tr   r   zInvalid mode: )r   r   r   rq   pause_scheduler_asyncry   NotImplementedErrorrw   rx   rd   rn   request_stateskeysr   has_unfinished_requestswait_for_requests_to_drainr   reset_prefix_cachereset_mm_cachereset_encoder_cache)r;   r  r  r  r  r>   r>   r?   pause_generation  sB   

(zAsyncLLM.pause_generationc              	      sb   | j 4 I dH  | j I dH  d| _| j   W d  I dH  dS 1 I dH s*w   Y  dS )z1Resume generation after :meth:`pause_generation`.NF)rw   rq   resume_scheduler_asyncrx   
notify_allr   r>   r>   r?   resume_generation  s   .zAsyncLLM.resume_generationc              	      sF   | j 4 I dH  | jW  d  I dH  S 1 I dH sw   Y  dS )z.Return whether the engine is currently paused.N)rw   rx   r   r>   r>   r?   	is_paused#  s   0zAsyncLLM.is_pausedpooling_paramsc              
   C  sf  d}zz/| j |||||||dI dH }d}	|	s2| p"| I dH }
t|
ts*J |
j}	|
V  |	rW nh tjyT   |durJ| j|j	ddI dH  | j
rStd|   tyd   | j
rctd|   tyt   | j
rstd|   ty } z|dur| j|j	ddI dH  | j
rtd	| t |d}~ww W |dur|  dS dS |dur|  w w )
a2  
        Main function called by the API server to kick off a request
            * 1) Making an AsyncStream corresponding to the Request.
            * 2) Processing the Input.
            * 3) Adding the Request to the EngineCore (separate process).

        A separate output_handler loop runs in a background AsyncIO task,
        pulling outputs from EngineCore and putting them into the
        per-request AsyncStream.

        The caller of generate() iterates the returned AsyncGenerator,
        returning the RequestOutput back to the caller.
        N)r   r   r   r   FTr   r   r   z Request %s failed (bad request).zRequest %s failed.)r   r   r   r   r   r   ru   r   r   r   rN   rg   rh   r)   r   rD   r*   r   )r;   r   r0  r   r   r   r   r   r   r   r   r   r>   r>   r?   encode)  s`   
zAsyncLLM.encodec                 C      | j jS r7   )ri   rl   r   r>   r>   r?   rl   z     zAsyncLLM.tokenizerc                 C   s
   | j  S r7   )ri   get_tokenizerr   r>   r>   r?   r4  ~  s   
zAsyncLLM.get_tokenizerc                 C   r2  r7   )ri   rendererr   r>   r>   r?   r5    r3  zAsyncLLM.rendererc                    s   | j jd uS r7   )rb   rc   r   r>   r>   r?   is_tracing_enabled  s   zAsyncLLM.is_tracing_enabledc                    s   | j r| j   d S d S r7   )rr   logr   r>   r>   r?   do_log_stats  s   zAsyncLLM.do_log_statsc                    s   t d | jr| jd S )NzCalled check_health.)rg   debugr   
dead_errorr   r>   r>   r?   check_health  s
   
zAsyncLLM.check_healthc                    B   | j dg}| jd ur|t| jj tj| I d H  d S )NT)rq   profile_asyncr   appendru   	to_threadr  gatherr;   corosr>   r>   r?   start_profile  
   
zAsyncLLM.start_profilec                    r<  )NF)rq   r=  r   r>  ru   r?  r   r@  rA  r>   r>   r?   stop_profile  rD  zAsyncLLM.stop_profilec                    s    | j   | j I d H  d S r7   )ri   clear_mm_cacherq   reset_mm_cache_asyncr   r>   r>   r?   r)    s   
zAsyncLLM.reset_mm_cachereset_running_requestsreset_connectorc                    s   | j ||I d H S r7   )rq   reset_prefix_cache_async)r;   rH  rI  r>   r>   r?   r(    s   
zAsyncLLM.reset_prefix_cachec                    s   | j  I d H  d S r7   )rq   reset_encoder_cache_asyncr   r>   r>   r?   r*    s   zAsyncLLM.reset_encoder_cachelevelc                    sB   |   I d H  | j|I d H  | jd ur| jd| d S d S )NrG   )r(  rq   sleep_asyncrr   record_sleep_state)r;   rL  r>   r>   r?   r    s   
zAsyncLLM.sleeptagsc                    s4   | j |I d H  | jd ur| jdd d S d S )Nr   )rq   wake_up_asyncrr   rN  )r;   rO  r>   r>   r?   wake_up  s
   
zAsyncLLM.wake_upc                    s   | j  I d H S r7   )rq   is_sleeping_asyncr   r>   r>   r?   is_sleeping  s   zAsyncLLM.is_sleepingc                       | j |I dH S )z<Load a new LoRA adapter into the engine for future requests.N)rq   add_lora_async)r;   r   r>   r>   r?   add_lora     zAsyncLLM.add_loralora_idc                    rT  )z&Remove an already loaded LoRA adapter.N)rq   remove_lora_asyncr;   rX  r>   r>   r?   remove_lora  rW  zAsyncLLM.remove_lorac                    s   | j  I dH S )zList all registered adapters.N)rq   list_loras_asyncr   r>   r>   r?   
list_loras  s   zAsyncLLM.list_lorasc                    rT  )z&Prevent an adapter from being evicted.N)rq   pin_lora_asyncrZ  r>   r>   r?   pin_lora  rW  zAsyncLLM.pin_lorar>   methodtimeoutargskwargsc                    s   | j ||||I dH S )zB
        Perform a collective RPC call to the given path.
        N)rq   collective_rpc_async)r;   r`  ra  rb  rc  r>   r>   r?   collective_rpc  s   

zAsyncLLM.collective_rpc,  drain_timeoutc                    sl   t   }t   | |k r.| j std dS td tdI dH  t   | |k std| d)z$Wait for all requests to be drained.z,Engines are idle, requests have been drainedNz;Engines are still running, waiting for requests to drain...rG   zTimeout reached after z' seconds waiting for requests to drain.)timerq   dp_engines_runningrg   rh   ru   r  TimeoutError)r;   rg  
start_timer>   r>   r?   r'    s   



z#AsyncLLM.wait_for_requests_to_drainnew_data_parallel_sizec                    s   | j jj}||krtd| dS td| | |I dH  td| | j|I dH  || j j_||krJ| jrLt	| j t
t|dd| _dS dS dS )a  
        Scale up or down the data parallel size by adding or removing
        engine cores.
        Args:
            new_data_parallel_size: The new number of data parallel workers
            drain_timeout:
                Maximum time to wait for requests to drain (seconds)
        z0Data parallel size is already %s, skipping scaleNz@Waiting for requests to drain before scaling up to %s engines...z?Requests have been drained, proceeding with scale to %s engines)rH   rW   rX   )rH   parallel_configdata_parallel_sizerg   rh   r'  rq   scale_elastic_eprJ   r1   rd   r   rr   )r;   rl  rg  old_data_parallel_sizer>   r>   r?   ro    s4   


zAsyncLLM.scale_elastic_epc                 C   s   | j d u p
| j   S r7   )rz   doner   r>   r>   r?   
is_running  s   zAsyncLLM.is_runningc                 C   s   | j S r7   )r   r   r>   r>   r?   
is_stopped     zAsyncLLM.is_stoppedc                 C   s   | j jjp| j S r7   )rq   	resourcesengine_deadrr  r   r>   r>   r?   r     s   zAsyncLLM.erroredc                 C   s   t  S r7   )r)   r   r>   r>   r?   r:     rt  zAsyncLLM.dead_errorc                    sN   ddl m} t||r|j}n	tdt| | jdd|idI dH  dS )z
        Initialize weight transfer for RL training.

        Args:
            request: Weight transfer initialization request with backend-specific info
        r   )r	   z(Expected WeightTransferInitRequest, got init_weight_transfer_engine	init_inforc  N)%vllm.distributed.weight_transfer.baser	   r   rx  	TypeErrorr   re  )r;   r   r	   init_info_dictr>   r>   r?   rw  $  s   	
z$AsyncLLM.init_weight_transfer_enginec                    sB   t |tr
|j}n	tdt| | jdd|idI dH  dS )z
        Batched weight update for RL training.

        Args:
            request: Weight update request with backend-specific update info
        z*Expected WeightTransferUpdateRequest, got update_weightsupdate_infory  N)r   r
   r~  r{  r   re  )r;   r   update_info_dictr>   r>   r?   r}  :  s   
zAsyncLLM.update_weights)NNNNr   NN)NNNNr   N)F)rU   N)NNr   N)FF)rG   r7   )Nr>   N)rf  )Zr@   rA   rB   r#   ENGINE_CONTEXTr   r   r   r/   rf   r   rd   r0   r   r:   intr9   classmethodr   r   r   r   r   tupler   r   r&   r   r   r   r   r   r   r   floatr   r   r   r-   r   r.   r   r   staticmethodr   r   r   r|   r   r   r'   r+  r.  r/  r   r1  propertyr    rl   r4  r   r5  r6  r8  r;  rC  rE  r)  r(  r*  r  rQ  rS  rV  r[  setr]  r_  re  r'  ro  rr  rs  r   BaseExceptionr:  r	   rw  r
   r}  r>   r>   r>   r?   rF   G   sp   	


 
	


	

 

	

U



nE



B

	
Q








*
rF   )cru   r   r   rh  r   collections.abcr   r   r   r   typingr   rZ   	vllm.envsr  vllmr   vllm.configr   rz  r	   r
   vllm.engine.arg_utilsr   vllm.engine.protocolr   vllm.inputsr   r   vllm.loggerr   vllm.lora.requestr   vllm.multimodalr   r   vllm.outputsr   r   r   vllm.plugins.io_processorsr   vllm.pooling_paramsr   vllm.renderersr   r   vllm.renderers.inputsr   r    vllm.renderers.inputs.preprocessr   vllm.sampling_paramsr   r   
vllm.tasksr   vllm.tokenizersr    vllm.tracingr!   vllm.transformers_utils.configr"   vllm.usage.usage_libr#   vllm.utils.async_utilsr$   vllm.utils.collection_utilsr%   vllm.v1.enginer&   r'   vllm.v1.engine.core_clientr(   vllm.v1.engine.exceptionsr)   r*   vllm.v1.engine.input_processorr+   vllm.v1.engine.output_processorr,   r-    vllm.v1.engine.parallel_samplingr.   vllm.v1.executorr/   vllm.v1.metrics.loggersr0   r1   r2   vllm.v1.metrics.prometheusr3   vllm.v1.metrics.statsr4   r@   rg   rD   r5   rF   r>   r>   r>   r?   <module>   s\   