o
    $iKl                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	 d dl
mZ er*d dlZdZdZe eZe add Zd	d
 Zdd Zdd ZdfddZdee	ed df  ddddfddZddde	ed df fddZd ed d!dddfd"d#ZeG d$d dZdgd&d'Zdhd)d*Zd!ddefd+d,Z d!ddefd-d.Z!did0d1Z"dhd2d3Z#dhd4d5Z$dhd6d7Z%	(	8		djd9d:Z&dkd<d=Z'dld?d@Z(	(	A		dmdBdCZ)dndEdFZ*	(	G		dodHdIZ+dJdKd!ed dLe,dMe,ddKf
dNdOZ-dJdKdPe,dLe,dMe,ddKf
dQdRZ.dJdKdLe,dMe,ddKfdSdTZ/dJdKdUddLe,dMe,de	dKe,e,f f
dVdWZ0dXe,de,fdYdZZ1dJdKd[e,d\e,ddKfd]d^Z2dpd`daZ3dJe4ddfdbdcZ5d!ddefdddeZ6dS )q    N)	dataclass)TYPE_CHECKINGListOptionalTuple)
is_in_test3RAY_DISABLE_CUSTOM_ARROW_JSON_OPTIONS_SERIALIZATION+RAY_DISABLE_CUSTOM_ARROW_DATA_SERIALIZATIONc                 C   s>   zdd l }W n
 ty   Y d S w t|  t|  t|  d S )Nr   )pyarrowModuleNotFoundError_register_arrow_data_serializer+_register_arrow_json_readoptions_serializer,_register_arrow_json_parseoptions_serializerserialization_contextpa r   ]/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/_private/arrow_serialization.py%_register_custom_datasets_serializers   s   r   c                    D   t jtddkrd S dd lm  | j jdd  fddd d S )N01r   c                 S   s   | j | jfS N)use_threads
block_sizeoptsr   r   r   <lambda>:   s    z=_register_arrow_json_readoptions_serializer.<locals>.<lambda>c                    
    j |  S r   )ReadOptionsargspajsonr   r   r   ;      
 custom_serializercustom_deserializer)osenvirongetr   pyarrow.jsonjson _register_cloudpickle_serializerr   r   r   r"   r   r   ,   s   

r   c                    r   )Nr   r   r   c                 S   s   | j | j| jfS r   )explicit_schemanewlines_in_valuesunexpected_field_behaviorr   r   r   r   r   M   s   z>_register_arrow_json_parseoptions_serializer.<locals>.<lambda>c                    r   r   )ParseOptionsr    r"   r   r   r   R   r$   r%   )r(   r)   r*   r   r+   r,   r-   r2   r.   r   r"   r   r   ?   s   

r   c                 C   s0   t jtddkrdS ddl}| |jt dS )ay  Custom reducer for Arrow data that works around a zero-copy slicing pickling
    bug by using the Arrow IPC format for the underlying serialization.

    Background:
        Arrow has both array-level slicing and buffer-level slicing; both are zero-copy,
        but the former has a serialization bug where the entire buffer is serialized
        instead of just the slice, while the latter's serialization works as expected
        and only serializes the slice of the buffer. I.e., array-level slicing doesn't
        propagate the slice down to the buffer when serializing the array.

        We work around this by registering a custom cloudpickle reducers for Arrow
        Tables that delegates serialization to the Arrow IPC format; thankfully, Arrow's
        IPC serialization has fixed this buffer truncation bug.

    See https://issues.apache.org/jira/browse/ARROW-10739.
    r   r   Nr   )r(   r)   r*   r	   r
   _register_cloudpickle_reducerTable_arrow_table_reducer   r   r   r   r   X   s   r   tpyarrow.Tablec                 C   s   g }| j D ]S}| | }zt|}W nA tyR } z5t|js$t r$|dt|jtvrBtjd| d|j ddd t	t|j t
| W  Y d}~  S d}~ww || qt|| jffS )a/  Custom reducer for Arrow Tables that works around a zero-copy slice pickling bug.
    Background:
        Arrow has both array-level slicing and buffer-level slicing; both are zero-copy,
        but the former has a serialization bug where the entire buffer is serialized
        instead of just the slice, while the latter's serialization works as expected
        and only serializes the slice of the buffer. I.e., array-level slicing doesn't
        propagate the slice down to the buffer when serializing the array.
        All that these copy methods do is, at serialization time, take the array-level
        slicing and translate them to buffer-level slicing, so only the buffer slice is
        sent over the wire instead of the entire buffer.
    See https://issues.apache.org/jira/browse/ARROW-10739.
    NzTFailed to complete optimized serialization of Arrow Table, serialization of column 'z
' of type z failed, so we're falling back to Arrow IPC serialization for the table. Note that this may result in slower serialization and more worker memory utilization. Serialization error:T)exc_info)column_names_arrow_chunked_array_reduce	Exception_is_dense_uniontyper   _serialization_fallback_setloggerwarningadd_arrow_table_ipc_reduceappend_reconstruct_tableschema)r6   reduced_columnscolumn_namecolumnreduced_columner   r   r   r5   q   s.   
r5   rF   pyarrow.Arraypyarrow.DataTyperE   zpyarrow.Schemareturnc                 C   s:   ddl }g }| D ]\}}|t|| q|jj||dS )zERestore a serialized Arrow Table, reconstructing each reduced column.r   NrE   )r
   rC   _reconstruct_chunked_arrayr4   from_arrays)rF   rE   r   columnschunks_payloadtype_r   r   r   rD      s
   rD   cazpyarrow.ChunkedArrayPicklableArrayPayloadc                 C   s.   g }| j D ]}t|}|| q|| jfS )zCustom reducer for Arrow ChunkedArrays that works around a zero-copy slice
    pickling bug. This reducer does not return a reconstruction function, since it's
    expected to be reconstructed by the Arrow Table reconstructor.
    )chunksrU   
from_arrayrC   r=   )rT   chunk_payloadschunkchunk_payloadr   r   r   r:      s
   


r:   rV   rS   c                 C   s"   ddl }dd | D } || |S )z=Restore a serialized Arrow ChunkedArray from chunks and type.r   Nc                 S      g | ]}|  qS r   to_array).0rY   r   r   r   
<listcomp>       z._reconstruct_chunked_array.<locals>.<listcomp>)r
   chunked_array)rV   rS   r   r   r   r   rO      s   rO   c                   @   sb   e Zd ZU dZded< eed< ed ed< eed< eed< ed  ed	< edddZdddZ	dS )rU   zPicklable array payload, holding data buffers and array metadata.

    This is a helper container for pickling and reconstructing nested Arrow Arrays while
    ensuring that the buffers that underly zero-copy slice views are properly truncated.
    rL   r=   lengthpyarrow.Bufferbuffers
null_countoffsetchildrenarK   rM   c                 C   s   t |S )a  Create a picklable array payload from an Arrow Array.

        This will recursively accumulate data buffer and metadata payloads that are
        ready for pickling; namely, the data buffers underlying zero-copy slice views
        will be properly truncated.
        )_array_to_array_payload)selfrh   r   r   r   rW      s   z PicklableArrayPayload.from_arrayc                 C   s   t | S )z7Reconstruct an Arrow Array from this picklable payload.)_array_payload_to_array)rj   r   r   r   r]      s   zPicklableArrayPayload.to_arrayNrh   rK   rM   rU   )rM   rK   )
__name__
__module____qualname____doc____annotations__intr   classmethodrW   r]   r   r   r   r   rU      s   
 	payloadc           	      C   s   ddl }dd | jD }|j| jr*t|dksJ t||\}}|j||S |j| jrPt|dkrPt|dksCJ t||\}}}|j	|||S t
| j|jrmt|dkscJ t||d }| j|S |jj| j| j| j| j| j|dS )	zHReconstruct an Arrow Array from a possibly nested PicklableArrayPayload.r   Nc                 S   r[   r   r\   )r^   child_payloadr   r   r   r_      r`   z+_array_payload_to_array.<locals>.<listcomp>         r=   rb   rd   re   rf   rg   )r
   rg   typesis_dictionaryr=   lenDictionaryArrayrP   is_mapMapArray
