o
    
۾i                    @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	 d dl
mZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZmZ d dlZd dlZd d	lmZmZ d d
lmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3 d dl4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z<m=Z=m>Z>m?Z?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZFmGZGmHZHmIZImJZJmKZKmLZLmMZMmNZN d dlOmPZPmQZQmRZR d dlSmTZT d dlUmVZV d dlWmXZX d dlYmZZZ d d l[m\Z\m]Z] d d!l^m_Z_m`Z` d d"lambZb d d#lcmdZd d d$lemfZg e"ehZid%Zjd&Zked'ZlG d(d) d)ZmG d*d+ d+emZnG d,d- d-enZoG d.d/ d/ZpG d0d1 d1epeoZqG d2d3 d3epenZrdS )4    N)deque)Callable	Generator)Future)	ExitStackcontextmanager)isclass	signature)DEBUG)AnyTypeVarcast)ParallelConfig
VllmConfig)1stateless_destroy_torch_distributed_process_group)enable_envs_cache)init_logger)dump_engine_exception)LoRARequest)MULTIMODAL_REGISTRY)POOLING_TASKSSupportedTask)
instrumentmaybe_init_worker_tracer)(maybe_register_config_serialize_by_value)freeze_gc_heapmaybe_attach_gc_debug_callback)get_hash_fn_by_name)make_zmq_socket)decorate_logsset_process_title)	BlockHash"generate_scheduler_kv_cache_configget_kv_cache_configsget_request_block_hasherinit_none_hash)SchedulerInterface)SchedulerOutput)	EngineCoreOutputEngineCoreOutputsEngineCoreRequestEngineCoreRequestTypeFinishReasonReconfigureDistributedRequestReconfigureRankTypeUtilityOutputUtilityResult)EngineHandshakeMetadataEngineZmqAddressesget_device_indices)Executor)KVCacheConfig)SchedulerStats)ModelRunnerOutput)RequestRequestStatus)MsgpackDecoderMsgpackEncoder)StructuredOutputManager)compute_iteration_details)__version__g      @   _Rc                   @   s  e Zd ZdZ		d^dedee dededB def
d	d
Z	e
dddedeeeef fddZdeedf fddZd_dedefddZdee fddZd`ddZd`ddZedefd d!Zedefd"d#Zdeeeef ef fd$d%Zd&eddfd'd(Zdeeeef dB ef fd)d*Z d+d, Z!d-d. Z"dad0efd1d2Z#d3d4 Z$	dbd5ed6edefd7d8Z%d`d9d:Z&dcd<efd=d>Z'ddd?ee dB fd@dAZ(defdBdCZ)dDdE Z*dFe+defdGdHZ,dIedefdJdKZ-de.e fdLdMZ/dIedefdNdOZ0		dedPedQedB dRedB ddfdSdTZ1		U	dfdVeede2f B dWe3dB dXedYeee4f dB dee2 f
dZd[Z5de6deeef fd\d]Z7dS )g
EngineCorezInner loop of vLLM's Engine.NFvllm_configexecutor_class	log_statsexecutor_fail_callbackinclude_finished_setc                 C   sb  ddl m} |  || _|jjstdt| || _||| _	|d ur)| j	
| d| _| |\}}}	||j_||j_| jd||fd t|| _|j }
t|	jdkrc|jjrctd d|j_|jj|jj |jj }|
||	| j|| j|d	| _|jd u| _| jjd ur| j	 | jj t! | _"}|#|| _$| j% }|d ur| j	& }|ri }|D ]}|d ur|'| q|(| | j	j)| _*d | _+| j*d
krt,d| j* t-| j*d| _+|j.d uo|j.j/| _/|j0j1dk| _2d | _3|jj4s|d urt5|jj6}t7| t8||| _3| j+d u r| j9n| j:| _;|jj<| _<t=j>t?t@   | _Ad| _BtC  tD  tE  d S )Nr   )load_general_pluginsz2Initializing a V1 LLM engine (v%s) with config: %sinitialize_cacheargsz3Disabling chunked prefill for model without KVCacheF)rB   kv_cache_configstructured_output_managerrF   rD   
block_size   z#Batch queue is enabled with size %d)maxlenpooling)Fvllm.pluginsrG   rB   parallel_configdata_parallel_rank_localloggerinfoVLLM_VERSIONrD   model_executorregister_failure_callback!available_gpu_memory_for_kv_cache_initialize_kv_cachescache_confignum_gpu_blocksnum_cpu_blockscollective_rpcr<   rM   scheduler_configget_scheduler_clslenkv_cache_groupsenable_chunked_prefillwarningrN   decode_context_parallel_sizeprefill_context_parallel_size	schedulerspeculative_configuse_spec_decode	connectorinit_kv_output_aggregatorr   mm_registry!engine_receiver_cache_from_configmm_receiver_cacheget_kv_connector#get_kv_connector_handshake_metadataupdateset_xfer_handshake_metadatamax_concurrent_batchesbatch_queue_sizebatch_queuedebugr   ec_transfer_configis_ec_producermodel_configrunner_typeis_pooling_modelrequest_block_hasherenable_prefix_cachingr   prefix_caching_hash_algor%   r$   stepstep_with_batch_queuestep_fnasync_schedulingqueueQueueliststraborts_queue_scheduler_pausedr   r   r   )selfrB   rC   rD   rE   rF   rG   r]   r^   rL   	Schedulerscheduler_block_sizerm   kv_connectorxfer_handshake_metadatacontentworker_dictcaching_hash_fn r   G/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/engine/core.py__init__R   s   	













