o
    bic                     @   s  d dl Z d dlZd dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
mZmZmZmZmZmZmZmZmZ d dlZd dlZd dlZd dlmZ d dlmZmZ d dlmZ d dl m!Z! d d	l"m#Z# e	rd dl$Z$d dlZd d
l%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, edddZ-edddZ.edZ/edZ0ed Z1ee2ddf Z3ed Z4e5e6Z7e#G dd deZ8eddee9ej:f f Z;ee4ej:f Z<e2Z=G dd dee-e.f Z>eee-ge.f ee-gee. f df Z?eeee1 df  Z@ed ZAg d ZBd!ZCd"ee9 d#e9fd$d%ZDe#d&ed d#ed' fd(d)ZEe#G d*d+ d+ZFG d,d- d-ZGe#eG d.d' d'ZHd/d0 eeHD ZIe#eG d1d deHZJe#d2d3eG d4d5 d5eJZKe#G d6d7 d7ZLe#d8d3G d9d: d:ZMd;eNej: d#ej:fd<d=ZOdS )>    N)	dataclassfields)Enum)TYPE_CHECKINGAnyCallableDictIteratorListOptionalProtocolTupleTypeVarUnion)ArrowConversionError)_check_pyarrow_version_truncated_repr)	ObjectRef)log_once)DeveloperAPI)BlockBuilder)PandasBlockSchema)SortKey)AggregateFnTT)contravariantU)	covariantKeyTypeAggType)pyarrow.Tablepandas.DataFramer   pyarrow.lib.Schema)zpyarrow.ChunkedArraypyarrow.Arrayzpandas.Seriesc                   @   s   e Zd ZdZdZdS )	BlockTypearrowpandasN)__name__
__module____qualname__ARROWPANDAS r,   r,   B/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/block.pyr$   @   s    r$   r    r!   c                   @   s*   e Zd Zdedeeee f fddZdS )_CallableClassProtocol_CallableClassProtocol__argreturnc                 C   s   d S Nr,   )selfr/   r,   r,   r-   __call__T   s   z_CallableClassProtocol.__call__N)r'   r(   r)   r   r   r   r	   r3   r,   r,   r,   r-   r.   S   s    "r.   BlockMetadata)r&   pyarrownumpyNr6   given_batch_formatr0   c                 C   s.   | dkrt } | tvrtd|  dt d| S )NdefaultzThe given batch format z isn't allowed (must be one of z).)DEFAULT_BATCH_FORMATVALID_BATCH_FORMATS
ValueError)r7   r,   r,   r-   _apply_batch_formatk   s   r<   metas
BlockStatsc                 C   s   dd | D S )Nc                 S   s   g | ]}|  qS r,   )to_stats).0mr,   r,   r-   
<listcomp>x   s    zto_stats.<locals>.<listcomp>r,   )r=   r,   r,   r-   r?   v   s   r?   c                   @   s.   e Zd ZdZdd ZedddZdd	 Zd
S )BlockExecStatsa  Execution stats for this block.

    Attributes:
        wall_time_s: The wall-clock time it took to compute this block.
        cpu_time_s: The CPU time it took to compute this block.
        node_id: A unique id for the node that computed this block.
        max_uss_bytes: An estimate of the maximum amount of physical memory that the
            process was using while computing this block.
    c                 C   s>   d | _ d | _d | _d| _d | _tj  | _	d| _
d | _d S )Nr   )start_time_s
end_time_swall_time_s
udf_time_s
cpu_time_srayruntime_contextget_runtime_contextget_node_idnode_idmax_uss_bytestask_idxr2   r,   r,   r-   __init__   s   
zBlockExecStats.__init__r0   _BlockExecStatsBuilderc                   C      t  S r1   )rR   r,   r,   r,   r-   builder      zBlockExecStats.builderc                 C   s   t | j| j| j| jdS )N)rF   rH   rG   rM   )reprrF   rH   rG   rM   rP   r,   r,   r-   __repr__   s   zBlockExecStats.__repr__N)r0   rR   )r'   r(   r)   __doc__rQ   staticmethodrT   rW   r,   r,   r,   r-   rC   {   s    

