o
    ۷i͆                     @   sz  d dl Z d dlZd dlZd dlZd dlmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ d dl)m,Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 ee9Z:dddZ;G dd de&Z<dS )    N)AsyncGeneratorIterableSequence)Any)
VllmConfig)InputPreprocessor)init_logger)get_io_processor)SamplingParams)TokenizerLikeEngineDeadError)OmniModelConfig)DiffusionParallelConfig) compute_talker_prompt_ids_lengthtry_send_via_connector)try_close_ray)OmniInputProcessor)ClientRequestState)OmniBase)	OmniStage)SHUTDOWN_TASKOmniStageTaskType)maybe_load_from_ipc)get_final_stage_id_for_e2e)OmniPromptTypeOmniSamplingParams)LoRARequest)OrchestratorAggregator)OmniRequestOutputc           
      C   s
  | rm|D ]1}z| t W n ty' } ztd|  W Y d}~nd}~ww t|dd}t|r5|  q|D ]}t|dd}t|rG|  q8| D ]"}	z|	  W qJ tyl } ztd|  W Y d}~qJd}~ww t| |dury|	  |dur|
  dS dS )z8Weak reference cleanup function for AsyncOmni instances.z5Failed to send shutdown signal to stage input queue: NclosezFailed to stop stage worker: )
put_nowaitr   	Exceptionloggerwarninggetattrcallablestop_stage_workerr   cancelterm)

