o
    `۷i*Q                     @   s  d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZmZmZ d dlmZm Z  d d	l!m"Z"m#Z# d d
l$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3 zd dl4Z4W n e5y   dZ4Y nw erd dl6Z6d dl7m8Z8 edZ9e :e;Z<edZ=dZ>ede?e0d Z@de1defddZAde1defddZBG dd de	ZCG dd de ZDddd e?de
e? fd!d"ZEG d#d$ d$eZFG d%d& d&e*ZGdS )'    N)TYPE_CHECKINGAnyCallableDictIteratorListMappingOptionalTupleTypeVarUnion)parse)env_integer)transform_polarstransform_pyarrow)shuffle)row_reprrow_repr_prettyrow_str)TableBlockAccessorTableBlockBuilder)convert_to_pyarrow_arraypyarrow_table_from_pydict)get_pyarrow_version)BlockBlockAccessorBlockColumnBlockColumnAccessorBlockExecStatsBlockMetadataWithSchema	BlockTypeU)DEFAULT_TARGET_MAX_BLOCK_SIZEDataContext)Expr)SortKeyTz13.0.0
__bsp_stub#RAY_DATA_ARROW_MAX_CHUNK_SIZE_BYTES    contextreturnc                 C      | j s| jr	tjS tjS N)
use_polarsuse_polars_sortr   sortr   r*    r2   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/_internal/arrow_block.pyget_sort_transformI      r4   c                 C   r,   r-   )r.   r/   r   concat_and_sortr   r1   r2   r2   r3   get_concat_and_sort_transformP   r5   r7   c                   @   s   e Zd ZdZdefddZdeeee f defddZ	de
fd	d
Zdd Zdeeef fddZdd Zdd Zdd ZdS )ArrowRowzA
    Row of a tabular Dataset backed by a Arrow Table block.
    rowc                 C   s
   || _ d S r-   )_row)selfr9   r2   r2   r3   __init__\      
zArrowRow.__init__keyr+   c                    sj   ddl m} | dtt dtf fdd}t|t}|r!|gn|}||}|d u r-d S |r3|d S |S )Nr   ) get_arrow_extension_tensor_typeskeysr+   c                    s    j j}t|| d jrt fdd| D S  j | }t|dkr(d S dd |jD }z
tdd |D W S  t	yE   | Y S w )Nr   c                    s   g | ]}t j j|d dqS )r   )col_namerow_idx)ArrowBlockAccessor_build_tensor_rowr:   ).0r>   r;   r2   r3   
<listcomp>i   s    z:ArrowRow.__getitem__.<locals>.get_item.<locals>.<listcomp>c                 S   s   g | ]}|d  qS r   r2   rE   colr2   r2   r3   rG   u       c                 S   s   g | ]}|  qS r2   as_py)rE   itemr2   r2   r3   rG   x   rK   )
r:   schema
isinstancefieldtypetupleselectlencolumnsAttributeError)r@   rO   tableitemsr;   tensor_arrow_extension_typesr2   r3   get_itemd   s    
	z&ArrowRow.__getitem__.<locals>.get_item)ray.data.extensionsr?   r   strr   rP   )r;   r>   r?   r\   is_single_itemr@   rY   r2   rZ   r3   __getitem___   s   
zArrowRow.__getitem__c                 c   s    | j jD ]}|V  qd S r-   )r:   column_names)r;   kr2   r2   r3   __iter__   s   zArrowRow.__iter__c                 C      | j jS r-   )r:   num_columnsrF   r2   r2   r3   __len__      zArrowRow.__len__c                 C   s   t |  S r-   )dictrY   rF   r2   r2   r3   	as_pydict      zArrowRow.as_pydictc                 C      t | S r-   )r   rF   r2   r2   r3   __str__   rg   zArrowRow.__str__c                 C   rk   r-   )r   rF   r2   r2   r3   __repr__   rg   zArrowRow.__repr__c                 C   s   t | ||S r-   )r   )r;   pcycler2   r2   r3   _repr_pretty_   rj   zArrowRow._repr_pretty_N)__name__
__module____qualname____doc__r   r<   r   r^   r   r`   r   rc   rf   r   ri   rl   rm   rp   r2   r2   r2   r3   r8   W   s    ,r8   c                       s   e Zd Z fddZedeeee f de	fddZ
edee	 de	fdd	Zedefd
dZedddZdefddZ  ZS )ArrowBlockBuilderc                    s&   t d u rtdt t jtf d S Nz+Run `pip install pyarrow` for Arrow support)pyarrowImportErrorsuperr<   TablebytesrF   	__class__r2   r3   r<      s   zArrowBlockBuilder.__init__rV   r+   c                 C   s   t dd |  D S )Nc                 S   s   i | ]
\}}|t ||qS r2   )r   )rE   column_namecolumn_valuesr2   r2   r3   
<dictcomp>   s    
z8ArrowBlockBuilder._table_from_pydict.<locals>.<dictcomp>)r   rY   )rV   r2   r2   r3   _table_from_pydict   s
   z$ArrowBlockBuilder._table_from_pydicttablesc                 C   s"   t | dkrtj| ddS | d S )N   Tpromote_typesr   )rU   r   concat)r   r2   r2   r3   _combine_tables   s   z!ArrowBlockBuilder._combine_tablesc                   C   s   dS )NFr2   r2   r2   r2   r3   _concat_would_copy   s   z$ArrowBlockBuilder._concat_would_copypyarrow.Tablec                   C   s   t i S r-   )r   r2   r2   r2   r3   _empty_table      zArrowBlockBuilder._empty_tablec                 C      t jS r-   r    ARROWrF   r2   r2   r3   