isinstanceBaseExtensionType
wrap_arrayArrayfrom_buffersrb   rd   re   rf   )	rt   r   rg   indices
dictionaryoffsetskeysitemsstorager   r   r   rk      s,   
rk   rh   c                 C   s  ddl }t| jrtd|j| jrt| S t| jr!t| S t	| jr*t
| S |j| js8|j| jr<t| S |j| jrGt| S |j| jrRt| S |j| jr]t| S |j| jrht| S |j| jrst| S t| j|jr~t| S td| j)zSerialize an Arrow Array to an PicklableArrayPayload for later pickling.

    This function's primary purpose is to dispatch to the handler for the input array
    type.
    r   NzKCustom slice view serialization of dense union arrays is not yet supported.zUnhandled Arrow array type:)r
   r<   r=   NotImplementedErrorrz   is_null_null_array_to_array_payload_is_primitive!_primitive_array_to_array_payload
_is_binary_binary_array_to_array_payloadis_listis_large_list_list_array_to_array_payloadis_fixed_size_list'_fixed_size_list_array_to_array_payload	is_struct_struct_array_to_array_payloadis_union_union_array_to_array_payloadr{   "_dictionary_array_to_array_payloadr~   _map_array_to_array_payloadr   r   !_extension_array_to_array_payload
ValueError)rh   r   r   r   r   ri     s4   


