o
    biD                     @   s  d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZ d d	lm Z  d d
l!m"Z" d dl#m$Z$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1 zd dl2Z2W n e3y   dZ2Y nw erd dl4Z4d dl5m6Z6 edZ7e 8e9Z:edZ;ede<e0d Z=de1defddZ>de1defddZ?G dd de"Z@G dd de%ZAddde<de
e< fd d!ZBG d"d# d#e$ZCG d$d% d%e*ZDdS )&    N)TYPE_CHECKINGAnyCallableDictIteratorListMappingOptionalTupleTypeVarUnionparse)get_pyarrow_version)env_integer)TENSOR_COLUMN_NAME)convert_to_pyarrow_arraypyarrow_table_from_pydict)transform_polarstransform_pyarrow)shuffle)TableRow)TableBlockAccessorTableBlockBuilder)BlockBlockAccessorBlockColumnBlockColumnAccessorBlockExecStatsBlockMetadataWithSchema	BlockTypeU)DEFAULT_TARGET_MAX_BLOCK_SIZEDataContext)SortKeyTz13.0.0#RAY_DATA_ARROW_MAX_CHUNK_SIZE_BYTES    contextreturnc                 C      | j s| jr	tjS tjS N)
use_polarsuse_polars_sortr   sortr   r(    r0   R/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/arrow_block.pyget_sort_transformH      r2   c                 C   r*   r+   )r,   r-   r   concat_and_sortr   r/   r0   r0   r1   get_concat_and_sort_transformO   r3   r5   c                   @   sZ   e Zd ZdZdeeee f defddZde	fddZ
dd	 Zdeeef fd
dZdS )ArrowRowzA
    Row of a tabular Dataset backed by a Arrow Table block.
    keyr)   c                    sj   ddl m} | dtt dtf fdd}t|t}|r!|gn|}||}|d u r-d S |r3|d S |S )Nr   ) get_arrow_extension_tensor_typeskeysr)   c                    s    j j}t|| d jrt fdd| D S  j | }t|dkr(d S dd |jD }z
tdd |D W S  t	yE   | Y S w )Nr   c                    s   g | ]
}t j j|d qS ))col_name)ArrowBlockAccessor_build_tensor_row_row).0r7   selfr0   r1   
<listcomp>e   s    z:ArrowRow.__getitem__.<locals>.get_item.<locals>.<listcomp>c                 S   s   g | ]}|d  qS r   r0   r>   colr0   r0   r1   rA   o       c                 S   s   g | ]}|  qS r0   as_py)r>   itemr0   r0   r1   rA   r   rE   )
r=   schema
isinstancefieldtypetupleselectlencolumnsAttributeError)r9   rI   tableitemsr@   tensor_arrow_extension_typesr0   r1   get_item`   s    
z&ArrowRow.__getitem__.<locals>.get_item)ray.data.extensionsr8   r   strr   rJ   )r@   r7   r8   rV   is_single_itemr9   rS   r0   rT   r1   __getitem__[   s   
zArrowRow.__getitem__c                 c   s    | j jD ]}|V  qd S r+   )r=   column_names)r@   kr0   r0   r1   __iter__   s   zArrowRow.__iter__c                 C      | j jS r+   )r=   num_columnsr?   r0   r0   r1   __len__      zArrowRow.__len__c                 C   s   t |  S r+   )dictrS   r?   r0   r0   r1   	as_pydict      zArrowRow.as_pydictN)__name__
__module____qualname____doc__r   rX   r   r   rZ   r   r]   r`   r   rc   r0   r0   r0   r1   r6   V   s    *r6   c                       s   e Zd Z fddZedeeee f de	fddZ
edee	 de	fdd	Zedefd
dZedddZdefddZ  ZS )ArrowBlockBuilderc                    s&   t d u rtdt t jtf d S Nz+Run `pip install pyarrow` for Arrow support)pyarrowImportErrorsuper__init__Tablebytesr?   	__class__r0   r1   rn      s   zArrowBlockBuilder.__init__rP   r)   c                 C   s   t dd |  D S )Nc                 S   s   i | ]
\}}|t ||qS r0   )r   )r>   column_namecolumn_valuesr0   r0   r1   
<dictcomp>   s    
z8ArrowBlockBuilder._table_from_pydict.<locals>.<dictcomp>)r   rS   )rP   r0   r0   r1   _table_from_pydict   s
   z$ArrowBlockBuilder._table_from_pydicttablesc                 C   s   t j| ddS )NTpromote_types)r   concat)rw   r0   r0   r1   _concat_tables   s   z ArrowBlockBuilder._concat_tablesc                   C   s   dS )NFr0   r0   r0   r0   r1   _concat_would_copy   s   z$ArrowBlockBuilder._concat_would_copypyarrow.Tablec                   C   s   t i S r+   )r   r0   r0   r0   r1   _empty_table      zArrowBlockBuilder._empty_tablec                 C      t jS r+   r    ARROWr?   r0   r0   r1   
