o
    bip                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	 d dl
mZ er0d dlZd dlmZ dZdZe eZe add	 Zd
d Zdd Zdd ZdjddZdee	ed df  ddddfddZddde	ed df fdd Zd!ed d"dddfd#d$ZeG d%d dZdkd'd(Zdld*d+Z d"dde!fd,d-Z"d"dde!fd.d/Z#dmd1d2Z$dld3d4Z%dld5d6Z&dld7d8Z'	)	9		dnd:d;Z(dod=d>Z)dpd@dAZ*	)	B		dqdCdDZ+drdFdGZ,dsdIdJZ-	)	K		dtdLdMZ.dNdOd"ed dPe/dQe/ddOf
dRdSZ0dNdOdTe/dPe/dQe/ddOf
dUdVZ1dNdOdPe/dQe/ddOfdWdXZ2dNdOdYddPe/dQe/de	dOe/e/f f
dZd[Z3d\e/de/fd]d^Z4dNdOd_e/d`e/ddOfdadbZ5dudddeZ6dNe7ddfdfdgZ8d"dde!fdhdiZ9dS )v    N)	dataclass)TYPE_CHECKINGListOptionalTuple)
is_in_test)ArrowTensorArray3RAY_DISABLE_CUSTOM_ARROW_JSON_OPTIONS_SERIALIZATION+RAY_DISABLE_CUSTOM_ARROW_DATA_SERIALIZATIONc                 C   s>   zdd l }W n
 ty   Y d S w t|  t|  t|  d S )Nr   )pyarrowModuleNotFoundError_register_arrow_data_serializer+_register_arrow_json_readoptions_serializer,_register_arrow_json_parseoptions_serializerserialization_contextpa r   T/home/ubuntu/.local/lib/python3.10/site-packages/ray/_private/arrow_serialization.py%_register_custom_datasets_serializers   s   r   c                    D   t jtddkrd S dd lm  | j jdd  fddd d S )N01r   c                 S   s   | j | jfS N)use_threads
block_sizeoptsr   r   r   <lambda><   s    z=_register_arrow_json_readoptions_serializer.<locals>.<lambda>c                    
    j |  S r   )ReadOptionsargspajsonr   r   r   =      
 custom_serializercustom_deserializer)osenvirongetr	   pyarrow.jsonjson _register_cloudpickle_serializerr    r   r   r#   r   r   .   s   

r   c                    r   )Nr   r   r   c                 S   s   | j | j| jfS r   )explicit_schemanewlines_in_valuesunexpected_field_behaviorr   r   r   r   r   O   s   z>_register_arrow_json_parseoptions_serializer.<locals>.<lambda>c                    r   r   )ParseOptionsr!   r#   r   r   r   T   r%   r&   )r)   r*   r+   r	   r,   r-   r.   r3   r/   r   r#   r   r   A   s   

r   c                 C   s0   t jtddkrdS ddl}| |jt dS )ay  Custom reducer for Arrow data that works around a zero-copy slicing pickling
    bug by using the Arrow IPC format for the underlying serialization.

    Background:
        Arrow has both array-level slicing and buffer-level slicing; both are zero-copy,
        but the former has a serialization bug where the entire buffer is serialized
        instead of just the slice, while the latter's serialization works as expected
        and only serializes the slice of the buffer. I.e., array-level slicing doesn't
        propagate the slice down to the buffer when serializing the array.

        We work around this by registering a custom cloudpickle reducers for Arrow
        Tables that delegates serialization to the Arrow IPC format; thankfully, Arrow's
        IPC serialization has fixed this buffer truncation bug.

    See https://issues.apache.org/jira/browse/ARROW-10739.
    r   r   Nr   )r)   r*   r+   r
   r   _register_cloudpickle_reducerTable_arrow_table_reducer   r   r   r   r   Z   s   r   tpyarrow.Tablec                 C   s   g }| j D ]S}| | }zt|}W nA tyR } z5t|js$t r$|dt|jtvrBtjd| d|j ddd t	t|j t
| W  Y d}~  S d}~ww || qt|| jffS )a/  Custom reducer for Arrow Tables that works around a zero-copy slice pickling bug.
    Background:
        Arrow has both array-level slicing and buffer-level slicing; both are zero-copy,
        but the former has a serialization bug where the entire buffer is serialized
        instead of just the slice, while the latter's serialization works as expected
        and only serializes the slice of the buffer. I.e., array-level slicing doesn't
        propagate the slice down to the buffer when serializing the array.
        All that these copy methods do is, at serialization time, take the array-level
        slicing and translate them to buffer-level slicing, so only the buffer slice is
        sent over the wire instead of the entire buffer.
    See https://issues.apache.org/jira/browse/ARROW-10739.
    NzTFailed to complete optimized serialization of Arrow Table, serialization of column 'z
' of type z failed, so we're falling back to Arrow IPC serialization for the table. Note that this may result in slower serialization and more worker memory utilization. Serialization error:T)exc_info)column_names_arrow_chunked_array_reduce	Exception_is_dense_uniontyper   _serialization_fallback_setloggerwarningadd_arrow_table_ipc_reduceappend_reconstruct_tableschema)r7   reduced_columnscolumn_namecolumnreduced_columner   r   r   r6   s   s.   
r6   rG   pyarrow.Arraypyarrow.DataTyperF   zpyarrow.Schemareturnc                 C   s:   ddl }g }| D ]\}}|t|| q|jj||dS )zERestore a serialized Arrow Table, reconstructing each reduced column.r   NrF   )r   rD   _reconstruct_chunked_arrayr5   from_arrays)rG   rF   r   columnschunks_payloadtype_r   r   r   rE      s
   rE   cazpyarrow.ChunkedArrayPicklableArrayPayloadc                 C   s.   g }| j D ]}t|}|| q|| jfS )zCustom reducer for Arrow ChunkedArrays that works around a zero-copy slice
    pickling bug. This reducer does not return a reconstruction function, since it's
    expected to be reconstructed by the Arrow Table reconstructor.
    )chunksrV   
from_arrayrD   r>   )rU   chunk_payloadschunkchunk_payloadr   r   r   r;      s
   


r;   rW   rT   c                 C   s"   ddl }dd | D } || |S )z=Restore a serialized Arrow ChunkedArray from chunks and type.r   Nc                 S      g | ]}|  qS r   to_array).0rZ   r   r   r   
<listcomp>       z._reconstruct_chunked_array.<locals>.<listcomp>)r   chunked_array)rW   rT   r   r   r   r   rP      s   rP   c                   @   sb   e Zd ZU dZded< eed< ed ed< eed< eed< ed  ed	< edddZdddZ	dS )rV   zPicklable array payload, holding data buffers and array metadata.

    This is a helper container for pickling and reconstructing nested Arrow Arrays while
    ensuring that the buffers that underly zero-copy slice views are properly truncated.
    rM   r>   lengthpyarrow.Bufferbuffers
null_countoffsetchildrenarL   rN   c                 C   s   t |S )a  Create a picklable array payload from an Arrow Array.

        This will recursively accumulate data buffer and metadata payloads that are
        ready for pickling; namely, the data buffers underlying zero-copy slice views
        will be properly truncated.
        )_array_to_array_payload)selfri   r   r   r   rX      s   z PicklableArrayPayload.from_arrayc                 C   s   t | S )z7Reconstruct an Arrow Array from this picklable payload.)_array_payload_to_array)rk   r   r   r   r^      s   zPicklableArrayPayload.to_arrayNri   rL   rN   rV   )rN   rL   )
__name__
__module____qualname____doc____annotations__intr   classmethodrX   r^   r   r   r   r   rV      s   
 	payloadc                 C   s  ddl }ddlm} dd | jD }| }|j| jr3t|dks(J t||\}}|j	||S |j
| jrYt|dkrYt|dksLJ t||\}}}	|j	|||	S t| j|rwt|dkskJ t||d }
|j| j|
S |jj| j| j| j| j| j|d	S )
zHReconstruct an Arrow Array from a possibly nested PicklableArrayPayload.r   N get_arrow_extension_tensor_typesc                 S   r\   r   r]   )r_   child_payloadr   r   r   r`      ra   z+_array_payload_to_array.<locals>.<listcomp>         r>   rc   re   rf   rg   rh   )r   $ray.air.util.tensor_extensions.arrowrw   rh   typesis_dictionaryr>   lenDictionaryArrayrQ   is_mapMapArray
isinstanceExtensionArrayfrom_storageArrayfrom_buffersrc   re   rf   rg   )ru   r   rw   rh   tensor_extension_typesindices
dictionaryoffsetskeysitemsstorager   r   r   rl      s6   
rl   ri   c                 C   s.  ddl }ddlm} | }t| jrtd|j| jr!t| S t	| jr*t
| S t| jr3t| S |j| jsA|j| jrEt| S |j| jrPt| S |j| jr[t| S |j| jrft| S |j| jrqt| S |j| jr|t| S t| j|rt| S t| j|jrt| S td| j)zSerialize an Arrow Array to an PicklableArrayPayload for later pickling.

    This function's primary purpose is to dispatch to the handler for the input array
    type.
    r   Nrv   zKCustom slice view serialization of dense union arrays is not yet supported.zUnhandled Arrow array type:)r   r}   rw   r=   r>   NotImplementedErrorr~   is_null_null_array_to_array_payload_is_primitive!_primitive_array_to_array_payload