zEngineCore.__init__zPrepare model	span_namereturnc                 C   s   t   }| j }tdd |D }|rCtjddkr8t| dd }|d us(J t	|d| _
| j
gt| }n| j }|d | _
ndgt| }t|t|ksTJ |jj}t|||}|jj}	|	|krn| jd|	fd	 t|}
|
j}d}| j| t   | }tjd
|dd |||
fS )Nc                 s   s    | ]}|V  qd S Nr   ).0kv_cache_specr   r   r   	<genexpr>   s    z3EngineCore._initialize_kv_caches.<locals>.<genexpr>VLLM_ELASTIC_EP_SCALE_UP_LAUNCH1dp_grouprH   r   update_max_model_lenrJ   zFinit engine (profile, create kv cache, warmup model) took %.2f secondslocal)scope)timerX   get_kv_cache_specsanyosenvirongetgetattrr   sync_kv_cache_memory_sizerZ   rb   determine_available_memoryrz   max_model_lenr#   r_   r"   
num_blocksinitialize_from_configrU   	info_once)r   rB   startkv_cache_specshas_kv_cacher   available_gpu_memorymax_model_len_beforekv_cache_configsmax_model_len_afterscheduler_kv_cache_configr]   r^   elapsedr   r   r   r[      sD   



z EngineCore._initialize_kv_caches.c                 C      | j jS r   )rX   supported_tasksr   r   r   r   get_supported_tasks     zEngineCore.get_supported_tasksr   requestrequest_wavec                 C   s   t |jtstdt|j |j }r.dd |  D }|j|vr.td|jd| |j	dur=| j
 s=td | j
| dS )zAdd request to the scheduler.

        `request_wave`: indicate which wave of requests this is expected to
        belong to in DP case
        z!request_id must be a string, got c                 S   s   g | ]}|t v r|qS r   )r   )r   taskr   r   r   
<listcomp>-  s    z*EngineCore.add_request.<locals>.<listcomp>zUnsupported task: z Supported tasks: NzXGot kv_transfer_params, but no KVConnector found. Disabling KVTransfer for this request.)
isinstance
request_idr   	TypeErrortypepooling_paramsr   r   
ValueErrorkv_transfer_paramsrh   rp   rU   re   add_request)r   r   r   r   supported_pooling_tasksr   r   r   r      s*   



zEngineCore.add_requestrequest_idsc                 C   s   | j |tj dS )z"Abort requests from the scheduler.N)rh   finish_requestsr9   FINISHED_ABORTED)r   r   r   r   r   abort_requestsA  s   zEngineCore.abort_requestsc                 C   
   d| _ dS )zPause the scheduler, keeping requests frozen in queue.

        Requests are kept frozen in queue and can be resumed later.
        TNr   r   r   r   r   pause_schedulerI     
zEngineCore.pause_schedulerc                 C   r   )ziResume the scheduler after a pause.

        Resumes processing of frozen requests in the queue.
        FNr   r   r   r   r   resume_schedulerP  r   zEngineCore.resume_schedulerscheduler_outputc              
   c   s@    zdV  W dS  t y } zt| j|| j  |d}~ww )z3Execute the model and log detailed info on failure.N)	Exceptionr   rB   rh   
make_stats)r   r   errr   r   r   log_error_detailW  s   zEngineCore.log_error_detailc                 c   s    | j jjsd V  d S t| dd| _t|}t }d V  t	d
dt| jdt|jdt|jdt|jdt|jd	tt | d
 ddg |  jd7  _d S )N_iteration_indexr    z
