o
    
۾i=                  	   @   s>  d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d d	lmZ d d
lmZmZmZ d dl m!Z!m"Z"m#Z# d dl$m%Z% d dl&m'Z' erd dl(Z)d dl*m+Z+ d dl,m-Z-m.Z. ee/Z0edZ1G dd dee1 eZ2G dd dZ3d:de4de5de6de5fddZ7G dd dZ8		d;de8ded dB d d!ddfd"d#Z9d$e:e fd%d&Z;d'ej<d(ej<d)e6dej<fd*d+Z=ej>fd,eddfd-d.Z?da@d/e5defd0d1ZAd2ej<deBfd3d4ZCe
G d5d6 d6ZDd7e'deDfd8d9ZEdS )<    N)CallableSequence)AbstractContextManager)	dataclass)
connection)BaseProcess)TYPE_CHECKINGAnyGenericTypeVarUnionoverload)record_function)init_logger)UsageContextis_usage_stats_enabledusage_message)get_open_portget_open_zmq_ipc_pathget_tcp_uri)kill_process_tree)SchedulerOutput)DPCoordinatorCoreEngineActorManagerCoreEngineProcManagerTc                	   @   sD  e Zd Zdee ddfddZdd Zdd	 Zd
d Zdd Z	dd Z
dd Zd.dedededB defddZededefddZededee fddZdeeB deee B fddZededefddZededefd dZdeeB deee B fd!dZd"d# Zd$d% Zd&d' Zd(d) Zd*d+ Zdee fd,d-ZdS )/ConstantListxreturnNc                 C   s
   || _ d S N_x)selfr    r$   A/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/utils.py__init__,      
zConstantList.__init__c                 C      t d)Nz Cannot append to a constant list	TypeErrorr#   itemr$   r$   r%   append/      zConstantList.appendc                 C   r(   )NzCannot extend a constant listr)   r+   r$   r$   r%   extend2   r.   zConstantList.extendc                 C   r(   )Nz"Cannot insert into a constant listr)   r+   r$   r$   r%   insert5   r.   zConstantList.insertc                 C   r(   )NzCannot pop from a constant listr)   r+   r$   r$   r%   pop8   r.   zConstantList.popc                 C   r(   )Nz"Cannot remove from a constant listr)   r+   r$   r$   r%   remove;   r.   zConstantList.removec                 C   r(   )NzCannot clear a constant listr)   r#   r$   r$   r%   clear>   r.   zConstantList.clearr   r,   startstopc                 C   s$   | j |||d ur|S t| j S r    )r"   indexlen)r#   r,   r5   r6   r$   r$   r%   r7   A   s   $zConstantList.indexc                 C      d S r    r$   r+   r$   r$   r%   __getitem__D      zConstantList.__getitem__sc                C   r9   r    r$   )r#   r<   r$   r$   r%   r:   G   r;   c                 C   s
   | j | S r    r!   r+   r$   r$   r%   r:   J   r'   valuec                 C   r9   r    r$   r#   r,   r=   r$   r$   r%   __setitem__M   r;   zConstantList.__setitem__c                C   r9   r    r$   )r#   r<   r=   r$   r$   r%   r?   P   r;   c                 C   r(   )Nz"Cannot set item in a constant listr)   r>   r$   r$   r%   r?   S   r.   c                 C   r(   )Nz'Cannot delete item from a constant listr)   r+   r$   r$   r%   __delitem__V   r.   zConstantList.__delitem__c                 C   
   t | jS r    )iterr"   r3   r$   r$   r%   __iter__Y   r'   zConstantList.__iter__c                 C   s
   || j v S r    r!   r+   r$   r$   r%   __contains__\   r'   zConstantList.__contains__c                 C   rA   r    )r8   r"   r3   r$   r$   r%   __len___   r'   zConstantList.__len__c                 C   s   d| j  dS )NzConstantList()r!   r3   r$   r$   r%   __repr__b   s   zConstantList.__repr__c                 C   s
   | j  S r    )r"   copyr3   r$   r$   r%   rH   e   r'   zConstantList.copy)r   N)__name__
