o
    i?                     @   s  d dl Z d dlmZmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZDmEZE d d lFmGZGmHZH d d!lImJZJ d d"lKmLZL d d#lMmNZN eeOZPe
d$ed%ZQG d&d' d'ZRdS )(    N)CallableMapping)copy)Any)TypeVar)ParallelConfig
VllmConfig)1stateless_destroy_torch_distributed_process_group)get_dp_group)
EngineArgs)
PromptType)init_logger)LoRARequest)MULTIMODAL_REGISTRYMultiModalRegistry)PoolingRequestOutputRequestOutput)get_io_processor)PoolingParams)BaseRenderer)
DictPrompt	TokPrompt)extract_prompt_components)SamplingParams)SupportedTask)TokenizerLike)init_tracer)UsageContext)EngineCoreRequest)EngineCoreClient)InputProcessor)OutputProcessor)ParentRequest)Executor)StatLoggerFactoryStatLoggerManager)Metricget_metrics_snapshot)IterationStats)record_function_or_nullcontext)
WorkerBase_R)defaultc                   @   sD  e Zd ZdZdejdeddfdedee	 de
de
ded	ee dB d
ede
de
ddfddZeejddfdeded	ee dB de
dd f
ddZeejddfdeded	ee dB de
dd f
ddZdefddZde
fddZde
de
fddZedd Zdeed f fd!d"Zdkd#ee d$e
ddfd%d&Z					'	dld(ed)eeB eB e B d*e!e"B d+e#dB d,e$dB d-e%ee&f dB d.e'eef dB d/ed0edB ddfd1d2Z(dee)e*B  fd3d4Z+d5d6 Z,d7d8 Z-d9d: Z.	dmd;e
d<e
de
fd=d>Z/dnd?d@Z0dodBefdCdDZ1dpdEee dB fdFdGZ2de
fdHdIZ3dee4 fdJdKZ5e6de7dB fdLdMZ8de7fdNdOZ9e6de:fdPdQZ;dndRdSZ<dndTdUZ=d,e$de
fdVdWZ>dXede
fdYdZZ?de@e fd[d\ZAdXede
fd]d^ZB		_	dqd`eeCeDgeEf B dae#dB dbedce%ee&f dB deeE f
dddeZFdfeCeGjHgeEf deeE fdgdhZIdidj ZJdS )r	LLMEnginez-Legacy LLMEngine for backwards compatibility.FNvllm_configexecutor_class	log_statsaggregate_engine_loggingusage_contextstat_loggersmm_registryuse_cached_outputsmultiprocess_modereturnc
                 C   s6  || _ |j| _|j| _|j| _|| _|j}
|
j}|
jdko |dk| _|	s2|
jdkr2| js2|
	 | _
nd | _
d| _t| j | _t| j | jj| _t| j| j| j jjd| _| jj}|d uretd| d| j_tj|	d||| jd| _d | _| jrt||||d| _| j  |	s| jjj| _| jrt  j!| _
| "  d S )	N   external_launcherF)r0   stream_intervalzvllm.llm_engineT)r6   asyncio_moder.   r/   r0   )r.   custom_stat_loggersenable_default_loggersr1   )#r.   observability_configmodel_configcache_configr0   parallel_configdistributed_executor_backenddata_parallel_sizeexternal_launcher_dpstateless_init_dp_groupdp_groupshould_execute_dummy_batchr    input_processorr   io_processor_pluginio_processorr!   	tokenizerscheduler_configr:   output_processorotlp_traces_endpointr   tracing_enabledr   make_clientengine_corelogger_managerr%   log_engine_initializedmodel_executorr
   	cpu_groupreset_mm_cache)selfr.   r/   r0   r1   r2   r3   r4   r5   r6   rA   executor_backendendpoint rZ   O/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm/v1/engine/llm_engine.py__init__4   sj   




zLLMEngine.__init__disable_log_statsc                 C   s   | |t || ||tjdS )Nr.   r/   r0   r2   r3   r6   )r#   	get_classenvsVLLM_ENABLE_V1_MULTIPROCESSING)clsr.   r2   r3   r]   rZ   rZ   r[   from_vllm_config   s   zLLMEngine.from_vllm_configengine_argsenable_multiprocessingc                 C   s@   | |}t|}tjrtd d}| |||j |||dS )z0Creates an LLM engine from the engine arguments.z'Enabling multiprocessing for LLMEngine.Tr^   )create_engine_configr#   r_   r`   ra   loggerdebugr]   )rb   rd   r2   r3   re   r.   r/   rZ   rZ   r[   from_engine_args   s   


zLLMEngine.from_engine_argsc                 C   
   | j  S N)rM   get_num_unfinished_requestsrW   rZ   rZ   r[   rl         
z%LLMEngine.get_num_unfinished_requestsc                 C   s,   | j  }| jd u r|p| j S | |S rk   )rM   has_unfinished_requestsrF   rQ   dp_engines_runninghas_unfinished_requests_dp)rW   has_unfinishedrZ   rZ   r[   ro      s   


z!LLMEngine.has_unfinished_requestsrr   c                 C   s    t | j|}|s|rd| _|S NT)r   has_unfinished_dprF   rG   )rW   rr   aggregated_has_unfinishedrZ   rZ   r[   rq      s   z$LLMEngine.has_unfinished_requests_dpc                 C   s   |S rk   rZ   )rb   outputsoutput_typerZ   rZ   r[   validate_outputs   s   zLLMEngine.validate_outputs.c                 C   s   t | ds| j | _| jS )N_supported_tasks)hasattrrQ   get_supported_tasksry   rm   rZ   rZ   r[   r{      s   
zLLMEngine.get_supported_tasksrequest_idsinternalc                 C   s   | j ||}| j| dS )z3Remove request_ids from EngineCore and Detokenizer.N)rM   abort_requestsrQ   )rW   r|   r}   rZ   rZ   r[   abort_request   s   zLLMEngine.abort_requestr   
request_idpromptparamsarrival_timelora_requesttokenization_kwargstrace_headerspriorityprompt_textc
                 C   s:  t |tstdt| t |tr |}
