o
    ڷi_                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZmZmZmZmZ ddlZdd	lmZmZmZ G d
d deZG dd dZG dd deejZG dd dZedddZG dd deeje ZdS )z%Future-returning APIs for coroutines.    )annotationsN)Future)deque)partial)chain)Any	AwaitableCallable
NamedTupleTypeVarcast)EVENTSPOLLINPOLLOUTc                   @  s>   e Zd ZU ded< ded< ded< ded< d	ed
< d	ed< dS )_FutureEventr   futurestrkindtupleargsdictkwargsr   msgtimerN)__name__
__module____qualname____annotations__ r   r   A/home/ubuntu/vllm_env/lib/python3.10/site-packages/zmq/_future.pyr      s   
 r   c                   @  sF   e Zd ZU dZdZded< ded< ddd	Zdd
dZddddZdS )_AsynczMixin for common async logicNr   _current_loopztype[Future]_Futurereturnc                 C  sL   | j du r|  | _ | | j  | j S |  }|| j ur$|| _ | | |S )zGet event loop

        Notice if event loop has changed,
        and register init_io_state on activation of a new event loop
        N)r!   _default_loop_init_io_state)selfcurrent_loopr   r   r   	_get_loop1   s   



z_Async._get_loopc                 C  s   t d)Nz!Must be implemented in a subclassNotImplementedError)r&   r   r   r   r$   B   s   z_Async._default_loopNonec                 C     d S Nr   r&   loopr   r   r   r%   E   s   z_Async._init_io_state)r#   r   r-   )r#   r+   )	r   r   r   __doc__r!   r   r(   r$   r%   r   r   r   r   r    +   s   
 

r    c                      sZ   e Zd ZU dZded< ded< ded< ded< dddZdddZdd fddZ  ZS )_AsyncPollerz:Poller that returns a Future on poll, instead of blocking.ztype[_AsyncSocket]_socket_classint_READ_WRITEz	list[Any]raw_socketsr/   r   socketevtfr	   r#   r+   c                 C     t  )z"Schedule callback for a raw socketr)   )r&   r/   r7   r8   r9   r   r   r   _watch_raw_socketQ      z_AsyncPoller._watch_raw_socketsocketsc                 G  r:   )z$Unschedule callback for a raw socketr)   )r&   r/   r=   r   r   r   _unwatch_raw_socketsU   r<   z!_AsyncPoller._unwatch_raw_sockets Awaitable[list[tuple[Any, int]]]c              
     s     |dkr0zt d}W n ty( } z | W Y d}~ S d}~ww  |  S    g fdd}fdd g fdd}jD ]W\}}t	|t
jrt	|jsqj|}| |t
j@ r}|jd	d
 |t
j@ r|jd	d
 qV| d}|t
j@ r|jO }|t
j@ r|jO }||| qV fdd}	|	 rÈ| |dur|dkrfdd}
d| |
fdd} | fdd} |  S )z Return a Future for a poll eventr   Nc                          s d  d S d S r-   done
set_result)r   watcherr   r   wake_rawm      z#_AsyncPoller.poll.<locals>.wake_rawc                   s   j  gR  S r-   )r>   r9   )r/   r6   r&   r   r   <lambda>r   s    z#_AsyncPoller.poll.<locals>.<lambda>c                   s    D ]}|   qd S r-   )_clear_io_state)r9   s)wrapped_socketsr   r   _clear_wrapper_iow   s   
z,_AsyncPoller.poll.<locals>._clear_wrapper_iopollr   c              
     s     rd S  rz  W d S  ty   Y d S w  r)  d S z
ttd}W n t	yK } z| W Y d }~d S d }~ww 
| d S Nr   )rC   	cancelledcancelRuntimeError	exceptionset_exceptionsuperr1   rO   	ExceptionrD   )r9   resulte)	__class__r   r&   rF   r   r   on_poll_ready   s&   
z(_AsyncPoller.poll.<locals>.on_poll_readyc                     rA   r-   rB   r   rE   r   r   trigger_timeout   rH   z*_AsyncPoller.poll.<locals>.trigger_timeoutMbP?c                   s$   t dr  d S   d S )NrS   )hasattrrS   remove_timeoutrI   )r/   timeout_handler   r   cancel_timeout   s   
z)_AsyncPoller.poll.<locals>.cancel_timeoutc                   s      s
   d S d S r-   )rC   rS   rI   rE   r   r   cancel_watcher   s   z)_AsyncPoller.poll.<locals>.cancel_watcher)r"   rW   rO   rX   rV   rD   r(   add_done_callbackr=   