rC   c                   @   s"   e Zd ZdZdd Zd	ddZdS )
rR   zHelper class for building block stats.

    When this class is created, we record the start time. When build() is
    called, the time delta is saved as part of the stats.
    c                 C   s   t  | _t  | _d S r1   )timeperf_counter_start_timeprocess_time
_start_cpurP   r,   r,   r-   rQ      s   
z_BlockExecStatsBuilder.__init__r0   rC   c                 C   s@   t  }t  }t }| j|_||_|| j |_|| j |_	|S r1   )
rZ   r[   r]   rC   r\   rD   rE   rF   r^   rH   )r2   end_timeend_cpustatsr,   r,   r-   build   s   z_BlockExecStatsBuilder.buildN)r0   rC   )r'   r(   r)   rX   rQ   rb   r,   r,   r,   r-   rR      s    rR   c                   @   s>   e Zd ZU dZee ed< ee ed< ee ed< dd ZdS )r>   z#Statistics about the block producednum_rows
size_bytes
exec_statsc                 C   s"   | j d urt| j tsJ d S d S r1   )rd   
isinstanceintrP   r,   r,   r-   __post_init__   s   
zBlockStats.__post_init__N)	r'   r(   r)   rX   r   rg   __annotations__rC   rh   r,   r,   r,   r-   r>      s   
 c                 C   s   h | ]}|j qS r,   )name)r@   fr,   r,   r-   	<setcomp>   s    rl   c                       s:   e Zd ZU dZeee  ed< dd Z fddZ	  Z
S )r4   zMetadata about the block.input_filesc                    s   t di  fddtD S )Nc                    s   i | ]}|  |qS r,   )__getattribute__)r@   keyrP   r,   r-   
<dictcomp>   s    z*BlockMetadata.to_stats.<locals>.<dictcomp>r,   )r>   _BLOCK_STATS_FIELD_NAMESrP   r,   rP   r-   r?      s   zBlockMetadata.to_statsc                    s"   t    | jd u rg | _d S d S r1   )superrh   rm   rP   	__class__r,   r-   rh      s   


zBlockMetadata.__post_init__)r'   r(   r)   rX   r   r
   strri   r?   rh   __classcell__r,   r,   rs   r-   r4      s
   
 alpha)	stabilityc                       sn   e Zd ZU dZee ed< ddeded f fddZ	dde	ded	 d
d fddZ
ed
efddZ  ZS )BlockMetadataWithSchemaNschemametadataSchemac                    s&   t  j|j|j|j|jd || _d S )N)rm   rd   rc   re   )rr   rQ   rm   rd   rc   re   rz   )r2   r{   rz   rs   r,   r-   rQ      s   
z BlockMetadataWithSchema.__init__blockra   rC   r0   c                 C   s*   t | }|j|d}| }t||dS )N)re   )r{   rz   )BlockAccessor	for_blockget_metadatarz   ry   )r}   ra   accessormetarz   r,   r,   r-   
from_block   s   
z"BlockMetadataWithSchema.from_blockc                 C   s   t | j| j| j| jdS )N)rc   rd   re   rm   )r4   rc   rd   re   rm   rP   r,   r,   r-   r{      s   z BlockMetadataWithSchema.metadatar1   )r'   r(   r)   rz   r   r|   ri   r4   rQ   Blockr   propertyr{   rv   r,   r,   rs   r-   ry      s   
 

ry   c                   @   s  e Zd ZdZdefddZdedee fddZ	drd	ed
edede
fddZdee de
fddZdeee  de
fddZdeeef de
fddZdee de
fddZdsddZ	dtdeeeee f  deejeeejf f fddZdud!d"Zde
fd#d$Zde
fd%d&Zd'ee defd(d)Zdefd*d+Zdeed,f fd-d.Z 		dvd/eee  d0ee! de"fd1d2Z#dwd5d6Z$e%dxd8d9Z&e'	dtd:ed;ee( de
fd<d=Z)e'd:eee*f de
fd>d?Z+e'd:eee*f de
fd@dAZ,e%dBe
ddCfdDdEZ-dFedGdHdd4fdIdJZ.drdKedLedee/ fdMdNZ0dKedLedee/ fdOdPZ1dKedLedee/ fdQdRZ2dKedLedee/ fdSdTZ3dKedLedee/ fdUdVZ4	dtdKedLedWee/ dee/ fdXdYZ5dydZd[Z6d\ee dGdHded4 fd]d^Z7d_dHd`e8da de
fdbdcZ9e%dded4 dGdHde8e
e:f fdedfZ;e%	gdzddee
 dGdHd`e8da dhede8e
