o
    ۷i                     @   s^   d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	m
Z
 eeZG dd dZdS )    N)MessageQueue)init_logger)DiffusionOutputOmniDiffusionConfig)OmniDiffusionRequestc                   @   sD   e Zd ZdefddZdd Zdd Zded	efd
dZ	dd Z
dS )	Scheduler	od_configc                 C   sl   t | dd }|d ur|jstd |   |j| _|| _t	 | _
t| j| jtt| jd| _d | _d S )Nmqz<SyncSchedulerClient is already initialized. Re-initializing.)n_readern_local_readerlocal_reader_ranks)getattrclosedloggerwarningclosenum_gpusnum_workersr   	threadingLock_lockr   listranger	   	result_mq)selfr   existing_mq r   S/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/scheduler.py
initialize   s   


zScheduler.initializec                 C   s   t j|dd| _td d S )Nr   )rankz-SyncScheduler initialized result MessageQueue)r   create_from_handler   r   info)r   handler   r   r   initialize_result_queue%   s   z!Scheduler.initialize_result_queuec                 C   s
   | j  S )N)r	   export_handler   r   r   r   get_broadcast_handle+   s   
zScheduler.get_broadcast_handlerequestreturnc              	   C   s   | j L z8dd|fi ddd}| j| | jdu rtd| j }t|tr3|dd	kr3td
|W W  d   S  t	j
jyN   t
d tdw 1 sRw   Y  dS )z<Sends a request to the scheduler and waits for the response.rpcgenerater   T)typemethodargskwargsoutput_rankexec_all_ranksNzResult queue not initializedstatuserrorzworker errorz,Timeout waiting for response from scheduler.z"Scheduler did not respond in time.)r   r	   enqueuer   RuntimeErrordequeue
isinstancedictgetzmqr2   Againr   TimeoutError)r   r'   rpc_requestoutputr   r   r   add_req.   s,   



zScheduler.add_reqc                 C   s   d| _ d| _dS )z-Closes the socket and terminates the context.N)r	   r   r%   r   r   r   r   L   s   
zScheduler.closeN)__name__
__module____qualname__r   r   r#   r&   r   r   r>   r   r   r   r   r   r      s    r   )r   r9   3vllm.distributed.device_communicators.shm_broadcastr   vllm.loggerr   vllm_omni.diffusion.datar   r   vllm_omni.diffusion.requestr   r?   r   r   r   r   r   r   <module>   s   