_is_binary_binary_array_to_array_payloadis_listis_large_list_list_array_to_array_payloadis_fixed_size_list'_fixed_size_list_array_to_array_payload	is_struct_struct_array_to_array_payloadis_union_union_array_to_array_payloadr   "_dictionary_array_to_array_payloadr   _map_array_to_array_payloadr   _tensor_array_to_array_payloadExtensionType!_extension_array_to_array_payload
ValueError)ri   r   rw   r   r   r   r   rj     s<   


rj   c                 C   sP   ddl }|j| p'|j| p'|j| p'|j| p'|j| p'|j| S )zcWhether the provided Array type is primitive (boolean, numeric, temporal or
    fixed-size binary).r   N)r   r~   
is_integeris_floating
is_decimal
is_booleanis_temporalis_fixed_size_binaryrT   r   r   r   r   r   J  s   




r   c                 C   s8   ddl }|j| p|j| p|j| p|j| S )z@Whether the provided Array type is a variable-sized binary type.r   N)r   r~   	is_stringis_large_string	is_binaryis_large_binaryr   r   r   r   r   Y  s   


r   pyarrow.NullArrayc                 C   s   t | jt| dg| jdg dS )z.Serialize null array to PicklableArrayPayload.Nr   r|   )rV   r>   r   rf   ri   r   r   r   r   e  s   r   c                 C   s   t | js
J | j|  }t|dksJ t||d }| jdkr-t|| jt| }nd}|d }|durDt|d | j| jt| }t| jt| ||g| jdg dS )zZSerialize primitive (numeric, temporal, boolean) arrays to
    PicklableArrayPayload.
    ry   r   Nrz   r|   )	r   r>   re   r   rf    _copy_bitpacked_buffer_if_neededrg   _copy_buffer_if_neededrV   )ri   re   
