o
    bi$                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZ d dlZd dlm	Z
 d dlm  mZ d dlmZ d dlmZmZmZmZ eeZG dd dZG d	d
 d
eZG dd deZG dd deZG dd deZdS )    N)deque)ListTupleaio)get_or_create_event_loop)gcs_pb2gcs_service_pb2gcs_service_pb2_grpc
pubsub_pb2c                   @   sX   e Zd ZddefddZedd Zdd Zd	d
 Zdd Z	e
dejddfddZdS )_SubscriberBaseN	worker_idc                 C   s8   || _ ttdd tdD | _d| _d| _d| _d S )Nc                 s   s    | ]}t d V  qdS )   N)randomgetrandbits).0_ r   K/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/gcs_pubsub.py	<genexpr>   s    z+_SubscriberBase.__init__.<locals>.<genexpr>   r       )
_worker_idbytes	bytearrayrange_subscriber_id_last_batch_size_max_processed_sequence_id_publisher_id)selfr   r   r   r   __init__   s
   
z_SubscriberBase.__init__c                 C   s   | j S N)r   r    r   r   r   last_batch_size"   s   z_SubscriberBase.last_batch_sizec                 C   s(   t j|i d}tj| j| j|gd}|S )N)channel_typesubscribe_messagesubscriber_id	sender_idcommands)r   Commandr	    GcsSubscriberCommandBatchRequestr   r   )r    channelcmdreqr   r   r   _subscribe_request&   s
   z"_SubscriberBase._subscribe_requestc                 C   s   t j| j| j| jdS )N)r(   max_processed_sequence_idpublisher_id)r	   GcsSubscriberPollRequestr   r   r   r#   r   r   r   _poll_request-   s
   z_SubscriberBase._poll_requestc                 C   s8   t j| j| jg d}|D ]}|jtj|i d q|S )Nr'   )r%   unsubscribe_message)r	   r,   r   r   r*   appendr   r+   )r    channelsr/   r-   r   r   r   _unsubscribe_request4   s   
z$_SubscriberBase._unsubscribe_requestereturnc                 C   s,   |   tjjkr
dS |   tjjkrdS dS )NTF)codegrpc
StatusCodeDEADLINE_EXCEEDEDUNAVAILABLE)r9   r   r   r   _should_terminate_polling>   s
   z)_SubscriberBase._should_terminate_pollingr"   )__name__
__module____qualname__r   r!   propertyr$   r0   r4   r8   staticmethodr<   RpcErrorr@   r   r   r   r   r      s    

r   c                       sb   e Zd ZdZ			ddededejf fddZdd	d
Z	dddZ
ddddZdddZ  ZS )_AioSubscribera#  Async io subscriber to GCS.

    Usage example common to Aio subscribers:
        subscriber = GcsAioXxxSubscriber(address="...")
        await subscriber.subscribe()
        while running:
            ...... = await subscriber.poll()
            ......
        await subscriber.close()
    Nr   addressr-   c                    sh   t  | |r|d u sJ dtj|dd}n|d us J dt|| _|| _t | _	t
 | _d S )Nz,address and channel cannot both be specifiedTr   z,One of address and channel must be specified)superr!   	gcs_utilscreate_gcs_channelr
   InternalPubSubGcsServiceStub_stub_channelr   _queueasyncioEvent_close)r    pubsub_channel_typer   rH   r-   	__class__r   r   r!   V   s   z_AioSubscriber.__init__r:   c                    s6   | j  rdS | | j}| jj|ddI dH  dS )zRegisters a subscription for the subscriber's channel type.

        Before the registration, published messages in the channel will not be
        saved for the subscriber.
        N   timeout)rR   is_setr0   rN   rM   GcsSubscriberCommandBatchr    r/   r   r   r   	subscriben   s
   
z_AioSubscriber.subscribec                    s   | j j||dI d H S )NrW   )rM   GcsSubscriberPoll)r    r/   rX   r   r   r   
_poll_cally   s   z_AioSubscriber._poll_callc           
   
      sz  t | jdkr|  }t | j||d}t | j }tj||g|tj	dI d H \}}|
 }| s<|  ||vsD||v rFd S zQt | j| _| j| jkru| jdkrltd| j d| j d | j| _d| _| jD ]}|j| jkrtd|  qz|j| _| j| qzW n tjy }	 z| |	rW Y d }	~	d S  d }	~	ww t | jdksd S d S )	Nr   rW   )rX   return_when zreplied publisher_id zdifferent from z/, this should only happens during gcs failover.zIgnoring out of order message )lenrO   r4   r   create_taskr^   rR   waitrP   FIRST_COMPLETEDpopdonecancelresultpub_messagesr   r2   r   loggerdebugr   sequence_idwarningr6   r<   rF   r@   )