ri   c                 C   sP   ddl }|j| p'|j| p'|j| p'|j| p'|j| p'|j| S )zcWhether the provided Array type is primitive (boolean, numeric, temporal or
    fixed-size binary).r   N)r
   rz   
is_integeris_floating
is_decimal
is_booleanis_temporalis_fixed_size_binaryrS   r   r   r   r   r   8  s   




r   c                 C   s8   ddl }|j| p|j| p|j| p|j| S )z@Whether the provided Array type is a variable-sized binary type.r   N)r
   rz   	is_stringis_large_string	is_binaryis_large_binaryr   r   r   r   r   G  s   


r   pyarrow.NullArrayc                 C   s   t | jt| dg| jdg dS )z.Serialize null array to PicklableArrayPayload.Nr   ry   )rU   r=   r|   re   rh   r   r   r   r   S  s   r   c                 C   s   t | js
J | j|  }t|dksJ t||d }| jdkr-t|| jt| }nd}|d }|durDt|d | j| jt| }t| jt| ||g| jdg dS )zZSerialize primitive (numeric, temporal, boolean) arrays to
    PicklableArrayPayload.
    rv   r   Nrw   ry   )	r   r=   rd   r|   re    _copy_bitpacked_buffer_if_neededrf   _copy_buffer_if_neededrU   )rh   rd   