Iteration(z): z context requests, z context tokens, z generation requests, z, generation tokens, iteration elapsed time: i  z.2fz msrO   )rB   observability_config enable_logging_iteration_detailsr   r   r=   r   	monotonicrU   rV   joinr   num_ctx_requestsnum_ctx_tokensnum_generation_requestsnum_generation_tokensformat)r   r   iteration_detailsbeforer   r   r   log_iteration_detailsg  s6   
z EngineCore.log_iteration_detailsc              	   C   s   | j ri dfS | j si dfS | j }| jj|dd}| j|}| |+ | | |	 }|du r=| j
|}W d   n1 sGw   Y  W d   n1 sVw   Y  |   | j||}||jdkfS )zSchedule, execute, and make output.

        Returns tuple of outputs and a flag indicating whether the model
        was executed.
        FT	non_blockNr   )r   rh   has_requestsschedulerX   execute_modelget_grammar_bitmaskr   r   resultsample_tokens_process_aborts_queueupdate_from_outputtotal_num_scheduled_tokens)r   r   futuregrammar_outputmodel_outputengine_core_outputsr   r   r   r     s.   

 
zEngineCore.stepmodel_executedc                 C   sB   | j s| jr|r| j }|d ur| j| d S d S d S d S d S r   )r   rj   rX   take_draft_token_idsrh   update_draft_token_ids)r   r   draft_token_idsr   r   r   	post_step  s   
zEngineCore.post_stepc              	   C   s  | j ri dfS | j}|dusJ t|| jk sJ d}d}| j rv| j }| jj|dd}| j	s7|j
dk}| js<|sDttt |}n|jsV| j|}| jj|dd}n|}|su||||f |rut|| jk ru|d d  sudS n|szdS | \}}}| |- | | | }	|	du r|  td	W d   n1 sw   Y  W d   n1 sw   Y  |   | j||	}
|r| jr| j }|dusJ | j|| | j|}| jj|dd}||||f |
|fS )
a  Schedule and execute batches with the batch queue.
        Note that if nothing to output in this step, None is returned.

        The execution flow is as follows:
        1. Try to schedule a new batch if the batch queue is not full.
        If a new batch is scheduled, directly return an empty engine core
        output. In other words, fulfilling the batch queue has a higher priority
        than getting model outputs.
        2. If there is no new scheduled batch, meaning that the batch queue
        is full or no other requests can be scheduled, we block until the first
        batch in the job queue is finished.
        3. Update the scheduler from the output.
        FNTr   r   rH   NTNFzunexpected error)r   rv   rb   ru   rh   r   r   rX   r   ry   r   r|   r   r   r7    pending_structured_output_tokensr   r   
appendleftdonepopr   r   r   RuntimeErrorr   r   rj   r    update_draft_token_ids_in_output)r   rv   r   deferred_scheduler_outputr   exec_futurer   r   exec_model_futr   r   r   r   r   r   r     s   



 
z EngineCore.step_with_batch_queuec                 C   sX   | j  s*g }| j  s#| j  }|t|tr|fn| | j  r| | d S d S r   )r   empty
get_nowaitextendr   r   r   )r   r   idsr   r   r   r   *  s   



z EngineCore._process_aborts_queuec                 C   s2   | j   | jr| j  | jr| j  d S d S r   )rM   clear_backendrX   shutdownrh   r   r   r   r   r
  4  s   

zEngineCore.shutdownTis_startc                 C      | j | d S r   )rX   profile)r   r  r   r   r   r  ;     zEngineCore.profilec                 C   s6   | j  r
td | jd ur| j  | j  d S )NzcResetting the multi-modal cache when requests are in progress may lead to desynced internal caches.)rh   has_unfinished_requestsrU   re   ro   clear_cacherX   reset_mm_cacher   r   r   r   r  >  s   


zEngineCore.reset_mm_cachereset_running_requestsreset_connectorc                 C   s   | j ||S r   )rh   reset_prefix_cache)r   r  r  r   r   r   r  M  s   zEngineCore.reset_prefix_cachec                 C   s,   | j  r
td | j   | j  dS )a0  Reset the encoder cache to invalidate all cached encoder outputs.

        This should be called when model weights are updated to ensure
        stale vision embeddings computed with old weights are not reused.
        Clears both the scheduler's cache manager and the GPU model runner's cache.
        z_Resetting the encoder cache when requests are in progress may lead to desynced internal caches.N)rh   r  rU   re   reset_encoder_cacherX   r   r   r   r   r  T  s   
	
zEngineCore.reset_encoder_cacherO   levelc                 C   r  r   )rX   sleep)r   r  r   r   r   r  h  r  zEngineCore.sleeptagsc                 C   r  r   )rX   wake_up)r   r  r   r   r   r  k  r  zEngineCore.wake_upc                 C   r   r   )rX   is_sleepingr   r   r   r   r  n  r   zEngineCore.is_sleepingc                 C   s   | j   d S r   )rX   execute_dummy_batchr   r   r   r   r  q  s   zEngineCore.execute_dummy_batchlora_requestc                 C      | j |S r   )rX   add_lora)r   r  r   r   r   r  t     zEngineCore.add_loralora_idc                 C   r  r   )rX   remove_lorar   r   r   r   r   r!  w  r  zEngineCore.remove_lorac                 C   s
   | j  S r   )rX   
