o
    ٷi:/                     @   s   d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ ddlmZmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ eeZG dd dZdS )z
Async entrypoint for vLLM-Omni diffusion model inference.

Provides an asynchronous interface for running diffusion models,
enabling concurrent request handling and streaming generation.
    N)AsyncGeneratorIterable)ThreadPoolExecutor)Any)init_logger)get_hf_file_to_dict)OmniDiffusionConfigTransformerConfig)DiffusionEngine)OmniDiffusionRequest)OmniDiffusionSamplingParamsOmniPromptType)LoRARequest)OmniRequestOutputc                   @   sN  e Zd ZdZ	d,dededB defddZ		d-ded	e	d
edB de
dB def
ddZ	d,ded
edB dedeedf fddZd.ddZd.ddZd.ddZd
eee B ddfddZedefddZedefddZdedefddZde
defd d!Zdee fd"d#Zd$edefd%d&Zd,d'edB ddfd(d)Zdefd*d+Z dS )/AsyncOmniDiffusiona  Async entry point for vLLM-Omni diffusion model inference.

    This class provides an asynchronous interface for running diffusion models,
    enabling concurrent request handling. It wraps the DiffusionEngine and
    provides async methods for image generation.

    Args:
        model: Model name or path to load
        od_config: Optional OmniDiffusionConfig. If not provided, it will be
            created from kwargs
        **kwargs: Additional keyword arguments passed to OmniDiffusionConfig

    Example:
        >>> async_diffusion = AsyncOmniDiffusion(model="Qwen/Qwen-Image")
        >>> result = await async_diffusion.generate(
        ...     prompt="A beautiful sunset over the ocean",
        ...     request_id="req-1",
        ... )
        >>> print(result.images)
    Nmodel	od_configkwargsc                 K   s  || _ |d}|d}|d u rtjdd|i|}nt|tr;|d u r*|d}|d u r3|d}tjdi |}|| _|d urJ| jjd| |d urV| jjd| z(t	d|j }|d ury|dd |_
|  t	d|j }t||_ntdW nc ttttfy   t	d|j }|d u rtd	|j  |d
}	|dpg }
|	dksd|
v rd|_
t |_|  n$|	dkr|j
d u rd|_
t |_|  n|
rt|
dkr|
d |_
n Y nw t|| _tdd| _d| _td| d S )Nstage_idengine_input_sourcer   zmodel_index.json_class_nameztransformer/config.jsonzmodel_index.json not foundzconfig.jsonz9Could not find config.json or model_index.json for model 
model_typearchitecturesbagelBagelForConditionalGenerationBagelPipelinenextstepNextStep11Pipeline   r   )max_workersFz-AsyncOmniDiffusion initialized with model: %s )r   getr   from_kwargs
isinstancedictr   omni_kv_config
setdefaultr   model_class_nameupdate_multimodal_supportr	   	from_dicttf_model_configFileNotFoundErrorAttributeErrorOSError
ValueErrorlenr
   make_engineenginer   	_executor_closedloggerinfo)selfr   r   r   r   r   config_dicttf_config_dictcfgr   r   r    r    ^/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/entrypoints/async_omni_diffusion.py__init__4   sb   








zAsyncOmniDiffusion.__init__promptsampling_params
request_idlora_requestreturnc           	   
      s   |du rdt  jdd  }|jrd|_|dur||_t|g||gd}td| t	
 }z|| j| jj|I dH }|d }W n ty_ } ztd|| td	| |d}~ww |jsf||_|S )
a  Generate images asynchronously from a text prompt.

        Args:
            prompt: Text prompt describing the desired image
            sampling_params: Sampling parameters
            request_id: Optional unique identifier for tracking the request

        Returns:
            OmniRequestOutput containing generated images

        Raises:
            RuntimeError: If generation fails
        Nzdiff-   T)promptsr=   request_idsz"Starting generation for request %sr   z$Generation failed for request %s: %szDiffusion generation failed: )uuiduuid4hexguidance_scaleguidance_scale_providedr?   r   r4   debugasyncioget_event_looprun_in_executorr2   r1   step	ExceptionerrorRuntimeErrorr>   )	r6   r<   r=   r>   r?   requestloopresulter    r    r:   generate   s:   
