o
    ٷi*                     @   s  d dl mZ d dlmZ d dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z%m&Z&m'Z' ee(Z)G dd deZ*dS )    )Callable)AnyN)ValidationError)tqdm)CompilationConfigStructuredOutputsConfigis_init_field)LLM)init_logger)PoolingRequestOutputRequestOutput)get_io_processor)UsageContext)Counter)	LLMEngine)"initialize_orchestrator_connectors)OmniEngineArgs)OmniInputProcessor)MultimodalOutputProcessor)filter_dataclass_kwargsload_stage_configs_from_modelload_stage_configs_from_yamlresolve_model_config_pathc                   @   s   e Zd ZdZ									d ded	edB d
edeeeef B e	B dB deeef dB deeef e
B dB dededededefddZd!ddZd!ddZdddeedef B deeeB  fddZdS )"OmniLLMaD  Main entry point for vLLM-Omni inference.

    This class extends the base vLLM LLM class with omni-specific
    processors for handling multimodal inputs and outputs. It provides
    configuration loading for multi-stage pipelines, while stage management
    is handled by the Omni class.

    Args:
        model: Model name or path to load
        stage_configs_path: Optional path to YAML file containing stage
            configurations. If None, configurations are loaded from the model.
        log_stats: Whether to enable statistics logging
        compilation_config: Optional compilation configuration. Can be an
            integer (compilation level), dict, or CompilationConfig instance.
        hf_overrides: Optional HuggingFace model configuration overrides
        structured_outputs_config: Optional structured outputs configuration.
            Can be a dict or StructuredOutputsConfig instance.
        init_sleep_seconds: Number of seconds to sleep between starting
            each stage process during initialization (used by Omni class)
        shm_threshold_bytes: Threshold in bytes for using shared memory
            for IPC. Objects larger than this threshold will use shared memory.
        batch_timeout: Timeout in seconds for batching requests within a stage
        init_timeout: Timeout in seconds for waiting for all stages to initialize
        **kwargs: Additional keyword arguments passed to the base LLM class
            and engine

    Example:
        >>> llm = OmniLLM(model="Qwen/Qwen2.5-Omni-7B")
        >>> # Stage management is handled by Omni class
    NF      
   ,  modelstage_configs_path	log_statscompilation_confighf_overridesstructured_outputs_configinit_sleep_secondsshm_threshold_bytesbatch_timeoutinit_timeoutkwargsc              
   K   s  | dd| _| dd| _|	| _t|| _|du r%t|| _t|| _	n|| _t
|| _	t| j| j|d\| _| _d|vrBd|d< d|v rV|d }t|trVt||d< d	|v rt|d	 trd
dlm} |d	 }z|di ||d	< W n ty } ztd|| td| |d}~ww |dd}|durt|trt|d}nt|trtdi dd | D }n|}nt }|durt|trtdi dd | D }n|}nt }td||||dtt|}t j!|t"j#d| _$t%| j$j&| j$j|j'd| j$_(t)| j$j*d| j$_+t| j$| _,t- | _.d| _/| j$0 }t1d| || _2| j$j3j4}t5| j$j*|| _6| j$j3| _3| j$j+| _+dS )z9LLM constructor with omni-specific configuration loading.worker_backendmulti_processray_addressN)r)   r%   disable_log_statsT
worker_clskv_transfer_configr   )KVTransferConfigz[Failed to convert 'kv_transfer_config' dict to KVTransferConfig object. Dict: %s. Error: %sz'Invalid 'kv_transfer_config' provided: omni_kv_config)levelc                 S       i | ]\}}t t|r||qS  )r   r   .0kvr3   r3   R/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/entrypoints/omni_llm.py
<dictcomp>        z$OmniLLM.__init__.<locals>.<dictcomp>c                 S   r2   r3   )r   r   r4   r3   r3   r8   r9      r:   )r   r!   r#   r0   )engine_argsusage_context)	tokenizerr    engine_core_output_type)vllm_configzSupported_tasks: %sr3   )7getr)   r+   r&   boolr    r   config_pathr   stage_configsr   r   omni_transfer_config
connectors
isinstancetypecloudpickledumpsdictvllm.config.kv_transferr/   r   loggererror
ValueErrorpopintr   itemsr   r   r   r   from_engine_argsr   	LLM_CLASS
llm_enginer   r=   engine_output_typeoutput_processorr   r?   input_processorengine_classr   request_counterdefault_sampling_paramsget_supported_tasksinfosupported_tasksmodel_configio_processor_pluginr   io_processor)selfr   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r-   r/   raw_config_dicter0   compilation_config_instancestructured_outputs_instancer;   r]   r_   r3   r3   r8   __init__B   s   




	


	



zOmniLLM.__init__returnc                 C   s:   t | dr| jdurt | jdr| j  dS dS dS dS )zClose resources.

        Note: Stage management is now handled by Omni class.
        This method closes the LLM engine but not stages.
        rT   Nshutdown)hasattrrT   rh   )ra   r3   r3   r8   close   s
   zOmniLLM.closec              
   C   sF   z|    W d S  ty" } ztjd|dd W Y d }~d S d }~ww )Nz)[Orchestrator] __del__ close() raised: %sT)exc_info)rj   	ExceptionrL   debug)ra   rc   r3   r3   r8   __del__   s   zOmniLLM.__del__T)use_tqdmro   .c             	   C   sL  |r | j  }t|r|nt}||dddddddddd}g }d}d}| j  r| j  }|D ]`}	|	jr||	 |rt|	t	rt
|	j}
|	jd usOJ |t
|	j|
 7 }||jd	  }|td
d |	jD 7 }||jd	  }d|dd|dd|_||
 n|d |j|kr|  q2| j  s+|r|  t|dd dS )NzProcessed promptsTzest. speed input: r   z.2fz toks/s, output: z toks/s)totaldescdynamic_ncolspostfixelapsedc                 s   s    | ]}t |jV  qd S )N)len	token_ids)r5   stpr3   r3   r8   	<genexpr>   s    z&OmniLLM._run_engine.<locals>.<genexpr>   c                 S   s   t | jdd S )N-r   )rP   
request_idsplit)xr3   r3   r8   <lambda>   s    z%OmniLLM._run_engine.<locals>.<lambda>)key)rT   get_num_unfinished_requestscallabler   has_unfinished_requestsstepfinishedappendrF   r   ru   outputsprompt_token_idsformat_dictsumrs   updatenrefreshrj   sorted)ra   ro   num_requests	tqdm_funcpbarr   total_in_tokstotal_out_toksstep_outputsoutputr   in_spdout_spdr3   r3   r8   _run_engine   sF   








zOmniLLM._run_engine)	NFNNNr   r   r   r   )rg   N)__name__
__module____qualname____doc__strrA   rP   rJ   r   r   r   rf   rj   rn   r   r   listr   r   r   r3   r3   r3   r8   r   "   sJ    "	


u
0r   )+collections.abcr   typingr   rH   pydanticr   r   vllm.configr   r   r   vllm.entrypoints.llmr	   vllm.loggerr
   vllm.outputsr   r   vllm.plugins.io_processorsr   vllm.usage.usage_libr   vllm.utils.counterr   vllm.v1.engine.llm_enginer   %vllm_omni.distributed.omni_connectorsr   vllm_omni.engine.arg_utilsr    vllm_omni.engine.input_processorr   !vllm_omni.engine.output_processorr   vllm_omni.entrypoints.utilsr   r   r   r   r   rL   r   r3   r3   r3   r8   <module>   s(    