o
    
۾i{                     @   sd  d dl Z d dlmZmZ d dlmZ d dlmZ d dlm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZmZmZmZmZ d dlmZ d d	lmZ d d
lmZmZmZmZ d dlm Z  d dl!m"Z"m#Z#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/ ej0d ddZ1G dd dZ2eG dd dZ3eG dd dZ4G dd dZ5G dd dZ6dS )    N)defaultdictdeque)Iterable)	dataclass)Anycast)LoRARequest)STREAM_FINISHEDCompletionOutputPoolingOutputPoolingRequestOutputRequestOutput)RequestOutputKind)TokenizerLike)SpanAttributesSpanKindextract_trace_contextinstrument_manual)&length_from_prompt_token_ids_or_embeds)EngineCoreOutputEngineCoreRequestFinishReason)IncrementalDetokenizer)LogprobsProcessor)ParentRequest)IterationStatsLoRARequestStatesRequestStateStatsSchedulerStatscpu)devicec                   @   st   e Zd ZdZdedefddZdeeB e	B ddfd	d
Z
deeB fddZdeeB dB fddZdd Zdd ZdS )RequestOutputCollectorz
    Collects streamed RequestOutputs per individual request,
    for hand-off to the consuming asyncio generate task.

    When streaming deltas, RequestOutputs are merged if the
    producer gets ahead of the consumer.
    output_kind
request_idc                 C   s,   |t jk| _|| _d | _t | _d | _d S N)	r   DELTA	aggregater#   outputasyncioEventready_input_stream_task)selfr"   r#    r-   S/home/ubuntu/.local/lib/python3.10/site-packages/vllm/v1/engine/output_processor.py__init__6   s
   

zRequestOutputCollector.__init__r'   returnNc                 C   s|   | j du s
t|tr|| _ | j  dS t| j tr*t|tr*| j j|| jd dS t| j tr:t|tr<|| _ dS dS dS )zNon-blocking put operation.N)r&   )	r'   
isinstance	Exceptionr*   setr   addr&   r   r,   r'   r-   r-   r.   put>   s   
zRequestOutputCollector.putc                    sP   | j  }du r| j I dH  | j  }du sd| _ | j  t|tr&||S )z"Get operation blocks on put event.N)r'   r*   waitclearr1   r2   r5   r-   r-   r.   getN   s   

zRequestOutputCollector.getc                 C   s0   | j }|durd| _ | j  t|tr||S )zNon-blocking get operation.N)r'   r*   r8   r1   r2   r5   r-   r-   r.   
get_nowaitX   s   

z!RequestOutputCollector.get_nowaitc                 C   s   | j d ur
| j   d | _ d S r$   )r+   cancelr,   r-   r-   r.   closeb   s   


zRequestOutputCollector.closec                 C   s,   | j  }d ur| |j d | _ d S d S r$   )r+   get_loopcall_soon_threadsafer;   )r,   taskr-   r-   r.   __del__g   s   
zRequestOutputCollector.__del__)__name__
__module____qualname____doc__r   strr/   r   r   r2   r6   r9   r:   r=   rA   r-   r-   r-   r.   r!   -   s    

r!   c                   @   s*   e Zd ZU eeeB  ed< ee ed< dS )OutputProcessorOutputrequest_outputsreqs_to_abortN)rB   rC   rD   listr   r   __annotations__rF   r-   r-   r-   r.   rG   m   s   
 rG   c                   @   sB   e Zd ZU dZedB ed< ee dB ed< eed< dZ	e
ed< dS )StreamingUpdatezStreaming input update data for output processor.

    Contains the incremental prompt data to be applied to a request state
    when the current sub-request completes.
    Npromptprompt_token_idsarrival_timeFfinal)rB   rC   rD   rE   rF   rK   rJ   intfloatrP   boolr-   r-   r-   r.   rL   s   s   
 rL   c                )   @   s  e Zd Z				d2dedededB dededB ded	edB d
