o
    
۾i4                     @   s\  d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZmZmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm Z m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z(m)Z) d dl*m+Z+ erd dl,m-Z- ee.Z/edZ0eg df Z1G dd deZ2d dl3m4Z5 d dl3m6Z7 e7Z6e5Z4dS )    N)ABCabstractmethod)Callable)Future)cached_property)TYPE_CHECKINGLiteralTypeVaroverload)
VllmConfig)KVOutputAggregator)KVConnectorHandshakeMetadata)init_logger)LoRARequest)SupportedTask)
instrument)resolve_obj_by_qualname)GrammarOutputSchedulerOutput)ReconfigureDistributedRequest)KVCacheConfigKVCacheSpec)DraftTokenIdsModelRunnerOutput)
WorkerBase)KVConnectorBase_Rc                   @   s  e Zd ZU dZdZeed< dZeed< ede	de
d  fddZed	d
de	ddfddZedbddZdee ddfddZdefddZdee fddZdeeeef  fddZe				dcdeeegef B dedB de dedB de!d dee fd d!Z"e				"dddeeegef B dedB de dedB de!d" de#ee  fd#d!Z"e	dcdefd$d!Z"deeee$f  fd%d&Z%e	ded'e&de!d de'dB fd(d)Z(e	"dfd'e&de!d" de#e'dB  fd*d)Z(	ded'e&dede'dB e#e'dB  B fd+d)Z(e	ded,e)dB de!d de'fd-d.Z*e	"dfd,e)dB de!d" de#e' fd/d.Z*	ded,e)dB dede'e#e' B fd0d.Z*dbd1d2Z+de,dB fd3d4Z-e.defd5d6Z/dfd7efd8d9Z0		dgd:ed;edB d<edB ddfd=d>Z1edbd?d@Z2dbdAdBZ3dhdEdFZ4e5de e6dGf fdHdIZ7dJe8defdKdLZ9dMedefdNdOZ:dMedefdPdQZ;de<e fdRdSZ=dbdTdUZ>dbdVdWZ?didYefdZd[Z@djd\ee dB fd]d^ZAd_eBddfd`daZCdS )kExecutorzAbstract base class for vLLM executors."

    An executor is responsible for executing the model on one device,
    or it can be a distributed executor that can execute the model on multiple devices.
    Fuses_raysupports_ppvllm_configreturnc                 C   s   | j }|j}t|trt|tstd| d|}|S |dkr*ddlm} |}|S |dkr8ddl	m
} |}|S |dkrFdd	lm} |}|S |d
krNt}|S t|trft|}t|tsdtd| d|S td| )NzAdistributed_executor_backend must be a subclass of Executor. Got .rayr   )RayDistributedExecutormp)MultiprocExecutoruniUniProcExecutorexternal_launcherz&Unknown distributed executor backend: )parallel_configdistributed_executor_backend
isinstancetype
issubclassr   	TypeErrorvllm.v1.executor.ray_executorr$   #vllm.v1.executor.multiproc_executorr&   !vllm.v1.executor.uniproc_executorr)   ExecutorWithExternalLauncherstrr   
ValueError)r    r+   r,   executor_classr$   r&   r)    r8   M/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/executor/abstract.py	get_class.   sL   



zExecutor.get_classzExecutor init)	span_nameNc                 C   sn   || _ |j| _|j| _|j| _|j| _|j| _|j| _|j| _|j| _|j	| _	| 
  d| _t | _d | _d S )NF)r    model_configcache_configlora_configload_configr+   scheduler_configdevice_configspeculative_configobservability_config_init_executoris_sleepingsetsleeping_tagskv_output_aggregator)selfr    r8   r8   r9   __init__X   s   
