o
    $iAo                     @   s8  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZmZmZmZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZ d dl m!Z! e	r|d dl"Z"d dlZd d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z* edddZ+edddZ,edZ-edZ.ed Z/ee0ddf Z1ed Z2ed Z3e4e5Z6e!G dd deZ7e!G dd de8eZ9eddee8ej:f f Z;ee2ej:f Z<e0Z=G dd  d ee+e,f Z>eee+ge,f ee+gee, f e0d  f Z?eeee/ d!f  Z@ed! ZAg d"ZBd#ZCd$ee1 d%eDfd&d'ZEd(ed) d%ed) fd*d+ZFd,ee8 d%e8fd-d.ZGe!d/ed! d%ed0 fd1d2ZHe!G d3d4 d4ZIG d5d6 d6ZJe!eG d7d0 d0ZKd8d9 eeKD ZLe!eG d:d! d!eKZMe!d;d<eG d=d> d>eMZNe!G d?d@ d@ZOe!dAd<G dBdC dCZPdDeQej: d%ej:fdEdFZRdS )G    N)	dataclassfields)Enum)TYPE_CHECKINGAnyCallableDictIteratorListOptionalProtocolTupleTypeVarUnion)_check_pyarrow_version_truncated_repr)	ObjectRef)log_once)DeveloperAPI)BlockBuilderPandasBlockSchema)SortKey)AggregateFnTT)contravariantU)	covariantKeyTypeAggType)pyarrow.Tablepandas.DataFramer   pyarrow.lib.Schema)pyarrow.ChunkedArraypyarrow.Arraypandas.Series)r%   z
np.ndarrayr$   r#   c                   @   s   e Zd ZdZdZdS )	BlockTypearrowpandasN)__name__
__module____qualname__ARROWPANDAS r.   r.   K/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/block.pyr&   D   s    r&   c                   @   s   e Zd ZdZdZdZdS )BatchFormatpyarrowr(   numpyN)r)   r*   r+   r,   r-   NUMPYr.   r.   r.   r/   r0   J   s    r0   r    r!   c                   @   s*   e Zd Zdedeeee f fddZdS )_CallableClassProtocol_CallableClassProtocol__argreturnc                 C   s   d S Nr.   )selfr5   r.   r.   r/   __call__`   s   z_CallableClassProtocol.__call__N)r)   r*   r+   r   r   r   r	   r9   r.   r.   r.   r/   r4   _   s    "r4   BlockMetadata)r(   r1   r2   Nr2   schemar6   c                 C   s,   ddl m} | d u pt| |r| j S |  S )Nr   r   )ray.data._internal.pandas_blockr   
isinstancenames)r;   r   r.   r.   r/   _is_empty_schemaw   s   r?   schemasSchemac                 C   s   | D ]
}t |s|  S qdS )zReturn the first non-empty schema from an iterator of schemas.

    Args:
        schemas: Iterator of schemas to check.

    Returns:
        The first non-empty schema, or None if all schemas are empty.
    N)r?   )r@   r;   r.   r.   r/   _take_first_non_empty_schema   s
   	rB   given_batch_formatc                 C   s.   | dkrt } | tvrtd|  dt d| S )NdefaultzThe given batch format z isn't allowed (must be one of z).)DEFAULT_BATCH_FORMATVALID_BATCH_FORMATS
ValueError)rC   r.   r.   r/   _apply_batch_format   s   rH   metas
BlockStatsc                 C   s   dd | D S )Nc                 S   s   g | ]}|  qS r.   )to_stats).0mr.   r.   r/   
<listcomp>   s    zto_stats.<locals>.<listcomp>r.   )rI   r.   r.   r/   rK      s   rK   c                   @   s.   e Zd ZdZdd ZedddZdd	 Zd
S )BlockExecStatsa  Execution stats for this block.

    Attributes:
        wall_time_s: The wall-clock time it took to compute this block.
        cpu_time_s: The CPU time it took to compute this block.
        node_id: A unique id for the node that computed this block.
        max_uss_bytes: An estimate of the maximum amount of physical memory that the
            process was using while computing this block.
    c                 C   s>   d | _ d | _d | _d| _d | _tj  | _	d| _
d | _d S )Nr   )start_time_s
end_time_swall_time_s
udf_time_s
cpu_time_srayruntime_contextget_runtime_contextget_node_idnode_idmax_uss_bytestask_idxr8   r.   r.   r/   __init__   s   
zBlockExecStats.__init__r6   _BlockExecStatsBuilderc                   C      t  S r7   )r^   r.   r.   r.   r/   builder      zBlockExecStats.builderc                 C   s   t | j| j| j| jdS )N)rR   rT   rS   rY   )reprrR   rT   rS   rY   r\   r.   r.   r/   __repr__   s   zBlockExecStats.__repr__N)r6   r^   )r)   r*   r+   __doc__r]   staticmethodr`   rc   r.   r.   r.   r/   rO      s    