ee dB de	j
dB dedB dedB dedB dededB dedededB dedB dedB def(ddZdeddfddZededB ded	edB dedB dededB dededd fdd Z		d3d!ee d"e	j
dB d#edB d$eeB dB d%eeef dB d&ejdB deeB dB fd'd(Z	d4ded)ee ee  B d*ed%eeef dB deeB f
d+d,Z!	d4d-ee d#edB d$eeB dB d&ejdB def
d.d/Z"d"e	j
de fd0d1Z#dS )5RequestStateNFr#   external_req_id
parent_reqrequest_indexlora_requestr"   rM   rN   prompt_embedslogprobs_processordetokenizermax_tokens_paramrO   queue	log_statsstream_intervaltop_pntemperaturestream_inputc                 C   s   || _ || _|| _|| _|| _|d ur|jnd | _|| _|| _|| _|	| _	t
| j| j	| _|
| _|| _|| _|| _|| _|| _d| _|| _d| _|rOt|dnd | _|| _d| _|| _|rct | _d S d | _d S )NTr   )rO   )r#   rU   rV   rW   rX   	lora_namer"   rM   rN   rY   r   
prompt_lenrZ   r[   r\   r`   ra   rb   is_prefillingr]   num_cached_tokensr   statsr_   sent_tokens_offsetstreaming_inputr   input_chunk_queue)r,   r#   rU   rV   rW   rX   r"   rM   rN   rY   rZ   r[   r\   rO   r]   r^   r_   r`   ra   rb   rc   r-   r-   r.   r/      s<   zRequestState.__init__updater0   c                 C   s   |j  | _|jr| jr| j|j n|j| _| jr"| j|jpd n|jp&g | _| jd us/J t| j| _| jd ur?|j| j_d| _	d S )Nr-   T)
rP   rj   rM   rN   extendlenre   rh   rO   rf   )r,   rl   r-   r-   r.   apply_streaming_update   s   



z#RequestState.apply_streaming_update	tokenizerrequestc	                 C   s  |j  }	r(|	js
d }|	j}
tj||d}tj||d}|	j}|	j}|	j}|	j	}nd }d }d }d }d }d }|j
d us;J |j
j}
|jd usFJ | di d|jd|jd|d|d|jd|
d|d	|jd
|jd|d|d|d|d|d|d|jd|d|d|d|jS )N)rp   rq   r#   rU   rV   rW   rX   r"   rM   rN   rY   rZ   r[   r\   r`   ra   rb   rO   r]   r^   r_   rc   r-   )sampling_params
detokenizer"   r   from_new_requestr   
max_tokensr`   ra   rb   pooling_paramsrU   r#   rX   rN   rY   rO   	resumable)clsrp   rq   rM   rV   rW   r]   r^   r_   rr   r"   rZ   r[   r\   r`   ra   rb   r-   r-   r.   rt      s   
	
zRequestState.from_new_requestnew_token_idspooling_outputfinish_reasonstop_reasonkv_transfer_paramsrouted_expertsc                 C   s  |d u}| j tjk}|s|rd S | jdkrG| jd usJ |s1| jdks1t| jj| j | jks1d S | j tjkrG| jj| jd  }t| jj| _| j	}	|d urY| 
|	| |g|S | ||||}
| jd u rj|
g}n| j| j|
\}}|sxd S | jj	}	| 
|	|||S )N   r   )r"   r   
FINAL_ONLYr_   r[   ri   rn   output_token_idsr%   rU   _new_request_output_new_pooling_output_new_completion_outputrV   get_outputsr#   )r,   ry   rz   r{   r|   r}   r~   finished
final_onlyrU   r'   outputsr-   r-   r.   make_request_output  sJ   	



z RequestState.make_request_outputr   r   c                 C   s   |d }t |tr#t|dksJ | jd usJ t||| j| j|dS | jd us*J | jtj	kr6| j
 }n| jj}| j}|d u rN| jd urNdgt| j }t|| j| j||ttt |||| j| jd
S )Nr   r   )r#   r   rg   rN   r   )
r#   rX   rM   rN   prompt_logprobsr   r   r}   rg   metrics)r1   r   rn   rN   r   rg   rZ   r"   r   r%   pop_prompt_logprobsr   rY   r   rX   rM   r   rJ   r
   rh   )r,   rU   r   r   r}   first_outputr   rN   r-   r-   r.   r   M  s<   
z RequestState._new_request_output	token_idsc           	   
   C   s   | j d usJ | jd usJ |d u}| jtjk}| j ||}|s%| j j}| jj}|r6|r6|t| d  }t	| j
||||| jj|rFt|nd |rM|dS d dS )N)indextextr   r~   logprobscumulative_logprobr{   r|   )r[   rZ   r"   r   r%   get_next_output_textr   r   rn   r
   rW   r   rF   )	r,   r   r{   r|   r~   r   deltar   r   r-   r-   r.   r   y  s,   z#RequestState._new_completion_outputc                 C   s
   t |dS )N)data)r   )r,   rz   r-   r-   r.   r        
z RequestState._new_pooling_output)NNNFNNr$   )$rB   rC   rD   rF   r   rQ   r   r   rJ   torchTensorr   r   rR   r!   rS   r/   rL   ro   classmethodr   r   rt   r   dictr   npndarrayr   r   r   r
   r   r   r   r   r-   r-   r-   r.   rT      s    
	

:	
C


E
1

!rT   c                   @   s|  e Zd ZdZ	d4dedB dedefddZd	d
 ZdefddZ	d5ddZ
