o
    ٷi                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
mZ d dlmZmZ d dlmZmZmZ d dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lm Z  d d
l!m"Z" d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl0m3Z4 d dl5m6Z6m7Z7m8Z8 d dl9m:Z: d dl;m<Z<m=Z=m>Z> d dl?m@Z@mAZA d dlBmCZC d dlDmEZE eeFZG				d&dejHdB dejIdB dejJdB fddZKdd ZLdeMfd d!ZNG d"d# d#ZOG d$d% d%eOZPdS )'    N)Callable	GeneratorSequence)ThreadPoolExecutoras_completed)AnyLiteraloverload)	OmegaConf)tqdm)SamplingParams)init_logger)make_zmq_socket)get_engine_client_zmq_addr)get_stage_connector_config"initialize_orchestrator_connectors)try_send_via_connector) resolve_omni_kv_config_for_stage)create_placement_groupget_ray_queue_classtry_close_ray	OmniStage)SHUTDOWN_TASKOmniStageTaskType)maybe_load_from_ipc)get_final_stage_id_for_e2einject_omni_kv_configload_and_resolve_stage_configs)ZmqQueue)OmniDiffusionSamplingParamsOmniPromptTypeOmniSamplingParams)OrchestratorAggregatorStageRequestStats)!download_weights_from_hf_specific)OmniRequestOutputhandshake_stopzmq_handshake_sockethandshake_threadc                 C   sB  | rm|D ]1}z| t W n ty' }	 ztd|	  W Y d}	~	nd}	~	ww t|dd}
t|
r5|
  q|D ]}t|dd}
t|
rG|
  q8| D ]"}z|  W qJ tyl }	 ztd|	  W Y d}	~	qJd}	~	ww t| |dury|	  |dur|j
dd | rtd |dur|d |dur|  dS dS )	z7Weak reference cleanup function for OmniBase instances.z5Failed to send shutdown signal to stage input queue: NclosezFailed to stop stage worker: g       @timeoutzCHandshake server thread did not terminate gracefully within timeoutr   )
put_nowaitr   	Exceptionloggerwarninggetattrcallablestop_stage_workerr   setjoinis_aliver*   term)
stage_liststage_in_queuesstage_out_queuesray_pgzmq_ctxr'   r(   r)   qeclose_fnstage rA   N/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/entrypoints/omni.py_weak_close_cleanup8   sH   

rC   c                 C   s   | S NrA   )model_idrA   rA   rB   _dummy_snapshot_downloadf   s   rF   returnc                 C   sv   t j| r| S t jddrddlm} || S zt| d dgdd W | S  tj	j
y:   td|  d	 Y | S w )
NVLLM_USE_MODELSCOPEFr   )snapshot_download*T)model_name_or_path	cache_dirallow_patternsrequire_allzRepository not found for 'z'.)ospathexistsenvironget modelscope.hub.snapshot_downloadrI   r%   huggingface_huberrorsRepositoryNotFoundErrorr/   r0   )rE   rI   rA   rA   rB   omni_snapshot_downloadj   s"   rX   c                	   @   s  e Zd ZdZdededdfddZdedB deeef dB fd	d
ZdedB dedB dedB fddZ	deeef deeef fddZ
dedeeef deeee f fddZdedeeef ddfddZdedefddZdeddfddZdededeeef ddfddZd8deddfd d!Zdedefd"d#Zd9d$ee dB ddfd%d&Zd9d$ee dB defd'd(Zd:d)d*Zd+edeeef fd,d-Zd:d.d/Zd:d0d1Zd8dedefd2d3Zedefd4d5Zedefd6d7Z dS );OmniBasea  Base class for serving Omni models.

    Args:
        model: Model name or path to load.
        **kwargs: Arbitrary keyword arguments.
            - stage_configs_path: Optional path to YAML file containing stage
              configurations. If None, configurations are loaded from the model.
            - log_stats: Whether to enable statistics logging
              be written to files with stage-specific suffixes.
            - stage_init_timeout: Per-stage init watchdog (seconds). Measured from
              when the previous stage finished (possibly a prior Omni run with GPU
              reuse/overlap) to when the current stage starts to initialize.
            - shm_threshold_bytes: Threshold in bytes for using shared memory
              for IPC. Objects larger than this threshold will use shared memory.
            - worker_backend: Backend for worker processes. Default is "multi_process".
            - ray_address: Address of Ray cluster for Ray backend, if using Ray backend.
            - batch_timeout: Timeout in seconds for batching requests within a stage
            - init_timeout: Timeout in seconds for waiting for all stages to initialize
            - Additional keyword arguments passed to stage engines.
    modelkwargsrG   Nc                 K   s   t |}||d< g | _g | _g | _t | _d | _d | _d | _d | _	d | _
