o
    ٷi1                     @   s  d dl Z d dlmZ d dlmZmZ d dlZd dlZd dl	m
Z
 d dlmZmZ d dlmZ d dlmZ d dlmZmZ d d	lmZmZ zd d
lmZ W n eyd   d dlmZ edefddZY nw d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 ee:Z;G dd de0Z<dS )    N)Mapping)Anycast)
VllmConfig)ProcessorInputs
PromptType)split_enc_dec_inputs)init_logger)MULTIMODAL_REGISTRYMultiModalRegistry)MultiModalFeatureSpecMultiModalUUIDDict)set_request_id)contextmanager_request_idc                 c   s    d V  d S )N )r   r   r   T/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/engine/input_processor.pyr      s   
r   )argsort_mm_positions)current_platform)PoolingParams)
DictPrompt	TokPrompt)SamplingParams)SupportedTask)&length_from_prompt_token_ids_or_embeds)set_default_torch_num_threads)InputProcessor)AdditionalInformationEntryAdditionalInformationPayloadOmniEngineCoreRequestPromptEmbedsPayload)OmniInputPreprocessor)LoRARequestc                       s   e Zd ZdZedejdefddZe	fde
def fdd	Z	
	
	
	
		
	
	ddedeeB eB deeB ded
B ded
B deeef d
B deeef d
B deded
B deedf d
B dedefddZededejfddZ  Z S )OmniInputProcessora  Processor for omni models, handling multimodal inputs and embeddings.

    Extends the base vLLM Processor with support for processing prompt
    embeddings and additional information payloads, enabling direct transfer
    of pre-computed embeddings between pipeline stages.

    Args:
        vllm_config: Global vLLM configuration
        mm_registry: Multi-modal registry for processing multimodal inputs
    dtypereturnc                 C   s   i t jdt jdt jdt jdt jdt jdt jdt jdt j	dt j
dt jdt jdt jdt jdt jd	t jd
}|| t| ddS )zConvert torch dtype to string representation.

        Args:
            dtype: PyTorch dtype to convert

        Returns:
            String representation of the dtype (e.g., "float32", "int64")
        float32float16bfloat16float64int64int32int16int8uint8boolztorch. )torchr&   floatr'   halfr(   r)   doubler*   longr+   intr,   shortr-   r.   r/   getstrreplace)r$   mappingr   r   r   _dtype_to_name:   sD   
	
z!OmniInputProcessor._dtype_to_namevllm_configmm_registryc                    s*   t  || t| j|j|| jd| _d S )N)mm_processor_cache)super__init__r!   model_configobservability_configr?   input_preprocessor)selfr=   r>   	__class__r   r   rA   X   s   zOmniInputProcessor.__init__Nr   F
request_idpromptparamsarrival_timelora_requesttokenization_kwargstrace_headersprioritydata_parallel_ranksupported_tasks.	resumablec           +      C   s  |  | | ||
 | jj}|j}|j}|jr|n|}|	dur6d|	  kr*|k s6n td|	 d| d|du r>t }| j	j
rT| j	j
jdkrT| jjsT| ||}n| | t|trittdB |d}nd}t|$ t  | jj|||d}W d   n1 sw   Y  W d   n1 sw   Y  tj|||d | j }t|\}}| || |d	 d
krd}|d }n	|d }|d}d}d}t|tr| }|j du rt!||}| j	j"| |_ |#| j$| | j%dur|&| j% n| }d}|d	 dkr@|d }|d }|d }t'|}g }|D ]#\}} || |  }!|(t)|| |  || *|!||| |  |!d qt|t+rK| ,|}d}"|d}#t|#t-r[|#}"nV|#duri }$|#. D ]E\}%}&t|&t/j0r|&1 2d3 }'| 4|'j5}(|'6 7 })t8|)dd t9|'j:D |(d}*nt|&t9rt8|&d}*ntd|*|$|%< qft-|$d}"t;|||||||||d||	|||"|dS )aj  Process input prompt into an engine core request.

        Converts a prompt (text, tokens, or multimodal) into an
        OmniEngineCoreRequest that can be processed by the engine.
        Handles prompt embeddings and additional information payloads
        for direct transfer between stages.

        Args:
            request_id: Unique identifier for this request
            prompt: Input prompt (text, token IDs, embeddings, or multimodal)
            params: Sampling or pooling parameters for generation
            arrival_time: Optional arrival timestamp (defaults to current time)
            lora_request: Optional LoRA adapter request
            tokenization_kwargs: Optional additional tokenization arguments
            trace_headers: Optional tracing headers for observability
            priority: Request priority (higher values processed first)
            data_parallel_rank: Optional data parallel rank for distributed
                inference

        Returns:
            Tuple of (prompt_string, OmniEngineCoreRequest) where:
                - prompt_string: The original prompt as a string, or None if
                  using embeddings
                - OmniEngineCoreRequest: Processed request ready for the engine

        Raises:
            ValueError: If data_parallel_rank is out of range or prompt_embeds
                has incorrect shape
        Nr   zdata_parallel_rank z is out of range [0, z).multi_modal_uuids)rM   mm_uuids)rI   rJ   processed_inputstypeembedsprompt_embedsprompt_token_ids