isinstance_zmqSocketr2   from_socketappendr   _add_recv_eventr   _add_send_eventr4   r5   r;   
call_later)r&   timeoutrY   rZ   rG   rN   r7   maskr8   r\   r]   rb   rc   r[   )r   r/   r6   r&   ra   rF   rM   r   rO   Y   sb   












z_AsyncPoller.poll)
r/   r   r7   r   r8   r3   r9   r	   r#   r+   )r/   r   r=   r   r#   r+   )r?   )r#   r@   )	r   r   r   r0   r   r;   r>   rO   __classcell__r   r   ro   r   r1   I   s   
 

r1   c                   @  s   e Zd Zedd ZdS )_NoTimerc                   C  r,   r-   r   r   r   r   r   rS      s   z_NoTimer.cancelN)r   r   r   staticmethodrS   r   r   r   r   rq      s    rq   T_AsyncSocket)boundc                      s|  e Zd ZU dZdZdZded< eZdZ					d]d^ fd
dZ
ed_d`ddZd_da fddZejjje_ fddZejjje_	dbdcd"d#Z	dbddd%d&Zddd'ded*d+Z	dbdfd.d/Z			dbdgd2d3Zd4d5 Zdejfdhd6d7Zd8d9 Zd:d; Zed_d<d=Zdddd>didGdHZdjdIdJZdKdL Z dMdN Z!dkdOdPZ"d_dQdRZ#dSdT Z$dUdV Z%dWdX Z&d_dYdZZ'd[d\ Z(  Z)S )lrt   Nr   _zmq.Socket_shadow_sockr?   _from_socket_zmq.Socket | Noner#   r+   c                   s   t |tjrd |}}|d urt j|jd || _nt j||fi | tj| j| _|d ur?tj	| j
j dtdd t | _t | _d| _| jj| _d S )N)shadowz^(io_loop) argument is deprecated in pyzmq 22.2. The currently active loop will always be used.   )
stacklevelr   )re   rf   rg   rW   __init__
underlyingrw   rz   warningswarnr[   r   DeprecationWarningr   _recv_futures_send_futures_stateFD_fd)r&   contextsocket_typeio_looprx   r   ro   r   r   r}      s"   
z_AsyncSocket.__init__clstype[T]r7   r   r   rs   c                 C  s   | ||dS )z.Create an async socket from an existing Socket)rx   r   r   )r   r7   r   r   r   r   rh      s   z_AsyncSocket.from_socketlinger
int | Nonec              	     sz   | j s4| jd ur4tt| jpg | jpg }|D ]}|j s/z|j  W q t	y.   Y qw q| 
  t j|d d S )N)r   )closedr   listr   r   r   r   rC   rS   rT   rK   rW   close)r&   r   
event_listeventro   r   r   r      s   
z_AsyncSocket.closec                   s"   t  |}|tkr| | |S r-   )rW   getr   _schedule_remaining_events)r&   keyrY   ro   r   r   r     s   
z_AsyncSocket.getTFflagsr3   copybooltrack)Awaitable[list[bytes] | list[_zmq.Frame]]c                 C     | j dt|||ddS )zvReceive a complete multipart zmq message.

        Returns a Future whose result will be a multipart message.
        recv_multipartr   r   r   r   rj   r   r&   r   r   r   r   r   r   r     s   z_AsyncSocket.recv_multipartAwaitable[bytes | _zmq.Frame]c                 C  r   )zReceive a single zmq frame.

        Returns a Future, whose result will be the received frame.

        Recommend using recv_multipart instead.
        recvr   r   r   r   r   r   r   r     s   	z_AsyncSocket.recvnbytesr   r   Awaitable[int]c               C  s   | j d|ft||ddS )zReceive a single zmq frame into a pre-allocated buffer.

        Returns a Future, whose result will be the number of bytes received.
        	recv_intor   )r   r   r   )r&   bufr   r   r   r   r   r   &  s   z_AsyncSocket.recv_into	msg_parts%Awaitable[_zmq.MessageTracker | None]c                 K  s(   ||d< ||d< ||d< | j d||dS )zqSend a complete multipart zmq message.

        Returns a Future that resolves when sending is complete.
        r   r   r   send_multipartr   r   )rk   )r&   r   r   r   r   r   r   r   r   r   1  s   z_AsyncSocket.send_multipartdatar   c                 K  s<   ||d< ||d< ||d< | t|||d | jd||dS )zSend a single zmq frame.

        Returns a Future that resolves when sending is complete.

        Recommend using send_multipart instead.
        r   r   r   r   sendr   )updater   rk   )r&   r   r   r   r   r   r   r   r   r   =  s
   z_AsyncSocket.sendc                   s>   |     fdd}|  fdd} |  S )zDeserialize with Futuresc              
     s      r s du rtd  d dt dS  r*   dS  }z|}W n tyL } z | W Y d}~dS d}~ww  	| dS )z+Chain result through serialization to recvdNzFuture z completed while awaiting z. A message has been dropped!)
rC   rR   rU   r   r   RuntimeWarningrV   rY   rX   rD   )_r   loadedrZ   r9   loadrecvdr   r   _chainU  s"   z)_AsyncSocket._deserialize.<locals>._chainc                   s$     rdS   r  dS dS )z"Chain cancellation from f to recvdN)rC   rR   rS   )r   )r9   r   r   r   _chain_cancelp  s
   z0_AsyncSocket._deserialize.<locals>._chain_cancel)r"   rd   )r&   r   r   r   r   r   r   r   _deserializeQ  s   

z_AsyncSocket._deserializec                   s   j r	ttj }|| tt||	   fdd}
 r0| n| fdd} |  S )zSpoll the socket for events

        returns a Future for the poll results.
        c                   st      rd S  rz   W d S  ty   Y d S w |  r)   d S t } |	d d S rQ   )
rC   rR   rS   rT   rU   rV   r   rY   rD   r   )r9   evtsr   poll_futurer&   r   r   unwrap_result  s   
z(_AsyncSocket.poll.<locals>.unwrap_resultc                   s0      sz   W dS  ty   Y dS w dS )z4Cancel underlying poll if request has been cancelledN)rC   rS   rT   rP   )r   r   r   cancel_poll  s   z&_AsyncSocket.poll.<locals>.cancel_poll)r   rf   ZMQErrorENOTSUP_poller_classregisterr   r   rO   r"   rC   rd   )r&   rm   r   pr   r   r   r   r   rO   {  s   


	z_AsyncSocket.pollc                   s    fdd}|  ||S )z'Add a timeout for a send or recv Futurec                     s      rd S  t  d S r-   )rC   rV   rf   Againr   rP   r   r   future_timeout  s   z1_AsyncSocket._add_timeout.<locals>.future_timeout)_call_later)r&   r   rm   r   r   rP   r   _add_timeout  s   z_AsyncSocket._add_timeoutc                 C  s   |   ||S )zSchedule a function to be called later

        Override for different IOLoop implementations

        Tornado and asyncio happen to both have ioloop.call_later
        with the same signature.
        )r(   rl   )r&   delaycallbackr   r   r   r     s   z_AsyncSocket._call_laterc                 C  s.   |sdS z| | W dS  ty   Y dS w )zMake sure that futures are removed from the event list when they resolve

        Avoids delaying cleanup until the next send/recv event,
        which may never come.
        N)remove
ValueError)r   r   r   r   r   r   _remove_finished_future  s   z$_AsyncSocket._remove_finished_future)r   r   r   r   r   r   tuple | Nonedict[str, Any] | Noner   Future | Noner   c             
   C  s:  |p|   }|du rd}|du ri }|drO|ddtj@ rOt| j|}z	||i |}W n tyG } z|| W Y d}~|S d}~ww |	| |S t
}	ttdrf| jj}
|
dkrf| ||
d }	t||||d|	d}| j| | jtt@ r|   | jr|| jv r|t| j| j|d	 | t |S )
z4Add a recv event, returning the corresponding FutureNr   r   r   r   RCVTIMEOr^   r   r   r   r   r   r   )r"   
startswithr   rf   DONTWAITgetattrrw   rX   rV   rD   rq   r_   rcvtimeor   r   r   ri   r   r   _handle_recvrd   r   r   _add_io_state)r&   r   r   r   r   r9   r   rrZ   r   
timeout_ms_future_eventr   r   r   rj     sJ   	


z_AsyncSocket._add_recv_eventc              
   C  sh  |p|   }|dv rw| jsw|dd}| }|tjB |d< t| j|}d}	z
||fi |}
W n6 tjyP } z|tj@ rD|	| nd}	W Y d}~n d}~w t
yf } z|	| W Y d}~n
d}~ww ||
 |	rw| jru|   |S t}ttdr| jtj}|dkr| ||d }t||d	|||d
}| j| |t| j| j|d | t |S )z4Add a send event, returning the corresponding Future)r   r   r   r   TFNSNDTIMEOr^   r   r   r   )r"   r   r   r   rf   r   r   rw   r   rV   rX   rD   r   r   rq   r_   r   r   r   ri   rd   r   r   r   r   )r&   r   r   r   r   r9   r   nowait_kwargsr   finish_earlyr   rZ   r   r   r   r   r   r   rk     sR   



z_AsyncSocket._add_send_eventc           
   
   C  s,  | j tt@ s
dS d}| jr%| j \}}}}}}| r!d}nn| js| js-| t |du r3dS |  |dkrB|	d dS |dkrK| j j
}n|dkrT| j j}n|dkr]| j j}ntd||d  tjO  < z	||i |}W n ty }	 z||	 W Y d}	~	dS d}	~	ww |	| dS )zHandle recv eventsNrO   r   r   r   zUnhandled recv event type: r   )rw   r   r   r   r   popleftrC   _drop_io_staterS   rD   r   r   r   r   rf   r   rX   rV   )
r&   r9   r   r   r   r   r   r   rY   rZ   r   r   r   r   J  s@   




z_AsyncSocket._handle_recvc           
   
   C  s  | j tt@ s
d S d }| jr%| j \}}}}}}| r!d }nn| js| js-| t |d u r3d S |  |dkrB|	d  d S |dkrK| j j
}n|dkrT| j j}ntd||d  tjO  < z
||fi |}W n ty }	 z||	 W Y d }	~	d S d }	~	ww |	| d S )NrO   r   r   zUnhandled send event type: r   )rw   r   r   r   r   r   rC   r   rS   rD   r   r   r   rf   r   rX   rV   )
r&   r9   r   r   r   r   r   r   rY   rZ   r   r   r   _handle_sendu  s<   



z_AsyncSocket._handle_sendc                 C  sH   | j jrdS | j t}|tj@ r|   |tj@ r|   | 	  dS )z(Dispatch IO events to _handle_recv, etc.N)
rw   r   r   r   rf   r   r   r   r   r   )r&   fdevents
zmq_eventsr   r   r   _handle_events  s   

z_AsyncSocket._handle_eventsc                 C  sB   | j dkrdS |du r| jt}|| j @ r| d| j dS dS )zkSchedule a call to handle_events next loop iteration

        If there are still events to handle.
        r   N)r   rw   r   r   r   r   )r&   r   r   r   r   r     s   

z'_AsyncSocket._schedule_remaining_eventsc                 C  s*   | j |kr| j |B  }| _ | | j  dS )zAdd io_state to poller.Nr   _update_handlerr&   stater   r   r   r     s   
z_AsyncSocket._add_io_statec                 C  s(   | j |@ r| j | @ | _ | | j  dS )z&Stop poller from watching an io_state.Nr   r   r   r   r   r     s   
z_AsyncSocket._drop_io_statec                 C  s   |r|    |   dS )zOUpdate IOLoop handler with state.

        zmq FD is always read-only.
        N)r(   r   r   r   r   r   r     s   z_AsyncSocket._update_handlerc                 C  s6   |du r|   }|| j| j| j | d| j dS )z#initialize the ioloop event handlerNr   )r(   add_handlerrw   r   r4   r   r.   r   r   r   r%     s   z_AsyncSocket._init_io_statec                 C  s2   | j }| j jr
| j}| jdur| j| dS dS )zNunregister the ioloop event handler

        called once during close
        N)rw   r   r   r!   remove_handler)r&   r   r   r   r   rK     s   
z_AsyncSocket._clear_io_state)Nr?   NN)rx   ry   r#   r+   r-   )r   r   r7   rv   r   r   r#   rs   )r   r   r#   r+   )r   TF)r   r3   r   r   r   r   r#   r   )r   r3   r   r   r   r   r#   r   )r   r3   r   r3   r#   r   )r   r   r   r3   r   r   r#   r   )r   r   r   r3   r   r   r   r   r   r   r#   r   )r#   r   )
r   r   r   r   r   r   r   r   r#   r   )NNN)r   r   )*r   r   r   r   r   r   r   r1   r   r   r}   classmethodrh   r   rf   rg   r0   r   r   r   r   r   r   r   r   rO   r   r   rr   r   rj   rk   r   r   r   r   r   r   r   r%   rK   rp   r   r   ro   r   rt      sd   
 *2

7=+
)

) r0   
__future__r   r   asyncior   collectionsr   	functoolsr   	itertoolsr   typingr   r   r	   r
   r   r   zmqrf   r   r   r   r   r    Pollerr1   rq   rs   rg   rt   r   r   r   r   <module>   s      	w