defddZdee dedee fddZ			d6dededB dedB dededB ddfddZdedededB ddfd d!Z		d7d"ee d#edB d$edB defd%d&Zdeddfd'd(Zd)edB fd*d+Zd,eded$edB ddfd-d.Zded,ed#edB d$edB fd/d0Z ded1e!dB d$edB fd2d3Z"dS )8OutputProcessorz.Process EngineCoreOutputs into RequestOutputs.r   rp   Nr^   r_   c                 C   sP   || _ || _|| _i | _i | _tt| _t|| _	d| _
t | _| j  d S )NF)r^   rp   r_   request_statesparent_requestsr   rJ   external_req_idsr   lora_statestracing_enabledr(   r)   _requests_drainedr3   )r,   rp   r^   r_   r-   r-   r.   r/     s   


zOutputProcessor.__init__c                 C   s
   t | jS r$   rn   r   r<   r-   r-   r.   get_num_unfinished_requests  r   z+OutputProcessor.get_num_unfinished_requestsr0   c                 C   s   t | jdkS )Nr   r   r<   r-   r-   r.   has_unfinished_requests  s   z'OutputProcessor.has_unfinished_requestsc                    s    | j sd S | j I d H  d S r$   )r   r   r7   r<   r-   r-   r.   wait_for_requests_to_drain  s   z*OutputProcessor.wait_for_requests_to_drainec                 C   s2   | j  D ]\}}|jdusJ |j| qdS )z(Propagate error to all generate() tasks.N)r   itemsr]   r6   )r,   r   _stater-   r-   r.   propagate_error  s   zOutputProcessor.propagate_errorrequest_idsinternalc                 C   sH  g }|D ]3}|r)| | | j| }r(|j}| j| }|| |s(| j|= q| j|g  }r7|| qg }|D ]]}| j|d}|durv| j	||j
 | | |jduru|jg |jdu retndtjddd }	ru|j|	 q<| j| }
r|
jrt|
j}| j|dd}|| | j|d q<| js| j  |S )ah  Abort a list of requests.

        The request_ids may be either external request IDs (those passed to
        InputProcessor.process_inputs()) or internal request IDs (those randomly
        generated when creating the EngineCoreRequest).

        If an external request ID is provided, and that external request ID
        was used for multiple requests, all requests associated with that external
        request ID are aborted.

        In the case of parallel sampling, a request ID may be used to identify
        a parent request, in which case the associated child requests are aborted
        also.
        N)ry   rz   r{   r|   r}   T)r   )appendr   r9   rU   r   removepoprm   r   request_finishedrd   r]   r   r[   EMPTY_CPU_TENSORr   ABORTr6   r   child_requestsrJ   abort_requestsr   r3   )r,   r   r   internal_req_idsr#   	req_staterU   internal_idsrequest_ids_to_abortrequest_outputparent
child_reqsr-   r-   r.   r     sV   









zOutputProcessor.abort_requestsr   rq   rM   rV   rW   r]   c              
   C   s   |j }| j|}|d ur| ||| d S tj| j|||||| j| jd}| j	
 r0| j	  || j|< |r=|| j|j < | j|j | d S )N)rp   rq   rM   rV   rW   r]   r^   r_   )r#   r   r9   _update_streaming_request_staterT   rt   rp   r^   r_   r   is_setr8   r   r   rU   r   )r,   rq   rM   rV   rW   r]   r#   r   r-   r-   r.   add_request  s*   