block_type      zArrowBlockBuilder.block_typer)   r}   )re   rf   rg   rn   staticmethodr   rX   r   r   r   rv   r{   boolr|   r~   r    r   __classcell__r0   r0   rq   r1   ri      s     ri   rR   r}   max_chunk_size_bytesc                 C   s0   | j dkrdS t| j | j }tdt|| S )aP  
    Calculate the max chunk size in rows for Arrow to Batches conversion in
    ArrowBlockAccessor.iter_rows().
    Args:
        table: The pyarrow table to calculate the max chunk size for.
        max_chunk_size_bytes: The max chunk size in bytes.
    Returns:
        The max chunk size in rows, or None if the table is empty.
    r   N   )nbytesintnum_rowsmax)rR   r   avg_row_sizer0   r0   r1   _get_max_chunk_size   s   
r   c                	       sB  e Zd ZeZdP fddZdee fddZded	e	de
fd
dZededd fddZeefdededejfddZdQdedededdfddZdee ddfddZdRddZdSd d!Z	"dTd#eeeee f  deejeeejf f fd$d%ZdUd&d'Zdefd(d)Zdefd*d+Z d,e!dd-fd.d/Z"ede#fd0d1Z$edUd2d3Z%d4eee d5d6f ddfd7d8Z&d#ee ddfd9d:Z'd;eeef ddfd<d=Z(d>ed?d@ddfdAdBZ)d?d@de
fdCdDZ*dEee+ d?d@ded- fdFdGZ,edHee
 d?d@de-e
e.f fdIdJZ/de0fdKdLZ1dMede2ee3ejf  fdNdOZ4  Z5S )Vr;   rR   r}   c                    s    t d u rtdt | d S rj   )rk   rl   rm   rn   )r@   rR   rq   r0   r1   rn      s   zArrowBlockAccessor.__init__r)   c                 C   r^   r+   )_tabler[   r?   r0   r0   r1   r[      ra   zArrowBlockAccessor.column_namesnamevaluec                 C   sj   || j jvsJ dd lm} t|tjr|j}nt|g}tj	t
| j |d}|||}| j ||S )Nr   )rL   )r   r[   pyarrow.computecomputerJ   rk   ScalarrL   
infer_typenullsrO   	fill_nullappend_column)r@   r   r   pcrL   arrayr0   r0   r1   fill_column   s   zArrowBlockAccessor.fill_columndatac                 C   s   t j|}| | S r+   )rk   ipcopen_streamread_all)clsr   readerr0   r0   r1   
from_bytes   s   zArrowBlockAccessor.from_bytesrowr:   c                 C   s   ddl m} | | d }t }|d u s||dkr6t|tjs!J |d u s+||dkr0| }n|j|}t|t	j
sBJ t||S )Nr   r   z8.0.0z9.0.0)packaging.versionr   r   rJ   rk   ExtensionScalarrG   rL   _extension_scalar_to_ndarraynpndarray)r   r:   parse_versionelementpyarrow_versionr0   r0   r1   r<      s   
z$ArrowBlockAccessor._build_tensor_rowFstartendcopyc                 C   s&   | j ||| }|rt||}|S r+   )r   slicer   combine_chunks)r@   r   r   r   viewr0   r0   r1   r      s   zArrowBlockAccessor.slicerandom_seedc                 C   s   t | j|S r+   )r   r   )r@   r   r0   r0   r1   random_shuffle  rd   z!ArrowBlockAccessor.random_shufflepyarrow.lib.Schemac                 C   r^   r+   )r   rI   r?   r0   r0   r1   rI     ra   zArrowBlockAccessor.schemapandas.DataFramec                 C   s0   ddl m} | j }t }|jr||}|S )Nr   ) _cast_tensor_columns_to_ndarrays)"ray.air.util.data_batch_conversionr   r   	to_pandasr#   get_currentenable_tensor_extension_casting)r@   r   dfctxr0   r0   r1   r     s   
zArrowBlockAccessor.to_pandasNrP   c           	      C   s   |d u r| j j}d}nt|trd}n|g}d}t| j j}|D ]}||vr0td| d| q g }|D ]}| j | }t|}|tj	|dd q5|rZt
|dksVJ |d S tt||S )NFTzCannot find column z, available columns: zero_copy_onlyr   r   )r   r[   rJ   listset
ValueErrorr   combine_chunked_arrayappendto_numpyrO   rb   zip)	r@   rP   should_be_single_ndarraycolumn_names_setcolumncolumn_values_ndarraysr:   rD   combined_arrayr0   r0   r1   r     s6   