bitmap_bufdata_bufr   r   r   r   r  s$   
r   c                 C   s   t | js
J | j|  }t|dksJ t|| jdkr+t|d | jt| }nd}|d }t|| j| jt| \}}}|d }t|d||}t	| jt| |||g| jdg dS )zZSerialize binary (variable-sized binary, string) arrays to
    PicklableArrayPayload.
    r{   r   Nrz   ry   r|   )
r   r>   re   r   rf   r   rg   _copy_offsets_buffer_if_neededr   rV   )ri   re   r   
offset_bufdata_offsetdata_lengthr   r   r   r   r     s(   

r   c                 C   s   |   }t|dksJ t|| jdkr!t|d | jt| }nd}|d }t|| j| jt| \}}}| j||}t	| jt| ||g| jdt
|gdS )zCSerialize list (regular and large) arrays to PicklableArrayPayload.rz   r   Nr|   )re   r   rf   r   rg   r   r>   valuesslicerV   rj   )ri   re   r   r   child_offsetchild_lengthchildr   r   r   r     s$   

r   pyarrow.FixedSizeListArrayc                 C   s   |   }t|dksJ t|| jdkr!t|d | jt| }nd}| jj| j }| jjt|  }| j||}t	| jt| |g| jdt
|gdS )z:Serialize fixed size list arrays to PicklableArrayPayload.rz   r   Nr|   )re   r   rf   r   rg   r>   	list_sizer   r   rV   rj   )ri   re   r   r   r   r   r   r   r   r     s    
r   pyarrow.StructArrayc                    s~      }t|dksJ t| jdkr!t|d  jt }nd} fddt jjD }t jt |g jd|dS )z1Serialize struct arrays to PicklableArrayPayload.rz   r   Nc                       g | ]	}t  |qS r   rj   fieldr_   ir   r   r   r`         z2_struct_array_to_array_payload.<locals>.<listcomp>r|   )	re   r   rf   r   rg   ranger>   
num_fieldsrV   )ri   re   r   rh   r   r   r   r     s   
r   pyarrow.UnionArrayc                    s   ddl }t jrJ   }t|dksJ t||d }|du s'J ||d }t||  jt } fddt jj	D }t
 jt ||g jd|dS )z0Serialize union arrays to PicklableArrayPayload.r   Nrz   c                    r   r   r   r   r   r   r   r`   $  r   z1_union_array_to_array_payload.<locals>.<listcomp>r|   )r   r=   r>   re   r   r   int8rg   r   r   rV   rf   )ri   r   re   r   type_code_bufrh   r   r   r   r     s"   r   pyarrow.DictionaryArrayc                 C   s4   t | j}t | j}t| jt| g | jd||gdS )z5Serialize dictionary arrays to PicklableArrayPayload.r   r|   )rj   r   r   rV   r>   r   rf   )ri   indices_payloaddictionary_payloadr   r   r   r   /  s   