zExecutor.__init__c                 C      t NNotImplementedErrorrI   r8   r8   r9   rD   l      zExecutor._init_executorkv_cache_configsc                 C   s   | j d|fd |  d dS )zp
        Initialize the KV caches and begin the model execution loop of the
        underlying workers.
        initialize_from_configargscompile_or_warm_up_modelNcollective_rpc)rI   rQ   r8   r8   r9   rR   p   s   zExecutor.initialize_from_configcallbackc                 C      dS )zk
        Register a function to be called if the executor enters a permanent
        failed state.
        Nr8   )rI   rX   r8   r8   r9   register_failure_callbackx   s   z"Executor.register_failure_callbackc                 C   
   |  dS )Ndetermine_available_memoryrV   rO   r8   r8   r9   r\         
z#Executor.determine_available_memoryc                 C   r[   )Nget_kv_cache_specrV   rO   r8   r8   r9   get_kv_cache_specs   r]   zExecutor.get_kv_cache_specsr8   methodtimeoutrT   kwargs	non_blockc                 C   rY   )a9  
        Execute an RPC call on all workers.

        Args:
            method: Name of the worker method to execute, or a callable that
                is serialized and sent to all workers to execute.

                If the method is a callable, it should accept an additional
                `self` argument, in addition to the arguments passed in `args`
                and `kwargs`. The `self` argument will be the worker object.
            timeout: Maximum time in seconds to wait for execution. Raises a
                [`TimeoutError`][] on timeout. `None` means wait indefinitely.
            args: Positional arguments to pass to the worker method.
            kwargs: Keyword arguments to pass to the worker method.
            non_block: If `True`, returns a list of Futures instead of waiting
                for the results.

        Returns:
            A list containing the results from each worker.

        Note:
            It is recommended to use this API to only pass control messages,
            and set up data-plane communication to pass data.
        Nr8   rI   r`   ra   rT   rb   rc   r8   r8   r9   rW      s   !zExecutor.collective_rpcTc                 C      d S rL   r8   rd   r8   r8   r9   rW      s   	c                 C   rK   rL   rM   rd   r8   r8   r9   rW         c                 C   r[   )N#get_kv_connector_handshake_metadatarV   rO   r8   r8   r9   rg      s   
z,Executor.get_kv_connector_handshake_metadatascheduler_outputc                 C   re   rL   r8   rI   rh   rc   r8   r8   r9   execute_model   rf   zExecutor.execute_modelc                 C   re   rL   r8   ri   r8   r8   r9   rj      rf   c                 C      | j d|f|d}|d S )Nrj   rT   rc   r   rV   )rI   rh   rc   outputr8   r8   r9   rj         grammar_outputc                 C   re   rL   r8   rI   ro   rc   r8   r8   r9   sample_tokens   rf   zExecutor.sample_tokensc                 C   re   rL   r8   rp   r8   r8   r9   rq      rf   c                 C   rk   )Nrq   rl   r   rV   )rI   ro   rc   rm   r8   r8   r9   rq      rn   c                 C   s   |  d d S )Nexecute_dummy_batchrV   rO   r8   r8   r9   rr      s   zExecutor.execute_dummy_batchc                 C      |  d}|d S )Ntake_draft_token_idsr   rV   rI   rm   r8   r8   r9   rt      s   
zExecutor.take_draft_token_idsc                 C   rY   )N   r8   rO   r8   r8   r9   max_concurrent_batches   rP   zExecutor.max_concurrent_batchesis_startc                 C   s   | j d|fd d S )NprofilerS   rV   )rI   rx   r8   r8   r9   ry      s   zExecutor.profilepathpatternmax_sizec                 C   s   | j dt|||dd d S )Nsave_sharded_state)rz   r{   r|   rb   )rW   dict)rI   rz   r{   r|   r8   r8   r9   r}      s   
zExecutor.save_sharded_statec                 C   rK   )zPChecks if the executor is healthy. If not, it should raise an
        exception.rM   rO   r8   r8   r9   check_health   rf   zExecutor.check_healthc                 C      |  d dS )zShutdown the executor.shutdownNrV   rO   r8   r8   r9   r        zExecutor.shutdown	connectorr   c                 C   s   t || jj| _dS )zInit KVOutputAggregatorN)r   from_connectorr+   
world_sizerH   )rI   r   r8   r8   r9   init_kv_output_aggregator	  s   
z"Executor.init_kv_output_aggregator.c                 C   rs   )Nget_supported_tasksr   rV   ru   r8   r8   r9   supported_tasks  s   
zExecutor.supported_taskslora_requestc                 C   s&   |j dks	J dt| jd|fdS )Nr   lora_id must be greater than 0.add_lorarS   )lora_int_idallrW   )rI   r   r8   r8   r9   r     s   zExecutor.add_loralora_idc                 C   $   |dksJ dt | jd|fdS )Nr   r   remove_lorarS   r   rW   rI   r   r8   r8   r9   r        zExecutor.remove_lorac                 C   r   )Nr   r   pin_lorarS   r   r   r8   r8   r9   r     r   zExecutor.pin_lorac                 C   s0   |  d}|D ]}||d ksJ dq|d S )N
list_lorasr   z'All workers should have the same LORAs.rV   )rI   setssr8   r8   r9   r   !  s   
zExecutor.list_lorasc                 C   r   )z+Reset the multi-modal cache in each worker.reset_mm_cacheNrV   rO   r8   r8   r9   r   '  r   zExecutor.reset_mm_cachec                 C   r   )zGReset the encoder cache in each worker to clear cached encoder outputs.reset_encoder_cacheNrV   rO   r8   r8   r9   r   +  r   zExecutor.reset_encoder_cacherv   levelc                 C   s\   | j r
td d S t }| jdt|dd t }ddh| _d| _ td||  d S )	NzExecutor is already sleeping.sleep)r   r~   weightskv_cacheTz$It took %.6f seconds to fall asleep.)	rE   loggerwarningtimeperf_counterrW   r   rG   info)rI   r   time_before_sleeptime_after_sleepr8   r8   r9   r   /  s   

zExecutor.sleeptagsc                 C   s   | j s
td d S |r!|D ]}|| jvr td|| j  d S qt }| jdt|dd t }td|| |d ur?|n| j |rQ|D ]}| j	| qGn| j
  | js^d| _ d S d S )NzExecutor is not sleeping.z!Tag %s is not in sleeping tags %swake_up)r   r~   z(It took %.6f seconds to wake up tags %s.F)rE   r   r   rG   r   r   rW   r   r   removeclear)rI   r   tagtime_before_wakeuptime_after_wakeupr8   r8   r9   r   <  s6   



zExecutor.wake_upreconfig_requestc                 C   rK   rL   rM   )rI   r   r8   r8   r9   reinitialize_distributedW  s   z!Executor.reinitialize_distributed)r!   N)Nr8   NF)Nr8   NT)F)T)NN)r   r   r!   N)rv   rL   )D__name__
__module____qualname____doc__r   bool__annotations__r   staticmethodr   r.   r:   r   rJ   r   rD   listr   rR   FailureCallbackrZ   intr\   r   r5   r   r_   r
   r   r   r   floattupler   rW   r   r   rg   r   r   rj   r   rq   rr   r   rt   propertyrw   ry   r}   r   r   r   r   r   r   r   r   r   r   rF   r   r   r   r   r   r   r   r8   r8   r8   r9   r   $   s*  
 )"












r   )r4   r(   )8r   abcr   r   collections.abcr   concurrent.futuresr   	functoolsr   typingr   r   r	   r
   vllm.configr   /vllm.distributed.kv_transfer.kv_connector.utilsr   1vllm.distributed.kv_transfer.kv_connector.v1.baser   vllm.loggerr   vllm.lora.requestr   
vllm.tasksr   vllm.tracingr   vllm.utils.import_utilsr   vllm.v1.core.sched.outputr   r   vllm.v1.enginer   vllm.v1.kv_cache_interfacer   r   vllm.v1.outputsr   r   vllm.v1.worker.worker_baser   .vllm.distributed.kv_transfer.kv_connector.baser   r   r   r   r   r   r3   r4   _ExecutorWithExternalLauncherr)   _UniProcExecutorr8   r8   r8   r9   <module>   s>     ;