zArrowBlockAccessor.to_numpyc                 C      | j S r+   )r   r?   r0   r0   r1   to_arrow7  r   zArrowBlockAccessor.to_arrowc                 C   s   | j jdkr
| j jS dS Nr   )r   r_   r   r?   r0   r0   r1   r   :  s   zArrowBlockAccessor.num_rowsc                 C   r^   r+   )r   r   r?   r0   r0   r1   
size_bytes?  ra   zArrowBlockAccessor.size_bytesaccr   c                 C   st   |   }|  }|jD ],}||}||jv r1d}|}||jv r/d||}|d7 }||jv s |}|||}q|S )Nr   z{}_{})r   r[   r   formatr   )r@   r   rsr:   rD   inew_namer0   r0   r1   _zipB  s   




zArrowBlockAccessor._zipc                   C   s   t  S r+   )ri   r0   r0   r0   r1   builderR  s   zArrowBlockAccessor.builderc                   C   s   t  S r+   )ri   r~   r0   r0   r0   r1   r~   V  r   zArrowBlockAccessor._empty_tableindicespyarrow.Arraypyarrow.ChunkedArrayc                 C   s   t | j|S )zSelect rows from the underlying table.

        This method is an alternative to pyarrow.Table.take(), which breaks for
        extension arrays.
        )r   
take_tabler   )r@   r   r0   r0   r1   takeZ  s   	zArrowBlockAccessor.takec                 C   s.   t dd |D std| d| j|S )Nc                 s   s    | ]}t |tV  qd S r+   )rJ   rX   rC   r0   r0   r1   	<genexpr>f  s    z,ArrowBlockAccessor.select.<locals>.<genexpr>zYColumns must be a list of column name strings when aggregating on Arrow blocks, but got: .)allr   r   rN   )r@   rP   r0   r0   r1   rN   e  s   zArrowBlockAccessor.selectcolumns_renamec                 C   s   | j |S r+   )r   rename_columns)r@   r   r0   r0   r1   r   m  rd   z!ArrowBlockAccessor.rename_columns	n_samplessort_keyr$   c                 C   s0   t t| jj|}| j| }t||S r+   )	randomsampleranger   r   rN   get_columnsr   r   )r@   r   r   r   rR   r0   r0   r1   _samplep  s   zArrowBlockAccessor._samplec                 C   sL   |  sJ d|   d| jjdkr|  S t }t|}|| j|S )Nz'Sorting columns couldn't be empty (got )r   )r   r   r   r~   r#   r   r2   )r@   r   r(   r.   r0   r0   r1   r.   u  s   zArrowBlockAccessor.sort
boundariesc                    sV     |}|jdkr fddtt|d D S t|dkr"|gS t|||S )Nr   c                    s   g | ]}   qS r0   )r~   )r>   _r?   r0   r1   rA     rE   z9ArrowBlockAccessor.sort_and_partition.<locals>.<listcomp>r   )r.   r   r   rO   r   	for_block_find_partitions_sorted)r@   r   r   rR   r0   r?   r1   sort_and_partition  s   


z%ArrowBlockAccessor.sort_and_partitionblocksc                 C   sj   t  }dd | D } t| dkrt }nt| tj} t	t
 }|| |dd}|tj|| dfS )Nc                 S   s   g | ]	}|j d kr|qS rB   )r   )r>   br0   r0   r1   rA     s    z:ArrowBlockAccessor.merge_sorted_blocks.<locals>.<listcomp>r   Trx   )stats)r   r   rO   r;   r~   r   normalize_block_typesr    r   r5   r#   r   r   