stage_liststage_in_queuesstage_out_queuesray_pgoutput_handlerzmq_ctxqeclose_fnstage r4   V/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/entrypoints/async_omni.py_weak_close_cleanup_async)   s<   r6   c                       sX  e Zd ZdZdedeeef ddf fddZdeeef deeef fdd	Zd
e	de
deeef ddf fddZdhde
ddf fddZdd Z	didddededee dB dee dB deedf f
ddZdededee dedede
deedf fdd Zdededede
dee dedeedf fd!d"Zdeeef d
e	de
dedeeeedB f f
d#d$Zdjd%d&Zedefd'd(Zedefd)d*Z edefd+d,Z!edefd-d.Z"edefd/d0Z#ede$fd1d2Z%dee&e B ddfd3d4Z'de(fd5d6Z)de*fd7d8Z+de,fd9d:Z-de.fd;d<Z/defd=d>Z0ed?d@ Z1djdAdBZ2djdCdDZ3djdEdFZ4dkdHedefdIdJZ5dldLe
ddfdMdNZ6didOee dB ddfdPdQZ7defdRdSZ8dTe9defdUdVZ:dWdX Z;didYee
 dB ddf fdZd[Z<didYee
 dB ddf fd\d]Z=dGd^d_d`edaeddfdbdcZ>djdddeZ?defdfdgZ@  ZAS )m	AsyncOmnia  Asynchronous unified entry point supporting multi-stage pipelines for LLM and Diffusion models.

    Similar to the Omni class, but provides an asynchronous interface supporting
    asynchronous LLM and Diffusion models.

    Args:
        model: Model name or path to load.
        **kwargs: Arbitrary keyword arguments.
            - stage_configs_path: Optional path to YAML file containing stage
              configurations. If None, configurations are loaded from the model.
            - log_stats: Whether to enable statistics logging
              be written to files with stage-specific suffixes.
            - stage_init_timeout: Per-stage init watchdog (seconds). Measured from
              when the previous stage finished (possibly a prior Omni run with GPU
              reuse/overlap) to when the current stage starts to initialize.
            - shm_threshold_bytes: Threshold in bytes for using shared memory
              for IPC. Objects larger than this threshold will use shared memory.
            - worker_backend: Backend for worker processes. Default is "multi_process".
            - ray_address: Address of Ray cluster for Ray backend, if using Ray backend.
            - batch_timeout: Timeout in seconds for batching requests within a stage
            - init_timeout: Timeout in seconds for waiting for all stages to initialize
            - Additional keyword arguments passed to stage engines.

    Example:
        >>> async_llm = AsyncOmni(model="Qwen/Qwen2.5-Omni-7B")
        >>> async for output in async_llm.generate(
        ...     prompt="Hello",
        ...     request_id="req-1",
        ...     sampling_params_list=[SamplingParams(), SamplingParams()]
        ... ):
        ...     print(output)
    modelkwargsreturnNc              
      sZ   t  | _d| _i | _d | _t j|fi | t	| t
| j| j| j| j| j| j| _d S NF)asyncio	Condition_pause_cond_pausedrequest_statesr.   super__init__weakreffinalizer6   r*   _stage_in_queues_stage_out_queues_ray_pg_zmq_ctx_weak_finalizer)selfr8   r9   	__class__r4   r5   rB   g   s   

zAsyncOmni.__init__c                 C   s  | dd}| || dd}d}d|v r/|d }|d j}td|D ]	}|d| 7 }q$no| d	p5d}| d
p<d}	| d}
| dpHd}| dpOd}| dd}| dd}| dd}|
du rj||	 }
|
| | }|r|dkr|dkr|| }n|}td|D ]	}|d| 7 }qtdd||
||	||||d
}ddd|dd|| dd| dd||| dd| dd| dd| dd| dd| d dd!dd"d#g}d|d d$ d%< |S )&z-Create default diffusion stage configuration.cache_backendnonecache_configN0parallel_config   ,ulysses_degreering_degreesequence_parallel_sizetensor_parallel_sizecfg_parallel_sizeuse_hsdpFhsdp_shard_sizehsdp_replicate_sizer   )
pipeline_parallel_sizedata_parallel_sizerW   rV   rT   rU   rX   rY   rZ   r\   	diffusionT)processdevicesmax_batch_sizevae_use_slicingvae_use_tilingenable_cache_dit_summaryenable_cpu_offloadenable_layerwise_offloadenforce_eagerdiffusion_load_formatdefaultcustom_pipeline_args)rQ   rc   rd   rM   rO   re   rf   rg   rh   ri   rk   image)stage_id
stage_typeruntimeengine_argsfinal_outputfinal_output_typerp   model_stage)get_normalize_cache_config
world_sizeranger   )rJ   r9   rM   rO   ra   rQ   num_devicesirT   rU   rV   rW   rX   rY   rZ   r\   other_parallel_sizedefault_stage_cfgr4   r4   r5   #_create_default_diffusion_stage_cfg~   sz   










z-AsyncOmni._create_default_diffusion_stage_cfgr3   rm   resultc                    sh   | d}|d ur|| | d}|d ur|| | d}|d ur*|| t ||| d S )Nvllm_config	tokenizeris_tracing_enabled)rt   set_vllm_configset_tokenizerset_is_tracing_enabledrA   _process_stage_ready)rJ   r3   rm   r}   r~   r   r   rK   r4   r5   r      s   





zAsyncOmni._process_stage_readyx   timeoutc                    s  t  | | jD ]V}|jdur_|jdur_z'|j}t|d| _|j| _| jj}t	||| _
td| j d|j  W  n# ty^ } ztd| j d|j d|  W Y d}~q	d}~ww q	t| drj| jdu rtd| j d d| _d| _
d| _dS dS )	z(Wait for all stages to report readiness.N)r~   [zI] Initialized input_processor, io_processor, and model_config from stage-z-] Failed to initialize processors from stage-: input_processorzg] No LLM stage found, processors will not be available. This may cause issues with OpenAIServingModels.)rA   _wait_for_stages_readyr*   r~   r   r   r   model_configio_processor_pluginr	   io_processorr#   info_namerm   r"   r$   hasattr)rJ   r   r3   r~   r   r1   rK   r4   r5   r      s@   


z AsyncOmni._wait_for_stages_readyc                 C   s   t | dr|   dS dS )zShutdown, cleaning up the background proc and IPC.

        Alias for close() method. Cleans up all stage processes
        and inter-process communication resources.
        rI   N)r   rI   rJ   r4   r4   r5   shutdown   s   
zAsyncOmni.shutdown)output_modalitiesprompt
request_idsampling_params_listr   c                  s   j 4 I dH   j  fddI dH  W d  I dH  n1 I dH s&w   Y  td j d z$   |du rB j}t|t jkrZt	dt j dt| t j}i }t

 }t| j j}t| j||d}	t|}
|	|
_|
 j|< |d	 }|||d
} jd	 | |	jd	 pt

 |	jd	< t

 ||< td j d| d|   jrdd t|D }||
_ ||||
|	|2 z	3 dH W }|V  q6 n ||
|	|||2 z	3 dH W }|V  q6 td j d| d|  zTz|	||||| td j d |	  W n" ty< } ztd j d| d|  W Y d}~nd}~ww W  j|d W dS W  j|d W dS  j|d w  t j!t"fyt    #|I dH  td|  w )a  Generate outputs for the given prompt asynchronously.

        Coordinates multi-stage pipeline through YAML configuration.
        Each stage will use AsyncOmniLLM or AsyncOmniDiffusion based on stage_type.
        Processes the prompt through all stages in the pipeline and yields
        outputs as they become available. Each stage uses its corresponding
        sampling parameters from the sampling_params_list.

        Args:
            prompt: Prompt to process. Can be a text string, token IDs,
                or multimodal prompt.
            request_id: Unique identifier for this request
            sampling_params_list: List of SamplingParams, one for each stage.
                Must have the same length as the number of stages.
                If None, uses default sampling params for each stage.
            output_modalities: Optional list of output modalities.

        Yields:
            OmniRequestOutput objects as they are produced by each stage.
            Each output contains the stage_id, final_output_type, and
            the request_output from that stage.

        Raises:
            ValueError: If sampling_params_list has incorrect length.
        Nc                      s    j  S N)r?   r4   r   r4   r5   <lambda>'  s    z$AsyncOmni.generate.<locals>.<lambda>r   z] generate() calledz	Expected z sampling params, got )
num_stages	log_statswall_start_tsfinal_stage_id_for_e2er   r   engine_inputssampling_paramsz#] Entering scheduling loop: stages=z, final_stage=c                 S   s   i | ]}|t  qS r4   )r<   Queue.0rm   r4   r4   r5   
<dictcomp>Y  s    z&AsyncOmni.generate.<locals>.<dictcomp>
] Request z finalized at stage-z] All requests completedz( Failed to finalized/build/log summary: z'[AsyncOrchestrator] Request %s aborted.)$r>   wait_forr#   debugr   _run_output_handlerdefault_sampling_params_listlenr*   
ValueErrortimer   r   r   r   r   metricsr@   submitstage_first_tsr   async_chunkrw   stage_queues_process_async_results_process_sequential_resultson_finalize_requestrt   build_and_log_summaryr"   	exceptionpopr<   CancelledErrorGeneratorExitabort)rJ   r   r   r   r   r   _req_start_ts_wall_start_tsr   r   	req_statesp0taskr   outputr1   r4   r   r5   generate  s   "(





*$zAsyncOmni.generater   r   r   c              	   C  sV  dd t |d D }d}t| st| jd |d  D ]\}	}
||	 r(qz	|j|	  }W n tjyC   t	dI d H  Y qw | 
||
|	|\}}}|r|	dkrd}|j}t|}tdt|}dg| |d< d  |d	< |d
< t dt| jD ]}|||| d}| j| | t |j|< q}|||	< |r|V  qt| rd S d S )Nc                 S   s   i | ]}|d qS Fr4   r   r4   r4   r5   r     s    z4AsyncOmni._process_async_results.<locals>.<dictcomp>rR   TMbP?r   Fprompt_token_idsmulti_modal_datamm_processor_kwargsr   )rw   allvalues	enumerater*   r   
get_nowaitr<   
QueueEmptysleep_process_single_resultr   copydeepcopymaxr   r   r   r   r   )rJ   r   r   r   r   r   r   all_stages_finishedsubmit_flagrm   r3   r}   engine_outputsfinishedoutput_to_yieldr   engine_inputnext_prompt_lenry   r   r4   r4   r5   r     sL   	

z AsyncOmni._process_async_resultsc                 C  s  t | jd |d  D ]\}}d}	|	s5|j I d H }
||jks#J | |
|||\}}	}|r3|V  |	rt|ts=|g}|| |d }||kr| j| }|	|| |
| j|}W d    n1 sgw   Y  || }t|t|f}| j|}d}|rt|||||||| j| j|d	}|sd| j d| d| d}t| t|td| j d| d|  qtd| j d	| d
 qd S )NrR   F)		connectorrm   next_stage_idreq_idnext_inputsr   original_promptnext_stage_queue_submit_fnr   r   z] Failed to send request z
 to stage-zZ via connector. Configure a connector for this edge or inspect connector logs for details.z] Forwarded request r   z fully completed)r   r*   queuert   rm   r   
isinstancelistset_engine_outputsstage_postprocess_timerprocess_engine_inputsstr
connectorsr   r   r   r#   errorRuntimeErrorr   )rJ   r   r   r   r   r   r   rm   r3   r   r}   r   r   r   
next_stager   sp_nextconnector_keyr   sent_via_connector	error_msgr4   r4   r5   r     s`   	





 z%AsyncOmni._process_sequential_resultsc           
      C   sL  | d}d|v r!td| j d| d| d|d   t|t|ddd	}t|tr1|d
 }|j}d}t	|ddrsg }	|j