e:f f
didjZ<d\ee8e*  dGdHdee
 fdkdlZ=de(fdmdnZ>doee dejfdpdqZ?dS ){r~   a  Provides accessor methods for a specific block.

    Ideally, we wouldn't need a separate accessor classes for blocks. However,
    this is needed if we want to support storing ``pyarrow.Table`` directly
    as a top-level Ray object, without a wrapping class (issue #17186).
    r0   c                 C      t )z2Return the number of rows contained in this block.NotImplementedErrorrP   r,   r,   r-   rc        zBlockAccessor.num_rowspublic_row_formatc                 C   r   )zIterate over the rows of this block.

        Args:
            public_row_format: Whether to cast rows into the public Dict row
                format (this incurs extra copy conversions).
        r   )r2   r   r,   r,   r-   	iter_rows     zBlockAccessor.iter_rowsFstartendcopyc                 C   r   )a(  Return a slice of this block.

        Args:
            start: The starting index of the slice (inclusive).
            end: The ending index of the slice (exclusive).
            copy: Whether to perform a data copy for the slice.

        Returns:
            The sliced block result.
        r   )r2   r   r   r   r,   r,   r-   slice  s   zBlockAccessor.sliceindicesc                 C   r   )zReturn a new block containing the provided row indices.

        Args:
            indices: The row indices to return.

        Returns:
            A new block containing the provided row indices.
        r   )r2   r   r,   r,   r-   take)  s   	zBlockAccessor.takecolumnsc                 C   r   )z3Return a new block containing the provided columns.r   r2   r   r,   r,   r-   select4  r   zBlockAccessor.selectcolumns_renamec                 C   r   )z0Return the block reflecting the renamed columns.r   )r2   r   r,   r,   r-   rename_columns8  r   zBlockAccessor.rename_columnsrandom_seedc                 C   r   )zRandomly shuffle this block.r   )r2   r   r,   r,   r-   random_shuffle<  r   zBlockAccessor.random_shuffler!   c                 C   r   )z+Convert this block into a Pandas dataframe.r   rP   r,   r,   r-   	to_pandas@  r   zBlockAccessor.to_pandasNc                 C   r   )zConvert this block (or columns of block) into a NumPy ndarray.

        Args:
            columns: Name of columns to convert, or None if converting all columns.
        r   r   r,   r,   r-   to_numpyD     zBlockAccessor.to_numpyr    c                 C   r   )z'Convert this block into an Arrow table.r   rP   r,   r,   r-   to_arrowN  r   zBlockAccessor.to_arrowc                 C   r   )z/Return the base block that this accessor wraps.r   rP   r,   r,   r-   to_blockR  r   zBlockAccessor.to_blockc                 C   s   |   S )z1Return the default data format for this accessor.)r   rP   r,   r,   r-   
