o
    `۷io                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
 d dlmZ er,d dlZdZdZe eZe add Zd	d
 Zdd Zdd Zddde
edgdf e
e f fddZdeddfddZdkddZdee
ed df  ddddfdd Zd!d"de
ed# df fd$d%Zd&ed# d'ddd"fd(d)ZeG d*d# d#Z dld,d-Z!dmd/d0Z"d'dde#fd1d2Z$d'dde#fd3d4Z%dnd6d7Z&dmd8d9Z'dmd:d;Z(dmd<d=Z)	.	>		#dod?d@Z*dpdBdCZ+dqdEdFZ,	.	G		#drdHdIZ-dsdKdLZ.	.	M		#dtdNdOZ/ddPd'e	d dQe0dRe0ddPf
dSdTZ1ddPdUe0dQe0dRe0ddPf
dVdWZ2ddPdQe0dRe0ddPfdXdYZ3ddPdZddQe0dRe0de
dPe0e0f f
d[d\Z4d]e0de0fd^d_Z5ddPd`e0dae0ddPfdbdcZ6dudedfZ7deddfdgdhZ8d'dde#fdidjZ9dS )v    N)	dataclass)TYPE_CHECKINGCallableListOptionalTuple)
is_in_test3RAY_DISABLE_CUSTOM_ARROW_JSON_OPTIONS_SERIALIZATION+RAY_DISABLE_CUSTOM_ARROW_DATA_SERIALIZATIONc                 C   s>   zdd l }W n
 ty   Y d S w t|  t|  t|  d S )Nr   )pyarrowModuleNotFoundError_register_arrow_data_serializer+_register_arrow_json_readoptions_serializer,_register_arrow_json_parseoptions_serializerserialization_contextpa r   V/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/_private/arrow_serialization.py%_register_custom_datasets_serializers   s   r   c                    D   t jtddkrd S dd lm  | j jdd  fddd d S )N01r   c                 S   s   | j | jfS N)use_threads
block_sizeoptsr   r   r   <lambda>:   s    z=_register_arrow_json_readoptions_serializer.<locals>.<lambda>c                    
    j |  S r   )ReadOptionsargspajsonr   r   r   ;      
 custom_serializercustom_deserializer)osenvirongetr	   pyarrow.jsonjson _register_cloudpickle_serializerr    r   r   r#   r   r   ,   s   

r   c                    r   )Nr   r   r   c                 S   s   | j | j| jfS r   )explicit_schemanewlines_in_valuesunexpected_field_behaviorr   r   r   r   r   M   s   z>_register_arrow_json_parseoptions_serializer.<locals>.<lambda>c                    r   r   )ParseOptionsr!   r#   r   r   r   R   r%   r&   )r)   r*   r+   r	   r,   r-   r.   r3   r/   r   r#   r   r   ?   s   

r   c                 C   s>   t jtddkrdS ddl}| |jt | |jt	 dS )ay  Custom reducer for Arrow data that works around a zero-copy slicing pickling
    bug by using the Arrow IPC format for the underlying serialization.

    Background:
        Arrow has both array-level slicing and buffer-level slicing; both are zero-copy,
        but the former has a serialization bug where the entire buffer is serialized
        instead of just the slice, while the latter's serialization works as expected
        and only serializes the slice of the buffer. I.e., array-level slicing doesn't
        propagate the slice down to the buffer when serializing the array.

        We work around this by registering a custom cloudpickle reducers for Arrow
        Tables that delegates serialization to the Arrow IPC format; thankfully, Arrow's
        IPC serialization has fixed this buffer truncation bug.

    See https://issues.apache.org/jira/browse/ARROW-10739.
    r   r   Nr   )
