o
    ۷i                     @   sX   d dl mZmZ d dlmZ d dlmZ d dlmZm	Z	 d dl
mZ G dd deZdS )	    )ABCabstractmethod)Any)resolve_obj_by_qualname)DiffusionOutputOmniDiffusionConfig)OmniDiffusionRequestc                   @   s   e Zd ZU dZdZeed< edede	d  fddZ
defdd	ZedddZededefddZe	
		
	
ddeded
B deded
B ded
B defddZedddZedddZd
S )DiffusionExecutorz,Abstract base class for Diffusion executors.Fuses_multiproc	od_configreturnc              
   C   s   | j }t|trt|tstd| d|}|S |dkr!td|dkr/ddlm} |}|S |dkr7td	t|t	rkzt
|}W n ttfy[ } ztd
| d| |d }~ww t|tsitd| d|S td| )NzJdistributed_executor_backend must be a subclass of DiffusionExecutor. Got .rayz!ray backend is not yet supported.mpr   )MultiprocDiffusionExecutorexternal_launcherz/external_launcher backend is not yet supported.z!Failed to load executor backend 'z,'. Ensure it is a valid python path. Error: z&Unknown distributed executor backend: )distributed_executor_backend
isinstancetype
issubclassr	   	TypeErrorNotImplementedError/vllm_omni.diffusion.executor.multiproc_executorr   strr   ImportError
ValueError)r   r   executor_classr   e r   [/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/executor/abstract.py	get_class   sJ   




zDiffusionExecutor.get_classc                 C   s   || _ |   d S )N)r   _init_executor)selfr   r   r   r   __init__4   s   zDiffusionExecutor.__init__Nc                 C      dS )z:Initialize the executor (e.g., launch workers, setup IPC).Nr   r"   r   r   r   r!   8      z DiffusionExecutor._init_executorrequestsc                 C   r$   )z$Add requests to the execution queue.Nr   )r"   r'   r   r   r   add_req=   r&   zDiffusionExecutor.add_reqr   methodtimeoutargskwargsunique_reply_rankc                 C   r$   )zExecute a method on workers.Nr   )r"   r)   r*   r+   r,   r-   r   r   r   collective_rpcB   s   
z DiffusionExecutor.collective_rpcc                 C   r$   )z.Check if the executor and workers are healthy.Nr   r%   r   r   r   check_healthN   r&   zDiffusionExecutor.check_healthc                 C   r$   )z,Shutdown the executor and release resources.Nr   r%   r   r   r   shutdownS   r&   zDiffusionExecutor.shutdown)r   N)Nr   NN)__name__
__module____qualname____doc__r
   bool__annotations__staticmethodr   r   r    r#   r   r!   r   r   r(   r   floattupledictintr   r.   r/   r0   r   r   r   r   r	   
   s@   
 $r	   N)abcr   r   typingr   vllm.utils.import_utilsr   vllm_omni.diffusion.datar   r   vllm_omni.diffusion.requestr   r	   r   r   r   r   <module>   s    