__module____qualname__listr   r&   r-   r/   r0   r1   r2   r4   intr7   r   r:   slicer?   r@   rC   rD   rE   rG   rH   r$   r$   r$   r%   r   +   s2     r   c                   @   sv   e Zd ZdZdddeejB dejdejde	de	d	d
fddZ
dded
B d	ejfddZdded
B d	ejfddZd
S )CpuGpuBufferz2Buffer to easily copy tensors between CPU and GPU.T)
with_numpysizedtypedevice
pin_memoryrP   r   Nc                G   sV   t j||d|d| _t j| j|d| _|  |r)|t jkr!td| j | _d S d S )Ncpu)rR   rS   rT   )rS   zkBfloat16 torch tensors cannot be directly cast to a numpy array, so call CpuGpuBuffer with with_numpy=False)	torchzerosrU   
zeros_likegpubfloat16
ValueErrornumpynp)r#   rR   rS   rT   rP   rQ   r$   r$   r%   r&   l   s   
zCpuGpuBuffer.__init__nc                 C   s<   |d u r| j j| jddS | j d | j| jd | ddS )NTnon_blocking)rY   copy_rU   r#   r^   r$   r$   r%   copy_to_gpu   s   "zCpuGpuBuffer.copy_to_gpuc                 C   s<   |du r| j j| jddS | j d| j| jd| ddS )zzNOTE: Because this method is non-blocking, explicit synchronization
        is needed to ensure the data is copied to CPU.NTr_   )rU   ra   rY   rb   r$   r$   r%   copy_to_cpu   s   "zCpuGpuBuffer.copy_to_cpur    )rI   rJ   rK   __doc__rM   rV   SymIntrR   rS   boolr&   Tensorrc   rd   r$   r$   r$   r%   rO   i   s$    
rO   
local_onlyhostportr   c                 C   s   | rt  S t||pt S )a  Assign a new ZMQ socket address.

    If local_only is True, participants are colocated and so a unique IPC
    address will be returned.

    Otherwise, the provided host and port will be used to construct a TCP
    address (port == 0 means assign an available port).)r   r   r   )ri   rj   rk   r$   r$   r%   get_engine_client_zmq_addr   s
   rl   c                   @   sV   e Zd ZdZ	ddedededejde	de
e d	e
e d
edB fddZdddZdS )APIServerProcessManagerzManages a group of API server processes.

    Handles creation, monitoring, and termination of API server worker
    processes. Also monitors extra processes to check if they are healthy.
    Ntarget_server_fnlisten_addresssockargsnum_serversinput_addressesoutput_addressesstats_update_addressc	                 C   s   || _ || _|| _td}	g | _tt|||D ]-\}
}}||||
d}|dur-||d< |	j|d|
 ||||fd}| j	| |
  qtdt| j t| t| j| _dS )a7  Initialize and start API server worker processes.

        Args:
            target_server_fn: Function to call for each API server process
            listen_address: Address to listen for client connections
            sock: Socket for client connections
            args: Command line arguments
            num_servers: Number of API server processes to start
            input_addresses: Input addresses for each API server
            output_addresses: Output addresses for each API server
            stats_update_address: Optional stats update address
        spawn)input_addressoutput_addressclient_countclient_indexNru   
ApiServer_)targetnamerq   zStarted %d API server processes)ro   rp   rq   multiprocessingget_context	processesziprangeProcessr-   r5   loggerinfor8   weakreffinalizeshutdown
_finalizer)r#   rn   ro   rp   rq   rr   rs   rt   ru   spawn_contextiin_addrout_addrclient_configprocr$   r$   r%   r&      s0   



z APIServerProcessManager.__init__r   c                 C   s   |    d S r    )r   r3   r$   r$   r%   close   s   zAPIServerProcessManager.closer    )r   N)rI   rJ   rK   re   r   strr	   argparse	NamespacerM   rL   r&   r   r$   r$   r$   r%   rm      s*    	
9rm   api_server_managerengine_manager)r   r   coordinatorzDPCoordinator | Nonec              
   C   s  ddl m}m} zzqtd dd | jD }|r |j||jj< g }t||r3|jD ]}|||j< q*n	t||r<|	 }|s@|ryt
j|dd}|D ]}	||	}|jdkretd|j d	|j d
|j qI|ruddl}
|
j|dd\}}|s@|s@W n" ty   td Y n ty } z	tdt|  d}~ww W td |   |r|  |r|  dS dS td |   |r|  |r|  w w )a  Wait for all processes to complete or detect if any fail.

    Raises an exception if any process exits with a non-zero status.

    Args:
        api_server_manager: The manager for API servers.
        engine_manager: The manager for engine processes.
            If CoreEngineProcManager, it manages local engines;
            if CoreEngineActorManager, it manages all engines.
        coordinator: The coordinator for data parallel.
    r   r   z'Waiting for API servers to complete ...c                 S   s   i | ]}|j |qS r$   )sentinel).0r   r$   r$   r%   
<dictcomp>   s    z2wait_for_completion_or_failure.<locals>.<dictcomp>   )timeoutzProcess z (PID: z) died with exit code Nz8Received KeyboardInterrupt, shutting down API servers...z0Exception occurred while running API servers: %sz#Terminating remaining processes ...)vllm.v1.engine.utilsr   r   r   r   r   r   r   
isinstanceget_run_refsr   waitr1   exitcodeRuntimeErrorr}   pidrayKeyboardInterrupt	Exception	exceptionr   r   )r   r   r   r   r   sentinel_to_procactor_run_refsr   ready_sentinelsr   r   _er$   r$   r%   wait_for_completion_or_failure   sj   