block_type      zArrowBlockBuilder.block_typer+   r   )rq   rr   rs   r<   staticmethodr   r^   r   r   r   r   r   boolr   r   r    r   __classcell__r2   r2   r|   r3   ru      s     ru   rX   r   max_chunk_size_bytesc                 C   s,   | j dkrdS | j | j }tdt|| S )aP  
    Calculate the max chunk size in rows for Arrow to Batches conversion in
    ArrowBlockAccessor.iter_rows().
    Args:
        table: The pyarrow table to calculate the max chunk size for.
        max_chunk_size_bytes: The max chunk size in bytes.
    Returns:
        The max chunk size in rows, or None if the table is empty.
    r   Nr   )nbytesnum_rowsmaxint)rX   r   avg_row_sizer2   r2   r3   _get_max_chunk_size   s   
r   c                	       s  e Zd ZeZda fddZdedefddZdee	 fd	d
Z
de	dedefddZededd fddZededede	dejfddZdbdedededdfddZdee ddfddZdcd!d"Zddd$d%Z	&ded'eee	ee	 f  deejee	ejf f fd(d)Zdfd*d+Zdefd,d-Zdefd.d/Z d0e!dd1fd2d3Z"d4e	d5e#ddfd6d7Z$ede%fd8d9Z&edfd:d;Z'd<eee d=d>f ddfd?d@Z(d'ee	 defdAdBZ)d'ee	 ddfdCdDZ*dEee	e	f ddfdFdGZ+dgdIdJZ,dKedLdMddfdNdOZ-dLdMdefdPdQZ.dRee/ dLdMded1 fdSdTZ0edUee dLdMde1ee2f fdVdWZ3de4fdXdYZ5dZede6ee7ejf  fd[d\Z8dhd_d`Z9  Z:S )irC   rX   r   c                    s&   t d u rtdt | d | _d S rv   )rw   rx   ry   r<   _max_chunk_size)r;   rX   r|   r2   r3   r<      s   
zArrowBlockAccessor.__init__indexr+   c                 C   s   | j ||d dd}t|S )Nr   F)copy)slicer8   )r;   r   base_rowr2   r2   r3   _get_row   s   zArrowBlockAccessor._get_rowc                 C   rd   r-   )_tablera   rF   r2   r2   r3   ra      rg   zArrowBlockAccessor.column_namesnamevaluec                 C   sv   dd l m} t|tjtjfr| ||S t|tjr|j}nt	|g}tj
t| j|d}|||}| ||S )Nr   )rR   )pyarrow.computecomputerP   rw   ArrayChunkedArrayupsert_columnScalarrR   
infer_typenullsrU   r   	fill_null)r;   r   r   pcrR   arrayr2   r2   r3   fill_column   s   zArrowBlockAccessor.fill_columndatac                 C   s   t j|}| | S r-   )rw   ipcopen_streamread_all)clsr   readerr2   r2   r3   
from_bytes   s   zArrowBlockAccessor.from_bytesr9   rB   rA   c                 C   s0   | | | }|  }t|tjsJ t||S r-   )rM   rP   npndarrayrR   )r9   rB   rA   elementarrr2   r2   r3   rD      s   z$ArrowBlockAccessor._build_tensor_rowFstartendr   c                 C   s&   | j ||| }|rt||}|S r-   )r   r   r   combine_chunks)r;   r   r   r   viewr2   r2   r3   r     s   zArrowBlockAccessor.slicerandom_seedc                 C   s   t | j|S r-   )r   r   )r;   r   r2   r2   r3   random_shuffle	  rj   z!ArrowBlockAccessor.random_shufflepyarrow.lib.Schemac                 C   rd   r-   )r   rO   rF   r2   r2   r3   rO     rg   zArrowBlockAccessor.schemapandas.DataFramec                 C   s6   ddl m} t }| jj|jd}|jr||}|S )Nr   ) _cast_tensor_columns_to_ndarrays)ignore_metadata)#ray.data.util.data_batch_conversionr   r#   get_currentr   	to_pandaspandas_block_ignore_metadataenable_tensor_extension_casting)r;   r   ctxdfr2   r2   r3   r     s   zArrowBlockAccessor.to_pandasNrV   c           	      C   s   |d u r| j j}d}nt|trd}n|g}d}t| j j}|D ]}||vr0td| d| q g }|D ]}| j | }t|}|tj	|dd q5|rZt
|dksVJ |d S tt||S )NFTzCannot find column z, available columns: zero_copy_onlyr   r   )r   ra   rP   listset
ValueErrorr   combine_chunked_arrayappendto_numpyrU   rh   zip)	r;   rV   should_be_single_ndarraycolumn_names_setcolumncolumn_values_ndarraysrA   rJ   combined_arrayr2   r2   r3   r     s6   