d | _d | _d | _d | _i | _t | _d | _td|  | || d S )NrZ   zInitializing stages for model: )rX   r8   _stage_in_queues_stage_out_queuesr4   _stages_ready_ray_pg
_queue_cls_ctx_zmq_ctx_zmq_master_address_zmq_master_port_zmq_handshake_socket_handshake_thread_handshake_stop_handshake_endpoints_handshake_seen_single_stage_idr/   info_initialize_stagesselfrZ   r[   rA   rA   rB   __init__   s(   zOmniBase.__init__cache_backendc              
   C   s4   |dkrdddddddd dd		S |d
krddiS d S )N	cache_dit   r      gQ?   Fdynamic)	Fn_compute_blocksBn_compute_blocksmax_warmup_stepsresidual_diff_thresholdmax_continuous_cached_stepsenable_taylorseertaylorseer_orderscm_steps_mask_policyscm_steps_policy	tea_cacherel_l1_threshg?rA   )rn   rp   rA   rA   rB   _get_default_cache_config   s   z"OmniBase._get_default_cache_configcache_configc                 C   sZ   t |trzt|}W n tjy   td d }Y nw |d u r+|dvr+| |}|S )Nz*Invalid cache_config JSON, using defaults.)N none)
isinstancestrjsonloadsJSONDecodeErrorr/   r0   r   )rn   rp   r   rA   rA   rB   _normalize_cache_config   s   


z OmniBase._normalize_cache_configc              	   C   s   d|v rt |d |d< |dd}| ||dd}d}d|v r6|d j}td|D ]	}|d	| 7 }q,d
dd|ddti |||ddddg}d|d
 d d< |S )z-Create default diffusion stage configuration.dtyperp   r   r   N0parallel_configrr   ,r   	diffusionT)processdevicesmax_batch_size)rp   r   image)stage_id
stage_typeruntimeengine_argsfinal_outputfinal_output_typer   model_stage)r   rS   r   
world_sizeranger
   create)rn   r[   rp   r   r   num_devicesidefault_stage_cfgrA   rA   rB   #_create_default_diffusion_stage_cfg   s:   
z,OmniBase._create_default_diffusion_stage_cfgc           
         sZ    dd}  dd_t||  fddd\}}|D ]}zpt|dddkr+W qt|d	r5|jdu r;ti |_  d
durTt|jd
rN|jjdu rT d
 |j_  d}|du rb  d}|durvt|jdrr|jj	du rv||j_	  d}|durt|jdr|jj
du r||j_
W q ty }	 ztd|	 W Y d}	~	qd}	~	ww ||fS )zJResolve stage configs and inject defaults shared by orchestrator/headless.stage_configs_pathNtts_max_instructions_lengthc                      s
     S rD   )r   rA   r[   rn   rA   rB   <lambda>  s   
 z1OmniBase._resolve_stage_configs.<locals>.<lambda>)default_stage_cfg_factoryr   r   r   	lora_path
lora_scalestatic_lora_scalequantization_configz*Failed to inject LoRA config for stage: %s)rS   r   r   r1   hasattrr   r
   r   r   r   r   r.   r/   r0   )