r    rX   r/   pollcloserf   others
other_taskmsgr9   r   r   r   _poll}   sP   

z_AioSubscriber._pollc                    sb   | j  rdS | j   | j| jgd}z| jj|ddI dH  W n	 ty+   Y nw d| _dS )z2Closes the subscriber and its active subscription.N)r7      rW   )rR   rY   setr8   rN   rM   rZ   	Exceptionr[   r   r   r   ro      s   


z_AioSubscriber.closeNNN)r:   Nr"   )rA   rB   rC   __doc__r   straiogrpcChannelr!   r\   r^   rs   ro   __classcell__r   r   rT   r   rG   J   s     

'rG   c                       sX   e Zd Z			ddededejf fddZddeeef fdd	Z	e
d
d Z  ZS )GcsAioResourceUsageSubscriberNr   rH   r-   c                       t  tj||| d S r"   )rI   r!   r   RAY_NODE_RESOURCE_USAGE_CHANNELr    r   rH   r-   rT   r   r   r!      s   
z&GcsAioResourceUsageSubscriber.__init__r:   c                    s    | j |dI dH  | | jS )zPolls for new resource usage message.

        Returns:
            A tuple of string reporter ID and resource usage json string.
        rW   N)rs   _pop_resource_usagerO   )r    rX   r   r   r   rn      s   z"GcsAioResourceUsageSubscriber.pollc                 C   s*   t | dkrdS |  }|j |jjfS )Nr   )NN)ra   popleftkey_iddecodenode_resource_usage_messagejson)queuerr   r   r   r   r      s   z1GcsAioResourceUsageSubscriber._pop_resource_usagerw   r"   )rA   rB   rC   r   ry   r<   r{   r!   r   rn   rE   r   r|   r   r   rT   r   r}      s    
	r}   c                       sl   e Zd Z			ddededejf fddZedd Z		dd	e
eeejf  fd
dZedd Z  ZS )GcsAioActorSubscriberNr   rH   r-   c                    r~   r"   )rI   r!   r   GCS_ACTOR_CHANNELr   rT   r   r   r!         zGcsAioActorSubscriber.__init__c                 C   s
   t | jS r"   )ra   rO   r#   r   r   r   
queue_size   s   
z GcsAioActorSubscriber.queue_sizer:   c                    $   | j |dI dH  | j| j|dS )z}Polls for new actor message.

        Returns:
            A list of tuples of binary actor ID and actor table data.
        rW   N
batch_size)rs   _pop_actorsrO   r    r   rX   r   r   r   rn         zGcsAioActorSubscriber.pollc                 C   f   t | dkrg S d}g }t | dkr1||k r1|  }||j|jf |d7 }t | dkr1||k s|S Nr      )ra   r   r6   r   actor_messager   r   poppedmsgsrr   r   r   r   r         z!GcsAioActorSubscriber._pop_actorsrw   r"   )rA   rB   rC   r   ry   r<   r{   r!   rD   r   r   r   r   ActorTableDatarn   rE   r   r|   r   r   rT   r   r      s&    

r   c                       s`   e Zd Z			ddededejf fddZ	ddee	ee
jf  fdd	Zed
d Z  ZS )GcsAioNodeInfoSubscriberNr   rH   r-   c                    r~   r"   )rI   r!   r   GCS_NODE_INFO_CHANNELr   rT   r   r   r!      r   z!GcsAioNodeInfoSubscriber.__init__r:   c                    r   )zsPolls for new node info message.

        Returns:
            A list of tuples of (node_id, GcsNodeInfo).
        rW   Nr   )rs   _pop_node_infosrO   r   r   r   r   rn      r   zGcsAioNodeInfoSubscriber.pollc                 C   r   r   )ra   r   r6   r   node_info_messager   r   r   r   r     r   z(GcsAioNodeInfoSubscriber._pop_node_infosrw   r"   )rA   rB   rC   r   ry   r<   r{   r!   r   r   r   GcsNodeInforn   rE   r   r|   r   r   rT   r   r      s"    	
r   )rP   loggingr   collectionsr   typingr   r   r<   r   rz   ray._private.gcs_utils_privaterJ   ray._common.utilsr   ray.core.generatedr   r	   r
   r   	getLoggerrA   rj   r   rG   r}   r   r   r   r   r   r   <module>   s     
4i%