zArrowBlockAccessor.to_numpyc                 C      | j S r-   )r   rF   r2   r2   r3   to_arrowA  r   zArrowBlockAccessor.to_arrowc                 C   s   | j jdkr
| j jS dS Nr   )r   re   r   rF   r2   r2   r3   r   D  s   zArrowBlockAccessor.num_rowsc                 C   rd   r-   )r   r   rF   r2   r2   r3   
size_bytesI  rg   zArrowBlockAccessor.size_bytesaccr   c                 C   st   |   }|  }|jD ],}||}||jv r1d}|}||jv r/d||}|d7 }||jv s |}|||}q|S )Nr   z{}_{})r   ra   r   formatappend_column)r;   r   rsrA   rJ   inew_namer2   r2   r3   _zipL  s   




zArrowBlockAccessor._zipr~   column_datac                 C   sX   t |tjtjfsJ dt| | jj|}|dkr$| j||S | j	|||S )Nz>Expected either a pyarrow.Array or pyarrow.ChunkedArray, got: )
rP   rw   r   r   rR   r   rO   get_field_indexr   
set_column)r;   r~   r   
column_idxr2   r2   r3   r   \  s   z ArrowBlockAccessor.upsert_columnc                   C   s   t  S r-   )ru   r2   r2   r2   r3   builderi  s   zArrowBlockAccessor.builderc                   C   s   t  S r-   )ru   r   r2   r2   r2   r3   r   m  r   zArrowBlockAccessor._empty_tableindicespyarrow.Arraypyarrow.ChunkedArrayc                 C   s   t | j|S )zSelect rows from the underlying table.

        This method is an alternative to pyarrow.Table.take(), which breaks for
        extension arrays.
        )r   