dkrZt|trO|jrO|j}	nt|drZ|jrZ|j}	|j
dkrjt||j
||	|d}n	t||j
||d}t|j| pzdt |j|< |j||j|||||j
|d td| j d| d| d |||fS )a  
        Process a single result dictionary from a stage.
        Returns:
            engine_outputs: The decoded outputs.
            finished: Whether the stage processing is finished for this request.
            output_to_yield: An OmniRequestOutput to yield, or None.
        r   r   r   z] Stage z error on request r   r   engine_outputs_shm)obj_keyshm_keyr   Nrq   Frl   images)rm   rr   request_outputr   r   )rm   rr   r   r   g        )r}   rn   rm   r   r   r   rr   r   z] Stage-z completed request z; forwarding or finalizing)rt   r#   r   r   r   _loadr   r   r   r%   rr   r   r   r   r   stage_last_tsr   process_stage_metricsrn   r   )
rJ   r}   r3   rm   r   r   r   r   r   r   r4   r4   r5   r     s^   
 



z AsyncOmni._process_single_resultc                    s<   j d urd S jj  fdd}t| _ d S )Nc            	   
      s  z{	 d} t D ]_\}}| }|d u rq	d} |ddkr(tdI d H  q	|d} |}|d u rFtdj d| d	|  q	t|d