rn   rZ   r[   r   config_pathstage_configscfgr   r   r>   rA   r   rB   _resolve_stage_configs   sF   




zOmniBase._resolve_stage_configsc                    s`  | dd| dd}| dd}| dd}| d	d
}| dd}| dd}| dd
_| dd
_jd
u rEd_td | dd
_||\__t	j||d\_
_t|_|_|_|_j_dtttf dtttf ffdd tttjtdt pdd% fddtjD }	g }
t|	D ]	}|
|   qW d
   n1 sw   Y  |
j!dd d d d |
D _"d!d j"D _#d"d j"D _$td#j% d$tj" d% jd&krt& _'nt()d'_*fd(d_'td)t_+td)t|_,-| j.|d* d
S )+z!Initialize stage list management.stage_init_timeout   shm_threshold_bytesi   init_timeouti,  worker_backendmulti_processray_addressNbatch_timeout
   	log_statsFr   omni_master_addressz	127.0.0.1zDNo omni_master_address provided, defaulting to localhost (127.0.0.1)omni_master_port)r   r   idx_cfgrG   c                    s   | \}}|t | dfS )Nr   r   )r   idxr   r   rA   rB   _build_stageN  s   z1OmniBase._initialize_stages.<locals>._build_stagerr   )max_workersc                    s    g | ]\}}  ||fqS rA   )submit).0r   r   )r   executorrA   rB   
<listcomp>S  s     z/OmniBase._initialize_stages.<locals>.<listcomp>c                 S   s   | d S )Nr   rA   )xrA   rA   rB   r   W  s    z-OmniBase._initialize_stages.<locals>.<lambda>)keyc                 S   s   g | ]\}}|qS rA   rA   )r   _strA   rA   rB   r   X  s    c                 S      g | ]}|j qS rA   )default_sampling_paramsr   r   rA   rA   rB   r   Y      c                 S   r   rA   )r   r   rA   rA   rB   r   Z  r   [z	] Loaded z stagesrayspawnc                      s    j jddS )Nr   )maxsize)ra   QueuerA   rn   rA   rB   r   a  s    r   r+   )/rS   rj   rc   r/   rk   rd   r   r   r   r   omni_transfer_config
connectorsboolr   r   r   r   _is_async_chunk_enableasync_chunktupleintr   r   r   minlenmaxrO   	cpu_count	enumerater   appendresultsortr8   default_sampling_params_listoutput_modalities_namer   r`   mpget_contextra   _stage_init_timeout_shm_threshold_bytes_start_stages_wait_for_stages_ready)rn   rZ   r[   r   r   r   r   r   r   futuresresultsfutrA   )r   r   rn   r   rB   rl   ,  sV   


&& 

zOmniBase._initialize_stages
stage_argsc                 C   s    t |d dd}tt |ddS )zget async chunk flagr   r   Nr   F)r1   r   )rn   r   r   rA   rA   rB   r   i  s   zOmniBase._is_async_chunk_enablec                 C   s@  | j dkrtt| j| jdd| _nI| jdu rt | _t| j	}i | _
| jdu }t|D ])}t|| jd}t|| jd}||f| j
|< td| j d| d| d	|  q-|   tt | jD ]\}}| j dkrt|  }	|  }
n| j
| \}}t| jtj|d
}	t| jtj|d
}
| j|	 | j|
 ||	|
 t| j|}zt| j|\}}}|rt |||| W n t!y } ztd|| W Y d}~nd}~ww | jdur|t"| jkrt#d| j d| d qb|j$|| j%| j&| j dkr| j'nd| j(|| j | j| jdurdndd	 td| j d| d qbdS )zStart all stage processes.r   PACK)number_of_stagesaddressstrategyN)
