o
    cir&                     @   sb   d dl Z d dlmZmZmZmZmZmZmZ d dl	m
Z
 er&d dlZd dlZdaG dd dZdS )    N)TYPE_CHECKINGAnyDictListSetTupleUnion)DeviceTc                	   @   sN  e Zd Zdd ZdeddfddZded	ed
eddfddZ	dede
fddZdedefddZdeddfddZde
ddfddZede
fddZded deed ee f fddZdddeeeddef f fdd Zdddeedef def fd!d"Zd#eeddef ef d$efd%d&Zd'edef d(dd)ed$efd*d+ZdS ),_SerializationContextc                 C   s$   d| _ g | _t | _i | _i | _d S )NF)_use_external_transport_out_of_band_tensorsset!_deserialized_tensor_placeholdersintra_process_channel_bufferschannel_id_to_num_readersself r   b/home/ubuntu/.local/lib/python3.10/site-packages/ray/experimental/channel/serialization_context.py__init__   s
   
z_SerializationContext.__init__devicereturnNc                 C   
   || _ d S N)_target_device)r   r   r   r   r   set_target_device%      
z'_SerializationContext.set_target_device
channel_idvaluenum_readersc                 C   s\   |dksJ d|| j vsJ d| d|| jvs"J d| d|| j |< || j|< d S )Nr   z#num_readers must be greater than 0.Channel z already exists in the buffer.z1 already exists in the channel_id_to_num_readers.)r   r   )r   r   r   r   r   r   r   set_data(   s   


z_SerializationContext.set_datac                 C   s
   || j v S r   )r   r   r   r   r   r   has_data4   r   z_SerializationContext.has_datac                 C   sv   || j v sJ d| d|| jv sJ d| d| j|  d8  < | j| dkr6| j| | j |S | j | S )Nr    z does not exist in the buffer.z1 does not exist in the channel_id_to_num_readers.   r   )r   r   popr"   r   r   r   get_data7   s   


z_SerializationContext.get_datac                 C   s    | j |d  | j|d  d S r   )r   r%   r   r"   r   r   r   
reset_dataF   s   z _SerializationContext.reset_datause_external_transportc                 C   r   r   r   )r   r(   r   r   r   set_use_external_transportJ   r   z0_SerializationContext.set_use_external_transportc                 C   s   | j S r   r)   r   r   r   r   r(   M   s   z,_SerializationContext.use_external_transporttensorsztorch.Tensorc                 C   s"   | j }| j}|| _ t | _||fS )z
        Return and reset the out-of-band tensors and all tensor placeholders
        that were deserialized since the last call to reset.
        )r   r   r   )r   r+   prev_tensors deserialized_tensor_placeholdersr   r   r   reset_out_of_band_tensorsQ   s
   z/_SerializationContext.reset_out_of_band_tensorstensorz
np.ndarrayztorch.dtypec                 C   sJ   ddl m} | }| jr |j|jkr | j| t| jd S | 	|S )Nr   )ChannelContextr$   )
ray.experimental.channelr0   get_currentr   r   torch_devicer   appendlenserialize_to_numpy_or_scalar)r   r/   r0   ctxr   r   r   serialize_tensor^   s   
z&_SerializationContext.serialize_tensorc                 C   sV   ddl }|jj}|dkr|d}| dkr#||j |j|fS |	 |j|fS )zd
        Serialize a tensor to a numpy array,
        or a scalar when the tensor is 0-dim.
        r   Ncpu)
torchr   typetodimviewuint8numpydtypeitem)r   r/   r:   tensor_device_typer   r   r   r6   o   s   
z2_SerializationContext.serialize_to_numpy_or_scalarvaltarget_devicec                 C   sh   t |tr'|}| j| |t| jk sJ | j| }|tjkr%|d}|S |\}}}| 	||||S )Nr9   )

isinstanceintr   addr5   r   r	   CPUr<    deserialize_from_numpy_or_scalar)r   rD   rE   placeholderr/   np_arrayrA   rC   r   r   r   deserialize_tensor   s   




z(_SerializationContext.deserialize_tensorrL   rA   rC   c                    s   dd l dd l|tjkr|n|tjtjfv rdn|jdkrZ fdd}trTt	  tj
dtdd ||}W d    d	a|S 1 sKw   Y  d	a|S ||}|S t|jshj| d
S j|d S )Nr   cudar9   c                    s8   t | jsj|  dS |  }|jdS )Nr   rA   r   )rF   ndarrayr/   
from_numpyr>   r<   )rL   
cpu_tensorrA   nptarget_device_typer:   r   r   convert_numpy_to_tensor   s   zW_SerializationContext.deserialize_from_numpy_or_scalar.<locals>.convert_numpy_to_tensorignorez%The given NumPy array is not writable)categorymessageFrO   rP   )r:   r@   r	   DEFAULTGPUCUDAr   _TORCH_WARNING_FILTER_ACTIVATEwarningscatch_warningsfilterwarningsUserWarningrF   rQ   r/   r>   )r   rL   rA   rC   rE   rW   
gpu_tensorr   rT   r   rJ      s8   



		z6_SerializationContext.deserialize_from_numpy_or_scalar)__name__
__module____qualname__r   r	   r   strr   rG   r!   boolr#   r&   r'   r*   propertyr(   r   r   r   r.   r   r8   r6   rM   rJ   r   r   r   r   r
      sN    




r
   )r_   typingr   r   r   r   r   r   r   ray.experimental.util.typesr	   r@   rU   r:   r^   r
   r   r   r   r   <module>   s    $