zAsyncOmniDiffusion.generatec                 K  s(   | j d||d|I dH }|V  dS )a(  Generate images with streaming progress updates.

        Currently, diffusion models don't support true streaming, so this
        yields a single result after generation completes. Future implementations
        may support step-by-step progress updates.

        Args:
            prompt: Text prompt describing the desired image
            request_id: Optional unique identifier for tracking the request
            **kwargs: Additional generation parameters

        Yields:
            OmniRequestOutput with generation progress/results
        )r<   r>   Nr    )rU   )r6   r<   r>   r   rS   r    r    r:   generate_stream   s   
z"AsyncOmniDiffusion.generate_streamc              
   C   s   | j rdS d| _ z| j  W n ty' } ztd| W Y d}~nd}~ww z	| jjdd W n tyI } ztd| W Y d}~nd}~ww td dS )z{Close the engine and release resources.

        Should be called when done using the AsyncOmniDiffusion instance.
        NTz"Error closing diffusion engine: %sF)waitz Error shutting down executor: %szAsyncOmniDiffusion closed)	r3   r1   closerN   r4   warningr2   shutdownr5   )r6   rT   r    r    r:   rX      s    zAsyncOmniDiffusion.closec                 C   s   |    dS )zAlias for close() method.N)rX   r6   r    r    r:   rZ      s   zAsyncOmniDiffusion.shutdownc                 C   s$   z|    W dS  ty   Y dS w )z Best-effort cleanup on deletion.N)rX   rN   r[   r    r    r:   __del__   s
   zAsyncOmniDiffusion.__del__c                    s   | j | dS )zAbort a request.N)r1   abort)r6   r>   r    r    r:   r]      s   zAsyncOmniDiffusion.abortc                 C   s   | j  S )zCheck if the engine is running.r3   r[   r    r    r:   
is_running   s   zAsyncOmniDiffusion.is_runningc                 C   s   | j S )zCheck if the engine is stopped.r^   r[   r    r    r:   
is_stopped   s   zAsyncOmniDiffusion.is_stopped
adapter_idc              	      sD   t  }|| j| jjdd|fi dI dH }t|tr t|S |S )zRemove a LoRAremove_loraN	rJ   rK   rL   r2   r1   collective_rpcr#   listall)r6   ra   rR   resultsr    r    r:   rb      s   
	zAsyncOmniDiffusion.remove_lorac              	      F   t  }|| j| jjdddd|idI dH }t|tr!t|S |S )zAdd a LoRA adapteradd_loraNr    r?   rc   )r6   r?   rR   rg   r    r    r:   ri   
     
	zAsyncOmniDiffusion.add_lorac              	      sd   t  }|| j| jjdddi dI dH }t|ts|pg S t }|D ]	}|	|p+g  q$t
|S )z%List all registered LoRA adapter IDs.
list_lorasNr    )rJ   rK   rL   r2   r1   rd   r#   re   setupdatesorted)r6   rR   rg   mergedpartr    r    r:   rk     s"   


zAsyncOmniDiffusion.list_loraslora_idc              	      rh   )z&Prevent an adapter from being evicted.pin_loraNr    ra   rc   )r6   rq   rR   rg   r    r    r:   rr   ,  rj   zAsyncOmniDiffusion.pin_loratrace_filenamec                    s(   t  }|| j| jj|I dH  dS )zStart profiling for the diffusion model.

        Args:
            trace_filename: Optional base filename for trace files.
                           If None, a timestamp-based name will be generated.
        N)rJ   rK   rL   r2   r1   start_profile)r6   rs   rR   r    r    r:   rt   :  s   z AsyncOmniDiffusion.start_profilec                    s"   t  }|| j| jjI dH S )zStop profiling and return profiling results.

        Returns:
            Dictionary containing paths to trace and table files.
        N)rJ   rK   rL   r2   r1   stop_profile)r6   rR   r    r    r:   ru   H  s   
zAsyncOmniDiffusion.stop_profile)N)NN)r@   N)!__name__
__module____qualname____doc__strr   r   r;   r   r   r   r   rU   r   rV   rX   rZ   r\   r   r]   propertyboolr_   r`   intrb   ri   re   rk   rr   rt   r$   ru   r    r    r    r:   r      s^    
O
;




r   )ry   rJ   rD   collections.abcr   r   concurrent.futuresr   typingr   vllm.loggerr   vllm.transformers_utils.configr   vllm_omni.diffusion.datar   r	   $vllm_omni.diffusion.diffusion_enginer
   vllm_omni.diffusion.requestr   vllm_omni.inputs.datar   r   vllm_omni.lora.requestr   vllm_omni.outputsr   rv   r4   r   r    r    r    r:   <module>   s    