take_tabler   )r;   r   r2   r2   r3   takeq  s   	zArrowBlockAccessor.takec                 C      | j |S r-   )r   dropr;   rV   r2   r2   r3   r   |  rj   zArrowBlockAccessor.dropc                 C   sF   t dd |D std| dt|dkr| td S | j|S )Nc                 s   s    | ]}t |tV  qd S r-   )rP   r^   rI   r2   r2   r3   	<genexpr>  s    z,ArrowBlockAccessor.select.<locals>.<genexpr>zYColumns must be a list of column name strings when aggregating on Arrow blocks, but got: .r   )allr   rU   r   $_BATCH_SIZE_PRESERVING_STUB_COL_NAMEr   rT   r   r2   r2   r3   rT     s   zArrowBlockAccessor.selectcolumns_renamec                 C   r   r-   )r   rename_columns)r;   r   r2   r2   r3   r     rj   z!ArrowBlockAccessor.rename_columnsother_blockc                 C   s.   | j }t|j|jD ]
\}}|||}q
|S r-   )r   r   ra   rV   r   )r;   r   result_tabler   r   r2   r2   r3   hstack  s   zArrowBlockAccessor.hstack	n_samplessort_keyr%   c                 C   s0   t t| jj|}| j| }t||S r-   )	randomsampleranger   r   rT   get_columnsr   r   )r;   r   r   r   rX   r2   r2   r3   _sample  s   zArrowBlockAccessor._samplec                 C   sL   |  sJ d|   d| jjdkr|  S t }t|}|| j|S )Nz'Sorting columns couldn't be empty (got )r   )r  r   r   r   r#   r   r4   )r;   r   r*   r0   r2   r2   r3   r0     s   zArrowBlockAccessor.sort
boundariesc                    sV     |}|jdkr fddtt|d D S t|dkr"|gS t|||S )Nr   c                    s   g | ]}   qS r2   )r   )rE   _rF   r2   r3   rG     rK   z9ArrowBlockAccessor.sort_and_partition.<locals>.<listcomp>r   )r0   r   r  rU   r   	for_block_find_partitions_sorted)r;   r  r   rX   r2   rF   r3   sort_and_partition  s   


z%ArrowBlockAccessor.sort_and_partitionblocksc                 C   sj   t  }dd | D } t| dkrt }nt| tj} t	t
 }|| |dd}|tj|| dfS )Nc                 S   s   g | ]	}|j d kr|qS rH   )r   )rE   br2   r2   r3   rG     s    z:ArrowBlockAccessor.merge_sorted_blocks.<locals>.<listcomp>r   Tr   )stats)r   r   rU   rC   r   r   normalize_block_typesr    r   r7   r#   r   r   
