o
    
۾iM6                     @   s
  d dl mZ d dlmZmZmZ d dlZd dlmZ d dl	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ ered dlmZmZ d dlm Z m!Z! ne"Ze"Ze"Z e"Z!ee#Z$edZ%G dd dZ&G dd dZ'dS )    )Callable)TYPE_CHECKINGAnyTypeVarN)
VllmConfigset_current_vllm_config)init_logger)LoRARequest)MULTIMODAL_REGISTRY)
instrument)resolve_obj_by_qualname)update_environment_variables)KVCacheSpec)
run_method)GrammarOutputSchedulerOutput)AsyncModelRunnerOutputModelRunnerOutput_Rc                   @   sr  e Zd ZdZ	d9dedededededd	fd
dZde	ee
f fddZd:ddZd:ddZd:ddZdededd	fddZd:ddZdejfddZdeejgef defddZdefdd Zd:d!d"Zd#edeeB d	B fd$d%Zd&edeeB fd'd(Zdefd)d*Zd+e defd,d-Z!d.edefd/d0Z"d.edefd1d2Z#de$e fd3d4Z%e&defd5d6Z'd:d7d8Z(d	S );
WorkerBasezWorker interface that allows vLLM to cleanly separate implementations for
    different hardware. Also abstracts control plane communication, e.g., to
    communicate request metadata to other workers.
    Fvllm_config
local_rankrankdistributed_init_methodis_driver_workerreturnNc                 C   s   || _ |j| _|j| _|j| _|j| _|j| _|j| _|j| _|j| _|j	| _	|j
| _
|j| _ddlm} || _|| j_|| _|| _|| _|| _d| _d| _dS )a  
        Initialize common worker components.

        Args:
            vllm_config: Complete vLLM configuration
            local_rank: Local device index
            rank: Global rank in distributed setup
            distributed_init_method: Distributed initialization method
            is_driver_worker: Whether this worker handles driver
                responsibilities
        r   )current_platformN)r   model_configcache_configlora_configload_configparallel_configscheduler_configdevice_configspeculative_configobservability_configkv_transfer_configcompilation_configvllm.platformsr   r   r   r   r   devicemodel_runner)selfr   r   r   r   r   r    r,   N/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/worker/worker_base.py__init__(   s*   
zWorkerBase.__init__c                 C      t )z/Get specifications for KV cache implementation.NotImplementedErrorr+   r,   r,   r-   get_kv_cache_specV      zWorkerBase.get_kv_cache_specc                 C   r/   )z7Prepare model for execution through compilation/warmup.r0   r2   r,   r,   r-   compile_or_warm_up_modelZ   r4   z#WorkerBase.compile_or_warm_up_modelc                 C      dS )z9Basic health check (override for device-specific checks).Nr,   r2   r,   r,   r-   check_health^   r4   zWorkerBase.check_healthc                 C   r/   )zjInitialize device state, such as loading the model or other on-device
        memory allocations.
        r0   r2   r,   r,   r-   init_deviceb      zWorkerBase.init_devicenum_gpu_blocksnum_cpu_blocksc                 C   r/   )z6Initialize the KV cache with the given size in blocks.r0   )r+   r:   r;   r,   r,   r-   initialize_cacheh   r4   zWorkerBase.initialize_cachec                 C   s$   t | jdd }t|r|  d S d S )Nreset_mm_cache)getattrr*   callable)r+   reset_fnr,   r,   r-   r=   l   s   