rO   c                   @   s"   e Zd ZdZdd Zd	ddZdS )
r^   zHelper class for building block stats.

    When this class is created, we record the start time. When build() is
    called, the time delta is saved as part of the stats.
    c                 C   s   t  | _t  | _d S r7   )timeperf_counter_start_timeprocess_time
_start_cpur\   r.   r.   r/   r]      s   
z_BlockExecStatsBuilder.__init__r6   rO   c                 C   s@   t  }t  }t }| j|_||_|| j |_|| j |_	|S r7   )
rf   rg   ri   rO   rh   rP   rQ   rR   rj   rT   )r8   end_timeend_cpustatsr.   r.   r/   build   s   z_BlockExecStatsBuilder.buildN)r6   rO   )r)   r*   r+   rd   r]   rn   r.   r.   r.   r/   r^      s    r^   c                   @   s>   e Zd ZU dZee ed< ee ed< ee ed< dd ZdS )rJ   z#Statistics about the block producednum_rows
size_bytes
exec_statsc                 C   s"   | j d urt| j tsJ d S d S r7   )rp   r=   intr\   r.   r.   r/   __post_init__   s   
zBlockStats.__post_init__N)	r)   r*   r+   rd   r   rr   __annotations__rO   rs   r.   r.   r.   r/   rJ      s   
 c                 C   s   h | ]}|j qS r.   )name)rL   fr.   r.   r/   	<setcomp>   s    rw   c                       s:   e Zd ZU dZeee  ed< dd Z fddZ	  Z
S )r:   zMetadata about the block.input_filesc                    s   t di  fddtD S )Nc                    s   i | ]}|  |qS r.   )__getattribute__)rL   keyr\   r.   r/   
<dictcomp>  s    z*BlockMetadata.to_stats.<locals>.<dictcomp>r.   )rJ   _BLOCK_STATS_FIELD_NAMESr\   r.   r\   r/   rK      s   zBlockMetadata.to_statsc                    s"   t    | jd u rg | _d S d S r7   )superrs   rx   r\   	__class__r.   r/   rs     s   


zBlockMetadata.__post_init__)r)   r*   r+   rd   r   r
   strrt   rK   rs   __classcell__r.   r.   r~   r/   r:      s
   
 alpha)	stabilityc                       sn   e Zd ZU dZee ed< ddeded f fddZ	dde	ded	 d