from_blockbuild)r  r   r  retr6   r2   r2   r3   merge_sorted_blocks  s   
z&ArrowBlockAccessor.merge_sorted_blocksc                 C   r   r-   r   rF   r2   r2   r3   r     r   zArrowBlockAccessor.block_typepublic_row_formatc                 c   sn    | j }|r$| jd u rt|t| _|j| jdD ]	}| E d H  qd S |  }t|D ]}| |V  q,d S )N)max_chunksize)	r   r   r   ARROW_MAX_CHUNK_SIZE_BYTES
to_batches	to_pylistr   r  r   )r;   r  rX   batchr   r   r2   r2   r3   	iter_rows  s   
zArrowBlockAccessor.iter_rowspredicate_exprr$   c                 C   s6   | j jdkr	| j S ddlm} ||| j }| j |S )z,Filter rows based on a predicate expression.r   )	eval_expr)r   r   ?ray.data._internal.planner.plan_expression.expression_evaluatorr  filter)r;   r  r  maskr2   r2   r3   r    s
   zArrowBlockAccessor.filter)rX   r   F)r+   r   )r+   r   r-   r   )r   r   r+   r   )r  r$   r+   r   );rq   rr   rs   r8   ROW_TYPEr<   r   r   r   r^   ra   r   r   r   classmethodr{   r   r   r   r   rD   r   r   r	   r   rO   r   r   r   r   r   r   r   r   r   r   r   ru   r   r   r   r   rT   r   r   r  r0   r&   r
  r
   r   r  r    r   r   r   r  r  r   r2   r2   r|   r3   rC      s    



'





rC   c                
       s  e Zd Zded f fddZdddeded	ee fd
dZdddeded	ee fddZ	dddeded	ee fddZ
dddeded	ee fddZdddeded	ee fddZ	d0dedee ded	ee fddZdddededed	ee fddZd	efddZd	eeeef  fddZd	efdd Zd	efd!d"Zd	efd#d$Zd	efd%d&Zd	ee fd'd(Zd1d*ed	ejfd+d,Zd	eee d-f fd.d/Z  Z S )2ArrowBlockColumnAccessorrJ   )r   r   c                    s   t  | d S r-   )ry   r<   )r;   rJ   r|   r2   r3   r<     s   z!ArrowBlockColumnAccessor.__init__TrL   ignore_nullsrM   r+   c                C   s4   dd l m} |j| j|rdndd}|r| S |S )Nr   
only_validr   )mode)r   r   count_columnrM   r;   r#  rM   pacresr2   r2   r3   r&    s   zArrowBlockColumnAccessor.countc                C   ,   dd l m} |j| j|d}|r| S |S Nr   
skip_nulls)r   r   sumr'  rM   r(  r2   r2   r3   r/       zArrowBlockColumnAccessor.sumc                C   r+  r,  )r   r   minr'  rM   r(  r2   r2   r3   r1    r0  zArrowBlockColumnAccessor.minc                C   r+  r,  )r   r   r   r'  rM   r(  r2   r2   r3   r     r0  zArrowBlockColumnAccessor.maxc                C   r+  r,  )r   r   meanr'  rM   r(  r2   r2   r3   r2    r0  zArrowBlockColumnAccessor.meanNr2  c                 C   s\   dd l m} |d u r| j|d}|d u rd S |j||| j|d|d}|r,| S |S )Nr   )r#     r-  )r   r   r2  r/  powersubtractr'  rM   )r;   r#  r2  rM   r)  r*  r2   r2   r3   sum_of_squared_diffs_from_mean  s   z7ArrowBlockColumnAccessor.sum_of_squared_diffs_from_meanqc                C   s6   dd l m} |j| j||d}|d }|r| S |S )Nr   )r7  r.  )r   r   quantiler'  rM   )r;   r7  r#  rM   r)  r   r*  r2   r2   r3   r8    s   z!ArrowBlockColumnAccessor.quantilec                 C   s<   dd l m} |  rdd l}|| j  S || jS r   )r   r   is_composed_of_listspolars
from_arrowr'  uniquer   )r;   r)  r:  r2   r2   r3   r<  )  s
   zArrowBlockColumnAccessor.uniquec                 C   sF   dd l m} || j}t|dkrd S |d |d dS )Nr   valuescounts)r=  r>  )r   r   value_countsr'  rU   rQ   r  )r;   r)  r?  r2   r2   r3   r?  6  s   z%ArrowBlockColumnAccessor.value_countsc                 C   s4   dd l }|d| ji}| j|jdd}| S )Nr   rJ   T)wrap_numerical)r:  	DataFramer'  	hash_rowscastInt64r   )r;   plr   hashesr2   r2   r3   hashA  s   zArrowBlockColumnAccessor.hashc                 C      dd l m} || jS r   )r   r   list_flattenr'  r;   r)  r2   r2   r3   flattenH     z ArrowBlockColumnAccessor.flattenc                 C   rH  r   )r   r   	drop_nullr'  rJ  r2   r2   r3   dropnaM  rL  zArrowBlockColumnAccessor.dropnac                 C   s   t jjt jjf}t| jj|S r-   )rw   libListTypeLargeListTyperP   r'  rR   )r;   typesr2   r2   r3   r9  R  s   z-ArrowBlockColumnAccessor.is_composed_of_listsc                 C   s
   | j  S r-   )r'  r  rF   r2   r2   r3   r  V  r=   z"ArrowBlockColumnAccessor.to_pylistFr   c                 C   s>   t  tk rt| jtjr| j S | jj|dS | jj|dS )Nr   )r   ,_MIN_PYARROW_VERSION_TO_NUMPY_ZERO_COPY_ONLYrP   r'  rw   r   r   )r;   r   r2   r2   r3   r   Y  s   

z!ArrowBlockColumnAccessor.to_numpyr   c                 C   r   r-   )r'  rF   r2   r2   r3   _as_arrow_compatibled  r   z-ArrowBlockColumnAccessor._as_arrow_compatible)NTr  )!rq   rr   rs   r   r<   r   r	   r!   r&  r/  r1  r   r2  r6  floatr8  r   r<  r   r^   r   r?  rG  rK  rN  r9  r   r  r   r   r   rT  r   r2   r2   r|   r3   r"    sH         


"r"  )Hloggingr   typingr   r   r   r   r   r   r   r	   r
   r   r   numpyr   packaging.versionr   parse_versionray._private.ray_constantsr   ray.data._internal.arrow_opsr   r   .ray.data._internal.arrow_ops.transform_pyarrowr   ray.data._internal.rowr   r   r   ray.data._internal.table_blockr   r   *ray.data._internal.tensor_extensions.arrowr   r   $ray.data._internal.utils.arrow_utilsr   ray.data.blockr   r   r   r   r   r   r    r!   ray.data.contextr"   r#   ray.data.expressionsr$   rw   rx   pandas2ray.data._internal.planner.exchange.sort_task_specr%   r&   	getLoggerrq   loggerrS  r   r   r  r4   r7   r8   ru   r   rC   r"  r2   r2   r2   r3   <module>   s\    4(


H"
  