o
    biq                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZmZmZm	Z	m
Z
 er&d dlZd dlZd dlZd dlmZ d dlZd dlmZ d dlmZ d dlmZmZmZmZmZmZmZm Z m!Z! d dl"m#Z#m$Z$ d dlm%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z> d dl?m@Z@ d d	lAmBZBmCZC eDeEZFeGd
dZHG dd deIZJdedeKfddZLdd ZMdd ZNG dd dZOdS )    N)TYPE_CHECKINGAnyListOptionalTupleUnion)ray_constants)TensorTransportEnum)	DynamicObjectRefGeneratorMessagePackSerializedObjectMessagePackSerializerPickle5SerializedObjectPickle5WriterRawSerializedObjectSerializedRayObjectsplit_bufferunpack_pickle5_buffers)	ErrorTypeRayErrorInfo)ActorDiedErrorActorPlacementGroupRemovedActorUnavailableErrorActorUnschedulableErrorLocalRayletDiedErrorNodeDiedErrorObjectFetchTimedOutErrorObjectFreedErrorObjectLostErrorObjectReconstructionFailedError-ObjectReconstructionFailedLineageEvictedError2ObjectReconstructionFailedMaxAttemptsExceededErrorObjectRefStreamEndOfStreamErrorOutOfDiskErrorOutOfMemoryErrorOwnerDiedErrorPlasmaObjectNotAvailableRayErrorRaySystemErrorRayTaskErrorReferenceCountingAssertionErrorRuntimeEnvSetupErrorTaskCancelledErrorTaskPlacementGroupRemovedTaskUnschedulableErrorWorkerCrashedError)CompiledDAGRef)inspect_serializabilityserialization_addons.RAY_allow_out_of_band_object_ref_serializationTc                   @   s   e Zd ZdS )DeserializationErrorN)__name__
__module____qualname__ r7   r7   N/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/serialization.pyr3   B   s    r3   obj	error_msgc              
   C   st   zt | W S  ttjjfy9 } z#t }t| |d | d|	  }t
|tr/t||tj|d}~ww )zdWrap cloudpickle.dumps to provide better error message
    when the object is not serializable.
    )
print_filez:
N)pickledumps	TypeErrorray
exceptions(OufOfBandObjectRefSerializationExceptionioStringIOr0   getvalue
isinstance)r9   r:   esiomsgr7   r7   r8   pickle_dumpsF   s   