||
jkrtd n |	d u s&J | jj	||||||||| 
 d	}
t| j|\}	}}| j|
 |
j}t |trQ|jnd}|dkrh| j|
|	d d | j|
 d S t|
}t|D ]*}||\}}||d kr|
nt|
}||_||_| j||	|| | j| qpd S )Nz!request_id must be a string, got zAsyncLLM.add_request() was passed a request_id parameter that does not match the EngineCoreRequest.request_id attribute. The latter will be used, and the former will be ignored.)supported_tasksr8   r   )
isinstancestr	TypeErrortyper   r   rg   warning_oncerH   process_inputsr{   r   r?   assign_request_idr   r   nrM   add_requestrQ   r"   rangeget_child_infor   sampling_params)rW   r   r   r   r   r   r   r   r   r   request_r   
parent_reqidxchild_paramschild_requestrZ   rZ   r[   r      sP   


zLLMEngine.add_requestc                 C   sj  | j rd| _ | j  g S td | j }W d    n1 s!w   Y  td" | jr1t nd }| jj|j	|j
|d}| j|j W d    n1 sOw   Y  td | j|j W d    n1 sjw   Y  td7 | jd ur|jd ur| jj|j|| j d |   W d    |jS W d    |jS W d    |jS 1 sw   Y  |jS )NFzllm_engine step: get_outputz llm_engine step: process_outputs)engine_core_timestampiteration_statszllm_engine step: abort_requestszllm_engine step: record_stats)scheduler_statsr   mm_cache_stats)rG   rQ   execute_dummy_batchr)   
get_outputr0   r(   rM   process_outputsrv   	timestampupdate_scheduler_statsr   r~   reqs_to_abortrR   recordrH   stat_mm_cachedo_log_stats_with_intervalrequest_outputs)rW   rv   r   processed_outputsrZ   rZ   r[   step  sJ   







	
	
		zLLMEngine.stepc                 C      | j d d S rs   rQ   profilerm   rZ   rZ   r[   start_profileB     zLLMEngine.start_profilec                 C   r   )NFr   rm   rZ   rZ   r[   stop_profileE  r   zLLMEngine.stop_profilec                 C   s   | j   | j  d S rk   )rH   clear_mm_cacherQ   rV   rm   rZ   rZ   r[   rV   H  s   
zLLMEngine.reset_mm_cachereset_running_requestsreset_connectorc                 C   s   | j ||S rk   )rQ   reset_prefix_cache)rW   r   r   rZ   rZ   r[   r   L  s   zLLMEngine.reset_prefix_cachec                 C   s   | j   dS )zReset the encoder cache to invalidate all cached encoder outputs.

        This should be called when model weights are updated to ensure
        stale vision embeddings computed with old weights are not reused.
        N)rQ   reset_encoder_cacherm   rZ   rZ   r[   r   S  s   zLLMEngine.reset_encoder_cacher8   levelc                 C   s,   | j | | jd ur| jd| d S d S )Nr8   )rQ   sleeprR   record_sleep_state)rW   r   rZ   rZ   r[   r   [     
zLLMEngine.sleeptagsc                 C   s,   | j | | jd ur| jdd d S d S )Nr   )rQ   wake_uprR   r   )rW   r   rZ   rZ   r[   r   a  r   zLLMEngine.wake_upc                 C   rj   rk   )rQ   is_sleepingrm   rZ   rZ   r[   r   g  rn   zLLMEngine.is_sleepingc                 C   s   | j sJ dt S )NzStat logging disabled)r0   r'   rm   rZ   rZ   r[   get_metricsj  s   zLLMEngine.get_metricsc                 C      | j jS rk   )rH   rK   rm   rZ   rZ   r[   rK   n     zLLMEngine.tokenizerc                 C   rj   rk   )rH   get_tokenizerrm   rZ   rZ   r[   r   r  rn   zLLMEngine.get_tokenizerc                 C   r   rk   )rH   rendererrm   rZ   rZ   r[   r   u  r   zLLMEngine.rendererc                 C   s   | j r
| j   dS dS )z Log stats if logging is enabled.N)rR   logrm   rZ   rZ   r[   do_log_statsy  s   zLLMEngine.do_log_statsc                 C   s>   t   }t| ds|| _|| j tjkr|   || _dS dS )z,Log stats when the time interval has passed._last_log_timeN)timerz   r   r`   VLLM_LOG_STATS_INTERVALr   )rW   nowrZ   rZ   r[   r   ~  s   

z$LLMEngine.do_log_stats_with_intervalc                 C      | j |S )z<Load a new LoRA adapter into the engine for future requests.)rQ   add_lora)rW   r   rZ   rZ   r[   r        zLLMEngine.add_loralora_idc                 C   r   )z&Remove an already loaded LoRA adapter.)rQ   remove_lorarW   r   rZ   rZ   r[   r     r   zLLMEngine.remove_lorac                 C   rj   )zList all registered adapters.)rQ   
list_lorasrm   rZ   rZ   r[   r     s   
zLLMEngine.list_lorasc                 C   r   )z&Prevent an adapter from being evicted.)rQ   pin_lorar   rZ   rZ   r[   r     r   zLLMEngine.pin_lorarZ   methodtimeoutargskwargsc                 C   s   | j ||||S rk   )rQ   collective_rpc)rW   r   r   r   r   rZ   rZ   r[   r     s   zLLMEngine.collective_rpcfuncc                 C   s   | j d|fdS )Napply_model)r   )r   )rW   r   rZ   rZ   r[   r     r   zLLMEngine.apply_modelc                 C   s.   t | dd }|d ur| jst| d S d S d S )NrF   )getattrrD   r	   )rW   rF   rZ   rZ   r[   __del__  s   zLLMEngine.__del__)F)NNNNr   N)FF)r7   N)r8   rk   )NrZ   N)K__name__
__module____qualname____doc__r   ENGINE_CONTEXTr   r   r   r#   boollistr$   r   r\   classmethodrc   r   ri   intrl   ro   rq   rx   tupler   r{   r   r   r   r   r   r   r   r   floatr   dictr   r   r   r   r   r   r   r   rV   r   r   r   r   r   r&   r   propertyr   rK   r   r   r   r   r   r   r   setr   r   r   r*   r+   r   nnModuler   r   rZ   rZ   rZ   r[   r-   1   s   
	

V


	

E$



	
"	r-   )Sr   collections.abcr   r   r   typingr   torch.nnr   typing_extensionsr   	vllm.envsr`   vllm.configr   r   vllm.distributedr	   vllm.distributed.parallel_stater
   vllm.engine.arg_utilsr   vllm.inputsr   vllm.loggerr   vllm.lora.requestr   vllm.multimodalr   r   vllm.outputsr   r   vllm.plugins.io_processorsr   vllm.pooling_paramsr   vllm.renderersr   vllm.renderers.inputsr   r    vllm.renderers.inputs.preprocessr   vllm.sampling_paramsr   
vllm.tasksr   vllm.tokenizersr   vllm.tracingr   vllm.usage.usage_libr   vllm.v1.enginer   vllm.v1.engine.core_clientr   vllm.v1.engine.input_processorr    vllm.v1.engine.output_processorr!    vllm.v1.engine.parallel_samplingr"   vllm.v1.executorr#   vllm.v1.metrics.loggersr$   r%   vllm.v1.metrics.readerr&   r'   vllm.v1.metrics.statsr(   vllm.v1.utilsr)   vllm.v1.worker.worker_baser*   r   rg   r+   r-   rZ   rZ   rZ   r[   <module>   sP   