r\||j	v r\|j	| 
|I d H  q	|j
|I d H  ||_q	| rttdI d H  ntdI d H  q ty } zEtd   D ]1}|jt|d}t|d
r|j	 D ]
}|
|I d H  qn	|j
|I d H  |jt|d}qd _W Y d }~d S d }~ww )NTFtypestage_readyg?r   r   zY] Request may have been aborted;                                 dropping output for req z
 at stage-r   r   r   z AsyncOmni output_handler failed.)r   r   )r   try_collectrt   r<   r   r#   r   r   r   r   putr   rm   r"   r   r   r   r   r.   )	idlerm   r3   r}   r   r   r1   r   r   r@   rJ   r*   r4   r5   r.   E  sX   




z5AsyncOmni._run_output_handler.<locals>.output_handler)r.   r*   r@   r<   create_task)rJ   r.   r4   r   r5   r   >  s   
-zAsyncOmni._run_output_handlerc                 C   s   t | jdkS )Nr   )r   rE   r   r4   r4   r5   
is_runningt  s   zAsyncOmni.is_runningc                 C   s   | j S r   )erroredr   r4   r4   r5   
is_stoppedy     zAsyncOmni.is_stoppedc                 C   s   | j  S r   )r   r   r4   r4   r5   r   }  s   zAsyncOmni.erroredc                 C      dS )NAsyncOrchestratorr4   r   r4   r4   r5   r        zAsyncOmni._namec                 C   r   )NTr4   r   r4   r4   r5   is_async  r  zAsyncOmni.is_asyncc                 C   s   t  S r   r   r   r4   r4   r5   
dead_error  r   zAsyncOmni.dead_errorc                    s(   t j|d}| jD ]}|| q
d S )N)r   r   )r   ABORTr*   r   )rJ   r   
abort_taskr3   r4   r4   r5   r     s
   
zAsyncOmni.abortc                    s,   | j D ]}|jr|jd ur|j  S qd S r   )r*   is_comprehensionr~   rJ   r3   r4   r4   r5   get_vllm_config  s   


zAsyncOmni.get_vllm_configc                    s.   | j D ]}|jr|jd ur|jj  S qd S r   )r*   r  r~   r   r  r4   r4   r5   get_model_config  s   

zAsyncOmni.get_model_configc                       d S r   r4   r   r4   r4   r5   get_input_preprocessor     z AsyncOmni.get_input_preprocessorc                    s"   | j D ]
}|jr|j  S qd S r   )r*   r  r   r  r4   r4   r5   get_tokenizer     