r)   r*   r+   r
   r   _register_cloudpickle_reducerTable_arrow_table_reduceSchema_arrow_schema_reducer   r   r   r   r   X   s
   r   schemazpyarrow.Schemareturnbytesc                 C   s   t |   ffS )a;  Custom reducer for Arrow Schema that uses IPC serialization for performance.

    Arrow's native IPC serialization for schemas is significantly faster than
    cloudpickle (10-20x for serialization, 2-3x for deserialization), making
    this optimization particularly valuable for workloads with large schemas.
    )_restore_schema_from_ipc	serialize
to_pybytesr9   r   r   r   r8   r   s   
r8   bufc                 C   s   ddl }|j|| S )z7Restore an Arrow Schema serialized to Arrow IPC format.r   N)r   ipcread_schemaBufferReader)r@   r   r   r   r   r<      s   r<   tpyarrow.Tablec                 C   s   g }| j D ]S}| | }zt|}W nA tyR } z5t|js$t r$|dt|jtvrBtjd| d|j ddd t	t|j t
| W  Y d}~  S d}~ww || qt|| jffS )a/  Custom reducer for Arrow Tables that works around a zero-copy slice pickling bug.
    Background:
        Arrow has both array-level slicing and buffer-level slicing; both are zero-copy,
        but the former has a serialization bug where the entire buffer is serialized
        instead of just the slice, while the latter's serialization works as expected
        and only serializes the slice of the buffer. I.e., array-level slicing doesn't
        propagate the slice down to the buffer when serializing the array.
        All that these copy methods do is, at serialization time, take the array-level
        slicing and translate them to buffer-level slicing, so only the buffer slice is
        sent over the wire instead of the entire buffer.
    See https://issues.apache.org/jira/browse/ARROW-10739.
    NzTFailed to complete optimized serialization of Arrow Table, serialization of column 'z
' of type z failed, so we're falling back to Arrow IPC serialization for the table. Note that this may result in slower serialization and more worker memory utilization. Serialization error:T)exc_info)column_names_arrow_chunked_array_reduce	Exception_is_dense_uniontyper   _serialization_fallback_setloggerwarningadd_arrow_table_ipc_reduceappend_reconstruct_tabler9   )rD   reduced_columnscolumn_namecolumnreduced_columner   r   r   r6      s.   
r6   rS   pyarrow.Arraypyarrow.DataTypec                 C   s:   ddl }g }| D ]\}}|t|| q|jj||dS )zERestore a serialized Arrow Table, reconstructing each reduced column.r   Nr?   )r   rQ   _reconstruct_chunked_arrayr5   from_arrays)rS   r9   r   columnschunks_payloadtype_r   r   r   rR      s
   rR   cazpyarrow.ChunkedArrayPicklableArrayPayloadc                 C   s.   g }| j D ]}t|}|| q|| jfS )zCustom reducer for Arrow ChunkedArrays that works around a zero-copy slice
    pickling bug. This reducer does not return a reconstruction function, since it's
    expected to be reconstructed by the Arrow Table reconstructor.
    )chunksr`   
from_arrayrQ   rK   )r_   chunk_payloadschunkchunk_payloadr   r   r   rH      s
   


rH   ra   r^   c                 C   s"   ddl }dd | D } || |S )z=Restore a serialized Arrow ChunkedArray from chunks and type.r   Nc                 S      g | ]}|  qS r   to_array).0rd   r   r   r   
<listcomp>       z._reconstruct_chunked_array.<locals>.<listcomp>)r   chunked_array)ra   r^   r   r   r   r   rZ      s   rZ   c                   @   sb   e Zd ZU dZded< eed< ed ed< eed< eed< ed  ed	< edddZdddZ	dS )r`   zPicklable array payload, holding data buffers and array metadata.

    This is a helper container for pickling and reconstructing nested Arrow Arrays while
    ensuring that the buffers that underly zero-copy slice views are properly truncated.
    rY   rK   lengthpyarrow.Bufferbuffers