zOutputProcessor.add_requestr   c                 C   s   |j s*|jdu r| | |jdur|jt dS |jr%d|jd _dS d|_dS t||j	|j
d}|jdu rC|| t |_dS |j| dS )z<Queue a streaming update instead of immediately applying it.NTF)rM   rN   rO   )rw   rk   _finish_requestr]   r6   r	   rP   rj   rL   rN   rO   ro   r   r   )r,   r   rq   rM   rl   r-   r-   r.   r   %  s(   




z/OutputProcessor._update_streaming_request_stateengine_core_outputsengine_core_timestampiteration_statsc              	   C   s  g }g }|D ]}|j }| j|}|du rq| |||| |j}	|j}
|j}|j}|j}|j	}|j
|_
d|_|
du r`|jdusBJ |jdusIJ |j|	|tjk}|rZtj}|}|j| ||	|
|||| }r|jrrd|_|jdur~|j| n|| |dur|jr|jr|j }|| qd|_q| | |js|| | ||| | jr| ||| qt||dS )a  
        Process the EngineCoreOutputs:
        1) Compute stats for logging
        2) Detokenize
        3) Create and handle RequestOutput objects:
            * If there is a queue (for usage with AsyncLLM),
              put the RequestOutput objects into the queue for
              handling by the per-request generate() tasks.

            * If there is no queue (for usage with LLMEngine),
              return a list of RequestOutput objects.

        NOTE FOR DEVELOPERS

        vLLM V1 minimizes the number of python loops over the full
        batch to ensure system overheads are minimized. This is the
        only function that should loop over EngineCoreOutputs.

        If you need to touch every element of the batch, do it from
        within the loop below.
        NF)rH   rI   ) r#   r   r9   _update_stats_from_outputry   rz   r{   r|   r}   r~   rg   rf   r[   rZ   rl   r   STOPupdate_from_outputr   rj   r   r]   r6   r   rk   popleftro   r   _update_stats_from_finishedr   
do_tracingrG   )r,   r   r   r   rH   rI   engine_core_outputreq_idr   ry   rz   r{   r|   r}   r~   stop_stringr   rl   r-   r-   r.   process_outputsF  sx   





zOutputProcessor.process_outputsc                 C   sn   |j }| j| | j|j }|| |s| j|j= |j}|r+|js+| j|j d  | js5| j	
  d S d S r$   )r#   r   r   r   rU   r   rV   r   r   r   r3   )r,   r   r   r   rV   r-   r-   r.   r     s   


zOutputProcessor._finish_requestscheduler_statsc                 C   s   | j | d S r$   )r   update_scheduler_stats)r,   r   r-   r-   r.   r     s   z&OutputProcessor.update_scheduler_statsr   c                 C   s"  |j d usJ |d usJ |j }t|jd }t|j}t|j|j}|j|j }|j	|j
 }	|j|j	 }
|j|j }|j|j	 }tj|jtj|tj|	tj|tj|jtj|
tj|tj|tj|ji	}|jrj|j|tj< |jrs|j|tj< |jr||j|tj< |j r|j |tj!< t"d|||t#j$d d S )Ng    eAllm_request)	span_name
start_time
attributescontextkind)%rh   rQ   rO   r   trace_headersr   rN   rY   iteration_timestampscheduled_ts	queued_tsfirst_token_tslast_token_tsr   "GEN_AI_LATENCY_TIME_TO_FIRST_TOKENfirst_token_latencyGEN_AI_LATENCY_E2EGEN_AI_LATENCY_TIME_IN_QUEUEGEN_AI_USAGE_PROMPT_TOKENSGEN_AI_USAGE_COMPLETION_TOKENSnum_generation_tokens$GEN_AI_LATENCY_TIME_IN_MODEL_PREFILL#GEN_AI_LATENCY_TIME_IN_MODEL_DECODE&GEN_AI_LATENCY_TIME_IN_MODEL_INFERENCEGEN_AI_REQUEST_IDrU   r`   GEN_AI_REQUEST_TOP_Pr\   GEN_AI_REQUEST_MAX_TOKENSrb   GEN_AI_REQUEST_TEMPERATUREra   GEN_AI_REQUEST_Nr   r   SERVER)r,   r   r   r   r   arrival_time_nstrace_contextprompt_lengthe2e_timequeued_timeprefill_timedecode_timeinference_timer   r-   r-   r.   r     sT   

zOutputProcessor.do_tracingc              	   C   sJ   |d u rd S |d usJ |j d usJ ||||j|j|j | j|j d S r$   )rh   r   rf   re   r   rd   )r,   r   r   r   r   r-   r-   r.   r     s   z)OutputProcessor._update_stats_from_outputr{   c                 C   sl   |d u rd S |d usJ |j d usJ |j||j|j|j |jd | j|j|j t	
|j||j j d S )N)r{   num_prompt_tokensr\   	req_statsrg   )rh   update_from_finished_requestre   r\   rg   r   r   r#   rd   r   observe_finished_requestrV   r   )r,   r   r{   r   r-   r-   r.   r     s   z+OutputProcessor._update_stats_from_finished)r   )r0   N)Nr   Nr   )#rB   rC   rD   rE   r   rS   rQ   r/   r   r   r   r2   r   r   rF   rJ   r   r   r   r!   r   rT   r   r   rR   r   rG   r   r   r   r   r   r   r   r   r-   r-   r-   r.   r     s    

D
!
$
p
>
r   )7r(   collectionsr   r   collections.abcr   dataclassesr   typingr   r   numpyr   r   vllm.lora.requestr   vllm.outputsr	   r
   r   r   r   vllm.sampling_paramsr   vllm.tokenizersr   vllm.tracingr   r   r   r   
vllm.utilsr   vllm.v1.enginer   r   r   vllm.v1.engine.detokenizerr   vllm.v1.engine.logprobsr    vllm.v1.engine.parallel_samplingr   vllm.v1.metrics.statsr   r   r   r   emptyr   r!   rG   rL   rT   r   r-   r-   r-   r.   <module>   s8   @  