bitmap_bufdata_bufr   r   r   r   `  s$   
r   c                 C   s   t | js
J | j|  }t|dksJ t|| jdkr+t|d | jt| }nd}|d }t|| j| jt| \}}}|d }t|d||}t	| jt| |||g| jdg dS )zZSerialize binary (variable-sized binary, string) arrays to
    PicklableArrayPayload.
    rx   r   Nrw   rv   ry   )
r   r=   rd   r|   re   r   rf   _copy_offsets_buffer_if_neededr   rU   )rh   rd   r   
offset_bufdata_offsetdata_lengthr   r   r   r   r     s(   

r   c                 C   s   |   }t|dksJ t|| jdkr!t|d | jt| }nd}|d }t|| j| jt| \}}}| j||}t	| jt| ||g| jdt
|gdS )zCSerialize list (regular and large) arrays to PicklableArrayPayload.rw   r   Nry   )rd   r|   re   r   rf   r   r=   valuesslicerU   ri   )rh   rd   r   r   child_offsetchild_lengthchildr   r   r   r     s$   

r   pyarrow.FixedSizeListArrayc                 C   s   |   }t|dksJ t|| jdkr!t|d | jt| }nd}| jj| j }| jjt|  }| j||}t	| jt| |g| jdt
|gdS )z:Serialize fixed size list arrays to PicklableArrayPayload.rw   r   Nry   )rd   r|   re   r   rf   r=   	list_sizer   r   rU   ri   )rh   rd   r   r   r   r   r   r   r   r     s    
r   pyarrow.StructArrayc                    s~      }t|dksJ t| jdkr!t|d  jt }nd} fddt jjD }t jt |g jd|dS )z1Serialize struct arrays to PicklableArrayPayload.rw   r   Nc                       g | ]	}t  |qS r   ri   fieldr^   ir   r   r   r_         z2_struct_array_to_array_payload.<locals>.<listcomp>ry   )	rd   r|   re   r   rf   ranger=   
num_fieldsrU   )rh   rd   r   rg   r   r   r   r     s   
r   pyarrow.UnionArrayc                    s   ddl }t jrJ   }t|dksJ t||d }|du s'J ||d }t||  jt } fddt jj	D }t
 jt ||g jd|dS )z0Serialize union arrays to PicklableArrayPayload.r   Nrw   c                    r   r   r   r   r   r   r   r_     r   z1_union_array_to_array_payload.<locals>.<listcomp>ry   )r
   r<   r=   rd   r|   r   int8rf   r   r   rU   re   )rh   r   rd   r   type_code_bufrg   r   r   r   r     s"   r   pyarrow.DictionaryArrayc                 C   s4   t | j}t | j}t| jt| g | jd||gdS )z5Serialize dictionary arrays to PicklableArrayPayload.r   ry   )ri   r   r   rU   r=   r|   re   )rh   indices_payloaddictionary_payloadr   r   r   r     s   

r   pyarrow.MapArrayc                 C   s4  ddl }|  }t|dksJ t|| jdkr%t|d | jt| }nd}|g}|d }t|| j| jt| \}}}t| |j	j
rS|| t| j||g}n9|  }t|dkscJ t||j| t| d ||g}	| j||}
| j||}t|	t|
t|g}t| jt| || jd|dS )z.Serialize map arrays to PicklableArrayPayload.r   Nrw   rv   ry   )r
   rd   r|   re   r   rf   r   r=   r   lib	ListArrayrC   ri   r   r   r   r   int32r   r   rU   )rh   r   rd   r   new_buffersr   r   r   rg   r   r   r   r   r   r   r   /  sB   


r   pyarrow.ExtensionArrayc                 C   s(   t | j}t| jt| g | jd|gdS )Nr   ry   )ri   r   rU   r=   r|   re   )rh   storage_payloadr   r   r   r   g  s   
r   bufrc   rf   rb   c                 C   sT   ddl }|dur|j|rt| ||} | S |dur|jd nd}t| |||} | S )Copy buffer, if needed.r   N   rw   )r
   rz   r   r   	bit_width_copy_normal_buffer_if_needed)r   rS   rf   rb   r   type_bytewidthr   r   r   r   u  s   r   
byte_widthc                 C   s2   || }|| }|dks|| j k r| ||} | S )r   r   )sizer   )r   r   rf   rb   byte_offsetbyte_lengthr   r   r   r     s
   r   c                 C   sV   |d }|d }t || d }|dks|| jk r)| ||} |dkr)t| ||} | S )z)Copy bit-packed binary buffer, if needed.r   r   )_bytes_for_bitsr   r   _align_bit_offset)r   rf   rb   
bit_offsetr   r   r   r   r   r     s   r   arr_typec           
      C   s   ddl }ddlm} |j|s"|j|s"|j|s"|j|r'| }n|	 }t
| |||d } |j||d d| g}|d  }|d  | }	|||}|j|ra|j|dd}| d } | ||	fS )zvCopy the provided offsets buffer, returning the copied buffer and the
    offset + length of the underlying data.
    r   Nrw   F)safe)r
   pyarrow.computecomputerz   r   r   r   is_large_unicodeint64r   r   r   r   as_pysubtractis_int32castrd   )
r   r   rf   rb   r   pacoffset_typer   r   r   r   r   r   r     s*   	





r   nc                 C   s   | d d@ S )zpRound up n to the nearest multiple of 8.
    This is used to get the byte-padded number of bits for n bits.
       ir   )r   r   r   r   r     s   r   r   r   c                 C   s>   ddl }|  }t|tj}||L }||tj}||S )z}Align the bit offset into the buffer with the front of the buffer by shifting
    the buffer and eliminating the offset.
    r   N)r
   
to_pybytesrr   
from_bytessys	byteorderto_bytes	py_buffer)r   r   r   r   bytes_bytes_as_intr   r   r   r     s   
r   tablec                 C   sd   ddl m} ddlm} | }||| jd}||  W d   n1 s&w   Y  t| ffS )a4  Custom reducer for Arrow Table that works around a zero-copy slicing pickling
    bug by using the Arrow IPC format for the underlying serialization.

    This is currently used as a fallback for unsupported types (or unknown bugs) for
    the manual buffer truncation workaround, e.g. for dense unions.
    r   )RecordBatchStreamWriter)BufferOutputStreamrN   N)pyarrow.ipcr   pyarrow.libr   rE   write_table_restore_table_from_ipcgetvalue)r   r   r   output_streamwrr   r   r   rB     s   rB   c                 C   s@   ddl m} || }| W  d   S 1 sw   Y  dS )z6Restore an Arrow Table serialized to Arrow IPC format.r   )RecordBatchStreamReaderN)r   r   read_all)r   r   readerr   r   r   r     s   
$r   c                 C   s   ddl }|j| o| jdkS )z1Whether the provided Arrow type is a dense union.r   Ndense)r
   rz   r   moder   r   r   r   r<     s   r<   )r6   r7   )rt   rU   rM   rK   rl   )rh   r   rM   rU   )rh   r   rM   rU   )rh   r   rM   rU   )rh   r   rM   rU   )rh   r   rM   rU   )rh   r   rM   rU   )rh   r   rM   rU   )r   r7   )7loggingr(   r   dataclassesr   typingr   r   r   r   ray._private.utilsr   r
   r   r	   	getLoggerrm   r?   setr>   r   r   r   r   r5   rD   r:   rO   rU   rk   ri   boolr   r   r   r   r   r   r   r   r   r   r   r   rr   r   r   r   r   r   r   rB   bytesr   r<   r   r   r   r   <module>   s   

.



#
")



 "


!

8




%