list_lorasr   r   r   r   r#  z  s   
zEngineCore.list_lorasc                 C   r  r   )rX   pin_lorar"  r   r   r   r$  }  r  zEngineCore.pin_lorapathpatternmax_sizec                 C   s   | j j|||d d S )N)r%  r&  r'  )rX   save_sharded_state)r   r%  r&  r'  r   r   r   r(    s   
zEngineCore.save_sharded_stater   methodtimeoutrK   kwargsc                 C   s   | j ||||S r   )rX   r_   )r   r)  r*  rK   r+  r   r   r   r_     s   zEngineCore.collective_rpcc                 C   sJ   | j dur|jr| j |j|_t|| j}|jr | j| ||j	fS )zPreprocess the request.

        This function could be directly used in input processing thread to allow
        request initialization running in parallel with Model forward
        N)
ro   mm_featuresget_and_update_featuresr8   from_engine_core_requestr}   use_structured_outputrM   grammar_initcurrent_wave)r   r   reqr   r   r   preprocess_add_request  s   	
z!EngineCore.preprocess_add_requestr   r   )r   N)T)FF)rO   r   )NN)Nr   N)8__name__
__module____qualname____doc__r   r   r4   boolr   r   r   tupleintr5   r[   r   r   r8   r   r   r   r   r   r   r   r'   r   r   dictr)   r   r   r   r   r
  r  r  r  r  r  r  r  r  r   r  r!  setr#  r$  r(  r@   floatr   r_   r*   r3  r   r   r   r   rA   O   s    
 ;!

#

x




	rA   c                       s  e Zd ZdZdZedd	d>ddded	ed
ede	e
 dededB def fddZed
eded	edededB deeddf fddZe	d>dejd
eded	ededededB deeddf fddZe	d>dejd	edededB def
ddZedddded efd!d"Zdefd#d$Zd%d& Zd'd( Zdefd)d*Zd+ed,eddfd-d.Z ed/d0 Z!d1d2 Z"d3e#e d4edB ded5e$j%fd6d7Z&d8e#e d9edB defd:d;Z'd,e(ddfd<d=Z)  Z*S )?EngineCoreProcz9ZMQ-wrapper for running EngineCore in background process.s   ENGINE_CORE_DEADzEngineCoreProc initr   Nr   engine_indexrB   local_clienthandshake_addressrC   rD   client_handshake_addressrA  c                   s  t jtttf    _t jtttf tB    _	 fdd}| _
 j
jddd}	d _ ||	|||}
t|
j _|
jd u _|
j _td j j  joV|jj }| _ | t ||||| t }tj j|
j |
j!|	|fdd	}|"  tj j#|
j|
j j
fdd	 _$ j$"  |j%d
ds|& st'd|
j!d usJ t(d |j%d
drW d    d S W d    d S 1 sw   Y  d S )Nc                      s    j tjdfS )N    )input_queue
put_nowaitr+   EXECUTOR_FAILEDr   r   r   r   <lambda>  s    z)EngineCoreProc.__init__.<locals>.<lambda>   little)length	byteorderFz1Has DP Coordinator: %s, stats publish address: %sT)targetrK   daemon
   r*  z'Input socket thread died during startupz0Waiting for READY message from DP Coordinator...))r   r   r:  r+   r   rF  r;  r)   bytesoutput_queuerA  to_bytesengines_running_perform_handshakesrb   outputsclient_countcoordinator_outputhas_coordinatorfrontend_stats_publish_addressrU   rw   rS   data_parallel_external_lbpublish_dp_lb_stats_init_data_parallelsuperr   	threadingEventThreadprocess_input_socketsinputscoordinator_inputr   process_output_socketsoutput_threadwaitis_aliver   rV   )r   rB   rB  rC  rC   rD   rD  rA  rE   identity	addressesinternal_dp_balancingready_eventinput_thread	__class__r   r   r     s   


	
D"zEngineCoreProc.__init__rj  r   c              	   c   s    t  }|o
|du }| }| |||||||j}	|du r4|	}
|
V  W d   n1 s.w   Y  n=|s8J | |||dd|}|	%}
|}|j|
_|j|
_|
V  W d   n1 s]w   Y  W d   n1 slw   Y  |  dS )a  
        Perform startup handshakes.

        For DP=1 or offline mode, this is with the colocated front-end process.

        For DP>1 with internal load-balancing this is with the shared front-end
        process which may reside on a different node.

        For DP>1 with external or hybrid load-balancing, two handshakes are
        performed:
            - With the rank 0 front-end process which retrieves the
              DP Coordinator ZMQ addresses and DP process group address.
            - With the colocated front-end process which retrieves the
              client input/output socket addresses.
        with the exception of the rank 0 and colocated engines themselves which
        don't require the second handshake.

        Here, "front-end" process can mean the process containing the engine
        core client (which is the API server process in the case the API
        server is not scaled out), OR the launcher process running the
        run_multi_api_server() function in serve.py.
        NTF)zmqContext_perform_handshakerS   rd  rW  __post_init__)r   rC  rj  rB  rB   rD  	input_ctxis_localheadless	handshakerk  local_handshakeclient_addressesr   r   r   rV    s<   	 z"EngineCoreProc._perform_handshakesctxrw  parallel_config_to_updatec                 c   s    t ||tj|ddd9}| ||||}	|	V  |jj}