local_onlyhostr   z ] Allocated endpoints for stage-z: in=z, out=)bindz?[Omni] Failed to inject omni connector config into stage-%s: %sz#] Skipping initialization of stage-z& worker due to single_stage_id settingTF)is_asyncr   ctxr   connectors_configr   ray_placement_groupignore_runtime_config] Stage-z process started))r   r   r   r8   r   r_   rb   zmqContextr   rh   rj   r   r   rc   r/   debugr   start_handshake_serverr   r   r`   r   PUSHPULLr\   r   r]   attach_queuesr   r   r   r   r.   r   rk   init_stage_workerr   r   ra   r   )rn   rZ   total_stagesr   sidin_endpointout_endpointr   r@   in_qout_qstage_connectors_configomni_conn_cfg	omni_fromomni_tor>   rA   rA   rB   r   n  sx   








zOmniBase._start_stagesr@   r   r   c                 C   s*   | j | td| j d| d d S )Nr   r   z reported ready)r^   addr/   rk   r   )rn   r@   r   r   rA   rA   rB   _process_stage_ready  s   zOmniBase._process_stage_readyx   r,   c              
   C   s  | j dur| jdkr| |}t| j}t tdt| }t	d| j
 d| d| d t| j|k rxt |k rxd}t| jD ] \}}|| jv rMqC|  }rcd	}|d
dkrc| ||| qC|sktd t| j|k rxt |k s<t| j|krt	d| j
 d dS ttt|t| j }td| j
 dt| j d| d|  d| dddddg}	ddd t|	D }
td| j
 d|
  dS )z?Wait for all stages to report readiness with optimized polling.Nr   r   r   z] Waiting for z  stages to initialize (timeout: s)FTtypestage_ready皙?z%] All stages initialized successfullyz] Initialization timeout: /z stages ready. Missing stages: zVIgnore this warning if the model weight download / load from disk time is longer than zs.zDVerify GPU/device assignment in config (runtime.devices) is correct.zICheck GPU/host memory availability; reduce model or batch size if needed.zHCheck model weights path and network reachability (if loading remotely).zLIncrease initialization wait time (stage_init_timeout or call-site timeout).
c                 s   s(    | ]\}}d |d  d| V  qdS )z  rr   z) NrA   )r   r   msgrA   rA   rB   	<genexpr>     & z2OmniBase._wait_for_stages_ready.<locals>.<genexpr>z7] Stage initialization timeout. Troubleshooting Steps:
)rj   r   _wait_for_handshakesr   r8   timer   r   r/   rk   r   r^   r   try_collectrS   r  sleepsortedr4   r   r0   r5   )rn   r,   
num_stagesdeadline
progressedr   r@   r   	not_readysuggestionsformatted_suggestionsrA   rA   rB   r     sF   

 


zOmniBase._wait_for_stages_readyc                 C   s^   | j | }|jdkrdS t|jdd}|du rdS t|dd}|du r%dS t|dd}|duS )z2Check if profiler config is set for a given stage.r   Tr   NFprofiler_configprofiler)r8   r   r1   stage_config)rn   r   r@   r   r%  r&  rA   rA   rB   _is_profiler_enabled  s   

zOmniBase._is_profiler_enabledstagesc                 C   s   |du rt tt| j}|D ]H}|t| jk rW| |s&td| j| qz| j| dt	j
i td| j| W q tyV } ztd| j|| W Y d}~qd}~ww qdS )az  Start profiling for specified stages.

        Sends start_profile command to stage workers. Profiling must be enabled
        via VLLM_TORCH_PROFILER_DIR environment variable.

        Args:
            stages: List of stage IDs to start profiling. If None, starts
                profiling for all stages that have profiling enabled.

        Example:
            >>> # Profile all stages
            >>> omni.start_profile()
            >>> outputs = omni.generate(prompts, sampling_params)
            >>> omni.stop_profile()

            >>> # Profile only stage 0 and 2
            >>> omni.start_profile(stages=[0, 2])
        NzA[%s] Skipping start_profile for stage-%s: profiler config not setr  z#[%s] Sent start_profile to stage-%sz1[%s] Failed to send start_profile to stage-%s: %s)listr   r   r8   r(  r/   rk   r   r   r   PROFILER_STARTr.   r0   )rn   r)  r   r>   rA   rA   rB   start_profile  s2   