to_defaultV  s   zBlockAccessor.to_defaultbatch_formatc                 C   sl   |du r|   S |dks|dkr|  S |dkr|  S |dkr$|  S |dkr,|  S tdt d| )	zConvert this block into the provided batch format.

        Args:
            batch_format: The batch format to convert this block to.

        Returns:
            This block formatted as the provided batch format.
        Nr8   nativer&   r5   r6   z The batch format must be one of z, got: )r   r   r   r   r   r;   r:   )r2   r   r,   r,   r-   to_batch_formatZ  s   	zBlockAccessor.to_batch_formatc                 C   r   )z3Return the approximate size in bytes of this block.r   rP   r,   r,   r-   rd   s  r   zBlockAccessor.size_bytesr"   c                 C   r   )z7Return the Python type or pyarrow schema of this block.r   rP   r,   r,   r-   rz   w  r   zBlockAccessor.schemarm   re   c                 C   s   t |  |  ||dS )z)Create a metadata object from this block.)rc   rd   rm   re   )r4   rc   rd   )r2   rm   re   r,   r,   r-   r   {  s   zBlockAccessor.get_metadataotherr   c                 C   r   )z<Zip this block with another block of the same type and size.r   )r2   r   r,   r,   r-   zip  r   zBlockAccessor.zipr   c                   C   r   )z%Create a builder for this block type.r   r,   r,   r,   r-   rT     s   zBlockAccessor.builderbatch
block_typec              
   C   s   t |tjrtdt| dt |tjjr]|du s |tj	krQz| 
|W S  tyP } ztdr;td| d |du rJ| |W  Y d}~S |d}~ww |tjksXJ | |S |S )z-Create a block from user-facing data formats.Error validating z: Standalone numpy arrays are not allowed in Ray 2.5. Return a dict of field -> array, e.g., `{'data': array}` instead of `array`.N!_fallback_to_pandas_block_warningz)Failed to convert batch to Arrow due to: z; falling back to Pandas block)rf   npndarrayr;   r   collectionsabcMappingr$   r*   batch_to_arrow_blockr   r   loggerwarningbatch_to_pandas_blockr+   )clsr   r   er,   r,   r-   batch_to_block  s*   

zBlockAccessor.batch_to_blockc                 C      ddl m} ||S )z4Create an Arrow block from user-facing data formats.r   )ArrowBlockBuilder)ray.data._internal.arrow_blockr   _table_from_pydict)r   r   r   r,   r,   r-   r        
z"BlockAccessor.batch_to_arrow_blockc                 C   r   )z4Create a Pandas block from user-facing data formats.r   )PandasBlockBuilder)ray.data._internal.pandas_blockr   r   )r   r   r   r,   r,   r-   r     r   z#BlockAccessor.batch_to_pandas_blockr}   zBlockAccessor[T]c                 C   s   t   ddl}ddl}t| |jrddlm} || S t| |jr+ddlm	} || S t| t
r;ddlm} || S t| trJtdt|  dtd| t| )z,Create a block accessor for the given block.r   N)ArrowBlockAccessor)PandasBlockAccessorr   z: Standalone Python objects are not allowed in Ray 2.5. To use Python objects in a dataset, wrap them in a dict of numpy arrays, e.g., return `{'item': batch}` instead of just `batch`.zNot a block type: {} ({}))r   r&   r5   rf   Tabler   r   	DataFramer   r   bytes
from_byteslistr;   r   	TypeErrorformattype)r}   r&   r5   r   r   r,   r,   r-   r     s"   


