o
    ٷi;                     @   s   d dl Z d dlZd dlmZ d dlmZ d dlZd dlm	Z	 d dl
mZ d dlmZ d dlmZmZmZ d dlmZ d d	lmZmZ d d
lmZ e	eZdedefddZdedefddZdedefddZG dd dZ dS )    N)Iterable)Any)init_logger)OmniDiffusionConfig)DiffusionExecutor)DiffusionModelRegistryget_diffusion_post_process_funcget_diffusion_pre_process_func)OmniDiffusionRequest)OmniDiffusionSamplingParamsOmniTextPrompt)OmniRequestOutputmodel_class_namereturnc                 C   &   t | }|d u rdS tt|ddS )NFsupport_image_inputr   _try_load_model_clsboolgetattrr   	model_cls r   X/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/diffusion/diffusion_engine.pysupports_image_input      
r   c                 C   s   t | }t|ddS )Ncolor_formatRGB)r   r   r   r   r   r   r   image_color_format!   s   
r   c                 C   r   )NFsupport_audio_outputr   r   r   r   r   supports_audio_output&   r   r    c                   @   s   e Zd ZdZdefddZdedee fddZ	e
d	edd fd
dZdefddZd#dedB ddfddZdefddZdd Z				d$dededB dededB dedB defddZd%ddZd eee B ddfd!d"ZdS )&DiffusionEnginez4The diffusion engine for vLLM-Omni diffusion models.	od_configc              
   C   sr   || _ t|| _t|| _t|}||| _z|   W dS  t	y8 } zt
d|  |   |d}~ww )zxInitialize the diffusion engine.

        Args:
            config: The configuration for the diffusion engine.
        zDummy run failed: N)r"   r   post_process_funcr	   pre_process_funcr   	get_classexecutor
_dummy_run	Exceptionloggererrorclose)selfr"   executor_classer   r   r   __init__0   s   



zDiffusionEngine.__init__requestr   c                    s  | j d urt }|    t | }td|dd |  }|jr,t|j td |jd u rGtd  fddt	 j
D S t }| jd urV| |jn|j}t | }td|dd t|tsw|d uru|gng }t jjt jj|d	 d
}| j d ur|d	 |d< t j
dkrֈ j
d }	 jr jd nd}
t| jjrt|dkr|d n|}tj|
g |	||jd|iddgS tj|
||	||jdgS g }d}t	 j
D ]`\}}	|t jk r j| nd}
 jj}|t|k r||||  ng }||7 }t| jjr1t|dkr|d n|}|tj|
g |	||jd|idd q|tj|
||	||jd q|S )NzPre-processing completed in z.4fz secondsz"Generation completed successfully.z1Output is None, returning empty OmniRequestOutputc              	      s<   g | ]\}}t j|t jk r j| nd g |i ddqS ) N
request_idimagespromptmetricslatents)r   from_diffusionlenrequest_ids).0ir5   r0   r   r   
<listcomp>T   s    z(DiffusionEngine.step.<locals>.<listcomp>zPost-processing completed in i  )	image_num
resolutionpostprocess_time_mspreprocessing_time_ms   r   r1   audio)r3   r4   r5   r6   r7   multimodal_outputfinal_output_typer2   )r$   timer)   infoadd_req_and_wait_for_responser*   r(   outputwarning	enumeratepromptsr#   
isinstancelistintsampling_paramsnum_outputs_per_promptr@   r9   r:   r    r"   r   r   r8   trajectory_latentsappend)r,   r0   preprocess_start_timepreprocess_timerJ   postprocess_start_timeoutputspostprocess_timer6   r5   r3   audio_payloadresults
output_idxr<   num_outputsrequest_outputsr   r=   r   stepE   s   











"
zDiffusionEngine.stepconfigc                 C   s   t | S )zFactory method to create a DiffusionEngine instance.

        Args:
            config: The configuration for the diffusion engine.

        Returns:
            An instance of DiffusionEngine.
        )r!   )r`   r   r   r   make_engine   s   
zDiffusionEngine.make_enginec                 C   s   | j |S N)r&   add_req)r,   r0   r   r   r   rI      s   z-DiffusionEngine.add_req_and_wait_for_responseNtrace_filenamec              
   C   s  |du rdt t  d}tjdd}tj|}tj|}z	tj|dd W n t	yB } zt
d| d	|   d}~ww tj||}| d
}t
d|  t
d|  z| jd|fd W dS  ty } zt
jddd td| |d}~ww )a  
        Start torch profiling on all diffusion workers.

        Creates a directory (if needed) and sets up a base filename template
        for per-rank profiler traces (typically saved as <template>_rank<N>.json).

        Args:
            trace_filename: Optional base filename (without extension or rank suffix).
                            If None, generates one using current timestamp.
        Nstage_0_diffusion__rankVLLM_TORCH_PROFILER_DIRz
