o
    .i3                     @   sP  d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZmZmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dl m!Z! d dl"m#Z#m$Z$ d dl%m&Z&m'Z' d dl(m)Z) erd dl*m+Z+ ee,Z-edZ.eg df Z/G dd deZ0d dl1m2Z3 d dl1m4Z5 e5Z4e3Z2dS )    N)ABCabstractmethod)Callable)Future)cached_property)TYPE_CHECKINGLiteralTypeVaroverload)
VllmConfig)KVOutputAggregator)KVConnectorHandshakeMetadata)init_logger)LoRARequest)SupportedTask)resolve_obj_by_qualname)GrammarOutputSchedulerOutput)ReconfigureDistributedRequest)KVCacheConfigKVCacheSpec)DraftTokenIdsModelRunnerOutput)
WorkerBase)KVConnectorBase_Rc                   @   s  e Zd ZU dZdZeed< dZeed< ede	de
d  fddZde	dd	fd
dZed^ddZdee dd	fddZdefddZdee fddZdeeeef  fddZe						d_deeegef B ded	B deded	B de d dee fddZ!e						 d`deeegef B ded	B deded	B de d  de"ee  fd!dZ!e	d_defd"dZ!deeee#f  fd#d$Z$e	dad%e%de d de&d	B fd&d'Z'e	 dbd%e%de d  de"e&d	B  fd(d'Z'	dad%e%dede&d	B e"e&d	B  B fd)d'Z'e	dad*e(d	B de d de&fd+d,Z)e	 dbd*e(d	B de d  de"e& fd-d,Z)	dad*e(d	B dede&e"e& B fd.d,Z)d^d/d0Z*de+d	B fd1d2Z,e-defd3d4Z.dbd5efd6d7Z/				dcd8ed9ed	B d:ed	B dd	fd;d<Z0ed^d=d>Z1d^d?d@Z2dddCdDZ3e4dee5dEf fdFdGZ6dHe7defdIdJZ8dKedefdLdMZ9dKedefdNdOZ:de;e fdPdQZ<d^dRdSZ=dedUefdVdWZ>dfdXee d	B fdYdZZ?d[e@dd	fd\d]ZAd	S )gExecutorzAbstract base class for vLLM executors."

    An executor is responsible for executing the model on one device,
    or it can be a distributed executor that can execute the model on multiple devices.
    Fuses_raysupports_ppvllm_configreturnc                 C   s   | j }|j}t|trt|tstd| d|}|S |dkr*ddlm} |}|S |dkr8ddl	m
} |}|S |dkrFdd	lm} |}|S |d
krNt}|S t|trft|}t|tsdtd| d|S td| )NzAdistributed_executor_backend must be a subclass of Executor. Got .rayr   )RayDistributedExecutormp)MultiprocExecutoruniUniProcExecutorexternal_launcherz&Unknown distributed executor backend: )parallel_configdistributed_executor_backend
isinstancetype
issubclassr   	TypeErrorvllm.v1.executor.ray_executorr#   #vllm.v1.executor.multiproc_executorr%   !vllm.v1.executor.uniproc_executorr(   ExecutorWithExternalLauncherstrr   
ValueError)r   r*   r+   executor_classr#   r%   r(    r7   V/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/vllm/v1/executor/abstract.py	get_class-   sL   



zExecutor.get_classNc                 C   sn   || _ |j| _|j| _|j| _|j| _|j| _|j| _|j| _|j| _|j	| _	| 
  d| _t | _d | _d S )NF)r   model_configcache_configlora_configload_configr*   scheduler_configdevice_configspeculative_configobservability_config_init_executoris_sleepingsetsleeping_tagskv_output_aggregator)selfr   r7   r7   r8   __init__W   s   
zExecutor.__init__c                 C      t NNotImplementedErrorrG   r7   r7   r8   rB   j      zExecutor._init_executorkv_cache_configsc                 C   s   | j d|fd |  d dS )zp
        Initialize the KV caches and begin the model execution loop of the
        underlying workers.
        initialize_from_configargscompile_or_warm_up_modelNcollective_rpc)rG   rO   r7   r7   r8   rP   n   s   zExecutor.initialize_from_configcallbackc                 C      dS )zk
        Register a function to be called if the executor enters a permanent
        failed state.
        Nr7   )rG   rV   r7   r7   r8   register_failure_callbackv   s   z"Executor.register_failure_callbackc                 C   
   |  dS )Ndetermine_available_memoryrT   rM   r7   r7   r8   rZ   }      
z#Executor.determine_available_memoryc                 C   rY   )Nget_kv_cache_specrT   rM   r7   r7   r8   get_kv_cache_specs   r[   zExecutor.get_kv_cache_specsr7   methodtimeoutrR   kwargs	non_blockc                 C   rW   )a9  
        Execute an RPC call on all workers.

        Args:
            method: Name of the worker method to execute, or a callable that
                is serialized and sent to all workers to execute.

                If the method is a callable, it should accept an additional
                `self` argument, in addition to the arguments passed in `args`
                and `kwargs`. The `self` argument will be the worker object.
            timeout: Maximum time in seconds to wait for execution. Raises a
                [`TimeoutError`][] on timeout. `None` means wait indefinitely.
            args: Positional arguments to pass to the worker method.
            kwargs: Keyword arguments to pass to the worker method.
            non_block: If `True`, returns a list of Futures instead of waiting
                for the results.

        Returns:
            A list containing the results from each worker.

        Note:
            It is recommended to use this API to only pass control messages,
            and set up data-plane communication to pass data.
        Nr7   rG   r^   r_   rR   r`   ra   r7   r7   r8   rU      s   !zExecutor.collective_rpcTc                 C      d S rJ   r7   rb   r7   r7   r8   rU      s   	c                 C   rI   rJ   rK   rb   r7   r7   r8   rU         c                 C   rY   )N#get_kv_connector_handshake_metadatarT   rM   r7   r7   r8   re      s   
