o
    ٷi'N                     @   s   d dl mZ d dlmZ d dlZd dlZd dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZ d d	lmZ d d
lmZmZmZ d dlmZ d dlmZ d dlmZ eeZ G dd deZ!G dd deZ"dS )    )Callable)AnyN)init_logger)PoolingRequestOutput)RequestOutputKind)TokenizerLike)EngineCoreOutputEngineCoreRequestFinishReason)OutputProcessor)OutputProcessorOutputRequestOutputCollectorRequestState)ParentRequest)IterationStats)OmniRequestOutputc                       s   e Zd ZdZ fddZdedB dedB ddfdd	Zdd
dZ		dde	e
 dejdB dedB de
eB dB deeef dB dejdB deeB dB f fddZ	dde	e
 dedB de
eB dB dejdB def
 fddZ  ZS )OmniRequestStatezRequest state for omni models, tracking multimodal outputs.

    Extends the base RequestState with support for accumulating
    multimodal tensor outputs (e.g., images, audio, latents) that
    are produced incrementally during generation.
    c                    s"   t  j|i | d | _d | _d S N)super__init__mm_typemm_accumulated)selfargskwargs	__class__ U/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/engine/output_processor.pyr   !   s   
zOmniRequestState.__init__payloadNr   returnc                    s  |d u rd S z|r|pd  | _dd  t|trTi }| jpd}| D ].\}}|dkr/|}n
|dkr9|dkr9|}t|trL fdd| D ||< q$ |||< q$n| jpXd}| |i}| jd u rj|| _W d S | D ]\}}|| jvr}|| j|< qn| j| }t|tjrt|tjr||g| j|< qnt|tjrt|tr|	| qnt|trt|tr| D ]=\}	}
|	|vr|
||	< qt|
tjrt||	 tjr||	 |
g||	< qt|
tjrt||	 tr||	 	|
 q|
||	< qqn|| j|< qnW d S  t
y   td Y d S w )	N c                 S   s@   t | tjrz|  jddd W S  ty   |  Y S w | S )NcpuT)non_blocking)
isinstancetorchTensordetachto
contiguous	Exception)xr   r   r   _to_cpu3   s   z7OmniRequestState.add_multimodal_tensor.<locals>._to_cpuhiddenmodel_outputsc                    s   i | ]\}}t | |qS r   )str).0sksvr,   r   r   
<dictcomp>H   s    z:OmniRequestState.add_multimodal_tensor.<locals>.<dictcomp>z$Error accumulating multimodal tensor)lowerr   r$   dictitemsr   r%   r&   listappendr*   logger	exception)r   r   r   incoming
target_keykvkeyexistingr1   r2   r   r3   r   add_multimodal_tensor+   sZ   









z&OmniRequestState.add_multimodal_tensorc              
   C   s.  | j du rdS z| j  D ]w\}}t|trL|rLt|d tjrLz|dkr'W qtj|dd| j |< W q tyK   t	d| d |d | j |< Y qw t|t
r| D ].\}}t|tr|rt|d tjrztj|dd||< W qU ty   |d ||< Y qUw qUqW dS  ty   td Y dS w )	zKConsolidate accumulated tensor lists into single tensors via concatenation.Nr   audio)dimz#Error concatenating tensor for key z; keeping last tensorz&Error consolidating multimodal tensors)r   r7   r$   r8   r%   r&   catr*   r:   warningr6   r;   )r   r>   r?   r1   r2   r   r   r   _consolidate_multimodal_tensorso   s4   

z0OmniRequestState._consolidate_multimodal_tensorsnew_token_idspooling_outputfinish_reasonstop_reasonkv_transfer_paramsrouted_expertsc                    s  | j du r|durt ||||||S |du}| jtjk}|s$|r$dS |r*|   | jdkra| j dus6J |sK| jdksKt	| j j
| j | jksKdS | jtjkra| j j
| jd }t	| j j
| _| j}	| ||||}
| jdu ru|
g}n| j| j|
\}}|sdS | jj}	| |	|||S )a  Create a request output from generation results.

        Creates a RequestOutput or PoolingRequestOutput from the generated
        tokens and accumulated multimodal outputs. Attaches multimodal
        tensors to the completion output if available.

        Args:
            new_token_ids: List of newly generated token IDs
            pooling_output: Optional pooling output tensor
            finish_reason: Optional finish reason indicating why generation stopped
            stop_reason: Optional stop reason (token ID or stop string)
            kv_transfer_params: Optional KV cache transfer parameters

        Returns:
            OmniRequestOutput or PoolingRequestOutput if output should be
            emitted (based on finish status and output kind), None otherwise
        N   r   )detokenizerr   make_request_outputoutput_kindr   