rI   c                 C   s`   t | ||}|r.t jjj}|  | }| }|d u r#t j }|j	
| ||| |S N)r?   	ObjectRef_privateworkerglobal_workercheck_connectedget_serialization_contextget_outer_object_refnilcore_worker#deserialize_and_register_object_refbinary)rU   	call_siteowner_addressobject_statusobj_refrM   contextouter_idr7   r7   r8   _object_ref_deserializerV   s   	

r\   c                 C   s(   t jjj }| }t jj| ||S rJ   )	r?   rL   rM   rN   rP   rQ   actorActorHandle_deserialization_helper)serialized_objweak_refrZ   r[   r7   r7   r8   _actor_handle_deserializers   s
   rb   c                	   @   sN  e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Zdddddedee fddZ	d:dededee defd d!Zdejfdee dee fd"d#Zd$d% Zd&d' Zdee fd(d)Zd*ee fd+d,Zd-d. Zd/d0 Zd1ed2edefd3d4Z d1ede!e"ef fd5d6Z#d1ede$eed7 f fd8d9Z%dS );SerializationContextzInitialize the serialization library.

    This defines a custom serializer for object refs and also tells ray to
    serialize several exception classes that we define for error handling.
    c                    s|   | _ t  _ fdd} tjj| dd } t|  fdd} tj	| dd } t
| t  d S )	Nc                    s:   t jjj  |  \}}}|s j|dd t||ffS )NT)allow_out_of_band_serialization)r?   rL   rM   rN   rO   _serialization_helperadd_contained_object_refrb   )r9   
serializedactor_handle_idra   selfr7   r8   actor_handle_reducer   s   z;SerializationContext.__init__.<locals>.actor_handle_reducerc                 S   s   t d)Nz1Serialization of CompiledDAGRef is not supported.)r>   r9   r7   r7   r8   compiled_dag_ref_reducer      z?SerializationContext.__init__.<locals>.compiled_dag_ref_reducerc                    sP   t jjj}|   j| t|  d |j	| \} }}t
|  |  ||ffS )N)rd   rV   )r?   rL   rM   rN   rO   rf   *ALLOW_OUT_OF_BAND_OBJECT_REF_SERIALIZATIONrV   rS   serialize_object_refr\   rU   )r9   rM   rW   rX   ri   r7   r8   object_ref_reducer   s    

z9SerializationContext.__init__.<locals>.object_ref_reducerc                 S   s   t | jffS rJ   )r
   _refsrl   r7   r7   r8   object_ref_generator_reducer      zCSerializationContext.__init__.<locals>.object_ref_generator_reducer)rM   	threadinglocal_thread_local_register_cloudpickle_reducerr?   r]   r^   r/   rK   r
   r1   apply)rj   rM   rk   rm   rq   rs   r7   ri   r8   __init__   s   
zSerializationContext.__init__c                 C   s   |t jj|< d S rJ   r<   CloudPicklerdispatch)rj   clsreducerr7   r7   r8   rx      s   z2SerializationContext._register_cloudpickle_reducerc                 C   s   t jj|d  d S rJ   )r<   r|   r}   pop)rj   r~   r7   r7   r8   _unregister_cloudpickle_reducer   s   z4SerializationContext._unregister_cloudpickle_reducerc                    s    fdd}|t jj|< d S )Nc                    s    | ffS rJ   r7   rl   custom_deserializercustom_serializerr7   r8   _CloudPicklerReducer      zSSerializationContext._register_cloudpickle_serializer.<locals>._CloudPicklerReducerr{   )rj   r~   r   r   r   r7   r   r8    _register_cloudpickle_serializer   s   z5SerializationContext._register_cloudpickle_serializerc                 C   s   t | jddS )Nin_bandFgetattrrw   ri   r7   r7   r8   is_in_band_serialization   r   z-SerializationContext.is_in_band_serializationc                 C      d| j _d S )NTrw   r   ri   r7   r7   r8   set_in_band_serialization   rt   z.SerializationContext.set_in_band_serializationc                 C   r   )NFr   ri   r7   r7   r8   set_out_of_band_serialization   rt   z2SerializationContext.set_out_of_band_serializationc                 C   s   t | jdg }|r|d S d S )Nobject_ref_stackr   )rj   stackr7   r7   r8   rQ      s   z)SerializationContext.get_outer_object_refc                 C   s2   t | jdst | j_t S | jj}t | j_|S )Nobject_refs)hasattrrw   setr   )rj   r   r7   r7   r8   #get_and_clear_contained_object_refs   s   

z8SerializationContext.get_and_clear_contained_object_refsN)rV   
object_refzray.ObjectRefrd   rV   c                C   sj   |   rt| jdst | j_| jj| d S |s*tjd|	  d|p&d tj
jjj| d S )Nr   z-It is not allowed to serialize ray.ObjectRef a&  . If you want to allow serialization, set `RAY_allow_out_of_band_object_ref_serialization=1.` If you set the env var, the object is pinned forever in the lifetime of the worker process and can cause Ray object leaks. See the callsite and trace to find where the serialization occurs.
Callsite: z-Disabled. Set RAY_record_ref_creation_sites=1)r   r   rw   r   r   addr?   r@   rA   hexrL   rM   rN   rS   add_object_ref_reference)rj   r   rd   rV   r7   r7   r8   rf      s   
z-SerializationContext.add_contained_object_refdatatensor_transport	object_idreturnc              	   C   s   ddl m} | j}|tjk}|rAtjjj	j
}|j|s0||s+J d| d|| |j|}|| |j| z2zt|\}	}
t|
dkrWtj|	|
d}nt|	}W n tjjyi   t w W |rr|g  |S |r||g  w w )a  

        Args:
            data: The data to deserialize.
            tensor_transport: The tensor transport to use. If not equal to OBJECT_STORE,
                it means that any tensors in the object are sent out-of-band
                instead of through the object store. In this case, we need to
                retrieve the tensors from the in-actor object store. Then, we
                deserialize `data` with the retrieved tensors in the
                serialization context.
            object_id: The object ID to use as the key for the in-actor object store
                to retrieve tensors.

        Returns:
            Any: The deserialized object.
        r   ChannelContextzobj_id=z not found in GPU object store. This error is unexpected. Please report this issue on GitHub: https://github.com/ray-project/ray/issues/new/choose)buffers)ray.experimental.channelr   get_currentserialization_contextr	   OBJECT_STOREr?   rL   rM   rN   gpu_object_managergpu_object_storehas_gpu_objectis_managed_gpu_objectfetch_gpu_objectget_gpu_objectreset_out_of_band_tensorsremove_gpu_objectr   lenr<   loadsPicklingErrorr3   )rj   r   r   r   r   ctxenable_gpu_objectsr   tensorsr   r   r9   r7   r7   r8   _deserialize_pickle5_data  s>   






z.SerializationContext._deserialize_pickle5_datac           	         sb   t |\}}|d tjkr| ||| ng  z fdd}t||}W |S  ty0   t w )Nr   c                    s    |  S rJ   r7   )indexpython_objectsr7   r8   _python_deserializerM  rn   zLSerializationContext._deserialize_msgpack_data.<locals>._python_deserializer)r   r   OBJECT_METADATA_TYPE_PYTHONr   r   r   	Exceptionr3   )	rj   r   metadata_fieldsr   r   msgpack_datapickle5_datar   r9   r7   r   r8   _deserialize_msgpack_data9  s   	z.SerializationContext._deserialize_msgpack_datac                 C   s0   |sJ |  ||}|sJ t }|| |S rJ   )r   r   ParseFromString)rj   r   r   pb_bytesray_error_infor7   r7   r8   _deserialize_error_infoU  s   
z,SerializationContext._deserialize_error_infoc                 C   s\   |st  S | ||}|dsJ |jdrt|jjS |jds'J t |jjdS )Nactor_died_errorcreation_task_failure_contextactor_died_error_context)cause)r   r   HasFieldr   r&   from_ray_exceptionr   r   )rj   r   r   r   r7   r7   r8   _deserialize_actor_died_error^  s   z2SerializationContext._deserialize_actor_died_errorc                 C   s  |d u rt j}|r;|d}|d tjtjfv r#| ||| |S |d tjkr4|d u r0dS |	 S |d tj
krU| ||}|d d |dd  dk}}t||S zt|d }	W n tyn   td| d| w |	tdkr| ||}t|S |	td	krt S |	td
kr| ||S |	tdkrt S |	tdkrzd}
|r| ||}|j}
t|
dW S  tjjjy   | ||}t| Y S w |	tdkrt| | | S |	tdkrt| | | S |	tdkrt | | | S |	tdkr!| ||}t!|jS |	tdkr4| ||}t"|jS |	tdkrHt#| | | S |	tdkr\t$| | | S |	tdkrpt%| | | S |	tdkrt&| | | S |	tdkrt'| | | S |	tdkrt(| | | S |	tdkr| ||}d}|)dr|j*j}t+|dS |	tdkrt, S |	tdkrt- S |	tdkr| ||}t.|jS |	tdkr| ||}t/|jS |	td krt0 S |	td!kr3| ||}|)d"r+|j1j2}nd }t3|j|S t4d#t5|	 S |rBt6d$t7S )%N   ,r       r      1zCan't deserialize object: z, metadata: TASK_EXECUTION_EXCEPTIONWORKER_DIED
ACTOR_DIEDLOCAL_RAYLET_DIEDTASK_CANCELLED )error_messageOBJECT_LOSTOBJECT_FETCH_TIMED_OUTOUT_OF_DISK_ERROROUT_OF_MEMORY	NODE_DIEDOBJECT_DELETEDOBJECT_FREED
OWNER_DIEDOBJECT_UNRECONSTRUCTABLE.OBJECT_UNRECONSTRUCTABLE_MAX_ATTEMPTS_EXCEEDED(OBJECT_UNRECONSTRUCTABLE_LINEAGE_EVICTEDRUNTIME_ENV_SETUP_FAILEDruntime_env_setup_failed_errorTASK_PLACEMENT_GROUP_REMOVEDACTOR_PLACEMENT_GROUP_REMOVEDTASK_UNSCHEDULABLE_ERRORACTOR_UNSCHEDULABLE_ERROREND_OF_STREAMING_GENERATORACTOR_UNAVAILABLEactor_unavailable_errorzUnrecognized error type z+non-null object should always have metadata)8r	   r   splitr   #OBJECT_METADATA_TYPE_CROSS_LANGUAGEr   r   r   OBJECT_METADATA_TYPE_RAW
to_pybytes!OBJECT_METADATA_TYPE_ACTOR_HANDLErb   intr   r   Valuer&   
from_bytesr.   r   r   r   r   r+   googleprotobufmessageDecodeErrorr   rW   rV   r   r"   r#   r   r)   r   r$   r   r    r   r   r   r*   r,   r   r-   r   r!   r   actor_idr   r'   str
ValueErrorr%   )rj   r   metadatar   r   r   r9   rg   ra   
error_typer   
error_infor:   r   r7   r7   r8   _deserialize_objectm  s   








z(SerializationContext._deserialize_objectserialized_ray_objectsc           
      C   s   t |t |ks
J t| jdsg | j_g }t||D ]U\}\}}}z<z| jj| | ||||}W n tyR }	 zt	|	 t
|	t }W Y d }	~	nd }	~	ww W | jjr^| jj  n| jjrj| jj  w w || q|S )Nr   )r   r   rw   r   zipappendr   r   logger	exceptionr'   	traceback
format_excr   )
rj   r   r   resultsr   r   r   tensor_transport_valuer9   rF   r7   r7   r8   deserialize_objects  s4   
z(SerializationContext.deserialize_objectsc              
   C   st   t  }z(z|   tj|d|jd}W n ty% } z|   |d }~ww W |   n|   w t||||  S )N   )protocolbuffer_callback)	r   r   r<   r=   r  r   r   r   r   )rj   r   valuewriterinbandrF   r7   r7   r8   _serialize_to_pickle5  s    
z*SerializationContext._serialize_to_pickle5c           
         s   g }t |tr,t|jjtrttd	d}|
 }n3ttd	d}|
 }n$t |tjjrM| \}}}|sA|| tj}||rIdnd }ntj}g   fdd}t||} rjtj}| | }	nd }	t||||	S )Nr   asciir   r      0c                    s   t  } |  |S rJ   )r   r   )or   r   r7   r8   _python_serializerT  s   
zFSerializationContext._serialize_to_msgpack.<locals>._python_serializer)rE   r(   
issubclassr   	__class__r+   r   r   r   encodeto_bytesr?   r]   r^   re   r   r   r   r   r   r=   r   r  r   )
rj   r  contained_object_refsr   rg   rh   ra   r  r   pickle5_serialized_objectr7   r   r8   _serialize_to_msgpack0  s8   



z*SerializationContext._serialize_to_msgpackr  obj_idc                 C   sN   |dusJ d|  |\}}|r%|d}tjjj}|j}|j|| |S )a  Retrieve GPU data from `value` and store it in the GPU object store. Then, return the serialized value.

        Args:
            value: The value to serialize.
            obj_id: The object ID of the value. `obj_id` is required, and the GPU data (e.g. tensors) in `value`
                will be stored in the GPU object store with the key `obj_id`.

        Returns:
            Serialized value.
        Nzd`obj_id` is required, and it is the key to retrieve corresponding tensors from the GPU object store.r  )	_serialize_and_retrieve_tensorsdecoder?   rL   rM   rN   r   r   add_gpu_object)rj   r  r  serialized_valr   rM   r   r7   r7   r8   serialize_and_store_gpu_objectsg  s   


z4SerializationContext.serialize_and_store_gpu_objectsc                 C   s   t |tr	t|S | |S )zSerialize an object.

        Args:
            value: The value to serialize.

        Returns:
            Serialized value.
        )rE   bytesr   r  )rj   r  r7   r7   r8   	serialize  s   

zSerializationContext.serializeztorch.Tensorc              	   C   sb   ddl m} | j}|j}|d z| |}W || n|| w |g \}}||fS )z
        Serialize `value` and return the serialized value and any tensors retrieved from `value`.
        This is only used for GPU objects.
        r   r   T)r   r   r   r   use_external_transportset_use_external_transportr  r   )rj   r  r   r   prev_use_external_transportr  r   _r7   r7   r8   r    s   

z4SerializationContext._serialize_and_retrieve_tensorsrJ   )&r4   r5   r6   __doc__rz   rx   r   r   r   r   r   rQ   r   boolr   r   rf   r   r	   r   r   r   r   r   r   r   r   r  r  r  r  r   r  r   r   r  r   r  r7   r7   r7   r8   rc   }   s    7	
'
:
	
 
7


rc   )PrB   loggingru   r   typingr   r   r   r   r   r   torchgoogle.protobuf.messager   ray._private.utilsr?   ray.cloudpicklecloudpickler<   ray.exceptionsray._privater   ray._private.custom_typesr	   ray._rayletr
   r   r   r   r   r   r   r   r   ray.core.generated.common_pb2r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   !ray.experimental.compiled_dag_refr/   ray.utilr0   r1   	getLoggerr4   r   env_boolro   r   r3   r   rI   r\   rb   rc   r7   r7   r7   r8   <module>   s6     ,p