d fddZ
ed
efddZ  ZS )BlockMetadataWithSchemaNr;   metadatarA   c                    s&   t  j|j|j|j|jd || _d S )N)rx   rp   ro   rq   )r}   r]   rx   rp   ro   rq   r;   )r8   r   r;   r~   r.   r/   r]     s   
z BlockMetadataWithSchema.__init__blockrm   rO   r6   c                 C   s*   t | }|j|d}| }t||dS )N)rq   )r   r;   )BlockAccessor	for_blockget_metadatar;   r   )r   rm   accessormetar;   r.   r.   r/   
from_block  s   
z"BlockMetadataWithSchema.from_blockc                 C   s   t | j| j| j| jdS )N)ro   rp   rq   rx   )r:   ro   rp   rq   rx   r\   r.   r.   r/   r   !  s   z BlockMetadataWithSchema.metadatar7   )r)   r*   r+   r;   r   rA   rt   r:   r]   Blockr   propertyr   r   r.   r.   r~   r/   r     s   
 

r   c                   @   s  e Zd ZdZdefddZdedee fddZ	dxd	ed
edede
fddZdee de
fddZdee de
fddZdeee  de
fddZdeeef de
fddZdedede
fddZdee de
fddZdyd!d"Z	#dzdeeeee f  deejeeejf f fd$d%Zd{d'd(Zde
fd)d*Zde
fd+d,Zd-ee defd.d/Z defd0d1Z!dee"d2f fd3d4Z#	#	#d|d5eee  d6ee$ de%fd7d8Z&d}d;d<Z'e(d~d>d?Z)e*	#dzd@edAee+ de
fdBdCZ,e*d@eee-f de
fdDdEZ.e*d@eee-f de
fdFdGZ/e(dHe
ddIfdJdKZ0dLedMdNdd:fdOdPZ1dxdQedRedee2 fdSdTZ3dQedRedee2 fdUdVZ4dQedRedee2 fdWdXZ5dQedRedee2 fdYdZZ6dQedRedee2 fd[d\Z7	#dzdQedRed]ee2 dee2 fd^d_Z8dd`daZ9dbee dMdNded: fdcddZ:dedNdfe;dg de
fdhdiZ<e(djed: dMdNde;e
e=f fdkdlZ>e(	mddjee
 dMdNdfe;dg dnede;e
e=f f
dodpZ?dbee;e-  dMdNdee
 fdqdrZ@de+fdsdtZAduee dejfdvdwZBd#S )r   a  Provides accessor methods for a specific block.

    Ideally, we wouldn't need a separate accessor classes for blocks. However,
    this is needed if we want to support storing ``pyarrow.Table`` directly
    as a top-level Ray object, without a wrapping class (issue #17186).
    r6   c                 C      t )z2Return the number of rows contained in this block.NotImplementedErrorr\   r.   r.   r/   ro   4     zBlockAccessor.num_rowspublic_row_formatc                 C   r   )zIterate over the rows of this block.

        Args:
            public_row_format: Whether to cast rows into the public Dict row
                format (this incurs extra copy conversions).
        r   )r8   r   r.   r.   r/   	iter_rows8     zBlockAccessor.iter_rowsFstartendcopyc                 C   r   )a(  Return a slice of this block.

        Args:
            start: The starting index of the slice (inclusive).
            end: The ending index of the slice (exclusive).
            copy: Whether to perform a data copy for the slice.

        Returns:
            The sliced block result.
        r   )r8   r   r   r   r.   r.   r/   sliceA  s   zBlockAccessor.sliceindicesc                 C   r   )zReturn a new block containing the provided row indices.

        Args:
            indices: The row indices to return.

        Returns:
            A new block containing the provided row indices.
        r   )r8   r   r.   r.   r/   takeN  s   	zBlockAccessor.takecolumnsc                 C   r   )z<Return a new block with the list of provided columns droppedr   r8   r   r.   r.   r/   dropY  r   zBlockAccessor.dropc                 C   r   )z3Return a new block containing the provided columns.r   r   r.   r.   r/   select]  r   zBlockAccessor.selectcolumns_renamec                 C   r   )z0Return the block reflecting the renamed columns.r   )r8   r   r.   r.   r/   rename_columnsa  r   zBlockAccessor.rename_columnscolumn_namecolumn_datac                 C      t  )aq  
        Upserts a column into the block. If the column already exists, it will be replaced.

        Args:
            column_name: The name of the column to upsert.
            column_data: The data to upsert into the column. (Arrow Array/ChunkedArray for Arrow blocks, Series or array-like for Pandas blocks)

        Returns:
            The updated block.
        r   )r8   r   r   r.   r.   r/   upsert_columne  s   zBlockAccessor.upsert_columnrandom_seedc                 C   r   )zRandomly shuffle this block.r   )r8   r   r.   r.   r/   random_shuffler  r   zBlockAccessor.random_shuffler!   c                 C   r   )z+Convert this block into a Pandas dataframe.r   r\   r.   r.   r/   	to_pandasv  r   zBlockAccessor.to_pandasNc                 C   r   )zConvert this block (or columns of block) into a NumPy ndarray.

        Args:
            columns: Name of columns to convert, or None if converting all columns.
        r   r   r.   r.   r/   to_numpyz     zBlockAccessor.to_numpyr    c                 C   r   )z'Convert this block into an Arrow table.r   r\   r.   r.   r/   to_arrow  r   zBlockAccessor.to_arrowc                 C   r   )z/Return the base block that this accessor wraps.r   r\   r.   r.   r/   to_block  r   zBlockAccessor.to_blockc                 C   s   |   S )z1Return the default data format for this accessor.)r   r\   r.   r.   r/   