zBlockAccessor.for_block	n_samplessort_keyr   c                 C   r   )z0Return a random sample of items from this block.r   )r2   r   r   r,   r,   r-   sample  r   zBlockAccessor.sampleonignore_nullsc                 C   r   )z=Returns a count of the distinct values in the provided columnr   r2   r   r   r,   r,   r-   count  r   zBlockAccessor.countc                 C   r   )z2Returns a sum of the values in the provided columnr   r   r,   r,   r-   sum  r   zBlockAccessor.sumc                 C   r   )z2Returns a min of the values in the provided columnr   r   r,   r,   r-   min  r   zBlockAccessor.minc                 C   r   )z2Returns a max of the values in the provided columnr   r   r,   r,   r-   max  r   zBlockAccessor.maxc                 C   r   )z3Returns a mean of the values in the provided columnr   r   r,   r,   r-   mean  r   zBlockAccessor.meanr   c                 C   r   )zBReturns a sum of diffs (from mean) squared for the provided columnr   )r2   r   r   r   r,   r,   r-   sum_of_squared_diffs_from_mean  r   z,BlockAccessor.sum_of_squared_diffs_from_meanc                 C   r   )z9Returns new block sorted according to provided `sort_key`r   )r2   r   r,   r,   r-   sort  r   zBlockAccessor.sort
boundariesc                 C   r   )z1Return a list of sorted partitions of this block.r   r2   r   r   r,   r,   r-   sort_and_partition  s   z BlockAccessor.sort_and_partitionro   aggsr   c                 C   r   )z3Combine rows with the same key into an accumulator.r   )r2   ro   r   r,   r,   r-   
_aggregate  r   zBlockAccessor._aggregateblocksc                 C   r   )z9Return a sorted block by merging a list of sorted blocks.r   )r   r   r,   r,   r-   merge_sorted_blocks  s   z!BlockAccessor.merge_sorted_blocksTfinalizec                 C   r   )z/Aggregate partially combined and sorted blocks.r   )r   r   r   r   r,   r,   r-   _combine_aggregated_blocks  r   z(BlockAccessor._combine_aggregated_blocksc                 C   r   )zNOTE: PLEASE READ CAREFULLY

        Returns dataset partitioned using list of boundaries

        This method requires that
            - Block being sorted (according to `sort_key`)
            - Boundaries is a sorted list of tuples
        r   r   r,   r,   r-   _find_partitions_sorted!  s   z%BlockAccessor._find_partitions_sortedc                 C   r   )z$Return the block type of this block.r   rP   r,   r,   r-   r   0  r   zBlockAccessor.block_typekeysc                 C   sL   |   dkrtjg tjdS |std|   gS | |}tt| S )a  
        NOTE: THIS METHOD ASSUMES THAT PROVIDED BLOCK IS ALREADY SORTED

        Compute boundaries of the groups within a block based on provided
        key (a column or a list of columns)

        NOTE: In each column, NaNs/None are considered to be the same group.

        Args:
            block: sorted block for which grouping of rows will be determined
                    based on provided key
            keys: list of columns determining the key for every row based on
                    which the block will be grouped

        Returns:
            A list of starting indices of each group and an end index of the last
            group, i.e., there are ``num_groups + 1`` entries and the first and last
            entries are 0 and ``len(array)`` respectively.
        r   )dtype)rc   r   arrayint32r   "_get_group_boundaries_sorted_numpyr   values)r2   r   projected_blockr,   r,   r-   _get_group_boundaries_sorted4  s   
z*BlockAccessor._get_group_boundaries_sortedF)r0   r!   r1   )r0   r    )NN)r   r   r0   r   )r0   r   )r   r   r0   r   )T)@r'   r(   r)   rX   rg   rc   boolr	   r   r   r   r   r
   r   r   ru   r   r   r   r   r   r   r   r   r   r   r   r   	DataBatchr   rd   r   rz   rC   r4   r   r   rY   rT   classmethodr$   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   ry   r   r   r   r   r   r,   r,   r,   r-   r~     s    	






#

	


	

r~   betac                
   @   s  e Zd ZdZdefddZdddeded	ee fd