FINAL_ONLYrH   stream_intervalsent_tokens_offsetlenoutput_token_idsDELTAexternal_req_id_new_completion_output
parent_reqget_outputs
request_id_new_request_output)r   rI   rJ   rK   rL   rM   rN   finished
final_onlyrY   outputoutputsr   r   r   rQ      sD   	


z$OmniRequestState.make_request_output	token_idsc           	         s   t  ||||}z8| jd ur?t|dst|di  t|d}t|tr5| j D ]\}}|||< q)W |S t|d| j W |S W |S  t	yP   t
d Y |S w )Nmultimodal_outputzError in _new_completion_output)r   rZ   r   hasattrsetattrgetattrr$   r6   r7   r*   r:   r;   )	r   rc   rK   rL   rN   base_outputmm_outr>   r?   r   r   r   rZ      s&   




z'OmniRequestState._new_completion_output)r    NNNr   )__name__
__module____qualname____doc__r   r   r/   rB   rH   r8   intr%   r&   r
   r6   npndarrayr   r   rQ   rZ   __classcell__r   r   r   r   r      sD    

D$

U
r   c                       sh  e Zd ZdZ		d/dedB dedededB f fdd	Zd
ede	e
gdf ddfddZ			d0dededB dedB dededB ddfddZ		d1dee
 dedB dedB def fddZde
ddfddZde
ddfdd Zde
ddfd!d"Zde
ddfd#d$Zde
ddfd%d&Zde
ddfd'd(Zde
ddfd)d*Zde
d+eed,f dejdB fd-d.Z  Z S )2MultimodalOutputProcessora  Handles multimodal output processing by normalizing EngineCoreOutput
    before delegating to the base vLLM OutputProcessor.

    Strategy:
    - Route by EngineCoreOutput.output_type when present
      ("image", "text+image", "latents", "text").
    - Fallback to pooling/text heuristics when output_type is absent.
    - Mutate EngineCoreOutput in-place to ensure vLLM's base processor can
      produce the correct RequestOutput/PoolingRequestOutput.
    - Allow custom per-modality handlers via register_handler().
    rO   N	tokenizer	log_statsrT   engine_core_output_typec                    s(   t  j|||d i | _i | _|| _dS )a  Initialize the multimodal output processor.

        Args:
            tokenizer: Tokenizer for detokenizing text outputs
            log_stats: Whether to log statistics
            stream_interval: Stream interval for output generation
            engine_core_output_type: Optional output type specification
                (e.g., "image", "audio", "latents"). Used to route outputs
                to appropriate processors. If None, output type is inferred.
        )rt   ru   rT   N)r   r   output_handlers_reqid_to_mm_typerv   )r   rt   ru   rT   rv   r   r   r   r     s   
z"MultimodalOutputProcessor.__init__modalityhandlerr    c                 C   s   || j | < dS )a  Register a custom handler for a specific modality.

        Allows custom processing logic for specific output modalities.
        The handler is called before default processing for outputs
        matching the specified modality.

        Args:
            modality: Modality name (e.g., "image", "audio", "latents")
            handler: Callable that takes an EngineCoreOutput and processes it
        N)rw   r5   )r   ry   rz   r   r   r   register_handler  s   z*MultimodalOutputProcessor.register_handlerr   requestpromptr[   request_indexqueuec              
   C   s   |j }| j|}|dur| ||| dS tj| j|||||| j| jd}| j	
 r0| j	  || j|< |r=|| j|j < | j|j | dS )a  Add a new request to be processed.

        Creates an OmniRequestState for the request and registers it
        for output processing.

        Args:
            request: Engine core request to add
            prompt: Optional prompt string for the request
            parent_req: Optional parent request for parallel sampling
            request_index: Index of the request in the batch
            queue: Optional queue for collecting outputs

        Raises:
            ValueError: If the request ID is already registered
        N)rt   r|   r}   r[   r~   r   ru   rT   )r]   request_statesget_update_streaming_request_stater   from_new_requestrt   ru   rT   _requests_drainedis_setclearparent_requestsexternal_req_idsrY   r9   )r   r|   r}   r[   r~   r   r]   	req_stater   r   r   add_request&  s*   