| j}d|||
|d}|jjdkr4|j	 |d< |
tj| W d    d S 1 sHw   Y  d S )Ni  F)rj  lingerbindREADY)statusr   rw  r]   dp_stats_addressrO   parallel_config_hash)r   rq  DEALERstartup_handshaker\   r]   r[  rS   data_parallel_sizecompute_hashsendmsgspecmsgpackencode)r   r{  rC  rj  rB  rw  rB   r|  handshake_socketrk  r]   r  	ready_msgr   r   r   rs  L  s6   "z!EngineCoreProc._perform_handshaker  rS   c                 C   s   |  tjd||d td | jtd ds"tdt d| 	 }tjj
|td}td	| |d urH|j D ]
\}}t||| q=|jS )
NHELLO)r  r   rw  z(Waiting for init message from front-end.i`  rQ  z7Did not receive response from front-end process within z minutesr   zReceived init message: %s)r  r  r  r  rU   rw   pollHANDSHAKE_TIMEOUT_MINSr   recvdecoder1   rS   itemssetattrrk  )r  rB  rw  rS   
init_bytesinit_messagekeyvaluer   r   r   r  {  s0   
z EngineCoreProc.startup_handshake)dp_ranklocal_dp_rankr  r  c           
   
      s  d t    fdd}ttj| ttj| d}zz|d }|j}|jdkp,| dk}|rE||_tdd	d
|  d tdd|   ntdd	dd td t	  |rm|j
durm|j
j d| |j
_td|j
j | |_|r|jjr| |_t|i |}nd|_d|_d|_t|d| i|}|dusJ |  W n. ty   td   ty }	 z|du rtd |	td |  |	d}	~	ww W |dur|  dS dS |dur|  w w )z2Launch EngineCore busy loop in background process.Fc                    s    sd t  d S r   )
SystemExit)signumframeshutdown_requestedr   r   signal_handler  s   z6EngineCoreProc.run_engine_core.<locals>.signal_handlerNrB   rO   r   vllm.engine_coreengine_coreEngineCore_DPinstrumenting_module_nameprocess_kindprocess_namerA   DP_dpz*Setting kv_transfer_config.engine_id to %srA  EngineCore exiting.zEngineCore failed to start.%EngineCore encountered a fatal error.)r   signalSIGTERMSIGINTrS   r  rT   r   r    r   kv_transfer_config	engine_idrU   rw   data_parallel_indexrz   is_moedata_parallel_rankDPEngineCoreProcdata_parallel_size_localr?  run_busy_loopr  r   	exception_send_engine_deadr
  )
r  r  rK   r+  r  r  rB   rS   data_paralleler   r  r   run_engine_core  sx   



zEngineCoreProc.run_engine_corec                 C   s   d S r   r   )r   rB   r   r   r   r^    s   z"EngineCoreProc._init_data_parallelc                 C   s   	 |    |   q)z!Core busy loop of the EngineCore.)_process_input_queue_process_engine_stepr   r   r   r   r    s   zEngineCoreProc.run_busy_loopc                 C   s   d}| j sS| j sS| jsS| jsS| j r;| jj | jj	
  W d   n1 s*w   Y  ttr;td d}| j }| j|  | j sS| j sS| jsS| jr|rZtd | j sp| j }| j|  | j r_dS dS )z0Exits when an engine step needs to be performed.FNzEngineCore waiting for work.TzEngineCore loop active.)rU  rh   r   rv   r   rF  r  r   mutexr   clearrU   isEnabledForr
   rw   r   _handle_client_requestr  )r   waitedr2  r   r   r   r    s@   









z#EngineCoreProc._process_input_queuec                 C   sT   |   \}}|r| ndD ]}| j| q| | |s(| j r(td |S )z5Called only when there are unfinished local requests.r   gMbP?)	r   r  rS  rG  r   rh   r  r   r  )r   rW  r   outputr   r   r   r     s   

z#EngineCoreProc._process_engine_steprequest_typer   c              
   C   s  |t jkr|\}}| || dS |t jkr| | dS |t jkrq|\}}}}t|}	zt| |}
|
| |
| }t	||	_
W n# tyc } ztd| d| dt| |	_W Y d}~nd}~ww | j|t|	df dS |t jkrztdtd| dS )zDispatch request from client.zInvocation of %s method failedzCall to z method failed: N)utility_outputzExecutor failed.z/Unrecognized input request type encountered: %s)r+   ADDr   ABORTr   UTILITYr/   r   _convert_msgspec_argsr0   r   BaseExceptionrU   r  r   failure_messagerS  rG  r)   rH  r   error)r   r  r   r2  r   
client_idxcall_idmethod_namerK   r  r)  r   r  r   r   r   r  4  s4   




z%EngineCoreProc._handle_client_requestc                 C   sB   |s|S t | j }t|t|ksJ tdd t||D S )ztIf a provided arg type doesn't match corresponding target method
        arg type, try converting to msgspec object.c                 s   sL    | ]!\}}t |jr t|jtjr t||js tj||jd n|V  qdS )r  N)r   
annotation
issubclassr  Structr   convert)r   vpr   r   r   r   \  s    

z7EngineCoreProc._convert_msgspec_args.<locals>.<genexpr>)r	   
parametersvaluesrb   r:  zip)r)  rK   	arg_typesr   r   r   r  T  s   z$EngineCoreProc._convert_msgspec_argsc                 C   s8   | j tj | jjdd | j rtd dS dS )z/Send EngineDead status to the EngineCoreClient.g      @rQ  zNvLLM shutdown signal from EngineCore failed to send. Please report this issue.N)	rS  rG  r?  ENGINE_CORE_DEADrg  r   ri  rU   fatalr   r   r   r   r  e  s   
z EngineCoreProc._send_engine_deadinput_addressescoord_input_addressrm  c                    s  t t}t  }t t   fdd|D }|du r"d}nt |tjdd}|d t	 }	|D ]}
|
d |	
|
tj q:|dur\| dksUJ |	
|tj |  ~	 |	 D ]K\}
}|
jdd
^}}tt|j}|tjkr||}z| |}W n ty   | | Y qfw ||}|tjkr| j| | j||f qfqb1 sw   Y  W d   dS 1 sw   Y  dS )zInput socket IO thread.c                    s&   g | ]} t |tjd dqS )Frj  r~  )enter_contextr   rq  r  )r   input_addressr{  rj  stackr   r   r     s    z8EngineCoreProc.process_input_sockets.<locals>.<listcomp>NFr     rE  s   READYT)copy)r:   r*   r   rq  rr  r  r   XSUBr  PollerregisterPOLLINr  r=  r  recv_multipartr+   rR  bufferr  r  r3  r   _handle_request_preproc_errorr  r   rG  rF  )r   r  r  rj  rm  add_request_decodergeneric_decoderinput_socketscoord_socketpollerinput_socket_
type_framedata_framesr  r2  r   r   r  r   rc  s  sZ   








4z$EngineCoreProc.process_input_socketsoutput_pathscoord_output_pathc                    s  t  }g }tttjttf   }t Չt   fdd|D }|dur4	t
 |tjdddnd}t|d }		 | j }
|
tjkrR|D ]}||
 qInwt|
trYJ |
\}}||_|d	krs|dusjJ ||| q<|r|d	 d
 jr|| d  |r|d	 d
 js||r| nt }|||}|| j|ddd}|jst|dkr|nd}||||f nt||	k r|| q=W d   n1 sw   Y  W d   dS W d   dS 1 sw   Y  dS )zOutput socket IO thread.c              
      s$   g | ]} t |tjd dqS )  )r}  )r  r   rq  PUSH)r   output_pathr{  r  r   r   r     s    z9EngineCoreProc.process_output_sockets.<locals>.<listcomp>NFr  )r~  r}  rO   TrH   r   rJ  )r  track)r;   r   r:  rq  MessageTrackerr   	bytearrayr   rr  r  r   r  rb   rS  r   r?  r  r  r   rR  rA  send_multipartr  r   appendr   encode_intor   )r   r  r  rA  encoderreuse_bufferspendingsocketsr  max_reuse_bufsr  socketclient_indexrW  r  bufferstrackerrefr   r  r   rf    sZ   		


Pz%EngineCoreProc.process_output_socketsc                 C   sD   t d|j | j|jt| j|jht|jg t	j
dgdf dS )zLog and return a request-scoped error response for exceptions raised
        from the add request preprocessing in the input socket processing thread.
        z*Unexpected error pre-processing request %s)r   new_token_idsfinish_reason)rA  finished_requestsrW  N)rU   r  r   rS  rG  r  r)   rA  r(   r,   ERROR)r   r   r   r   r   r  
  s"   z,EngineCoreProc._handle_request_preproc_errorr   )+r5  r6  r7  r8  r  r   r   r9  r   r   r4   r;  r   r   rR  r   r2   rV  rq  rr  r   rs  staticmethodSocketr  r  r^  r  r  r  r+   r   r  r  r  r   r`  ra  rc  rf  r*   r  __classcell__r   r   ro  r   r?    s    		_:		.%U

 

Q
Fr?  c                       s   e Zd ZdZ	d"dedededee dededB f fd	d
Z	defddZ
 fddZd#dedef fddZdededdf fddZdd Zdd ZdedefddZdeddfd d!Z  ZS )$r  zXZMQ-wrapper for running EngineCore in background process
    in a data parallel context.NrB   rB  rC  rC   rD   rD  c              	      sH   |j jsJ dd| _d| _d| _|jj}t j|||||||d d S )Nz3DPEngineCoreProc should only be used for MoE modelsr   r   r   r@  )	rz   r  step_counterr1  last_countsrS   r  r_  r   )r   rB   rB  rC  rC   rD   rD  r  ro  r   r   r   '  s    
	
