o
    ۷iG                     @  sT   d Z ddlmZ ddlZddlmZ ddlZddlmZ G dd dZ	dddZ
dS )z'ZMQ-based queue utilities for Omni IPC.    )annotationsN)Any)make_zmq_socketc                   @  sd   e Zd ZdZdddddd#ddZd$ddZd$ddZd%d&ddZd'ddZd(dd Z	d)d!d"Z
dS )*ZmqQueuez#Queue-like wrapper on a ZMQ socket.N)bindconnectrecv_timeout_mssend_timeout_msctxzmq.Contextsocket_typeintr   
str | Noner   r   
int | Noner	   returnNonec          	      C  s   |d ur|n|}|d u rt d|d u}t||||dd| _t | _| j| jtj || _|| _	|d ur:|| j_
|d urB|| j_|| _d S )Nz(Either bind or connect must be specifiedi  )r   linger)
ValueErrorr   _socketzmqPoller_pollerregisterPOLLIN_default_recv_timeout_default_send_timeoutrcvtimeosndtimeoendpoint)	selfr
   r   r   r   r   r	   path	bind_mode r"   U/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/entrypoints/zmq_utils.py__init__   s   

zZmqQueue.__init__objr   c              
   C  s8   z	| j | W dS  tjy } zt |d}~ww )z:Send an object to the queue. Blocks until sent or timeout.N)r   
send_pyobjr   AgainqueueFullr   r%   er"   r"   r#   put4      
zZmqQueue.putc              
   C  s>   z| j j|tjd W dS  tjy } zt |d}~ww )z-Send an object to the queue without blocking.flagsN)r   r&   r   NOBLOCKr'   r(   r)   r*   r"   r"   r#   
put_nowait;   s   
zZmqQueue.put_nowaittimeoutfloat | Nonec                 C  sN   |du r	| j  S t| jt|d }|| j tjkr#| j  S t	
 )zBReceive an object from the queue with optional timeout in seconds.Ni  )r   
recv_pyobjdictr   pollr   getr   r   r(   Empty)r   r2   eventsr"   r"   r#   r7   B   s   

zZmqQueue.getc              
   C  s8   z	| j jtjdW S  tjy } zt |d}~ww )z2Receive an object from the queue without blocking.r.   N)r   r4   r   r0   r'   r(   r8   )r   r+   r"   r"   r#   
get_nowaitM   r-   zZmqQueue.get_nowaitboolc                 C  s"   t | jd}|| jtjkS )z-Check if the queue is empty without blocking.r   )r5   r   r6   r7   r   r   r   )r   r9   r"   r"   r#   emptyT   s   zZmqQueue.emptyc                 C  s   | j d d S )Nr   )r   close)r   r"   r"   r#   r=   Y   s   zZmqQueue.close)r
   r   r   r   r   r   r   r   r   r   r	   r   r   r   )r%   r   r   r   )N)r2   r3   r   r   )r   r   )r   r;   )r   r   )__name__
__module____qualname____doc__r$   r,   r1   r7   r:   r<   r=   r"   r"   r"   r#   r      s    
"


r   r
   r   r   strr   r   r   c                 C  s   t | ||dS )z:Create a ZmqQueue from an endpoint string and socket type.)r   )r   )r
   r   r   r"   r"   r#   create_zmq_queue]   s   rC   )r
   r   r   rB   r   r   r   r   )rA   
__future__r   r(   typingr   r   vllm.utils.network_utilsr   r   rC   r"   r"   r"   r#   <module>   s   N