r   procsc                 C   s   | D ]
}|  r|  qt d }| D ]}|t  }|dkr# n
|  r,|| q| D ]}|  r@|j }d ur@t| q/d S )Nr   r   )is_alive	terminatetime	monotonicjoinr   r   )r   r   deadline	remainingr   r$   r$   r%   r   .  s"   
r   from_tensor	to_tensorlengthc                 C   s   |d| j | d| ddS )z
    Copy the first length elements of a tensor into another tensor in a
    non-blocking manner.

    Used to copy pinned CPU tensor data to pre-allocated GPU tensors.

    Returns the sliced target tensor.
    NTr_   )ra   )r   r   r   r$   r$   r%   
copy_sliceB  s   r   usage_contextc                 C   s   t  sdS ddlm} | j}d}| jdur| jj}tj|| j|i dt	| jj
d| jjd| jjd| jjd| jjd	t	| jjd
t| jd| jjd| jjd|jd|jd|jd|jd|jd|jd|d dS )z#Report usage statistics if enabled.Nr   )get_architecture_class_namerR   
block_sizegpu_memory_utilizationkv_cache_memory_bytesquantizationkv_cache_dtypeenable_loraenable_prefix_cachingenforce_eagerdisable_custom_all_reducetensor_parallel_sizedata_parallel_sizepipeline_parallel_sizeenable_expert_parallelall2all_backendkv_connector)	extra_kvs)r    vllm.model_executor.model_loaderr   parallel_configkv_transfer_configr   r   report_usagemodel_configr   rR   cache_configr   r   r   r   cache_dtyperg   lora_configr   r   r   r   r   r   r   r   )vllm_configr   r   r   r   r$   r$   r%   report_usage_statsP  sX   



r   r}   c                 C   sB   t d urt | S tj}tjrt}n
tjrdd l}|j}|a || S )Nr   )	_PROFILER_FUNC
contextlibnullcontextenvs VLLM_CUSTOM_SCOPES_FOR_PROFILINGr   VLLM_NVTX_SCOPES_FOR_PROFILINGnvtxannotate)r}   funcr   r$   r$   r%   record_function_or_nullcontext  s   r   tensorc                 C   s   |    tj jS )zGet the raw data of a tensor as a uint8 memoryview, useful for
    serializing and hashing.

    Args:
        tensor: The input tensor.

    Returns:
        A memoryview of the tensor data as uint8.
    )flatten
contiguousviewrV   uint8r\   data)r   r$   r$   r%   tensor_data  s   
r   c                   @   s<   e Zd ZU eed< eed< eed< eed< defddZdS )	IterationDetailsnum_ctx_requestsnum_ctx_tokensnum_generation_requestsnum_generation_tokensr   c              	   C   s&   d| j  d| j d| j d| j d	S )Nz"IterationDetails(num_ctx_requests=z!,                 num_ctx_tokens=z+,                  num_generation_requests=z),                  num_generation_tokens=rF   )r   r   r   r   r3   r$   r$   r%   rG     s   
zIterationDetails.__repr__N)rI   rJ   rK   rM   __annotations__r   rG   r$   r$   r$   r%   r     s   
 r   scheduler_outputc                 C   sx   d}d}d}d}dd | j D }| j D ]\}}| j|s#||v r,|d7 }||7 }q|d7 }||7 }qt||||S )a  
    Compute the number of context/generation requests and tokens
    for the current iteration's scheduler output. A requests is regarded
    as a context request if its output tokens are still 0, an extended chunk
    of chunked prefill falls into this category.

    Args:
        scheduler_output: The scheduler output for the current iteration.

    Returns:
        An IterationDetails object containing the number of
        context/generation requests and tokens.
    r   c                 S   s   h | ]}|j qS r$   )req_id)r   new_reqr$   r$   r%   	<setcomp>  s    z,compute_iteration_details.<locals>.<setcomp>   )scheduled_new_reqsnum_scheduled_tokensitemsscheduled_cached_reqsis_context_phaser   )r   num_context_requestsnum_context_tokensr   r   new_req_idsr   
num_tokensr$   r$   r%   compute_iteration_details  s$   

r   )r   )NN)Fr   r   r~   r   r   collections.abcr   r   r   dataclassesr   r   multiprocessing.processr   typingr   r	   r
   r   r   r   rV   torch.autograd.profilerr   	vllm.envsr   vllm.loggerr   vllm.usage.usage_libr   r   r   vllm.utils.network_utilsr   r   r   vllm.utils.system_utilsr   vllm.v1.core.sched.outputr   r\   r]   vllm.v1.engine.coordinatorr   r   r   r   rI   r   r   r   rO   rg   r   rM   rl   rm   r   rL   r   rh   r   ENGINE_CONTEXTr   r   r   
memoryviewr   r   r   r$   r$   r$   r%   <module>   s    	>&G
K

/