zDPEngineCoreProc.__init__c                 C   sl   |j j}|j j}|j j}|dksJ |d usJ d|  kr(|  kr(|k s+J  J || _|j  | _d S )NrO   r   )rS   r  r  rT   r  stateless_init_dp_groupr   )r   rB   r  dp_sizer  r   r   r   r^  F  s   &z$DPEngineCoreProc._init_data_parallelc                    s*   t    t| dd  }rt| d S d S )Nr   )r_  r
  r   r   )r   r   ro  r   r   r
  S  s   
zDPEngineCoreProc.shutdownr   r   r   c                    sR   | j r || jkr || jkr|| _n| js | jdt| jdf t || d S )NrH   )
start_wave)rZ  r1  rU  rS  rG  r)   r_  r   )r   r   r   ro  r   r   r   X  s   
zDPEngineCoreProc.add_requestr  r   c                    sf   |t jkr*|\}}|| jkr$|| jkr&|| _| js(td| d| _d S d S d S d S t || d S )Nz*EngineCore starting idle loop for wave %d.T)	r+   START_DP_WAVErA  r1  rU  rU   rw   r_  r  )r   r  r   new_waveexclude_eng_indexro  r   r   r  e  s   



z'DPEngineCoreProc._handle_client_requestc                 C   sV   | j sd S | j }|| jkr)|| _t|| j| jd}| jdt	|df d S d S )N)r  r1  rH   )scheduler_stats)
r]  rh   get_request_countsr  r6   r  r1  rS  rG  r)   )r   countsstatsr   r   r   _maybe_publish_request_countst  s   