zOmniBase.start_profilec              
   C   s&  |du rt tt| j}g g d}|D ]}|t| jk r| |s+td| j| q| j| }t|drtd| j| |	 }t
|tr|dpO|d}|dpY|d	}td
| j d| d|   |r~td
| j d| dt|  |rtd
| j d| dt|  |rt
|tr|d | nt
|t r|d | |rt
|tr|d	 | qt
|t r|d	 | qtd
| j d| d qtd
| j d| dt|  qtd| j| |dtji qtd
| j dt|d  dt|d	  d |S )z}
        Synchronously stop profiling for specified stages and collect
        the file paths for traces and tables.
        N)tracestablesz@[%s] Skipping stop_profile for stage-%s: profiler config not setstop_profilez5[%s] Requesting profile data collection from stage-%stracer-  tabler.  r   r   z returned: z traces type: z tables type: z returned no table dataz returned non-dict data: zO[%s] Stage-%s does not support synchronous stop_profile. Falling back to async.r  z] Collected z trace(s) and z	 table(s))r*  r   r   r8   r(  r/   rk   r   r   r/  r   dictrS   r   keysr  r   r   extendr0   r   r   PROFILER_STOP)rn   r)  all_resultsr   r@   
stage_datar-  r.  rA   rA   rB   r/  -  s`   




"""



$(zOmniBase.stop_profilec                 C   s   t | dr|   dS dS )z1Close all stage processes and clean up resources._weak_finalizerN)r   r8  r   rA   rA   rB   r*   v  s   
zOmniBase.closer  c              
   C   s   t |tr|ddkrdddS z	t|d}W n ttfy6 } zdd| dW  Y d}~S d}~ww | j|}|du rIdd	| dS | j| |\}}t	
d
| j| d||dS )zProcess incoming handshake message and generate response.

        Args:
            msg: Decoded message from client

        Returns:
            Response dictionary with ok status and either endpoints or error
        r  	handshakeFzinvalid handshake payload)okerrorr   zinvalid stage_id: Nzunknown stage_id: z%[%s] Handshake received from stage-%sT)r:  r  r  )r   r2  rS   r   	TypeError
ValueErrorrh   ri   r  r/   rk   r   )rn   r  r   r>   	endpointsr  r  rA   rA   rB   _process_handshake_message{  s,   	
z#OmniBase._process_handshake_messagec              	      s   t  }| jt j zG j sK|d}t fdd|D }|s%qt	j
 j }t	j
 |} j|  j rW | j dS W | j dS | j w )zAMain loop for handshake server - polls for messages and responds.i  c                 3   s(    | ]\}}| j ko|tjkV  qd S rD   )re   r   POLLIN)r   sockeventr   rA   rB   r    r  z6OmniBase._run_handshake_server_loop.<locals>.<genexpr>N)r   Pollerregisterre   r@  rg   is_setpollanymsgspecmsgpackdecoderecvencoder?  send
unregister)rn   pollereventshas_messager  responserA   r   rB   _run_handshake_server_loop  s   



z#OmniBase._run_handshake_server_loopc                 C   s   | j dus
| jdu rdS | jr| jdu rdS td| jt| jd}t | _t	| j|t
jddd| _tj| jddd| _ | j   dS )	a  Start the ZMQ handshake server.

        The handshake server allows distributed stages to discover their
        queue endpoints by querying the orchestrator with their stage_id.
        Skips starting if the server is already running or ZMQ is not initialized.
        NF)r   r   portTi  )r   lingerzzmq-handshake-server)targetdaemonname)rf   rb   rc   rd   r   r   	threadingEventrg   r   r   REPre   ThreadrS  start)rn   endpointrA   rA   rB   r     s   
zOmniBase.start_handshake_serverc              
   C   s   t | j}tt|t| jh }|s|S t tdt| }t	d| j
 d| d| d || jsMt |k rMtd || jsMt |k s<tdt|t  }|| js|t|| j }td| j
 dt | j dt | d	|  |S )
zWait for handshakes from all expected stages.

        Args:
            timeout: Timeout in seconds for waiting for handshakes. Default is 120s.

        Returns:
            Remaining timeout in seconds after waiting for handshakes.
        r   r   z&] Waiting for handshakes from stages: z (timeout: r  g      ?z] Handshake timeout: r  z- stages completed handshake. Missing stages: )r   r   r4   r   r   rj   r  r   r/   rk   r   issubsetri   r  r  r0   )rn   r,   r  expectedr   remaining_timeoutmissingrA   rA   rB   r    s$   
	 

 zOmniBase._wait_for_handshakesc                 C      dS )NrY   rA   r   rA   rA   rB   r        zOmniBase._namec                 C   rc  )NFrA   r   rA   rA   rB   r     rd  zOmniBase.is_async)r  rD   )rG   N)!__name__
__module____qualname____doc__r   r   ro   r2  r   r   r   r   r*  r   rl   r   r   r   r   r   r  r   r(  r,  r/  r*   r?  rS  r   r  propertyr   r   rA   rA   rA   rB   rY      s0    """*&0="P1*
I
%
'rY   c                       s  e Zd ZdZdededdf fddZe	ddee	e B d	e
e	e
 B dB d
ed deeddf fddZe	ddddee	e B d	e
e	e
 B dB d
ed dee fddZ	dddddee	e B d	e
e	e
 B dB d
edeedef B deeddf ee B f
ddZdee	e B d	e	e
 deeddf fddZ	ddee	e B d	e	e
 deedef B deeddf fddZedefddZ  ZS )Omnia  Unified entrypoint for both LLM and Diffusion models for better usability.

    Args:
        model: Model name or path to load.
        **kwargs: Arbitrary keyword arguments.
            - stage_configs_path: Optional path to YAML file containing stage
              configurations. If None, configurations are loaded from the model.
            - log_stats: Whether to enable statistics logging
              be written to files with stage-specific suffixes.
            - stage_init_timeout: Per-stage init watchdog (seconds). Measured from
              when the previous stage finished (possibly a prior Omni run with GPU
              reuse/overlap) to when the current stage starts to initialize.
            - shm_threshold_bytes: Threshold in bytes for using shared memory
              for IPC. Objects larger than this threshold will use shared memory.
            - worker_backend: Backend for worker processes. Default is "multi_process".
            - ray_address: Address of Ray cluster for Ray backend, if using Ray backend.
            - batch_timeout: Timeout in seconds for batching requests within a stage
            - init_timeout: Timeout in seconds for waiting for all stages to initialize
            - Additional keyword arguments passed to stage engines.

    Example:
        >>> omni = Omni(model="Qwen/Qwen2.5-Omni-7B")
        >>> outputs = omni.generate(prompts="Hello, world!", sampling_params_list=[SamplingParams()])
        >>> print(outputs)
    rZ   r[   rG   Nc                    sF   t  j|fi | t| t| j| j| j| j| j	| j
| j| j
| _d S rD   )superro   weakreffinalizerC   r8   r\   r]   r_   rb   rg   re   rf   r8  rm   	__class__rA   rB   ro     s   
zOmni.__init__promptssampling_params_listpy_generatorTc                C      d S rD   rA   rn   rp  rq  rr  rA   rA   rB   generate+     zOmni.generateF)rr  c                C   rs  rD   rA   rt  rA   rA   rB   ru  4  rv  )rr  use_tqdmrw  .c          
   
   C   s   |du r| j }n"t|ts*g }| j D ]}|j}||jkr"|| q|| q|}z|r4| ||W S t| |||}|W S  tyW }	 zt	
d|	 |   |	d}	~	ww )a*  Generate outputs for the given prompts.

        Orchestrates the multi-stage pipeline based on YAML configuration.
        Each stage will use OmniLLM or OmniDiffusion based on stage_type.

        Args:
            prompts: Input prompt(s) for generation.
            sampling_params_list: Optional list of per-stage parameters.
            py_generator: Whether the returned result(s) are wrapped in a generator instead of a list.
            use_tqdm: Whether to use tqdm progress bar

        Returns:
            List of OmniRequestOutput objects, one for each input prompt.
            Each output contains the stage_id, final_output_type, and
            the request_output from the final stage.

        Raises:
            ValueError: If sampling_params_list is None or has incorrect length.
        N+[Orchestrator] Failed to run generation: %s)r   r   r   ro  r   _run_generation_with_generatorr*  _run_generationr.   r/   	exceptionr*   )
rn   rp  rq  rr  rw  per_stage_paramsdefault_stage_spdefault_sp_typeoutputsr>   rA   rA   rB   ru  =  s*   


c              
   c   s`    |  ||}z#z|E dH  W n ty# } ztd| |d}~ww W |   dS |   w )zIRun generation through all stages in the pipeline and return a generator.Nrx  )rz  r.   r/   r{  r*   )rn   rp  rq  genr>   rA   rA   rB   ry  s  s   z#Omni._run_generation_with_generatorc           3      c   sh   t d| j d |du rtdt|t| jkr+tdt| j dt| tt| j|D ]"\}\}}|jdkr@t	nt
}t||sUtd| d	| d
|j q3t|ts`t|tsd|g}nt|}t| j}	dd tt|D }
dd t|
|D }i }t }i }| D ]\}}t|tr|dd}nd}t|| j| j}|||< qt|	| j||}| }|rt|r|nt}||dd}t d| j dt| d |jd pt |jd< | D ])\}}|d }|||d}| jd | t ||< t d| j d| d qd}|r3t|r|nt}|t|dddddddddd}t|gdg|	d   }d}t|}t d| j d | d!|	  ||k rd"}t| jD ]\}}| }|du rpq`d}|d#}d$|v rt d| j d%| d&| d'|d$   q`|d(d)krtd* q`t |d+d,d-} t!|j"| pd.t |j"|< zl|d/}!|!dur#|j#| |  |!j$7  < |%|j||  |&|||!|j' |r#|j(d0 pd1}"t)|j*}#|#|" }$| j| }%|%d2kr d3nd4}&|j+dkr|j,|j+ }'nd}'d5| d6|& d7|$dd8|'d9d:	|_-W n% t.yI }( zt /d| j d;| d<| d'|(  W Y d}(~(nd}(~(ww t d| j d=| d>| d? |0|  t1|d@d"rt d| j dA| dB|  z||| kr|2||||| W n% t.y }( zt /d| j dC| dD| d'|(  W Y d}(~(nd}(~(ww t3||j'| dE})z-t4| dFr| j5nt| tr| rt4| d dFr| d j5nd"}*|*r|6|)|| W n% t.y	 }( zt /d| j dG| dD| d'|(  W Y d}(~(nd}(~(ww |)V  |d }+|+|| kr| j|+ },z#|7|| |,8| j|| g}-W d   n	1 s:w   Y  W n& t.yf }( zt /d| j dH| dD|+ d'|(  W Y d}(~(q`d}(~(ww ||+ }.t|t|+f}/| j9|/}0d"}1|0rt:|0||+||-|.|| | j|+ j|dI	}1|1st;d| j dJ| dK|+ dLt d| j dM| dK|+  ||+  d7  < q`|d7 }|r| j||  }2|2d2krd3ndN|_<|=d t d| j dA| dO| dP| dQ	 q`|stdR ||k sYt d| j dS |r|>  z|?  W dS  t.y3 }( zt /d| j dT|(  W Y d}(~(dS d}(~(ww )Uz2Run generation through all stages in the pipeline.r   z] generate() calledNz9sampling_params_list is required for pipelined generationz	Expected z sampling params, got r   z'Expected sampling parameters with type z
 in stage z, got c                 S   s   g | ]}| d t   qS )r   )uuiduuid4)r   r   rA   rA   rB   r     s    z(Omni._run_generation.<locals>.<listcomp>c                 S   s   i | ]\}}||qS rA   rA   )r   ridprA   rA   rB   