z%MultimodalOutputProcessor.add_requestengine_core_outputsengine_core_timestampiteration_statsc                    s   | j   |D ]D}| jpd }|r|| j |j< | | | j|j}|d u s-t|t	s.q|j
d urK|jd urK||j
t|d| jpDd  d |_
qt j|||dS )Nr!   output_type)r   r   )rx   r   rv   r5   r]   _route_and_normalizer   r   r$   r   rJ   rP   rB   rg   r   process_outputs)r   r   r   r   ecor   r   r   r   r   r   T  s*   

z)MultimodalOutputProcessor.process_outputsr   c                 C   s   t |d| jpd }|| jv r)z	| j| | W n ty(   td| Y nw |dkr4| | d S |dv r?| | d S |dv rJ| 	| d S |dv rU| 
| d S |dkr`| | d S |jd url| | d S | | d S )	Nr   r!   z%Error in custom output handler for %simage)z
text+imagez
text,imagez
image+text)latentslatent)rC   speechtext)rg   rv   r5   rw   r*   r:   r;   _process_image_output_process_text_image_output_process_latents_output_process_audio_output_process_text_outputrJ   _process_pooling_output)r   r   r   r   r   r   r   r  s(   

z.MultimodalOutputProcessor._route_and_normalizec                 C   2   |j du r| j|dd}|dur||_ dS dS dS )z>Ensure image tensors are surfaced via pooling_output for vLLM.N)r   imagespixel_valuespixelskeysrJ    _extract_from_multimodal_outputsr   r   tensorr   r   r   r        

z/MultimodalOutputProcessor._process_image_outputc                 C   r   )zkAllow text+image outputs. Text path stays as new_token_ids;
        image/latents route via pooling_output.N)r   r   r   r   r   r   zr   r   r   r   r   r   r     s   

z4MultimodalOutputProcessor._process_text_image_outputc                 C   r   )z6Ensure latent tensors are surfaced via pooling_output.N)r   r   r   	posteriorr   r   r   r   r   r   r     r   z1MultimodalOutputProcessor._process_latents_outputc                 C   r   )z5Ensure audio tensors are surfaced via pooling_output.N)rC   audioswavwaveform	audio_pcmpcmr   r   r   r   r   r   r     s   

z/MultimodalOutputProcessor._process_audio_outputc                 C   s   dS )u=   No-op; base processor will detokenize new_token_ids → text.Nr   r   r   r   r   r   r     s   z.MultimodalOutputProcessor._process_text_outputc                 C   sJ   |j du rdS t|j tjs#z
t|j |_ W dS  ty"   Y dS w dS )z*Optional sanity checks for pooling tensor.N)rJ   r$   r%   r&   	as_tensorr*   r   r   r   r   r     s   
z1MultimodalOutputProcessor._process_pooling_outputr   .c                 C   sh   t |dd }t|tsd S |D ]}||}t|tjr |  S q| D ]}t|tjr1|  S q%d S )Nmultimodal_outputs)rg   r$   r6   r   r%   r&   values)r   r   r   mmr>   r?   r   r   r   r     s   

z:MultimodalOutputProcessor._extract_from_multimodal_outputs)rO   N)Nr   Nrj   )!rk   rl   rm   rn   r   boolro   r/   r   r   r   r{   r	   r   r   r   r8   floatr   r   r   r   r   r   r   r   r   r   tupler%   r&   r   rr   r   r   r   r   rs      sb     
1	,rs   )#collections.abcr   typingr   numpyrp   r%   vllm.loggerr   vllm.outputsr   vllm.sampling_paramsr   vllm.tokenizersr   vllm.v1.enginer   r	   r
   vllm.v1.engine.output_processorr   VLLMOutputProcessorr   r   r    vllm.v1.engine.parallel_samplingr   vllm.v1.metrics.statsr   vllm_omni.outputsr   rk   r:   r   rs   r   r   r   r   <module>   s$     ^