o
    ڷi7                     @   s  d Z ddlZddlZddlZddlmZmZ ddlm	Z	m
Z
mZ ddlmZ ddlmZ ddlmZ dd	lmZ d
d  ZZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZedZedkZ e! Z"d'ddZ#d'd d!Z$d'd"d#Z%G d$d% d%Z&d%d&gZ'dS )(zzmq Socket class    N)SocketOption_OptType)ZMQError	_check_rc_check_version   )ffi)lib)Frame)_retry_sys_callc                 C   s   t d| S )Nzsize_t*)r   newlength r   M/home/ubuntu/vllm_env/lib/python3.10/site-packages/zmq/backend/cffi/socket.py<lambda>   s    r   c                   C      t dtt dfS Nz	uint64_t*uint64_tr   r   nspsizeofr   r   r   r   new_uint64_pointer      r   c                   C   r   Nzint64_t*int64_tr   r   r   r   r   new_int64_pointer   r   r   c                   C   r   Nzint*intr   r   r   r   r   new_int_pointer   r   r   c                 C   s&   t d| ddtt d|  fS )Nchar[d]charr   r   r   r   r   new_binary_data!   s   &r$   c                 C      t d| t dfS r   r   r   r   valr   r   r   value_uint64_pointer%      r)   c                 C   r%   r   r&   r'   r   r   r   value_int64_pointer)   r*   r+   c                 C   r%   r   r&   r'   r   r   r   value_int_pointer-   r*   r,   c                 C   s(   t d|d dd| t d| fS )Nr    r   r!   r"   r#   r&   )r(   r   r   r   r   value_binary_data1   s   (r-   ZMQ_FD_T   c                 C   sD   t | dtj}|tjkstr|tjkrt S |tjkrt|S t	 S N	_opt_type)
getattrr   r   int64ZMQ_FD_64BITfdr   bytesr$   r   )optionr   opt_typer   r   r   new_pointer_from_opt;   s   
r9   c                 C   sV   zt | } W n ty   tj}Y nw | j}|tjkr%t||d d  S t|d S )Nr   )r   
ValueErrorr   r   r1   r6   r   buffer)r7   opt_pointerr   r8   r   r   r   value_from_opt_pointerG   s   

r=   c                 C   sJ   t | dtj}|tjkstr|tjkrt|S |tjkr!t||S t	|S r0   )
r2   r   r   r3   r4   r5   r+   r6   r-   r,   )r7   valuer   r8   r   r   r   initialize_opt_pointerX   s   

r?   c                   @   s   e Zd ZdZdZdZdZdZdZdZ	dZ
dZd,ddZedd Zdd	 Zed
d Zd-ddZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zd.ddZd/d!d"Zddd#d$ed%ed&efd'd(Zd0d*d+ZdS )1SocketNFr   c                 C   sr   |d u rt j}|| _|| _d  | _| _|r td|| _d| _	nd| _	t
|j|| _| jtjkr4t d| _d S )Nvoid *TF)zmqCOPY_THRESHOLDcopy_thresholdcontext_draft_poller_draft_poller_ptrr   cast_zmq_socket_shadowC
zmq_socket_zmq_ctxNULLr   _closed)selfrE   socket_typeshadowrD   r   r   r   __init__m   s   
zSocket.__init__c                 C   s   t td| jS )z+The address of the underlying libzmq socketsize_t)r   r   rH   rI   rP   r   r   r   
underlying~   s   zSocket.underlyingc              
   C   st   | j rdS z	| tj W dS  ty9 } z|jtjkr&d| _ W Y d}~dS |jtjkr-n W Y d}~dS d}~ww )zthorough check of whether the socket has been closed,
        even if by another entity (e.g. ctx.destroy).

        Only used by the `closed` property.

        returns True if closed, False otherwise
        TNF)rO   getrB   TYPEr   errnoENOTSOCKETERM)rP   er   r   r   _check_closed_deep   s    	
zSocket._check_closed_deepc                 C   s   |   S N)r]   rU   r   r   r   closed   s   zSocket.closedc                 C   s   d}| j s4t| dr4| jd urt| j}d  | _| _| jd ur1|d ur+| tj	| t
| j}d| _ |dk r>t| d S d S )Nr   rI   T)rO   hasattrrG   rK   zmq_poller_destroyrF   rI   setrB   LINGER	zmq_closer   )rP   lingerrcr   r   r   close   s   

zSocket.closec                 C   s   t |tr|d}n|}t |tr|d}t| j|}|dk rgtrDt	 t
jkrD|ddd }d| dt d}tt	 |d	t	 t
jkra|ddd }d
| d}tt	 |d	t| d S d S )Nutf8r   z://r   z
ipc path "z" is longer than z+ characters (sizeof(sockaddr_un.sun_path)).)msgz(No such file or directory for ipc path "z".)
isinstancestrencoder6   decoderK   zmq_bindrI   IPC_PATH_MAX_LEN	zmq_errno	errno_modENAMETOOLONGsplitr   ENOENTr   )rP   address	address_brf   pathrj   r   r   r   bind   s$   


zSocket.bindc                 C   .   t |tr
|d}t| j|}t| d S Nrh   )rk   rl   rm   rK   
zmq_unbindrI   r   rP   rv   rf   r   r   r   unbind      