to_default  s   zBlockAccessor.to_defaultbatch_formatc                 C   sl   |du r|   S |dks|dkr|  S |dkr|  S |dkr$|  S |dkr,|  S tdt d| )	zConvert this block into the provided batch format.

        Args:
            batch_format: The batch format to convert this block to.

        Returns:
            This block formatted as the provided batch format.
        NrD   nativer(   r1   r2   z The batch format must be one of z, got: )r   r   r   r   r   rG   rF   )r8   r   r.   r.   r/   to_batch_format  s   	zBlockAccessor.to_batch_formatc                 C   r   )z3Return the approximate size in bytes of this block.r   r\   r.   r.   r/   rp     r   zBlockAccessor.size_bytesr"   c                 C   r   )z7Return the Python type or pyarrow schema of this block.r   r\   r.   r.   r/   r;     r   zBlockAccessor.schemarx   rq   c                 C   s   t |  |  ||dS )z)Create a metadata object from this block.)ro   rp   rx   rq   )r:   ro   rp   )r8   rx   rq   r.   r.   r/   r     s   zBlockAccessor.get_metadataotherr   c                 C   r   )z<Zip this block with another block of the same type and size.r   )r8   r   r.   r.   r/   zip  r   zBlockAccessor.zipr   c                   C   r   )z%Create a builder for this block type.r   r.   r.   r.   r/   r`     s   zBlockAccessor.builderbatch
block_typec              
   C   s   t |tjrtdt| dt |tjjrc|du s |tj	krWddl
m} z| |W S  |yV } ztdrAtd| d |du rP| |W  Y d}~S |d}~ww |tjks^J | |S |S )	z-Create a block from user-facing data formats.Error validating z: Standalone numpy arrays are not allowed in Ray 2.5. Return a dict of field -> array, e.g., `{'data': array}` instead of `array`.Nr   )ArrowConversionError!_fallback_to_pandas_block_warningz)Failed to convert batch to Arrow due to: z; falling back to Pandas block)r=   npndarrayrG   r   collectionsabcMappingr&   r,   $ray.air.util.tensor_extensions.arrowr   batch_to_arrow_blockr   loggerwarningbatch_to_pandas_blockr-   )clsr   r   r   er.   r.   r/   batch_to_block  s,   