null_countoffsetchildrenarX   r:   c                 C   s   t |S )a  Create a picklable array payload from an Arrow Array.

        This will recursively accumulate data buffer and metadata payloads that are
        ready for pickling; namely, the data buffers underlying zero-copy slice views
        will be properly truncated.
        )_array_to_array_payload)selfrs   r   r   r   rb      s   z PicklableArrayPayload.from_arrayc                 C   s   t | S )z7Reconstruct an Arrow Array from this picklable payload.)_array_payload_to_array)ru   r   r   r   rh      s   zPicklableArrayPayload.to_arrayNrs   rX   r:   r`   )r:   rX   )
__name__
__module____qualname____doc____annotations__intr   classmethodrb   rh   r   r   r   r   r`      s   
 	payloadc           	      C   s   ddl }dd | jD }|j| jr*t|dksJ t||\}}|j||S |j| jrPt|dkrPt|dksCJ t||\}}}|j	|||S t
| j|jrmt|dkscJ t||d }| j|S |jj| j| j| j| j| j|dS )	zHReconstruct an Arrow Array from a possibly nested PicklableArrayPayload.r   Nc                 S   rf   r   rg   )ri   child_payloadr   r   r   rj     rk   z+_array_payload_to_array.<locals>.<listcomp>         rK   rm   ro   rp   rq   rr   )r   rr   typesis_dictionaryrK   lenDictionaryArrayr[   is_mapMapArray
isinstanceBaseExtensionType
wrap_arrayArrayfrom_buffersrm   ro   rp   rq   )	r   r   rr   indices
dictionaryoffsetskeysitemsstorager   r   r   rv     s,   
rv   rs   c                 C   s  ddl }t| jrtd|j| jrt| S t| jr!t| S t	| jr*t
| S |j| js8|j| jr<t| S |j| jrGt| S |j| jrRt| S |j| jr]t| S |j| jrht| S |j| jrst| S t| j|jr~t| S td| j)zSerialize an Arrow Array to an PicklableArrayPayload for later pickling.

    This function's primary purpose is to dispatch to the handler for the input array
    type.
    r   NzKCustom slice view serialization of dense union arrays is not yet supported.zUnhandled Arrow array type:)r   rJ   rK   NotImplementedErrorr   is_null_null_array_to_array_payload_is_primitive!_primitive_array_to_array_payload
_is_binary_binary_array_to_array_payloadis_listis_large_list_list_array_to_array_payloadis_fixed_size_list'_fixed_size_list_array_to_array_payload	is_struct_struct_array_to_array_payloadis_union_union_array_to_array_payloadr   "_dictionary_array_to_array_payloadr   _map_array_to_array_payloadr   r   !_extension_array_to_array_payload
ValueError)rs   r   r   r   r   rt   $  s4   


rt   c                 C   sP   ddl }|j| p'|j| p'|j| p'|j| p'|j| p'|j| S )zcWhether the provided Array type is primitive (boolean, numeric, temporal or
    fixed-size binary).r   N)r   r   
is_integeris_floating
is_decimal
is_booleanis_temporalis_fixed_size_binaryr^   r   r   r   r   r   M  s   




r   c                 C   s8   ddl }|j| p|j| p|j| p|j| S )z@Whether the provided Array type is a variable-sized binary type.r   N)r   r   	is_stringis_large_string	is_binaryis_large_binaryr   r   r   r   r   \  s   


r   pyarrow.NullArrayc                 C   s   t | jt| dg| jdg dS )z.Serialize null array to PicklableArrayPayload.Nr   r   )r`   rK   r   rp   rs   r   r   r   r   h  s   r   c                 C   s   t | js
J | j|  }t|dksJ t||d }| jdkr-t|| jt| }nd}|d }|durDt|d | j| jt| }t| jt| ||g| jdg dS )zZSerialize primitive (numeric, temporal, boolean) arrays to
    PicklableArrayPayload.
    r   r   Nr   r   )	r   rK   ro   r   rp    _copy_bitpacked_buffer_if_neededrq   _copy_buffer_if_neededr`   )rs   ro   