zWorkerBase.reset_mm_cachec                 C   r/   Nr0   r2   r,   r,   r-   	get_modelq      zWorkerBase.get_modelfnc                 C   s   ||   S )z1Apply a function on the model inside this worker.)rB   )r+   rD   r,   r,   r-   apply_modelt   s   zWorkerBase.apply_modelc                 C   s   ddl m} ||  S )z;Return a transformers-style hierarchical view of the model.r   )format_model_inspection)vllm.model_inspectionrF   rB   )r+   rF   r,   r,   r-   get_model_inspectionx   s   zWorkerBase.get_model_inspectionc                 C   r/   )zLoad model onto target device.r0   r2   r,   r,   r-   
load_model~   r4   zWorkerBase.load_modelscheduler_outputc                 C   r/   )zIf this method returns None, sample_tokens should be called immediately after
        to obtain the ModelRunnerOutput.

        Note that this design may be changed in future if/when structured outputs
        parallelism is re-architected.
        r0   r+   rJ   r,   r,   r-   execute_model   s   	zWorkerBase.execute_modelgrammar_outputc                 C   r/   )zFShould be called immediately after execute_model iff it returned None.r0   )r+   rM   r,   r,   r-   sample_tokens   r9   zWorkerBase.sample_tokensc                 C   r/   )zaReturn the size of a single cache block, in bytes. Used in
        speculative decoding.
        r0   r2   r,   r,   r-   get_cache_block_size_bytes   r9   z%WorkerBase.get_cache_block_size_byteslora_requestc                 C   r/   rA   r0   )r+   rP   r,   r,   r-   add_lora   rC   zWorkerBase.add_loralora_idc                 C   r/   rA   r0   r+   rR   r,   r,   r-   remove_lora   rC   zWorkerBase.remove_lorac                 C   r/   rA   r0   rS   r,   r,   r-   pin_lora   rC   zWorkerBase.pin_lorac                 C   r/   rA   r0   r2   r,   r,   r-   
list_loras   rC   zWorkerBase.list_lorasc                 C   s
   | j  S )z-Get vocabulary size from model configuration.)r   get_vocab_sizer2   r,   r,   r-   
vocab_size   s   
zWorkerBase.vocab_sizec                 C   r6   )z&Clean up resources held by the worker.Nr,   r2   r,   r,   r-   shutdown   r4   zWorkerBase.shutdown)Fr   N))__name__
__module____qualname____doc__r   intstrboolr.   dictr   r3   r5   r7   r8   r<   r=   nnModulerB   r   r   rE   rH   rI   r   r   r   rL   r   rN   rO   r	   rQ   rT   rU   setrV   propertyrX   rY   r,   r,   r,   r-   r   "   sX    
.







r   c                   @   s  e Zd ZdZ		d(dededB ddfddZd)d	d
Zdeeef ddfddZde	ee
e
f  ddfddZeddde	ee
ef  ddfddZde	e ddfddZdd Zde
eB fddZde
fdd Zd!eddfd"d#Zd!edeeB dB fd$d%Zd)d&d'ZdS )*WorkerWrapperBaseao  
    This class represents one process in an executor/engine. It is responsible
    for lazily initializing the worker and handling the worker's lifecycle.
    We first instantiate the WorkerWrapper, which remembers the worker module
    and class name. Then, when we call `update_environment_variables`, and the
    real initialization happens in `init_worker`.
    r   Nrpc_rankglobal_rankr   c                 C   s&   || _ |du r
| j n|| _|  |  dS )a#  
        Initialize the worker wrapper with the given vllm_config and rpc_rank.
        Note: rpc_rank is the rank of the worker in the executor. In most cases,
        it is also the rank of the worker in the distributed group. However,
        when multiple executors work together, they can be different.
        e.g. in the case of SPMD-style offline inference with TP=2,
        users can launch 2 engines/executors, each with only 1 worker.
        All workers have rpc_rank=0, but they have different ranks in the TP
        group.
        N)rh   ri   )r+   rh   ri   r,   r,   r-   r.      s   zWorkerWrapperBase.__init__c                 C   s   | j d ur| j   d S d S rA   )workerrY   r2   r,   r,   r-   rY      s   
zWorkerWrapperBase.shutdownrank_mappingc                 C   s   | j |v r|| j  | _ dS dS )z
        Adjust the rpc_rank based on the given mapping.
        It is only used during the initialization of the executor,
        to adjust the rpc_rank of workers after we create all workers.
        N)rh   )r+   rk   r,   r,   r-   adjust_rank   s   
