o
    `۷i@                     @   s8   d dl Z d dlmZmZ d dlmZ G dd deZdS )    N)AnyOptional)ChannelInterfacec                   @   s   e Zd ZdZ		ddedee dee fddZdd	d
Z	dddZ
dd ZdefddZddedee fddZddee defddZdddZdS )CachedChannela  
    CachedChannel wraps an inner channel and caches the data read from it until
    `num_reads` reads have completed. If inner channel is None, the data
    is written to serialization context and retrieved from there. This is useful
    when passing data within the same actor and a shared memory channel can be
    avoided.

    Args:
        num_reads: The number of reads from this channel that must happen before
            writing again. Readers must be methods of the same actor.
        inner_channel: The inner channel to cache data from. If None, the data is
            read from the serialization context.
        _channel_id: The unique ID for the channel. If None, a new ID is generated.
    N	num_readsinner_channel_channel_idc                 C   sB   |dksJ d|| _ || _|| _| jd u rtt | _d S d S )Nr   z!num_reads must be greater than 0.)
_num_reads_inner_channelr   struuiduuid4)selfr   r   r    r   ]/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/experimental/channel/cached_channel.py__init__   s   
zCachedChannel.__init__returnc                 C      | j d ur| j   d S d S N)r
   ensure_registered_as_writerr   r   r   r   r   &      
z)CachedChannel.ensure_registered_as_writerc                 C   r   r   )r
   ensure_registered_as_readerr   r   r   r   r   *   r   z)CachedChannel.ensure_registered_as_readerc                 C   s   t | j| j| jffS r   )r   r	   r
   r   r   r   r   r   
__reduce__.   s
   zCachedChannel.__reduce__c                 C   s   d| j  d| j d| j dS )NzCachedChannel(channel_id=z, num_reads=z), inner_channel=))r   r	   r
   r   r   r   r   __str__5   s   
zCachedChannel.__str__valuetimeoutc                 C   sP   |    ddlm} | jd ur| j|| d S | j}|| j|| j	 d S Nr   ChannelContext)
r   ray.experimental.channelr    r
   writeget_currentserialization_contextset_datar   r	   )r   r   r   r    ctxr   r   r   r"   <   s   

zCachedChannel.writec                 C   sr   |    ddlm} | j}|| jr|| jS | jd us$J d| j	|}|
| j|| j || jS )Nr   r   zGCannot read from the serialization context while inner channel is None.)r   r!   r    r#   r$   has_datar   get_datar
   readr%   r	   )r   r   r    r&   r   r   r   r   r)   M   s   
zCachedChannel.readc                 C   s:   ddl m} | jd ur| j  | j}|| j d S r   )r!   r    r
   closer#   r$   
reset_datar   )r   r    r&   r   r   r   r*   i   s
   


zCachedChannel.close)NN)r   Nr   )__name__
__module____qualname____doc__intr   r   r   r   r   r   r   r   r   floatr"   r)   r*   r   r   r   r   r      s$    


r   )r   typingr   r   ray.experimental.channel.commonr   r   r   r   r   r   <module>   s    