dZ	dddeded	ee fddZ
dddeded	ee fddZdddeded	ee fddZdddeded	ee fddZdddededed	ee fddZd	efddZd	efddZddddedee ded	ee fddZd	ee fd d!Zd+d#ed	ejfd$d%Zd	eee d&f fd'd(Zeded	d fd)d*ZdS ),BlockColumnAccessorzbProvides vendor-neutral interface to apply common operations
    to block's (Pandas/Arrow) columnscolc                 C   s
   || _ d S r1   )_column)r2   r   r,   r,   r-   rQ   [  s   
zBlockColumnAccessor.__init__T)as_pyr   r   r0   c                C      t  )z4Returns a count of the distinct values in the columnr   r2   r   r   r,   r,   r-   r   ^  rU   zBlockColumnAccessor.countc                C   rS   )z)Returns a sum of the values in the columnr   r   r,   r,   r-   r   b  rU   zBlockColumnAccessor.sumc                C   r   )z)Returns a min of the values in the columnr   r   r,   r,   r-   r   f  rU   zBlockColumnAccessor.minc                C   r   )z)Returns a max of the values in the columnr   r   r,   r,   r-   r   j  rU   zBlockColumnAccessor.maxc                C   r   )z*Returns a mean of the values in the columnr   r   r,   r,   r-   r   n  rU   zBlockColumnAccessor.meanqc                C   r   )z.Returns requested quantile of the given columnr   )r2   r   r   r   r,   r,   r-   quantiler  s   zBlockColumnAccessor.quantilec                 C   r   )zBReturns new column holding only distinct values of the current oner   rP   r,   r,   r-   uniquex  rU   zBlockColumnAccessor.uniquec                 C   r   )z;Flattens nested lists merging them into top-level containerr   rP   r,   r,   r-   flatten|  s   zBlockColumnAccessor.flattenN)r   r   r   c                C   r   )z9Returns a sum of diffs (from mean) squared for the columnr   )r2   r   r   r   r,   r,   r-   r     s   z2BlockColumnAccessor.sum_of_squared_diffs_from_meanc                 C   r   )z8Converts block column to a list of Python native objectsr   rP   r,   r,   r-   	to_pylist  rU   zBlockColumnAccessor.to_pylistFzero_copy_onlyc                 C   r   )z#Converts underlying column to Numpyr   )r2   r   r,   r,   r-   r     rU   zBlockColumnAccessor.to_numpyr#   c                 C   r   )zAConverts block column into a representation compatible with Arrowr   rP   r,   r,   r-   _as_arrow_compatible  rU   z(BlockColumnAccessor._as_arrow_compatiblec                 C   sn   t   ddl}t| tjst| tjrddlm} || S t| |jr-ddl	m
} || S tdt|  d)z-Create a column accessor for the given columnr   N)ArrowBlockColumnAccessor)PandasBlockColumnAccessorzEExpected either a pandas.Series or pyarrow.Array (ChunkedArray) (got ))r   r&   rf   paArrayChunkedArrayr   r   Seriesr   r   r   r   )r   pdr   r   r,   r,   r-   
for_column  s   zBlockColumnAccessor.for_columnr   )r'   r(   r)   rX   BlockColumnrQ   r   r   r   r   r   r   r   r   floatr   r   r   r   r
   r   r   r   r   r   r   r   rY   r   r,   r,   r,   r-   r   V  sH         
	

r   r   c                 C   s@  g }g }g }| D ]0}t |jt jrt |d r|| qt |jt js3|d d u r3|| q|| qg }t|dkrR|t dd |D jdd t|dkri|t dd |D jdd t|dkr|t dd |D jdd t 	dgt 
|jdd d d t| d ggt}|S )	Nr   c                 S   s$   g | ]}|d d |dd kqS    Nr  r,   r@   arrr,   r,   r-   rB     s   $ z6_get_group_boundaries_sorted_numpy.<locals>.<listcomp>)axisc              	   S   sH   g | ] }|d d |dd kt |d d t |dd B @ qS r  )r   isfiniter  r,   r,   r-   rB     s    "c              	   S   sN   g | ]#}|d d |dd kt |d d dt |dd d@  @ qS r  )r   equalr  r,   r,   r-   rB     s    (r  )r   
issubdtyper   numberisnanappendlenvstackanyhstackcolumn_stacknonzeroastyperg   )r   general_arraysnum_arrays_with_nancat_arrays_with_noner  diffsr   r,   r,   r-   r     sV   	r   )Pr   loggingrZ   dataclassesr   r   enumr   typingr   r   r   r   r	   r
   r   r   r   r   r   r6   r   r5   r   rI   $ray.air.util.tensor_extensions.arrowr   ray.data._internal.utilr   r   	ray.typesr   ray.utilr   ray.util.annotationsr   r&    ray.data._internal.block_builderr   r   r   2ray.data._internal.planner.exchange.sort_task_specr   ray.data.aggregater   r   r   r   r   r   r   r|   r   	getLoggerr'   r   r$   ru   r   r   DataBatchColumnCallableClassr.   UserDefinedFunctionBlockPartitionBlockPartitionMetadatar:   r9   r<   r?   rC   rR   r>   rq   r4   ry   r~   r   r   r   r,   r,   r,   r-   <module>   s    4
$  QV