r   pyarrow.MapArrayc                 C   s4  ddl }|  }t|dksJ t|| jdkr%t|d | jt| }nd}|g}|d }t|| j| jt| \}}}t| |j	j
rS|| t| j||g}n9|  }t|dkscJ t||j| t| d ||g}	| j||}
| j||}t|	t|
t|g}t| jt| || jd|dS )z.Serialize map arrays to PicklableArrayPayload.r   Nrz   ry   r|   )r   re   r   rf   r   rg   r   r>   r   lib	ListArrayrD   rj   r   r   r   r   int32r   r   rV   )ri   r   re   r   new_buffersr   r   r   rh   r   r   r   r   r   r   r   A  sB   


r   r   c                 C   s(   t | j}t| jt| g | jd|gdS )z1Serialize tensor arrays to PicklableArrayPayload.r   r|   )rj   r   rV   r>   r   rf   )ri   storage_payloadr   r   r   r   y  s   
r   pyarrow.ExtensionArrayc                 C   s(   t | j}| j|_t| |_| j|_|S r   )rj   r   r>   r   rc   rf   )ri   ru   r   r   r   r     s
   

r   bufrd   rg   rc   c                 C   sT   ddl }|dur|j|rt| ||} | S |dur|jd nd}t| |||} | S )Copy buffer, if needed.r   N   rz   )r   r~   r   r   	bit_width_copy_normal_buffer_if_needed)r   rT   rg   rc   r   type_bytewidthr   r   r   r     s   r   
byte_widthc                 C   s2   || }|| }|dks|| j k r| ||} | S )r   r   )sizer   )r   r   rg   rc   byte_offsetbyte_lengthr   r   r   r     s
   r   c                 C   sV   |d }|d }t || d }|dks|| jk r)| ||} |dkr)t| ||} | S )z)Copy bit-packed binary buffer, if needed.r   r   )_bytes_for_bitsr   r   _align_bit_offset)r   rg   rc   
bit_offsetr   r   r   r   r   r     s   r   arr_typec           
      C   s   ddl }ddlm} |j|s"|j|s"|j|s"|j|r'| }n|	 }t
| |||d } |j||d d| g}|d  }|d  | }	|||}|j|ra|j|dd}| d } | ||	fS )zvCopy the provided offsets buffer, returning the copied buffer and the
    offset + length of the underlying data.
    r   Nrz   F)safe)r   pyarrow.computecomputer~   r   r   r   is_large_unicodeint64r   r   r   r   as_pysubtractis_int32castre   )
r   r   rg   rc   r   pacoffset_typer   r   r   r   r   r   r     s*   	





r   nc                 C   s   | d d@ S )zpRound up n to the nearest multiple of 8.
    This is used to get the byte-padded number of bits for n bits.
       ir   )r   r   r   r   r     s   r   r   r   c                 C   s>   ddl }|  }t|tj}||L }||tj}||S )z}Align the bit offset into the buffer with the front of the buffer by shifting
    the buffer and eliminating the offset.
    r   N)r   
to_pybytesrs   
from_bytessys	byteorderto_bytes	py_buffer)r   r   r   r   bytes_bytes_as_intr   r   r   r     s   
r   tablec                 C   sd   ddl m} ddlm} | }||| jd}||  W d   n1 s&w   Y  t| ffS )a4  Custom reducer for Arrow Table that works around a zero-copy slicing pickling
    bug by using the Arrow IPC format for the underlying serialization.

    This is currently used as a fallback for unsupported types (or unknown bugs) for
    the manual buffer truncation workaround, e.g. for dense unions.
    r   )RecordBatchStreamWriter)BufferOutputStreamrO   N)pyarrow.ipcr   pyarrow.libr   rF   write_table_restore_table_from_ipcgetvalue)r   r   r   output_streamwrr   r   r   rC     s   rC   c                 C   s@   ddl m} || }| W  d   S 1 sw   Y  dS )z6Restore an Arrow Table serialized to Arrow IPC format.r   )RecordBatchStreamReaderN)r   r  read_all)r   r  readerr   r   r   r    s   
$r  c                 C   s   ddl }|j| o| jdkS )z1Whether the provided Arrow type is a dense union.r   Ndense)r   r~   r   moder   r   r   r   r=      s   r=   )r7   r8   )ru   rV   rN   rL   rm   )ri   r   rN   rV   )ri   r   rN   rV   )ri   r   rN   rV   )ri   r   rN   rV   )ri   r   rN   rV   )ri   r   rN   rV   )ri   r   rN   rV   )ri   r   rN   rV   )r   r8   ):loggingr)   r   dataclassesr   typingr   r   r   r   ray._private.utilsr   r   ray.data.extensionsr   r	   r
   	getLoggerrn   r@   setr?   r   r   r   r   r6   rE   r;   rP   rV   rl   rj   boolr   r   r   r   r   r   r   r   r   r   r   r   r   rs   r   r   r   r   r   r   rC   bytesr  r=   r   r   r   r   <module>   s   

.



#
,/



 "


!


8





%