z.DPEngineCoreProc._maybe_publish_request_countsc                 C   s   	 |    |  }|   | j }|s|s| jsq |   | || _| jsS| jdks/| j	sIt
d| j | j	r;dnd}| j|t| jdf |  jd7  _d| _q)z8Core busy loop of the EngineCore for data parallel case.Tr   z&Wave %d finished, pausing engine loop.rH   )wave_completerO   )r  r  r  rh   r  rU  r  _has_global_unfinished_reqsr  rZ  rU   rw   r1  rS  rG  r)   r  )r   executedlocal_unfinished_reqsr  r   r   r   r    s4   


zDPEngineCoreProc.run_busy_looplocal_unfinishedc                 C   s.   |  j d7  _ | j d dkrdS t| j|S )NrO       r   T)r  r   has_unfinished_dpr   )r   r$  r   r   r   r!    s   z,DPEngineCoreProc._has_global_unfinished_reqsreconfig_requestc                 C   s   t | j |   | jj}|j}|j|_|jdkr|j|_|j	t
jks%J |j|_|j|_|jdkr;|j| _| | _|j|_| j| |j|kr_| jdksQJ t| j| j | jd |jt
jkrr|   td| j d S td| j d S )NrH   r   compile_or_warm_up_modelzDPEngineCoreProc %s shutdownz4Distributed environment reinitialized for DP rank %s)r   r   r
  rB   rS   r  new_data_parallel_sizenew_data_parallel_rankr  new_data_parallel_rank_localr.   KEEP_CURRENT_RANKnew_data_parallel_master_ipdata_parallel_master_ipnew_data_parallel_master_portdata_parallel_master_portr  r  rX   reinitialize_distributedrZ   r   r   r_   SHUTDOWN_CURRENT_RANKrU   rV   )r   r'  rS   old_dp_sizer   r   r   r2    sD   




z)DPEngineCoreProc.reinitialize_distributedr   r4  )r5  r6  r7  r8  r   r9  r   r   r4   r   r^  r
  r8   r;  r   r+   r   r  r  r  r!  r-   r2  r  r   r   ro  r   r  #  sD    