z,Executor.get_kv_connector_handshake_metadatascheduler_outputc                 C   rc   rJ   r7   rG   rf   ra   r7   r7   r8   execute_model   rd   zExecutor.execute_modelc                 C   rc   rJ   r7   rg   r7   r7   r8   rh      rd   c                 C      | j d|f|d}|d S )Nrh   rR   ra   r   rT   )rG   rf   ra   outputr7   r7   r8   rh         grammar_outputc                 C   rc   rJ   r7   rG   rm   ra   r7   r7   r8   sample_tokens   rd   zExecutor.sample_tokensc                 C   rc   rJ   r7   rn   r7   r7   r8   ro      rd   c                 C   ri   )Nro   rj   r   rT   )rG   rm   ra   rk   r7   r7   r8   ro      rl   c                 C   s   |  d d S )Nexecute_dummy_batchrT   rM   r7   r7   r8   rp      s   zExecutor.execute_dummy_batchc                 C      |  d}|d S )Ntake_draft_token_idsr   rT   rG   rk   r7   r7   r8   rr      s   
zExecutor.take_draft_token_idsc                 C   rW   )N   r7   rM   r7   r7   r8   max_concurrent_batches   rN   zExecutor.max_concurrent_batchesis_startc                 C   s   | j d|fd d S )NprofilerQ   rT   )rG   rv   r7   r7   r8   rw      s   zExecutor.profilepathpatternmax_sizec                 C   s   | j dt|||dd d S )Nsave_sharded_state)rx   ry   rz   r`   )rU   dict)rG   rx   ry   rz   r7   r7   r8   r{      s   
zExecutor.save_sharded_statec                 C   rI   )zPChecks if the executor is healthy. If not, it should raise an
        exception.rK   rM   r7   r7   r8   check_health   rd   zExecutor.check_healthc                 C      |  d dS )zShutdown the executor.shutdownNrT   rM   r7   r7   r8   r        zExecutor.shutdown	connectorr   c                 C   s   t || jj| _dS )zInit KVOutputAggregatorN)r   from_connectorr*   
world_sizerF   )rG   r   r7   r7   r8   init_kv_output_aggregator  s   
z"Executor.init_kv_output_aggregator.c                 C   rq   )Nget_supported_tasksr   rT   rs   r7   r7   r8   supported_tasks  s   
zExecutor.supported_taskslora_requestc                 C   s&   |j dks	J dt| jd|fdS )Nr   lora_id must be greater than 0.add_lorarQ   )lora_int_idallrU   )rG   r   r7   r7   r8   r     s   zExecutor.add_loralora_idc                 C   $   |dksJ dt | jd|fdS )Nr   r   remove_lorarQ   r   rU   rG   r   r7   r7   r8   r        zExecutor.remove_lorac                 C   r   )Nr   r   pin_lorarQ   r   r   r7   r7   r8   r     r   zExecutor.pin_lorac                 C   s0   |  d}|D ]}||d ksJ dq|d S )N
list_lorasr   z'All workers should have the same LORAs.rT   )rG   setssr7   r7   r8   r     s   
zExecutor.list_lorasc                 C   r   )z+Reset the multi-modal cache in each worker.reset_mm_cacheNrT   rM   r7   r7   r8   r   %  r   zExecutor.reset_mm_cachert   levelc                 C   s\   | j r
td d S t }| jdt|dd t }ddh| _d| _ td||  d S )	NzExecutor is already sleeping.sleep)r   r|   weightskv_cacheTz$It took %.6f seconds to fall asleep.)	rC   loggerwarningtimeperf_counterrU   r}   rE   info)rG   r   time_before_sleeptime_after_sleepr7   r7   r8   r   )  s   

zExecutor.sleeptagsc                 C   s   | j s
td d S |r!|D ]}|| jvr td|| j  d S qt }| jdt|dd t }td|| |d ur?|n| j |rQ|D ]}| j	| qGn| j
  | js^d| _ d S d S )NzExecutor is not sleeping.z!Tag %s is not in sleeping tags %swake_up)r   r|   z(It took %.6f seconds to wake up tags %s.F)rC   r   r   rE   r   r   rU   r}   r   removeclear)rG   r   tagtime_before_wakeuptime_after_wakeupr7   r7   r8   r   6  s6   



zExecutor.wake_upreconfig_requestc                 C   rI   rJ   rK   )rG   r   r7   r7   r8   reinitialize_distributedQ  s   z!Executor.reinitialize_distributed)r    N)Nr7   NF)Nr7   NT)F)T)NN)r   r   r    N)rt   rJ   )B__name__
__module____qualname____doc__r   bool__annotations__r   staticmethodr   r-   r9   rH   r   rB   listr   rP   FailureCallbackrX   intrZ   r}   r4   r   r]   r
   r   r   r   floattupler   rU   r   r   re   r   r   rh   r   ro   rp   r   rr   propertyru   rw   r{   r~   r   r   r   r   r   r   r   r   r   rD   r   r   r   r   r   r   r7   r7   r7   r8   r   #   s&  
 )
"











r   )r3   r'   )6r   abcr   r   collections.abcr   concurrent.futuresr   	functoolsr   typingr   r   r	   r
   vllm.configr   /vllm.distributed.kv_transfer.kv_connector.utilsr   1vllm.distributed.kv_transfer.kv_connector.v1.baser   vllm.loggerr   vllm.lora.requestr   
vllm.tasksr   vllm.utils.import_utilsr   vllm.v1.core.sched.outputr   r   vllm.v1.enginer   vllm.v1.kv_cache_interfacer   r   vllm.v1.outputsr   r   vllm.v1.worker.worker_baser   .vllm.distributed.kv_transfer.kv_connector.baser   r   r   r   r   r   r2   r3   _ExecutorWithExternalLauncherr(   _UniProcExecutorr7   r7   r7   r8   <module>   s<     6