o
    X۷i                     @  s   d dl mZ d dlZd dlmZ d dlZd dlZd dlZd dlZd dl	m
Z
 d dl	mZ dZdZdaejd	d
 ZG dd dejZG dd dZG dd dZdS )    )annotationsN)sizeof)
_klv_utils)_store_actionsz	127.0.0.1i4  Fc                   C  s   da d S )NT)
_exit_mode r   r   N/home/ubuntu/vllm_env/lib/python3.10/site-packages/cupyx/distributed/_store.py_exit   s   r	   c                      s4   e Zd Z fddZ fddZ fddZ  ZS )ExceptionAwareProcessc                   s,   t  j|i | d | _t \| _| _d S N)super__init__
_exceptionmultiprocessingPipe	_parent_p_child_p)selfargskwargs	__class__r   r   r      s   zExceptionAwareProcess.__init__c              
     sP   zt    | jd  W d S  ty' } z| j| W Y d }~d S d }~ww r   )r   runr   send	Exception)r   er   r   r   r       s   
zExceptionAwareProcess.runc                   s2   t    | j r| j }|d ur|d S d S r   )r   joinr   pollrecv)r   	exceptionr   r   r   r   '   s   


zExceptionAwareProcess.join)__name__
__module____qualname__r   r   r   __classcell__r   r   r   r   r
      s    r
   c                   @  sJ   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zee	fddZ
dd ZdS )TCPStorec                 C  s4   i | _ d | _|| _tdd| _t | _d | _	d S )Nb   )
storage_process_world_sizer   Value_run	threadingLock_lock_current_barrier)r   
world_sizer   r   r   r   2   s   

zTCPStore.__init__c                 C  s   t s|   d S d S r   )r   stopr   r   r   r   __del__;   s   zTCPStore.__del__c                 C  s
   || _ d S r   )r(   )r   processr   r   r   _set_process?   s   
zTCPStore._set_processc                 C  s   |S | ttj}t|dkrEtj|}|jdkr tdt|j	d |j }t
|j|| }|d urM||  W d    d S W d    d S W d    d S 1 sXw   Y  d S )Nr      zInvalid length for message)r   r   r   action_tlenfrom_buffer_copylength
ValueError	bytearrayvaluer   execute_actionactionsendallklv)r   c_socketaction_bytesaction_mr=   rr   r   r   _process_requestB   s   
	"zTCPStore._process_requestc              	   C  s   t  t jt jV}|t jt jd |||f |  |d | j	j
dkrVz| \}}W n	 ty9   Y q"w tj| j|fdd}|  | j	j
dks(W d    d S W d    d S 1 saw   Y  d S )Nr&         ?T)targetr   daemon)socketAF_INETSOCK_STREAM
setsockopt
SOL_SOCKETSO_REUSEADDRbindlisten
settimeoutr+   r=   acceptTimeoutErrorr,   ThreadrF   start)r   hostportsrB   addrtr   r   r   _server_loopO   s(   
"zTCPStore._server_loopc                 C  s$   t | j||fd}|  || _d S )N)rH   r   )r
   r\   rV   r(   )r   rW   rX   pr   r   r   r   b   s
   

zTCPStore.runc                 C  sd   t rd S | jd ur.| j  d| j_W d    n1 sw   Y  | j r0| j  d S d S d S )Nr   )r   r(   r+   get_lockr=   is_aliver   r2   r   r   r   r1   i   s   


zTCPStore.stopN)r    r!   r"   r   r3   r5   rF   r\   _DEFAULT_HOST_DEFAULT_PORTr   r1   r   r   r   r   r$   /   s    	r$   c                   @  sB   e Zd ZdZdZeefddZdd Zdd Z	d	d
 Z
dd ZdS )TCPStoreProxy2   rG   c                 C  s   || _ || _d S r   )rW   rX   )r   rW   rX   r   r   r   r   x   s   
zTCPStoreProxy.__init__c              	   C  s   t tjD ]t}zcttjtjQ}|| j| jf |	|
  |ttj}t|dkrZtj|}t|jd |j }|jdkrS||W  d    W   S t|dW d    n1 sdw   Y  W q tyy   ttj Y qw td)Nr   zutf-8zTCPStore is not available)rangerb   MAX_NUM_RETRIESrJ   rK   rL   connectrW   rX   r@   rA   r   r   r   result_action_tr8   r9   r<   r=   r:   statusdecode_resultRuntimeErrordecodeConnectionRefusedErrortimesleepDELAY_FOR_RETRY)r   r?   irY   result_bytesresultr=   r   r   r   
_send_recv|   s0   
zTCPStoreProxy._send_recvc                 C  s   |  t|S r   )rs   r   Get)r   keyr   r   r   __getitem__   s   zTCPStoreProxy.__getitem__c                 C  s   |  t|| d S r   )rs   r   Set)r   ru   r=   r   r   r   __setitem__   s   zTCPStoreProxy.__setitem__c                 C  s   |  t  d S r   )rs   r   Barrierr2   r   r   r   barrier   s   zTCPStoreProxy.barrierN)r    r!   r"   re   ro   r`   ra   r   rs   rv   rx   rz   r   r   r   r   rb   s   s    rb   )
__future__r   atexitctypesr   r   r,   rJ   rm   cupyx.distributedr   r   r`   ra   r   registerr	   Processr
   r$   rb   r   r   r   r   <module>   s"    
D