bitmap_bufdata_bufr   r   r   r   u  s$   
r   c                 C   s   t | js
J | j|  }t|dksJ t|| jdkr+t|d | jt| }nd}|d }t|| j| jt| \}}}|d }t|d||}t	| jt| |||g| jdg dS )zZSerialize binary (variable-sized binary, string) arrays to
    PicklableArrayPayload.
    r   r   Nr   r   r   )
r   rK   ro   r   rp   r   rq   _copy_offsets_buffer_if_neededr   r`   )rs   ro   r   
offset_bufdata_offsetdata_lengthr   r   r   r   r     s(   

r   c                 C   s   |   }t|dksJ t|| jdkr!t|d | jt| }nd}|d }t|| j| jt| \}}}| j||}t	| jt| ||g| jdt
|gdS )zCSerialize list (regular and large) arrays to PicklableArrayPayload.r   r   Nr   )ro   r   rp   r   rq   r   rK   valuesslicer`   rt   )rs   ro   r   r   child_offsetchild_lengthchildr   r   r   r     s$   

r   pyarrow.FixedSizeListArrayc                 C   s   |   }t|dksJ t|| jdkr!t|d | jt| }nd}| jj| j }| jjt|  }| j||}t	| jt| |g| jdt
|gdS )z:Serialize fixed size list arrays to PicklableArrayPayload.r   r   Nr   )ro   r   rp   r   rq   rK   	list_sizer   r   r`   rt   )rs   ro   r   r   r   r   r   r   r   r     s    
r   pyarrow.StructArrayc                    s~      }t|dksJ t| jdkr!t|d  jt }nd} fddt jjD }t jt |g jd|dS )z1Serialize struct arrays to PicklableArrayPayload.r   r   Nc                       g | ]	}t  |qS r   rt   fieldri   ir   r   r   rj         z2_struct_array_to_array_payload.<locals>.<listcomp>r   )	ro   r   rp   r   rq   rangerK   
num_fieldsr`   )rs   ro   r   rr   r   r   r   r     s   
r   pyarrow.UnionArrayc                    s   ddl }t jrJ   }t|dksJ t||d }|du s'J ||d }t||  jt } fddt jj	D }t
 jt ||g jd|dS )z0Serialize union arrays to PicklableArrayPayload.r   Nr   c                    r   r   r   r   r   r   r   rj   '  r   z1_union_array_to_array_payload.<locals>.<listcomp>r   )r   rJ   rK   ro   r   r   int8rq   r   r   r`   rp   )rs   r   ro   r   type_code_bufrr   r   r   r   r     s"   r   pyarrow.DictionaryArrayc                 C   s4   t | j}t | j}t| jt| g | jd||gdS )z5Serialize dictionary arrays to PicklableArrayPayload.r   r   )rt   r   r   r`   rK   r   rp   )rs   indices_payloaddictionary_payloadr   r   r   r   2  s   

r   pyarrow.MapArrayc                 C   s4  ddl }|  }t|dksJ t|| jdkr%t|d | jt| }nd}|g}|d }t|| j| jt| \}}}t| |j	j
rS|| t| j||g}n9|  }t|dkscJ t||j| t| d ||g}	| j||}
| j||}t|	t|
t|g}t| jt| || jd|dS )z.Serialize map arrays to PicklableArrayPayload.r   Nr   r   r   )r   ro   r   rp   r   rq   r   rK   r   lib	ListArrayrQ   rt   r   r   r   r   int32r   r   r`   )rs   r   ro   r   new_buffersr   r   r   rr   r   r   r   r   r   r   r   D  sB   


r   pyarrow.ExtensionArrayc                 C   s(   t | j}t| jt| g | jd|gdS )Nr   r   )rt   r   r`   rK   r   rp   )rs   storage_payloadr   r   r   r   |  s   
r   rn   rq   rm   c                 C   sT   ddl }|dur|j|rt| ||} | S |dur|jd nd}t| |||} | S )Copy buffer, if needed.r   N   r   )r   r   r   r   	bit_width_copy_normal_buffer_if_needed)r@   r^   rq   rm   r   type_bytewidthr   r   r   r     s   r   
byte_widthc                 C   s2   || }|| }|dks|| j k r| ||} | S )r   r   )sizer   )r@   r   rq   rm   byte_offsetbyte_lengthr   r   r   r     s
   r   c                 C   sV   |d }|d }t || d }|dks|| jk r)| ||} |dkr)t| ||} | S )z)Copy bit-packed binary buffer, if needed.r   r   )_bytes_for_bitsr   r   _align_bit_offset)r@   rq   rm   
