o
    $iNS                     @   s  d dl Z d dlZd dlmZmZmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZ d dlmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% d d	l&m'Z' errd d
l(m)Z) d dl*m+Z+ edZ,ede'Z-G dd deZ.G dd deZ/dS )    N)TYPE_CHECKINGAnyDictIteratorListMappingOptionalSequenceTupleTypeVarUnion)env_integer)TENSOR_COLUMN_NAME)BlockBuilder)SizeEstimator)NULL_SENTINELfind_partition_indexis_nan
keys_equal)BlockBlockAccessorBlockColumnAccessorBlockExecStatsBlockMetadataWithSchema	BlockTypeKeyTypeU)DEFAULT_TARGET_MAX_BLOCK_SIZE)SortKey)AggregateFnT#RAY_DATA_MAX_UNCOMPACTED_SIZE_BYTESc                   @   s   e Zd Zdd Zdeeeejf ddfddZ	de
ddfd	d
Zedeeee
 f defddZedee defddZede
fddZedefddZdefddZdefddZdefddZdefddZdddZdS ) TableBlockBuilderc                 C   sB   t t| _g | _d| _d| _t | _d| _	d| _
d| _|| _d S Nr   )collectionsdefaultdictlist_columns_tables_tables_size_cursor_tables_size_bytesr   _uncompacted_size	_num_rows_num_uncompacted_rows_num_compactions_block_type)self
block_type r2   [/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/table_block.py__init__7   s   
zTableBlockBuilder.__init__itemreturnNc                 C   s   t |dr
| }n
t|tjrt|i}t|tjjs%t	d
|t||D ]}|| jvr7d g| j | j|< q'| jD ]}||}| j| | q;|  jd7  _|  jd7  _|   | j| d S )N	as_pydictzLReturned elements of an TableBlock must be of type `dict`, got {} (type {}).   )hasattrr7   
isinstancenpndarrayr   r$   abcr   
ValueErrorformattyper'   r-   getappendr,   _compact_if_neededr+   add)r0   r5   column_namevaluer2   r2   r3   rD   L   s(   




zTableBlockBuilder.addblockc                 C   sX   t || jstdt| d| j d| t|}| j| |  j|	 7  _d S )NzGot a block of type z, expected z[.If you are mapping a function, ensure it returns an object with the expected type. Block:
)
r:   r/   	TypeErrorr@   r   	for_blockr(   rB   r,   num_rows)r0   rG   accessorr2   r2   r3   	add_blocke   s   
zTableBlockBuilder.add_blockcolumnsc                 C      t NNotImplementedError)rM   r2   r2   r3   _table_from_pydictq      z$TableBlockBuilder._table_from_pydicttablesc                 C   rN   rO   rP   )rT   r2   r2   r3   _combine_tablesu   rS   z!TableBlockBuilder._combine_tablesc                   C   rN   rO   rP   r2   r2   r2   r3   _empty_tabley   rS   zTableBlockBuilder._empty_tablec                   C   rN   rO   rP   r2   r2   r2   r3   _concat_would_copy}   rS   z$TableBlockBuilder._concat_would_copyc                 C   s    | j rdS |  ot| jdkS )NTr8   )r'   rW   lenr(   r0   r2   r2   r3   will_build_yield_copy   s   z'TableBlockBuilder.will_build_yield_copyc                 C   sD   | j r| | j g}ng }|| j t|dkr|  S | |S r#   )r'   rR   extendr(   rX   rV   rU   )r0   rT   r2   r2   r3   build   s   
zTableBlockBuilder.buildc                 C      | j S rO   )r,   rY   r2   r2   r3   rJ         zTableBlockBuilder.num_rowsc                 C   sX   | j dkrdS | j| jd  D ]}|  jt| 7  _qt| j| _| j| j  S r#   )	r,   r(   r)   r*   r   rI   
size_bytesrX   r+   r0   tabler2   r2   r3   get_estimated_memory_usage   s   
z,TableBlockBuilder.get_estimated_memory_usagec                 C   s\   | j sJ | j tk rd S | | j }| | t | _| j   |  jd7  _d| _	d S )Nr8   r   )
r'   r+   r_   MAX_UNCOMPACTED_SIZE_BYTESrR   rL   r   clearr.   r-   )r0   rG   r2   r2   r3   rC      s   