zSocket.unbindc                 C   rz   r{   )rk   rl   rm   rK   zmq_connectrI   r   r}   r   r   r   connect   r   zSocket.connectc                 C   rz   r{   )rk   rl   rm   rK   zmq_disconnectrI   r   r}   r   r   r   
disconnect   r   zSocket.disconnectc                 C   s   d }t |trtdzt|}W n ty   tj}Y nw |j}t |tr6|tjkr2td| t	|}t
|||\}}ttj| j|td|| d S )Nzunicode not allowed, use bytesznot a bytes sockopt: void*)rk   rl   	TypeErrorr   r:   r   r   r1   r6   lenr?   r   rK   zmq_setsockoptrI   r   rH   )rP   r7   r>   r   r8   c_value_pointerc_sizetr   r   r   rb      s*   




z
Socket.setc           
   
   C   s  zt |}W n ty   tj}Y nw |j}|tjkr7| jd ur7t|\}}t	
| jtd| t|d S t|dd\}}ztt	j| j||| W n ty } zt|t jkr|jtjjkr| t jrtdd tjsqtdtjtj dd	 td
| _t	  | jd< | _| jtj krd  | _| _ t	!| j| jtj tj"tj#B }t$| t	
| jtd|}t$| t|d W  Y d }~S  d }~ww |d }t%|||}	|tj j&kr|tj'kr|	(dr|	d d }	|	S )Nr   r      r   )         z)draft socket FD support via zmq_poller_fdz'libzmq must be built with draft supportr   )
stacklevelzvoid*[1]rA       ri   ))r   r:   r   r   r1   rB   FDrF   r9   rK   zmq_poller_fdr   rH   r   zmq_getsockoptrI   r   rY   ErrnoEINVALrW   THREAD_SAFEr   	DRAFT_APIRuntimeErrorwarningswarnerrorDraftFDWarningr   rG   zmq_poller_newrN   zmq_poller_addPOLLINPOLLOUTr   r=   
ROUTING_IDr6   endswith)
rP   r7   r8   r   _c_sizet_pointerr\   rf   szvr   r   r   rW      sp   




#
z
Socket.getc                 C   s   t d}t|tst| }t d|}t|t|}t	| t
t||t| ttj|| j| t|}t	| dS )zSend a copy of a bufferable
zmq_msg_t*zchar[]N)r   r   rk   r6   
memoryviewtobytesrK   zmq_msg_init_sizer   r   memcpyzmq_msg_datar   zmq_msg_sendrI   zmq_msg_close)rP   bufflagszmq_msg	c_messagerf   rc2r   r   r   
_send_copy:  s   


zSocket._send_copyc                 C   s2   |  }|j}ttj|| j| |j}|  |S )z1Send a Frame on this socket in a non-copy manner.)	fast_copyr   r   rK   r   rI   trackerrg   )rP   framer   
frame_copyr   r   r   r   r   _send_frameI  s   zSocket._send_framec           	      C   s   t |tr	td|rt |ts| ||S d}t |tr)|r&|js&td|}n | jr?t|}|j	| jk r?| || t
jS t||| jd}d}| ||}|rU|  |S )Nz.Message must be in bytes, not a unicode objectFzNot a tracked message)trackrD   T)rk   rl   r   r
   r   r   r:   rD   r   nbytesrB   _FINISHED_TRACKERr   rg   )	rP   datar   copyr   close_framer   r   r   r   r   r   sendT  s(   


zSocket.sendTc           	      C   s   |rt d}t| n	tj|d}|j}zttj|| j	| W n t
y0   |r/t|  w |s5|S t t|t|}|d d  }t|}t| |S )Nr   )r   )r   r   rK   zmq_msg_initrB   r
   r   r   zmq_msg_recvrI   	Exceptionr   r;   r   zmq_msg_sizer   )	rP   r   r   r   r   r   _buffer_bytesrf   r   r   r   recvo  s&   


zSocket.recv)r   r   r   r   returnc               C   s   t |}|jstd|jrtd|dk rtd|d|j}|dkr(|}n||kr7td|d| dt|}tt	j
| j|||}t| |S )Nz%Can only recv_into contiguous buffersz Cannot recv_into readonly bufferr   znbytes=z must be non-negativez too big for memoryview of B)r   
contiguousBufferErrorreadonlyr:   r   r   from_bufferr   rK   zmq_recvrI   r   )rP   r;   r   r   view
view_bytesc_bufrf   r   r   r   	recv_into  s    
zSocket.recv_intori   c                 C   sD   |dk rt j}|du rtj}t|tr|d}t| j	|| dS )a   s.monitor(addr, flags)

        Start publishing socket events on inproc.
        See libzmq docs for zmq_monitor for details.

        Note: requires libzmq >= 3.2

        Parameters
        ----------
        addr : str
            The inproc url used for monitoring. Passing None as
            the addr will cause an existing socket monitor to be
            deregistered.
        events : int [default: zmq.EVENT_ALL]
            The zmq event bitmask for which events will be sent to the monitor.
        r   Nrh   )
rB   	EVENT_ALLr   rN   rk   rl   rm   rK   zmq_socket_monitorrI   )rP   addreventsr   r   r   monitor  s   

zSocket.monitor)NNr   Nr^   )r   FF)r   TF)ri   )__name__
__module____qualname__rE   rQ   rI   rO   _refrJ   rF   rG   rD   rS   propertyrV   r]   r_   rg   ry   r~   r   r   rb   rW   r   r   r   r   r   r   r   r   r   r   r   r@   b   s:    



G

r@   rp   )r   )(__doc__rY   rr   r   rB   zmq.constantsr   r   	zmq.errorr   r   r   _cffir   r	   rK   messager
   utilsr   r   new_sizet_pointerr   r   r   r$   r)   r+   r,   r-   r   _fd_sizer4   get_ipc_path_max_lenrp   r9   r=   r?   r@   __all__r   r   r   r   <module>   s:    




  S