zWorkerWrapperBase.adjust_rank	envs_listc                 C   s   || j  }t| d S rA   )rh   r   )r+   rm   envsr,   r,   r-   r      s   
z.WorkerWrapperBase.update_environment_variableszWorker init)	span_name
all_kwargsc              	   C   s  || j  }|d}|dusJ d|| _|  ddlm} |  |j}t|jt	r1t
|j}ntd|jrt
|j}g }||jvrt|D ]'}	|	drPqHt||	rcJ d| d	|	 d
| dtt||	ro||	 qH|j|f |_td||| |dd}
|
du rd}|jj}|r|jdkrt|t| d| _nt||
| _t| j |di || _W d   dS 1 sw   Y  dS )z
        Here we inject some common logic before initializing the worker.
        Arguments are passed to the worker class constructor.
        r   Nz0vllm_config is required to initialize the workerr   )load_general_pluginszpassing worker_cls is no longer supported. Please pass keep the class in a separate module and pass the qualified name of the class as a string.__zWorker class z already has an attribute z2, which conflicts with the worker extension class .z8Injected %s into %s for extended collective_rpc calls %sshared_worker_lockzoMissing `shared_worker_lock` argument from executor. This argument is needed for mm_processor_cache_type='shm'.shmr,   ) rh   getr   %enable_trace_function_call_for_threadvllm.pluginsrq   r!   
isinstance
worker_clsr`   r   
ValueErrorworker_extension_cls	__bases__dir
startswithhasattrr?   r>   appendloggerinfopopr   multimodal_configmm_processor_cache_typewarning_oncemm_receiver_cacher
   !worker_receiver_cache_from_configr   rj   )r+   rp   kwargsr   rq   r!   worker_classr|   extended_callsattrrt   msg	mm_configr,   r,   r-   init_worker   s|   






"zWorkerWrapperBase.init_workerkv_cache_configsc                 C   sT   || j  }| jd usJ t| j | j| W d    d S 1 s#w   Y  d S rA   )ri   r   r   rj   initialize_from_config)r+   r   kv_cache_configr,   r,   r-   r   8  s
   
"z(WorkerWrapperBase.initialize_from_configc                 C   sH   | j d usJ t| j  | j  W d    d S 1 sw   Y  d S rA   )r   r   rj   r8   r2   r,   r,   r-   r8   >  s   "zWorkerWrapperBase.init_devicemethodc              
   O   sD   zt | |||W S  ty! } zd|d}t| |d }~ww )NzError executing method z5. This might cause deadlock in distributed execution.)r   	Exceptionr   	exception)r+   r   argsr   er   r,   r,   r-   execute_methodD  s   

z WorkerWrapperBase.execute_methodr   c                 C   s   t | j|S rA   )r>   rj   )r+   r   r,   r,   r-   __getattr__W  s   zWorkerWrapperBase.__getattr__rJ   c                 C   s0   | j }|d u r	d S |jD ]	}||j|_qd S rA   )r   scheduled_new_reqsget_and_update_featuresmm_features)r+   rJ   mm_cachereq_datar,   r,   r-   _apply_mm_cacheZ  s   
z!WorkerWrapperBase._apply_mm_cachec                 C   s   |  | | j|S rA   )r   rj   rL   rK   r,   r,   r-   rL   d  s   
zWorkerWrapperBase.execute_modelc                 C   s$   | j }|d ur|  | j  d S rA   )r   clear_cacherj   r=   )r+   r   r,   r,   r-   r=   k  s   z WorkerWrapperBase.reset_mm_cache)r   NrZ   )r[   r\   r]   r^   r_   r.   rY   rb   rl   listr`   r   r   r   r   r   r8   bytesr   r   r   r   r   r   rL   r=   r,   r,   r,   r-   rg      s>    


	
 U


rg   )(collections.abcr   typingr   r   r   torchtorch.nnrc   vllm.configr   r   vllm.loggerr   vllm.lora.requestr	   vllm.multimodalr
   vllm.tracingr   vllm.utils.import_utilsr   vllm.utils.system_utilsr   vllm.v1.kv_cache_interfacer   vllm.v1.serial_utilsr   vllm.v1.core.sched.outputr   r   vllm.v1.outputsr   r   objectr[   r   r   r   rg   r,   r,   r,   r-   <module>   s2    