zAsyncOmni.get_tokenizerc                    s"   | j D ]
}|jr|j  S qdS r;   )r*   r  r   r  r4   r4   r5   r     r  zAsyncOmni.is_tracing_enabledc                 C   s   | j jS )zReturn the renderer from input_processor if available.

        OMNI: Required by upstream OpenAIServingModels.__init__ which
        accesses engine_client.renderer.
        )r   rendererr   r4   r4   r5   r    s   zAsyncOmni.rendererc                    r
  r   r4   r   r4   r4   r5   do_log_stats  r  zAsyncOmni.do_log_statsc                    r
  r   r4   r   r4   r4   r5   check_health  r  zAsyncOmni.check_healthc                    r
  r   r4   r   r4   r4   r5   reset_mm_cache  r  zAsyncOmni.reset_mm_cacheFreset_running_requestsc                    r
  r   r4   )rJ   r  r4   r4   r5   reset_prefix_cache  r  zAsyncOmni.reset_prefix_cacherR   levelc                    r
  r   r4   )rJ   r  r4   r4   r5   r     r  zAsyncOmni.sleeptagsc                    r
  r   r4   )rJ   r  r4   r4   r5   wake_up  r  zAsyncOmni.wake_upc                       dS )z$Check whether the engine is sleepingFr4   r   r4   r4   r5   is_sleeping     zAsyncOmni.is_sleepinglora_requestc                    r  )z<Load a new LoRA adapter into the engine for future requests.Fr4   )rJ   r  r4   r4   r5   add_lora  r  zAsyncOmni.add_lorac                    s
   t d)z4Generate outputs for a request from a pooling model.z)encode() is not implemented for AsyncOmni)NotImplementedError)rJ   argsr9   r4   r4   r5   encode  s   zAsyncOmni.encodestagesc                       t  | dS )a  Start profiling for specified stages.

        Async wrapper around the base implementation for API consistency.

        Args:
            stages: List of stage IDs to start profiling. If None, starts
                profiling for all stages that have profiling enabled.

        Example:
            >>> await async_omni.start_profile()
            >>> async for output in async_omni.generate(...):
            ...     pass
            >>> await async_omni.stop_profile()
        N)rA   start_profilerJ   r   rK   r4   r5   r"       zAsyncOmni.start_profilec                    r!  )a  Stop profiling for specified stages.

        Async wrapper around the base implementation for API consistency.

        Args:
            stages: List of stage IDs to stop profiling. If None, stops
                profiling for all stages.

        Example:
            >>> await async_omni.start_profile()
            >>> async for output in async_omni.generate(...):
            ...     pass
            >>> await async_omni.stop_profile()
        N)rA   stop_profiler#  rK   r4   r5   r%    r$  zAsyncOmni.stop_profileT)wait_for_inflight_requestsclear_cacher&  r'  c             	      s   | j 4 I dH  | jr	 W d  I dH  dS d| _W d  I dH  n1 I dH s+w   Y  |rB|  I dH  |  I dH  dS dS )a4  
        Pause generation to allow model weight updates.

        New generation/encoding requests are blocked until resume.

        Args:
            wait_for_inflight_requests: When ``True`` waits for in-flight
                requests to finish before pausing. When ``False`` (default),
                immediately aborts any in-flight requests.
            clear_cache: Whether to clear KV cache and prefix cache after
                draining. Set to ``False`` to preserve cache for faster resume.
                Default is ``True`` (clear caches).
        NT)r>   r?   r  r  )rJ   r&  r'  r4   r4   r5   pause_generation  s   (zAsyncOmni.pause_generationc              	      sR   | j 4 I dH  d| _| j   W d  I dH  dS 1 I dH s"w   Y  dS )z1Resume generation after :meth:`pause_generation`.NF)r>   r?   
notify_allr   r4   r4   r5   resume_generation"  s
   .zAsyncOmni.resume_generationc              	      sF   | j 4 I dH  | jW  d  I dH  S 1 I dH sw   Y  dS )z.Return whether the engine is currently paused.N)r>   r?   r   r4   r4   r5   	is_paused)  s   0zAsyncOmni.is_paused)r   r   )r:   Nr   )rR   )B__name__
__module____qualname____doc__r   dictr   rB   r|   r   intr   r   r   r   r   r   r   r   r   r   r
   r   r   r   r   tupleboolr   r   propertyr   r   r   r   r  BaseExceptionr  r   r   r   r  r   r	  r   r  r   r  r   r  r  r  r  r  r   r  r  r   r  r  r"  r%  r(  r*  r+  __classcell__r4   r4   rK   r5   r7   E   s    "!"&K%



 

.

C


I6



  

#r7   r   )=r<   r   r   rC   collections.abcr   r   r   typingr   vllm.configr   vllm.inputs.preprocessr   vllm.loggerr   vllm.plugins.io_processorsr	   vllm.sampling_paramsr
   vllm.tokenizersr   vllm.v1.engine.exceptionsr   vllm_omni.configr   vllm_omni.diffusion.datar   -vllm_omni.distributed.omni_connectors.adapterr   r   %vllm_omni.distributed.ray_utils.utilsr    vllm_omni.engine.input_processorr   *vllm_omni.entrypoints.client_request_stater   vllm_omni.entrypoints.omnir    vllm_omni.entrypoints.omni_stager   !vllm_omni.entrypoints.stage_utilsr   r   r   r   vllm_omni.entrypoints.utilsr   vllm_omni.inputs.datar   r   vllm_omni.lora.requestr   vllm_omni.metricsr   vllm_omni.outputsr   r,  r#   r6   r7   r4   r4   r4   r5   <module>   s>   