multimodal	mm_kwargsmm_placeholders	mm_hashes)datamodality
identifiermm_positionmm_hashadditional_informationcpuc                 S   s   g | ]}t |qS r   )r6   ).0xr   r   r   
<listcomp>  s    z5OmniInputProcessor.process_inputs.<locals>.<listcomp>)tensor_datatensor_shapetensor_dtype)	list_dataz4additional_information values must be Tensor or list)entries
cache_salt)rH   rY   mm_featuressampling_paramspooling_paramseos_token_idrK   rL   rm   rO   rP   rN   rX   rc   rR   )<_validate_lora_validate_paramsr=   parallel_configdata_parallel_sizedata_parallel_size_locallocal_engines_only
ValueErrortimerB   multimodal_configmm_processor_cache_gbcache_configenable_prefix_caching_maybe_build_mm_uuids_validate_mm_uuids
isinstancedictr   r   r8   r   r   rD   
preprocessr   validate_requestget_eos_token_idr   _validate_model_inputsr   clone
max_tokensr   max_model_lenupdate_from_generation_configgeneration_config_fields	tokenizerupdate_from_tokenizerr   appendr   _get_mm_identifierr    _decode_prompt_embedsr   itemsr1   Tensordetachto
contiguousr<   r$   numpytobytesr   listshaper   )+rE   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   rt   dp_sizedp_local_size	num_ranksrT   rU   rq   encoder_inputsdecoder_inputsrY   rX   ro   rp   seq_lenrn   decoder_mm_inputsdecoder_mm_positionsdecoder_mm_hashessorted_mm_idxsr_   idxbase_mm_hashadditional_information_payloadraw_inforl   keyvaluev_cpu	dtype_str
data_bytesentryr   r   r   process_inputse   s   
+

 













z!OmniInputProcessor.process_inputspayloadc                 C   s2   t t| j}tj| j|d}|| j}t|S )N)r$   )	getattrnpr$   
frombufferr^   reshaper   r1   
from_numpy)r   r$   arrr   r   r   r   (  s   
z(OmniInputProcessor._decode_prompt_embeds)NNNNr   NNF)!__name__
__module____qualname____doc__staticmethodr1   r$   r9   r<   r
   r   r   rA   r   r   r   r   r   r2   r"   r   r   r   r6   tupler   r/   r   r   r    r   r   __classcell__r   r   rF   r   r#   .   s\     
	

 Dr#   )=ry   collections.abcr   typingr   r   r   r   r1   vllm.configr   vllm.inputsr   r   vllm.inputs.parser   vllm.loggerr	   vllm.multimodalr
   r   vllm.multimodal.inputsr   r   vllm.multimodal.processingr   ImportError
contextlibr   r9   vllm.multimodal.utilsr   vllm.platformsr   vllm.pooling_paramsr   vllm.renderers.inputsr   r   vllm.sampling_paramsr   
vllm.tasksr   
vllm.utilsr   vllm.utils.torch_utilsr   vllm.v1.engine.input_processorr   vllm_omni.enginer   r   r   r    vllm_omni.inputs.preprocessr!   vllm_omni.lora.requestr"   r   loggerr#   r   r   r   r   <module>   s@    