/r  c                   @   s   e Zd ZdZ		ddedededefddZdedefd	d
Zdedede	fddZ
ede	dededede	dB f
ddZdd Zdd ZdS )EngineCoreActorMixinzE
    Ray actor for running EngineCore in a data parallel context
    r   rB   rk  r  r  c                 C   s:   t ddd| d || _||j_||j_| || d S )Nr  r  DPEngineCoreActor_DPr  )r   rk  rS   r  rT   _set_visible_devices)r   rB   rk  r  r  r   r   r   r     s   zEngineCoreActorMixin.__init__c                 C   s0   ddl m} | rd S |j}| ||| d S )Nr   )current_platform)vllm.platformsr8  is_xpudevice_control_env_var_set_cuda_visible_devices)r   rB   r  r8  r;  r   r   r   r7    s   z)EngineCoreActorMixin._set_visible_devicesr;  c                 C   st   |j j}zt|||}|tj|< W d S  ty9 } ztd| d||  d|d |  dt| d	|d }~ww )NzError setting z: local range: [z, rO   z) base value: "")rS   
world_sizer3   r   r   
IndexErrorr   getenv)r   rB   r  r;  r>  r  r  r   r   r   r<  "  s(   
z.EngineCoreActorMixin._set_cuda_visible_devicesrC  rj  rB  rD  Nc                 c   s    | j V  dS )z
        For Ray, we don't need to actually perform handshake.
        All addresses information is known before the actor creation.
        Therefore, we simply yield these addresses.
        N)rk  )r   rC  rj  rB  rB   rD  r   r   r   rV  4  s   z(EngineCoreActorMixin._perform_handshakesc                 C   s   dS )a   
        Wait until the engine core is initialized.

        This is just an empty method. When ray.get() on this method
        (or any other method of the actor) returns, it is guaranteed
        that actor creation (i.e., __init__) is complete.
        Nr   r   r   r   r   wait_for_initD  s   z"EngineCoreActorMixin.wait_for_initc                 C   sZ   z'z|    W n ty   td   ty    td  w W |   dS |   w )z0
        Run the engine core busy loop.
        r  r  N)r  r  rU   rw   r   r  r
  r   r   r   r   runN  s   

zEngineCoreActorMixin.runr  )r5  r6  r7  r8  r   r2   r;  r   r7  r   r<  r   rR  r9  rV  rA  rB  r   r   r   r   r5    sF    
%

r5  c                   @   @   e Zd ZdZ		ddedededee dede	d	e	fd
dZ
dS )DPMoEEngineCoreActorz'Used for MoE model data parallel cases.r   rB   rB  rk  rC   rD   r  r  c                 C   s2   ||j _t| |||| t| ||d|| d S )Nr   )rS   r  r5  r   r  r   rB   rB  rk  rC   rD   r  r  r   r   r   r   a  s   

zDPMoEEngineCoreActor.__init__Nr  r5  r6  r7  r8  r   r9  r2   r   r4   r;  r   r   r   r   r   rD  ^  &    	rD  c                   @   rC  )EngineCoreActorz%Used for non-MoE and/or non-DP cases.r   rB   rB  rk  rC   rD   r  r  c              	   C   sF   d|j _d|j _d|j _t| |||| tj| ||d|||d d S )NrO   r   r   r@  )rS   r  r  r  r5  r   r?  rE  r   r   r   r   x  s   


zEngineCoreActor.__init__Nr  rF  r   r   r   r   rH  u  rG  rH  )sr   r   r  r`  r   collectionsr   collections.abcr   r   concurrent.futuresr   
contextlibr   r   inspectr   r	   loggingr
   typingr   r   r   r  rq  vllm.configr   r   vllm.distributedr   	vllm.envsr   vllm.loggerr   vllm.logging_utils.dump_inputr   vllm.lora.requestr   vllm.multimodalr   
vllm.tasksr   r   vllm.tracingr   r   vllm.transformers_utils.configr   vllm.utils.gc_utilsr   r   vllm.utils.hashingr   vllm.utils.network_utilsr   vllm.utils.system_utilsr   r    vllm.v1.core.kv_cache_utilsr!   r"   r#   r$   r%   vllm.v1.core.sched.interfacer&   vllm.v1.core.sched.outputr'   vllm.v1.enginer(   r)   r*   r+   r,   r-   r.   r/   r0   vllm.v1.engine.utilsr1   r2   r3   vllm.v1.executorr4   vllm.v1.kv_cache_interfacer5   vllm.v1.metrics.statsr6   vllm.v1.outputsr7   vllm.v1.requestr8   r9   vllm.v1.serial_utilsr:   r;   vllm.v1.structured_outputr<   vllm.v1.utilsr=   vllm.versionr>   rW   r5  rU   POLLING_TIMEOUT_Sr  r@   rA   r?  r  r5  rD  rH  r   r   r   r   <module>   sz   ,    a    { Kq