<dictcomp>  s    z(Omni._run_generation.<locals>.<dictcomp>
modalitieszAdding requests)descz
] Seeding z requests into stage-0r   )
request_idengine_inputssampling_paramsz] Enqueued request z to stage-0zProcessed promptsTzest. speed input: z.2fz unit/s, output: z unit/s)totalr  dynamic_ncolspostfixrr   z+] Entering scheduling loop: total_requests=z	, stages=Fr  r;  z] Stage z error on request z: r  r  r  engine_outputsengine_outputs_shm)obj_keyshm_keyg        metricselapsedgư>r   imgtokzest. speed stage- z/s: z, avg e2e_lat: z.1fmsz&] Failed to process metrics for stage z, req r   z completed request z; forwarding or finalizingr   z
] Request z finalized at stage-z*] Finalize request handling error for req z
 at stage )r   r   request_outputfinishedz)] Failed to record audio metrics for req z&] Process engine inputs error for req )		connectorr   next_stage_idreq_idnext_inputsr  original_promptnext_stage_queue_submit_fnr  z] Failed to send request z
 to stage-zZ via connector. Configure a connector for this edge or inspect connector logs for details.z] Forwarded request reqz fully completed (r  )g{Gzt?z] All requests completedz] Failed to build/log summary: )@r/   r   r   r=  r   r8   r   zipr   r    r   r   ro  r   r   r*  r   r  itemsr2  rS   r   r   r#   r   r2   r   stage_first_tsr   r  r;  r  _loadr   stage_last_tsaccumulated_gen_time_msstage_gen_time_msaccumulate_diffusion_metricson_stage_metricsr   format_dictsumstage_total_tokens	e2e_counte2e_total_msr  r.   r{  set_engine_outputsr1   on_finalize_requestr&   r   r  record_audio_generated_framesstage_postprocess_timerprocess_engine_inputsr   r   RuntimeErrorunitupdater*   build_and_log_summary)3rn   rp  rq  rw  r   r@   spExpectedSPTyperequest_promptsr  request_idsrequest_id_to_prompt_req_start_ts_wall_start_tsfinal_stage_id_to_promptr  promptprompt_modalitiesfinal_stage_id_for_e2er  it	tqdm_funcr  sp0taskpbarremaining_by_stagecompleted_requeststotal_requestsmade_progressr   r   r  _mr  	total_outout_spdmodalityr  avg_latr>   output_to_yieldr  r  