z$TableBlockBuilder._compact_if_needed)r6   N)__name__
__module____qualname__r4   r   dictr   r;   r<   rD   r   rL   staticmethodr   strr   r   rR   rU   rV   boolrW   rZ   r\   intrJ   rb   rC   r2   r2   r2   r3   r"   6   s"     r"   c                   @   sT  e Zd ZdefddZedd Zededede	j
fd	d
ZdefddZdee fddZdededefddZdefddZdeddfddZdPddZedefddZdedd defd!d"Zdedd defd#d$ZdQd&ed'edee fd(d)Zd&ed'edee fd*d+Zd&ed'edee fd,d-Zd&ed'edee fd.d/Zd&ed'edee fd0d1Z 	2dRd&ed'ed3ee dee fd4d5Z!d6efd7d8Z"dd d9e#d: defd;d<Z$e%	=dSd>ee dd d9e#d: d?ede#ed@f f
dAdBZ&dCee#e  dd fdDdEZ'e%	2dRd>ee dFee( dee fdGdHZ)e%dIedJe(fdKdLZ*dMedefdNdOZ+d2S )TTableBlockAccessorra   c                 C   s
   || _ d S rO   _tabler`   r2   r2   r3   r4      s   
zTableBlockAccessor.__init__c                 C   s   |  d|d  S )N_r8   r2   )namecountr2   r2   r3   _munge_conflict   s   z"TableBlockAccessor._munge_conflictrowrow_idxr6   c                 C   rN   rO   rP   )rt   ru   r2   r2   r3   _build_tensor_row   rS   z$TableBlockAccessor._build_tensor_rowc                 C   s   |   }|S rO   )	to_pandas)r0   defaultr2   r2   r3   
to_default   s   zTableBlockAccessor.to_defaultc                 C   rN   rO   rP   rY   r2   r2   r3   column_names      zTableBlockAccessor.column_namesrq   rF   c                 C   rN   rO   rP   )r0   rq   rF   r2   r2   r3   fill_column   r{   zTableBlockAccessor.fill_columnc                 C   r]   rO   rn   rY   r2   r2   r3   to_block   r^   zTableBlockAccessor.to_blockaccr   c                 C   rN   rO   rP   )r0   r~   r2   r2   r3   _zip   r{   zTableBlockAccessor._zipotherc                 C   s   t |}t|t| s4t| tr(t|tr(t| j|g\}}t ||S td	t| t||
 | 
 krHtd	| 
 |
 | |S )Nz#Cannot zip {} with block of type {}z3Cannot zip self (length {}) with block of length {})r   rI   r:   r@   rm   normalize_block_typesro   zipr>   r?   rJ   r   )r0   r   r~   	self_norm
other_normr2   r2   r3   r      s*   

zTableBlockAccessor.zipc                   C   rN   rO   rP   r2   r2   r2   r3   rV      rS   zTableBlockAccessor._empty_table	n_samplessort_keyr   c                 C   rN   rO   rP   )r0   r   r   r2   r2   r3   _sample   r{   zTableBlockAccessor._samplec                 C   sL   |d u st |rtd| |  dkr|  S t||  }| ||S )Nz+Table sort key must be a column name, was: r   )callablerQ   rJ   rV   minr   )r0   r   r   kr2   r2   r3   sample   s   zTableBlockAccessor.sampleFonignore_nullsc                 C   s   t | j| }|j|dS N)r   )r   
for_columnro   rr   r0   r   r   rK   r2   r2   r3   rr      s   zTableBlockAccessor.countc                 C   &   |  | t| j| }|j|dS r   )_validate_columnr   r   ro   sumr   r2   r2   r3   r         
zTableBlockAccessor.sumc                 C   r   r   )r   r   r   ro   r   r   r2   r2   r3   r     r   zTableBlockAccessor.minc                 C   r   r   )r   r   r   ro   maxr   r2   r2   r3   r     r   zTableBlockAccessor.maxc                 C   r   r   )r   r   r   ro   meanr   r2   r2   r3   r     r   zTableBlockAccessor.meanNr   c                 C   s&   |  | t| j| }|j|dS r   )r   r   r   ro   sum_of_squared_diffs_from_mean)r0   r   r   r   rK   r2   r2   r3   r     s   
z1TableBlockAccessor.sum_of_squared_diffs_from_meancolc                 C   s@   |d u rt d| d||  vrt d| d|   d S )Nz-Provided `on` value has to be non-null (got 'z')zReferencing column 'z' not present in the schema: )r>   rz   schema)r0   r   r2   r2   r3   r     s   z#TableBlockAccessor._validate_columnaggsr   c                    s<  |  dtttt tf  ffdd} }| D ]|\}}| t|dkr-|d   fdd|D }tt|D ]}t	
|}	|	 dkrU|| || |||< q<i }
rht|D ]\}}||
|< q_tt}t||D ]!\}}|j}|| dkr||| }||  d7  < ||
|< qr||
 q| S )aY  Applies provided aggregations to groups of rows with the same key.

        This assumes the block is already sorted by key in ascending order.

        Args:
            sort_key: A column name or list of column names.
               If this is ``None``, place all rows in a single group.

            aggs: The aggregations to do.

        Returns:
            A sorted block of [k, v_1, ..., v_n] columns where k is the groupby
            key and v_i is the partially combined accumulator for the ith given
            aggregation.
            If key is None then the k column is omitted.
        r6   c                  3   s     st   fV  dS d } }jdd}d}	 z=|du r#t|}|  }t|  |rK|d7 }zt|}W n tyC   d}Y nw t|  |s.|| |fV  |} W n
 tya   Y dS w q)z/Creates an iterator over zero-copy group views.Nr   Fpublic_row_formatTr8   )tupler}   	iter_rowsnextr   StopIterationslice)startenditernext_row	next_keys)keysr0   r2   r3   iter_groups9  s6   z2TableBlockAccessor._aggregate.<locals>.iter_groupsr8   r   c                    s   g | ]}|  qS r2   )init).0agg)	init_valsr2   r3   
<listcomp>[  s    z1TableBlockAccessor._aggregate.<locals>.<listcomp>)get_columnsr   r
   r	   r   r   builderrX   ranger   rI   rJ   accumulate_blockr   r$   r%   rl   rq   rs   rD   r\   )r0   r   r   r   r   
group_keys
group_viewaccumulatorsirK   rt   r   gkrr   r   accumulatorrq   r2   )r   r   r0   r3   
_aggregate&  s8   $



zTableBlockAccessor._aggregateTblocksfinalizer   c                    s  t |}t }| fdd  fdd}tjdd |D d|idt|d	  }	 zdu r;t	 } fdd}	d
}
dgt
| }dgt
| }|	 D ]S}|
rtt}tt
|D ]'}|| j}|| d	krt ||| }||  d7  < |||< || ||< qld}
q]tt
|D ]}|| || |||  ||< qq]i }rt|D ]\}}|||< qt|||D ]\}}}|r||||< q|||< q|| W n	 ty   Y nw q2| }|tj|| dfS )aX  Combine previously aggregated blocks.

        This assumes blocks are already sorted by key in ascending order,
        so we can do merge sort to get all the rows with the same key.

        Args:
            blocks: A list of partially combined and sorted blocks.
            sort_key: The column name of key or None for global aggregation.
            aggs: The aggregations to do.
            finalize: Whether to finalize the aggregation. This is used as an
                optimization for cases where we repeatedly combine partially
                aggregated groups.

        Returns:
            A block of [k, v_1, ..., v_n] columns and its metadata where k is
            the groupby key and v_i is the corresponding aggregation result for
            the ith given aggregation.
            If key is None then the k column is omitted.
        c                    s    rt |   S dS )N)r   r   )r)r   r2   r3   _key_fn  s   z>TableBlockAccessor._combine_aggregated_blocks.<locals>._key_fnc                    s    | }t dd |D S )Nc                 S   s$   g | ]}|d u st |rtn|qS rO   )r   r   )r   vr2   r2   r3   r     s   $ zVTableBlockAccessor._combine_aggregated_blocks.<locals>.safe_key_fn.<locals>.<listcomp>r   )r   values)r   r2   r3   safe_key_fn  s   zBTableBlockAccessor._combine_aggregated_blocks.<locals>.safe_key_fnc                 S   s   g | ]}t |jd dqS )Fr   )r   rI   r   r   rG   r2   r2   r3   r     s    zATableBlockAccessor._combine_aggregated_blocks.<locals>.<listcomp>keyNr   Tc                   3   sR    t  r'V  ztW n ty   d Y d S w t  sd S d S rO   )r   r   r   r2   )r   r   r   r   r2   r3   gen  s   z:TableBlockAccessor._combine_aggregated_blocks.<locals>.genr8   F)stats)rm   r   r   r   r   heapqmerger   rI   r   rX   r$   r%   rl   r   rq   rs   r   r   rD   r   r\   r   
from_block)clsr   r   r   r   r   r   r   next_key_columnsr   firstr   resolved_agg_namesr   rr   r   rq   rt   col_namenext_keyr   agg_namer   retr2   )r   r   r   r   r   r3   _combine_aggregated_blocksw  sv   






=z-TableBlockAccessor._combine_aggregated_blocks
boundariesc                    sV   g } fdd|D }d}|D ]}|  j||  |}q|  j|d   |S )Nc                    s   g | ]	}t  j|qS r2   )r   ro   )r   boundaryr0   r   r2   r3   r     s    z>TableBlockAccessor._find_partitions_sorted.<locals>.<listcomp>r   )rB   ro   )r0   r   r   
partitionsboundslast_idxidxr2   r   r3   _find_partitions_sorted  s   z*TableBlockAccessor._find_partitions_sortedtarget_block_typec                    s   t t}|D ] }t|}t|tstdt| d||	   d7  < qt
|dkr=du s;gt| kr=|S du rPt| dd ddd	 \} fd
d|D tfddD rrtddd D  dS )a  Normalize input blocks to the specified `normalize_type`. If the blocks
        are already all of the same type, returns original blocks.

        Args:
            blocks: A list of TableBlocks to be normalized.
            target_block_type: The type to normalize the blocks to. If None,
               Ray Data chooses a type to minimize the amount of data conversions.

        Returns:
            A list of blocks of the same type.
        zWBlock type normalization is only supported for TableBlock, but received block of type: .r8   Nc                 S   s   | d S )Nr8   r2   )xr2   r2   r3   <lambda>2  s    z:TableBlockAccessor.normalize_block_types.<locals>.<lambda>T)r   reverser   c                    s   g | ]}  |qS r2   )try_convert_block_typer   )r   r   r2   r3   r   6  s    z<TableBlockAccessor.normalize_block_types.<locals>.<listcomp>c                 3   s$    | ]}t |t d   V  qdS )r   N)r:   r@   r   )resultsr2   r3   	<genexpr>:  s   " z;TableBlockAccessor.normalize_block_types.<locals>.<genexpr>zYExpected all blocks to be of the same type after normalization, but got different types: c                 S   s   g | ]}t |qS r2   )r@   )r   br2   r2   r3   r   =  s    zP. Try using blocks of the same type to avoid the issue with block normalization.)r$   r%   rl   r   rI   r:   rm   r>   r@   r1   rX   r&   r   sorteditemsany)r   r   r   
seen_typesrG   block_accessorrp   r2   )r   r   r   r3   r     s>   


z(TableBlockAccessor.normalize_block_typesrG   r1   c                 C   s>   |t jkrt| S |t jkrt| S t| S rO   )r   ARROWr   rI   to_arrowPANDASrw   ry   )r   rG   r1   r2   r2   r3   r   C  s
   

z)TableBlockAccessor.try_convert_block_typeother_blockc                 C   rN   )a  Combine this table with another table horizontally (column-wise).
        This will append the columns.

        Args:
            other_block: The table to hstack side-by-side with.

        Returns:
            A new table with columns from both tables combined.
        rP   )r0   r   r2   r2   r3   hstackL  s   
zTableBlockAccessor.hstack)r   r   r6   r   )FrO   )T),re   rf   rg   r   r4   ri   rs   r   rl   r;   r<   rv   r   ry   r   rj   rz   r|   r}   r   r   r   rV   r   r   rk   r   r   rr   r   r   r   r   r   r   r
   r   classmethodr   r   r   r   r   r   r2   r2   r2   r3   rm      s~    



Q
z

7rm   )0r$   r   typingr   r   r   r   r   r   r   r	   r
   r   r   numpyr;   ray._private.ray_constantsr   ray.air.constantsr    ray.data._internal.block_builderr   !ray.data._internal.size_estimatorr   ray.data._internal.utilr   r   r   r   ray.data.blockr   r   r   r   r   r   r   r   ray.data.contextr   2ray.data._internal.planner.exchange.sort_task_specr   ray.data.aggregater   r    rc   r"   rm   r2   r2   r2   r3   <module>   s(    4(
u