from_blockbuild)r   r   r   retr4   r0   r0   r1   merge_sorted_blocks  s   
z&ArrowBlockAccessor.merge_sorted_blocksc                 C   r   r+   r   r?   r0   r0   r1   r     r   zArrowBlockAccessor.block_typepublic_row_formatc                 c   sl    | j }|r%t| dst| j t| _|j| jdD ]	}| E d H  qd S t|  D ]}| 	|V  q+d S )N_max_chunk_size)max_chunksize)
r   hasattrr   ARROW_MAX_CHUNK_SIZE_BYTESr   
to_batches	to_pylistr   r   _get_row)r@   r   rR   batchr   r0   r0   r1   	iter_rows  s   
zArrowBlockAccessor.iter_rows)rR   r}   F)r)   r   )r)   r   r+   r   )6re   rf   rg   r6   ROW_TYPErn   r   rX   r[   r   r   r   classmethodrp   r   r   r   r   r   r<   r   r   r   r	   r   rI   r   r   r   r   r   r   r   r   r   ri   r   r~   r   rN   r   r   r.   r%   r   r
   r   r   r    r   r   r   r  r   r0   r0   rq   r1   r;      s    




'


r;   c                
       sn  e Zd Zded f fddZdddeded	ee fd
dZdddeded	ee fddZ	dddeded	ee fddZ
dddeded	ee fddZdddeded	ee fddZ	d(dedee ded	ee fddZdddededed	ee fddZd	efddZd	efddZd	ee fdd Zd)d"ed	ejfd#d$Zd	eee d%f fd&d'Z  ZS )*ArrowBlockColumnAccessorrD   )r   r   c                    s   t  | d S r+   )rm   rn   )r@   rD   rq   r0   r1   rn     s   z!ArrowBlockColumnAccessor.__init__TrF   ignore_nullsrG   r)   c                C   s4   dd l m} |j| j|rdndd}|r| S |S )Nr   
only_validr   )mode)r   r   count_columnrG   r@   r  rG   pacresr0   r0   r1   r	    s   zArrowBlockColumnAccessor.countc                C   ,   dd l m} |j| j|d}|r| S |S Nr   
skip_nulls)r   r   sumr
  rG   r  r0   r0   r1   r       zArrowBlockColumnAccessor.sumc                C   r  r  )r   r   minr
  rG   r  r0   r0   r1   r    r  zArrowBlockColumnAccessor.minc                C   r  r  )r   r   r   r
  rG   r  r0   r0   r1   r     r  zArrowBlockColumnAccessor.maxc                C   r  r  )r   r   meanr
  rG   r  r0   r0   r1   r    r  zArrowBlockColumnAccessor.meanNr  c                 C   s\   dd l m} |d u r| j|d}|d u rd S |j||| j|d|d}|r,| S |S )Nr   )r     r  )r   r   r  r  powersubtractr
  rG   )r@   r  r  rG   r  r  r0   r0   r1   sum_of_squared_diffs_from_mean  s   z7ArrowBlockColumnAccessor.sum_of_squared_diffs_from_meanqc                C   s6   dd l m} |j| j||d}|d }|r| S |S )Nr   )r  r  )r   r   quantiler
  rG   )r@   r  r  rG   r  r   r  r0   r0   r1   r    s   z!ArrowBlockColumnAccessor.quantilec                 C      dd l m} || jS r   )r   r   uniquer
  r@   r  r0   r0   r1   r       zArrowBlockColumnAccessor.uniquec                 C   r  r   )r   r   list_flattenr
  r  r0   r0   r1   flatten  r  z ArrowBlockColumnAccessor.flattenc                 C   s
   | j  S r+   )r
  r   r?   r0   r0   r1   r     s   
z"ArrowBlockColumnAccessor.to_pylistFr   c                 C   s"   t  tk r
| j S | jj|dS )Nr   )r   ,_MIN_PYARROW_VERSION_TO_NUMPY_ZERO_COPY_ONLYr
  r   )r@   r   r0   r0   r1   r      s   

z!ArrowBlockColumnAccessor.to_numpyr   c                 C   r   r+   )r
  r?   r0   r0   r1   _as_arrow_compatible  r   z-ArrowBlockColumnAccessor._as_arrow_compatible)NTr  )re   rf   rg   r   rn   r   r	   r!   r	  r  r  r   r  r  floatr  r   r  r!  r   r   r   r   r   r   r#  r   r0   r0   rq   r1   r    s@         


"r  )Eloggingr   typingr   r   r   r   r   r   r   r	   r
   r   r   numpyr   r   r   r   ray._private.arrow_utilsr   ray._private.ray_constantsr   ray.air.constantsr   $ray.air.util.tensor_extensions.arrowr   r   ray.data._internal.arrow_opsr   r   .ray.data._internal.arrow_ops.transform_pyarrowr   ray.data._internal.rowr   ray.data._internal.table_blockr   r   ray.data.blockr   r   r   r   r   r   r    r!   ray.data.contextr"   r#   rk   rl   pandas2ray.data._internal.planner.exchange.sort_task_specr$   r%   	getLoggerre   loggerr"  r   r   r2   r5   r6   ri   r   r;   r  r0   r0   r0   r1   <module>   sX    4(


:
 u