bit_offsetr   r   r   r   r   r     s   r   arr_typec           
      C   s   ddl }ddlm} |j|s"|j|s"|j|s"|j|r'| }n|	 }t
| |||d } |j||d d| g}|d  }|d  | }	|||}|j|ra|j|dd}| d } | ||	fS )zvCopy the provided offsets buffer, returning the copied buffer and the
    offset + length of the underlying data.
    r   Nr   F)safe)r   pyarrow.computecomputer   r   r   r   is_large_unicodeint64r   r   r   r   as_pysubtractis_int32castro   )
r@   r   rq   rm   r   pacoffset_typer   r   r   r   r   r   r     s*   	





r   nc                 C   s   | d d@ S )zpRound up n to the nearest multiple of 8.
    This is used to get the byte-padded number of bits for n bits.
       ir   )r   r   r   r   r     s   r   r   r   c                 C   s>   ddl }|  }t|tj}||L }||tj}||S )z}Align the bit offset into the buffer with the front of the buffer by shifting
    the buffer and eliminating the offset.
    r   N)r   r>   r}   
from_bytessys	byteorderto_bytes	py_buffer)r@   r   r   r   bytes_bytes_as_intr   r   r   r     s   
r   tablec                 C   sd   ddl m} ddlm} | }||| jd}||  W d   n1 s&w   Y  t| ffS )a4  Custom reducer for Arrow Table that works around a zero-copy slicing pickling
    bug by using the Arrow IPC format for the underlying serialization.

    This is currently used as a fallback for unsupported types (or unknown bugs) for
    the manual buffer truncation workaround, e.g. for dense unions.
    r   )RecordBatchStreamWriter)BufferOutputStreamr?   N)pyarrow.ipcr   pyarrow.libr   r9   write_table_restore_table_from_ipcgetvalue)r   r   r   output_streamwrr   r   r   rP     s   rP   c                 C   s@   ddl m} || }| W  d   S 1 sw   Y  dS )z6Restore an Arrow Table serialized to Arrow IPC format.r   )RecordBatchStreamReaderN)r  r  read_all)r@   r  readerr   r   r   r    s   
$r  c                 C   s   ddl }|j| o| jdkS )z1Whether the provided Arrow type is a dense union.r   Ndense)r   r   r   moder   r   r   r   rJ     s   rJ   )rD   rE   )r   r`   r:   rX   rw   )rs   r   r:   r`   )rs   r   r:   r`   )rs   r   r:   r`   )rs   r   r:   r`   )rs   r   r:   r`   )rs   r   r:   r`   )rs   r   r:   r`   )r   rE   ):loggingr)   r   dataclassesr   typingr   r   r   r   r   ray._private.utilsr   r   r	   r
   	getLoggerrx   rM   setrL   r   r   r   r   r;   r8   r<   r6   rR   rH   rZ   r`   rv   rt   boolr   r   r   r   r   r   r   r   r   r   r   r   r}   r   r   r   r   r   r   rP   r  rJ   r   r   r   r   <module>   s   


.



#
")



 "


!

8




%