next_stager  sp_nextconnector_keyr  sent_via_connector	final_modrA   rA   rB   rz    s  








 
 



 






 
 $&zOmni._run_generationc                 C   rc  )NOrchestratorrA   r   rA   rA   rB   r     rd  z
Omni._namerD   )T)re  rf  rg  rh  r   r   ro   r	   r!   r   r"   r   r   r&   ru  r*  r   r   r   ry  rz  ri  r   __classcell__rA   rA   rn  rB   rj    s~    



6



  rj  )NNNN)Qr   multiprocessingr   rO   rY  r  r  rl  collections.abcr   r   r   concurrent.futuresr   r   typingr   r   r	   rU   msgspec.msgpackrH  r   	omegaconfr
   	tqdm.autor   vllmr   vllm.loggerr   vllm.utils.network_utilsr   vllm.v1.utilsr   %vllm_omni.distributed.omni_connectorsr   r   -vllm_omni.distributed.omni_connectors.adapterr   :vllm_omni.distributed.omni_connectors.utils.initializationr   %vllm_omni.distributed.ray_utils.utilsr   r   r    vllm_omni.entrypoints.omni_stager   !vllm_omni.entrypoints.stage_utilsr   r   r   r  vllm_omni.entrypoints.utilsr   r   r   vllm_omni.entrypoints.zmq_utilsr   vllm_omni.inputs.datar    r!   r"   vllm_omni.metricsr#   r$   2vllm_omni.model_executor.model_loader.weight_utilsr%   vllm_omni.outputsr&   re  r/   rZ  Socketr\  rC   rF   r   rX   rY   rj  rA   rA   rA   rB   <module>   sh   
.    }