./profilesT)exist_okz$Failed to create profiler directory z: z*.jsonu!   Starting diffusion profiling → zProfiler output directory: start_profile)methodargsz$Failed to start profiling on workersexc_infozCould not start profiler: )rP   rG   osenvirongetpath
expanduserabspathmakedirsOSErrorr)   r*   joinrH   debugcollective_rpcr(   RuntimeError)r,   rd   	trace_direxcfull_templateexpected_patternr.   r   r   r   ri      s.   
zDiffusionEngine.start_profilec           
      C   s  t d z	| jddd}W n ty#   t jddd g g d Y S w g g d}d	}|s4t d
 |S t|D ]S\}}t|tsPt d| dt	| d q8|
d}|r}t d| d|  |d | |d7 }|ds}t d| d|  |
d}|r|d | q8t|}|d	krd|d dd }	t|d dkr|	dt|d d  d7 }	t d| d| d|	  n'|d rt dd|d dd  t|d dkrd nd!  nt d" |d rt d#t|d  d$ |S )%a  
        Stop profiling on all workers and collect the final trace/table paths.

        The worker (torch_profiler.py) now handles trace export, compression to .gz,
        and deletion of the original .json file. This method only collects and
        reports the paths returned by the workers.

        Returns:
            dict with keys:
            - "traces": list of final trace file paths (usually .json.gz)
            - "tables": list of table strings (one per rank)
        z6Stopping diffusion profiling and collecting results...stop_profileiX  )rj   timeoutz#Failed to stop profiling on workersTrl   )tracestablesr   z+No profiling results returned from any rankzRank z: invalid result format (got )tracez[Rank z] Final trace: r   rC   )z.json.gzz.jsonz : unusual trace path extension: tabler   z, N   z ... (+z more)zProfiling stopped. Collected z trace file(s) from z rank(s). Final trace paths: zMProfiling stopped but no traces were successfully collected. Reported paths: z ...r1   uB   Profiling stopped — no trace files were collected from any rank.z
Collected z profiling table(s))r)   rH   rx   r(   r*   rK   rL   rN   dicttyperp   rT   endswithr9   rv   rw   )
r,   r[   output_filessuccessful_tracesrankres
trace_pathr   	num_ranksfinal_paths_strr   r   r   r~      sf   







zDiffusionEngine.stop_profilec           	   	   C   s   d}d}d}t | jjrt| jj}tj|||f}nd}dd|id}t|gt|||dddd	}t	
d
 | jdurB| |n|}| | dS )z!A dummy run to warm up the model.rC   i   Nz	dummy runimage)r5   multi_modal_datag        )heightwidthnum_inference_stepsguidance_scalerR   )rM   rQ   zdummy run to warm up the model)r   r"   r   r   PILImagenewr
   r   r)   rH   r$   rI   )	r,   r   r   r   r   dummy_imager5   reqr0   r   r   r   r'   <  s*   
zDiffusionEngine._dummy_runr   rj   r   rk   kwargsunique_reply_rankc                 C   s(   t |ts	J d| jj|||||dS )a  Call a method on worker processes and get results immediately.

        Args:
            method: The method name (str) to execute on workers
            timeout: Optional timeout in seconds
            args: Positional arguments for the method
            kwargs: Keyword arguments for the method
            unique_reply_rank: If set, only get reply from this rank

        Returns:
            Single result if unique_reply_rank is provided, otherwise list of results
        z.Only string method names are supported for now)rj   r   rk   r   r   )rN   strr&   rx   )r,   rj   r   rk   r   r   r   r   r   rx   Y  s   zDiffusionEngine.collective_rpcc                 C   s   t | dr| j  d S d S )Nr&   )hasattrr&   shutdown)r,   r   r   r   r+   v  s   
zDiffusionEngine.closer3   c                 C   s   t d d S )Nz,DiffusionEngine abort is not implemented yet)r)   rK   )r,   r3   r   r   r   abortz  s   
zDiffusionEngine.abortrb   )Nr   NN)r   N)__name__
__module____qualname____doc__r   r/   r
   rO   r   r_   staticmethodra   rI   r   ri   r   r~   r'   floattuplerP   r   rx   r+   r   r   r   r   r   r   r!   -   s:    q*M 

r!   )!rn   rG   collections.abcr   typingr   	PIL.Imager   vllm.loggerr   vllm_omni.diffusion.datar   %vllm_omni.diffusion.executor.abstractr   vllm_omni.diffusion.registryr   r   r	   vllm_omni.diffusion.requestr
   vllm_omni.inputs.datar   r   vllm_omni.outputsr   r   r)   r   r   r   r   r    r!   r   r   r   r   <module>   s"   