zBlockAccessor.batch_to_blockc                 C      ddl m} ||S )z4Create an Arrow block from user-facing data formats.r   )ArrowBlockBuilder)ray.data._internal.arrow_blockr   _table_from_pydict)r   r   r   r.   r.   r/   r        
z"BlockAccessor.batch_to_arrow_blockc                 C   r   )z4Create a Pandas block from user-facing data formats.r   )PandasBlockBuilder)r<   r   r   )r   r   r   r.   r.   r/   r     r   z#BlockAccessor.batch_to_pandas_blockr   zBlockAccessor[T]c                 C   s   t   ddl}ddl}t| |j|jfrddlm} || S t| |jr.ddl	m
} || S t| tr>ddlm} || S t| trMtdt|  dtd| t| )z,Create a block accessor for the given block.r   N)ArrowBlockAccessor)PandasBlockAccessorr   z: Standalone Python objects are not allowed in Ray 2.5. To use Python objects in a dataset, wrap them in a dict of numpy arrays, e.g., return `{'item': batch}` instead of just `batch`.zNot a block type: {} ({}))r   r(   r1   r=   TableRecordBatchr   r   	DataFramer<   r   bytes
from_byteslistrG   r   	TypeErrorformattype)r   r(   r1   r   r   r.   r.   r/   r     s"   


zBlockAccessor.for_block	n_samplessort_keyr   c                 C   r   )z0Return a random sample of items from this block.r   )r8   r   r   r.   r.   r/   sample  r   zBlockAccessor.sampleonignore_nullsc                 C   r   )z=Returns a count of the distinct values in the provided columnr   r8   r   r   r.   r.   r/   count  r   zBlockAccessor.countc                 C   r   )z2Returns a sum of the values in the provided columnr   r   r.   r.   r/   sum!  r   zBlockAccessor.sumc                 C   r   )z2Returns a min of the values in the provided columnr   r   r.   r.   r/   min%  r   zBlockAccessor.minc                 C   r   )z2Returns a max of the values in the provided columnr   r   r.   r.   r/   max)  r   zBlockAccessor.maxc                 C   r   )z3Returns a mean of the values in the provided columnr   r   r.   r.   r/   mean-  r   zBlockAccessor.meanr   c                 C   r   )zBReturns a sum of diffs (from mean) squared for the provided columnr   )r8   r   r   r   r.   r.   r/   sum_of_squared_diffs_from_mean1  r   z,BlockAccessor.sum_of_squared_diffs_from_meanc                 C   r   )z9Returns new block sorted according to provided `sort_key`r   )r8   r   r.   r.   r/   sort:  r   zBlockAccessor.sort
boundariesc                 C   r   )z1Return a list of sorted partitions of this block.r   r8   r   r   r.   r.   r/   sort_and_partition>  s   z BlockAccessor.sort_and_partitionrz   aggsr   c                 C   r   )z3Combine rows with the same key into an accumulator.r   )r8   rz   r   r.   r.   r/   
_aggregateD  r   zBlockAccessor._aggregateblocksc                 C   r   )z9Return a sorted block by merging a list of sorted blocks.r   )r   r   r.   r.   r/   merge_sorted_blocksH  s   z!BlockAccessor.merge_sorted_blocksTfinalizec                 C   r   )z/Aggregate partially combined and sorted blocks.r   )r   r   r   r   r.   r.   r/   _combine_aggregated_blocksO  r   z(BlockAccessor._combine_aggregated_blocksc                 C   r   )zNOTE: PLEASE READ CAREFULLY

        Returns dataset partitioned using list of boundaries

        This method requires that
            - Block being sorted (according to `sort_key`)
            - Boundaries is a sorted list of tuples
        r   r   r.   r.   r/   _find_partitions_sortedY  s   z%BlockAccessor._find_partitions_sortedc                 C   r   )z$Return the block type of this block.r   r\   r.   r.   r/   r   h  r   zBlockAccessor.block_typekeysc                 C   sL   |   dkrtjg tjdS |std|   gS | |}tt| S )a  
        NOTE: THIS METHOD ASSUMES THAT PROVIDED BLOCK IS ALREADY SORTED

        Compute boundaries of the groups within a block based on provided
        key (a column or a list of columns)

        NOTE: In each column, NaNs/None are considered to be the same group.

        Args:
            block: sorted block for which grouping of rows will be determined
                    based on provided key
            keys: list of columns determining the key for every row based on
                    which the block will be grouped

        Returns:
            A list of starting indices of each group and an end index of the last
            group, i.e., there are ``num_groups + 1`` entries and the first and last
            entries are 0 and ``len(array)`` respectively.
        r   )dtype)ro   r   arrayint32r   "_get_group_boundaries_sorted_numpyr   values)r8   r   projected_blockr.   r.   r/   _get_group_boundaries_sortedl  s   
z*BlockAccessor._get_group_boundaries_sortedF)r6   r!   r7   )r6   r    )NN)r   r   r6   r   )r6   r   )r   r   r6   r   )T)Cr)   r*   r+   rd   rr   ro   boolr	   r   r   r   r   r
   r   r   r   r   r   r   r   BlockColumnr   r   r   r   r   r   r   r   r   r   	DataBatchr   rp   r   r;   rO   r:   r   r   re   r`   classmethodr&   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r.   r.   r.   r/   r   +  s    	






%

	


	

r   betac                
   @   s  e Zd ZdZdefddZdddeded	ee fd
dZ	dddeded	ee fddZ
dddeded	ee fddZdddeded	ee fddZdddeded	ee fddZdddededed	ee fddZd	efddZd	eeef fddZd	efddZd	efddZd	efdd Zd	efd!d"Zd#dd$ded%ee ded	ee fd&d'Zd	ee fd(d)Zd3d+ed	ejfd,d-Zd	eee d.f fd/d0Z e!ded	d fd1d2Z"d#S )4BlockColumnAccessorzbProvides vendor-neutral interface to apply common operations
    to block's (Pandas/Arrow) columnscolc                 C   s
   || _ d S r7   )_column)r8   r   r.   r.   r/   r]     s   
zBlockColumnAccessor.__init__T)as_pyr   r   r6   c                C   r   )z4Returns a count of the distinct values in the columnr   r8   r   r   r.   r.   r/   r     ra   zBlockColumnAccessor.countc                C   r_   )z)Returns a sum of the values in the columnr   r   r.   r.   r/   r     ra   zBlockColumnAccessor.sumc                C   r   )z)Returns a min of the values in the columnr   r   r.   r.   r/   r     ra   zBlockColumnAccessor.minc                C   r   )z)Returns a max of the values in the columnr   r   r.   r.   r/   r     ra   zBlockColumnAccessor.maxc                C   r   )z*Returns a mean of the values in the columnr   r   r.   r.   r/   r     ra   zBlockColumnAccessor.meanqc                C   r   )z.Returns requested quantile of the given columnr   )r8   r   r   r   r.   r.   r/   quantile  s   zBlockColumnAccessor.quantilec                 C   r   )zBReturns new column holding only distinct values of the current oner   r\   r.   r.   r/   unique  ra   zBlockColumnAccessor.uniquec                 C   r   r7   r   r\   r.   r.   r/   value_counts     z BlockColumnAccessor.value_countsc                 C   r   )a  
        Computes a 64-bit hash value for each row in the column.

        Provides a unified hashing method across supported backends.
        Handles complex types like lists or nested structures by producing a single hash per row.
        These hashes are useful for downstream operations such as deduplication, grouping, or partitioning.

        Internally, Polars is used to compute row-level hashes even when the original column
        is backed by Pandas or PyArrow.

        Returns:
            A column of 64-bit integer hashes, returned in the same format as the
            underlying backend (e.g., Pandas Series or PyArrow Array).
        r   r\   r.   r.   r/   hash  s   zBlockColumnAccessor.hashc                 C   r   )z;Flattens nested lists merging them into top-level containerr   r\   r.   r.   r/   flatten  s   zBlockColumnAccessor.flattenc                 C   r   r7   r   r\   r.   r.   r/   dropna  r  zBlockColumnAccessor.dropnac                 C   r   )z
        Checks whether the column is composed of list-like elements.

        Returns:
            True if the column is made up of list-like values; False otherwise.
        r   r\   r.   r.   r/   is_composed_of_lists  s   z(BlockColumnAccessor.is_composed_of_listsN)r   r   r   c                C   r   )z9Returns a sum of diffs (from mean) squared for the columnr   )r8   r   r   r   r.   r.   r/   r     s   z2BlockColumnAccessor.sum_of_squared_diffs_from_meanc                 C   r   )z8Converts block column to a list of Python native objectsr   r\   r.   r.   r/   	to_pylist  ra   zBlockColumnAccessor.to_pylistFzero_copy_onlyc                 C   r   )z#Converts underlying column to Numpyr   )r8   r
  r.   r.   r/   r     ra   zBlockColumnAccessor.to_numpyr$   c                 C   r   )zAConverts block column into a representation compatible with Arrowr   r\   r.   r.   r/   _as_arrow_compatible  ra   z(BlockColumnAccessor._as_arrow_compatiblec                 C   sn   t   ddl}t| tjst| tjrddlm} || S t| |jr-ddl	m
} || S tdt|  d)z-Create a column accessor for the given columnr   N)ArrowBlockColumnAccessor)PandasBlockColumnAccessorzEExpected either a pandas.Series or pyarrow.Array (ChunkedArray) (got ))r   r(   r=   paArrayChunkedArrayr   r  Seriesr<   r  r   r   )r   pdr  r  r.   r.   r/   
for_column  s   zBlockColumnAccessor.for_columnr   )#r)   r*   r+   rd   r   r]   r   r   r   r   r   r   r   r   floatr  r  r   r   r
   r  r  r  r  r  r   r   r	  r   r   r   r   r  re   r  r.   r.   r.   r/   r     sP         


r   r   c                 C   s@  g }g }g }| D ]0}t |jt jrt |d r|| qt |jt js3|d d u r3|| q|| qg }t|dkrR|t dd |D jdd t|dkri|t dd |D jdd t|dkr|t dd |D jdd t 	dgt 
|jdd d d t| d ggt}|S )	Nr   c                 S   s$   g | ]}|d d |dd kqS    Nr  r.   rL   arrr.   r.   r/   rN     s   $ z6_get_group_boundaries_sorted_numpy.<locals>.<listcomp>)axisc              	   S   sH   g | ] }|d d |dd kt |d d t |dd B @ qS r  )r   isfiniter  r.   r.   r/   rN   "  s    "c              	   S   sN   g | ]#}|d d |dd kt |d d dt |dd d@  @ qS r  )r   equalr  r.   r.   r/   rN   /  s    (r  )r   
issubdtyper   numberisnanappendlenvstackanyhstackcolumn_stacknonzeroastyperr   )r   general_arraysnum_arrays_with_nancat_arrays_with_noner  diffsr   r.   r.   r/   r     sV   	r   )Sr   loggingrf   dataclassesr   r   enumr   typingr   r   r   r   r	   r
   r   r   r   r   r   r2   r   r1   r  rU   ray.data._internal.utilr   r   	ray.typesr   ray.utilr   ray.util.annotationsr   r(    ray.data._internal.block_builderr   r<   r   2ray.data._internal.planner.exchange.sort_task_specr   ray.data.aggregater   r   r   r   r   r   r   rA   r   BatchColumn	getLoggerr)   r   r&   r   r0   r   r   DataBatchColumnCallableClassr4   UserDefinedFunctionBlockPartitionBlockPartitionMetadatarF   rE   r   r?   rB   rH   rK   rO   r^   rJ   r|   r:   r   r   r   r   r   r.   r.   r.   r/   <module>   s    4
	
$  dv