o
    biT                     @   s  d dl Z d dlZd dlZd dlmZmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlZd dlmZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZm Z  d d	l!m"Z" d d
l#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* d dl+m,Z, erd dlZd dl-Z-d dl.m/Z/ d dl#m0Z0 edZ1dZ2e3e4Z5da6dd Z7G dd deZ8G dd de'Z9G dd de Z:e ;dddgZ<G dd deZ=dS )    N)
TYPE_CHECKINGAnyDictIteratorListMappingOptionalTupleTypeVarUnion)is_object_dtype	is_scalaris_string_dtype)TENSOR_COLUMN_NAME)_should_convert_to_tensor)convert_to_numpy)TableRow)TableBlockAccessorTableBlockBuilder)is_null)BlockBlockAccessorBlockColumnBlockColumnAccessorBlockExecStats	BlockTypeU)DataContext)SortKeyBlockMetadataWithSchemaT   c                  C   s   t d u r
dd l} | a t S Nr   )_pandaspandasr%    r'   S/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/pandas_block.pylazy_import_pandas6   s   r)   c                   @   sZ   e Zd ZdZdeeee f defddZde	fddZ
dd	 Zdeeef fd
dZdS )	PandasRowzF
    Row of a tabular Dataset backed by a Pandas DataFrame block.
    keyreturnc                    sd   ddl m  dtt dtf fdd}t|t}|r|gn|}||}|d u r*d S |r0|d S |S )Nr   TensorArrayElementkeysr,   c              
      s   j |  }t|dkrd S |jd }t|jd  r#tdd |D S z
tdd |D W S  ttfyN } ztjd| d|d |W  Y d }~S d }~ww )Nr   c                 s   s    | ]}|  V  qd S N)to_numpy.0itemr'   r'   r(   	<genexpr>P   s    z:PandasRow.__getitem__.<locals>.get_item.<locals>.<genexpr>c                 s   s    | ]}|V  qd S r0   r'   r2   r'   r'   r(   r5   U   s    zFailed to convert z to a tuple)exc_info)	_rowleniloc
isinstancetupleAttributeError
ValueErrorloggerwarning)r/   colitemser.   selfr'   r(   get_itemG   s   

z'PandasRow.__getitem__.<locals>.get_item)ray.data.extensionsr.   r   strr   r:   )rD   r+   rE   is_single_itemr/   rA   r'   rC   r(   __getitem__D   s   
zPandasRow.__getitem__c                 c   s    | j jD ]}|V  qd S r0   )r7   columns)rD   kr'   r'   r(   __iter__j   s   zPandasRow.__iter__c                 C      | j jd S )N   )r7   shaperD   r'   r'   r(   __len__n      zPandasRow.__len__c                 C   s>   i }|   D ]\}}t|r|tju rd ||< q|||< q|S r0   )rA   r   pdNA)rD   pydictr+   valuer'   r'   r(   	as_pydictq   s   

zPandasRow.as_pydictN)__name__
__module____qualname____doc__r   rG   r   r   rI   r   rL   rQ   r   rW   r'   r'   r'   r(   r*   ?   s    &r*   c                
       sp  e Zd Zd* fddZdddeded	ee fd
dZdddeded	ee fddZdddeded	ee fddZ	dddeded	ee fddZ
dddeded	ee fddZdddededed	ee fddZd	efddZd	efddZ		d+dedee ded	ee fddZd	ee fdd Zd,d"ed	ejfd#d$Zd	eee d%f fd&d'Zd(d) Z  ZS )-PandasBlockColumnAccessorr@   pandas.Seriesc                       t  | d S r0   super__init__)rD   r@   	__class__r'   r(   ra         z"PandasBlockColumnAccessor.__init__T)as_pyignore_nullsre   r,   c                C   s   |r| j  S t| j S r0   )_columncountr8   rD   rf   re   r'   r'   r(   rh         zPandasBlockColumnAccessor.countc                C   s   |   rd S | jj|ddS )NrN   )skipna	min_count)_is_all_nullrg   sumri   r'   r'   r(   rn      s   zPandasBlockColumnAccessor.sumc                C      |   rd S | jj|dS Nrk   )rm   rg   minri   r'   r'   r(   rr         zPandasBlockColumnAccessor.minc                C   ro   rp   )rm   rg   maxri   r'   r'   r(   rt      rs   zPandasBlockColumnAccessor.maxc                C   s(   | j |d}t|s|| j|d S |S )Nrf   )rn   r   rh   )rD   rf   re   sum_r'   r'   r(   mean   s
   zPandasBlockColumnAccessor.meanqc                C   s   | j j|dS )N)rx   )rg   quantile)rD   rx   rf   re   r'   r'   r(   ry      s   z"PandasBlockColumnAccessor.quantilec                 C   s   t  }|| j S r0   )r)   Seriesrg   unique)rD   rS   r'   r'   r(   r{      s   z PandasBlockColumnAccessor.uniquec                 C      | j j S r0   )rg   listflattenrP   r'   r'   r(   r~      rR   z!PandasBlockColumnAccessor.flattenNrw   c                 C   s6   |d u r
| j |d}t|r|S | j| d j|dS )Nru      rq   )rw   r   rg   rn   )rD   rf   rw   re   r'   r'   r(   sum_of_squared_diffs_from_mean   s
   z8PandasBlockColumnAccessor.sum_of_squared_diffs_from_meanc                 C   s
   | j  S r0   )rg   to_listrP   r'   r'   r(   	to_pylist   s   
z#PandasBlockColumnAccessor.to_pylistFzero_copy_onlyc                 C   s   | j j| dS )zqNOTE: Unlike Arrow, specifying `zero_copy_only=True` isn't a guarantee
        that no copy will be made
        )copy)rg   r1   )rD   r   r'   r'   r(   r1      s   z"PandasBlockColumnAccessor.to_numpyzpyarrow.Arrayc                 C   s   |   S r0   )r   rP   r'   r'   r(   _as_arrow_compatible   s   z.PandasBlockColumnAccessor._as_arrow_compatiblec                 C   s   | j    S r0   )rg   notnaanyrP   r'   r'   r(   rm      rd   z&PandasBlockColumnAccessor._is_all_null)r@   r]   NTF)rX   rY   rZ   ra   boolr   r   rh   rn   rr   rt   rw   floatry   r   r{   r~   r   r   r   r   npndarrayr1   r   r   rm   __classcell__r'   r'   rb   r(   r\      sD         

r\   c                       s   e Zd Z fddZedeeee f ddfddZ	eded ddfd	d
Z
edefddZedddZdefddZ  ZS )PandasBlockBuilderc                    s   t  }t |j d S r0   )r)   r`   ra   	DataFrame)rD   r%   rb   r'   r(   ra      s   zPandasBlockBuilder.__init__rJ   r,   pandas.DataFramec                    s.   ddl m  t }| fdd|  D S )Nr   TensorArrayc                    s8   i | ]\}}|t |d krt||r t|n|qS r   )r8   r   r   )r3   column_namecolumn_valuesr   r'   r(   
<dictcomp>   s    z9PandasBlockBuilder._table_from_pydict.<locals>.<dictcomp>)$ray.data.extensions.tensor_extensionr   r)   r   rA   )rJ   r%   r'   r   r(   _table_from_pydict   s   
z%PandasBlockBuilder._table_from_pydicttablesc                 C   s^   t  }ddlm} t| dkr|j| dd}|jddd n| d }t }|jr-||}|S )Nr   ))_cast_ndarray_columns_to_tensor_extensionrN   Tignore_indexdropinplace)	r)   "ray.air.util.data_batch_conversionr   r8   concatreset_indexr   get_currentenable_tensor_extension_casting)r   r%   r   dfctxr'   r'   r(   _concat_tables   s   z!PandasBlockBuilder._concat_tablesc                   C   s   dS r   r'   r'   r'   r'   r(   _concat_would_copy   s   z%PandasBlockBuilder._concat_would_copyc                  C   s   t  } |  S r0   )r)   r   r&   r'   r'   r(   _empty_table  s   zPandasBlockBuilder._empty_tablec                 C      t jS r0   r   PANDASrP   r'   r'   r(   
block_type     zPandasBlockBuilder.block_typer,   r   )rX   rY   rZ   ra   staticmethodr   rG   r   r   r   r   r   r   r   r   r   r   r'   r'   rb   r(   r      s     r   PandasBlockSchemanamestypesc                	       s  e Zd ZeZdI fddZdee fddZded	e	de
fd
dZededejfddZdJdedededdfddZdee ddfddZdee ddfddZdeeef ddfddZdee ddfdd Zdefd!d"ZdKd#d$Z	%dLdeeeee f  deejeeejf f fd&d'ZdMd)d*Zdefd+d,Zdefd-d.Z d/e!ddfd0d1Z"ede#fd2d3Z$edKd4d5Z%d6ed7d8ddfd9d:Z&dNd;d<Z'd=ee( d7d8dee
 fd>d?Z)ed@ee
 d7d8de*e
dAf fdBdCZ+de,fdDdEZ-dFede.ee/ejf  fdGdHZ0  Z1S )OPandasBlockAccessortabler   c                    r^   r0   r_   )rD   r   rb   r'   r(   ra     rd   zPandasBlockAccessor.__init__r,   c                 C   r|   r0   )_tablerJ   tolistrP   r'   r'   r(   column_names  rR   z PandasBlockAccessor.column_namesnamerV   c                 C   s&   || j jvsJ | j jdi ||iS )Nr'   )r   rJ   assign)rD   r   rV   r'   r'   r(   fill_column  s   zPandasBlockAccessor.fill_columnrowc                 C   s0   ddl m} | t jd }t||r| }|S )Nr   r-   )rF   r.   r   r9   r:   r1   )r   r.   tensorr'   r'   r(   _build_tensor_row  s
   
z%PandasBlockAccessor._build_tensor_rowFstartendr   c                 C   s0   | j || }|jddd |r|jdd}|S )NTr   deep)r   r   r   )rD   r   r   r   viewr'   r'   r(   slice)  s
   zPandasBlockAccessor.sliceindicesc                 C   s   | j |}|jddd |S )NTr   )r   taker   )rD   r   r   r'   r'   r(   r   0  s   zPandasBlockAccessor.takerJ   c                 C   s,   t dd |D std| d| j| S )Nc                 s   s    | ]}t |tV  qd S r0   r:   rG   )r3   r@   r'   r'   r(   r5   6  s    z-PandasBlockAccessor.select.<locals>.<genexpr>zZColumns must be a list of column name strings when aggregating on Pandas blocks, but got: .)allr=   r   )rD   rJ   r'   r'   r(   select5  s   
zPandasBlockAccessor.selectcolumns_renamec                 C   s   | j j|dddS )NF)rJ   r   r   )r   rename)rD   r   r'   r'   r(   rename_columns=  s   z"PandasBlockAccessor.rename_columnsrandom_seedc                 C   s"   | j jd|d}|jddd |S )NrN   )fracrandom_stateTr   )r   sampler   )rD   r   r   r'   r'   r(   random_shuffle@  s   z"PandasBlockAccessor.random_shufflec                 C   sJ   | j j}t|j |j d}tdd |jD r#td|jd|S )N)r   r   c                 s   s    | ]	}t |t V  qd S r0   r   )r3   r   r'   r'   r(   r5   L  s    z-PandasBlockAccessor.schema.<locals>.<genexpr>zwA Pandas DataFrame with column names of non-str types is not supported by Ray Dataset. Column names of this DataFrame: r   )	r   dtypesr   indexr   valuesr   r   r=   )rD   r   schemar'   r'   r(   r   E  s   zPandasBlockAccessor.schemac                 C   s,   ddl m} t }| j}|jr||}|S )Nr   ) _cast_tensor_columns_to_ndarrays)r   r   r   r   r   r   )rD   r   r   r   r'   r'   r(   	to_pandasT  s   zPandasBlockAccessor.to_pandasNc                 C   s   |d u r| j j }d}nt|trd}n|g}d}t| j j}|D ]}||vr6td| d| j j  q"g }|D ]}|| j |   q;|rP|d }|S t	t
||}|S )NFTzCannot find column z, available columns: r   )r   rJ   r   r:   r}   setr=   appendr1   dictzip)rD   rJ   should_be_single_ndarraycolumn_names_setcolumnarraysr'   r'   r(   r1   ]  s0   

zPandasBlockAccessor.to_numpypyarrow.Tablec                 C   s   dd l }|jj| jdd}i }t| jjD ]\}}| j| }|  s3|jt	||
 d|||f< q| D ]\\}}}||||}q8|S )Nr   F)preserve_index)type)pyarrowTablefrom_pandasr   	enumeraterJ   r   r   nullsr8   nullrA   
set_column)rD   paarrow_tablenull_coerced_columnsidxcol_namer@   null_colr'   r'   r(   to_arrow{  s   

zPandasBlockAccessor.to_arrowc                 C   rM   r#   )r   rO   rP   r'   r'   r(   num_rows  rR   zPandasBlockAccessor.num_rowsc                    s`  ddl m} ddlm m} t  fdd| jjddd}|f}t}| jj	D ]~}| j| j
}t|s>t|s>t||rt| j| }t||}	| j| j|	d	j}
z5t|
|rht|
d jtjrh|
j}ntfd
d}t||
}|||	  }||  t|7  < W q) ty } ztd| d|  W Y d }~q)d }~ww q)| }t|S )Nr   r   )r.   TensorDtypec                    s:  t  }d}t| g}|r| }t|ttttfr$t	
|}||7 }qt||v r+q|t| zt	
|}W n tyD   d}Y nw ||7 }t|tjrW||j| 7 }nBt|jrk||jddd | 7 }n.t|ttt fry|| n t|tr||  ||  nt| r||  |s|S )zhCalculates the memory size of objects,
            including nested objects using an iterative approach.r   Tr   r   )r   collectionsdequepopr:   rG   bytesintr   sys	getsizeofidadd	TypeErrorr   r   nbytesr   memory_usagern   r}   r;   extendr   r/   r   r1   )objseen
total_sizeobjectscurrentsize)r.   rS   r'   r(   get_deep_size  sB   


$z5PandasBlockAccessor.size_bytes.<locals>.get_deep_sizeTFr   )nc                    s    | S r0   r'   )x)r	  r'   r(   <lambda>  s    z0PandasBlockAccessor.size_bytes.<locals>.<lambda>z#Error calculating size for column 'z': )%ray.air.util.tensor_extensions.pandasr   rF   r.   r   r)   r   r  #_PANDAS_SIZE_BYTES_MAX_SAMPLE_COUNTrJ   dtyper   r   r:   r8   rr   r   r   r   
issubdtypenumpy_dtypenumberr   	vectorizern   r   	Exceptionr>   r?   )rD   r   r   r  object_need_checkmax_sample_countr   r  r  sample_sizesampled_datacolumn_memory_samplevectorized_size_calccolumn_memoryrB   total_memory_usager'   )r.   r	  rS   r(   
size_bytes  sH   /
"zPandasBlockAccessor.size_bytesaccc           	      C   sz   |   jdd}|  }|jD ]+}|| }t|j}||v r6d}|}||v r4d||}|d7 }||v s&|}|||< q|S )NFr   rN   z{}_{})r   r   rJ   r}   format)	rD   r  rsr   r@   r   inew_namer'   r'   r(   _zip  s   


zPandasBlockAccessor._zipc                   C   s   t  S r0   )r   r'   r'   r'   r(   builder  s   zPandasBlockAccessor.builderc                   C   s   t  S r0   )r   r   r'   r'   r'   r(   r     s   z PandasBlockAccessor._empty_table	n_samplessort_keyr   c                 C   s   | j |  j|ddS )NTr   )r   get_columnsr   )rD   r&  r'  r'   r'   r(   _sample  rj   zPandasBlockAccessor._samplec                 C   sP   |  sJ d|   d| jjd dkr|  S | \}}| jj||dS )Nz'Sorting columns couldn't be empty (got )r   by	ascending)r(  r   rO   r   to_pandas_sort_argssort_values)rD   r'  rJ   r-  r'   r'   r(   sort  s   zPandasBlockAccessor.sort
boundariesc                    sZ     |}|jd dkr fddtt|d D S t|dkr$|gS t|||S )Nr   c                    s   g | ]}   qS r'   )r   )r3   _rP   r'   r(   
<listcomp>1  s    z:PandasBlockAccessor.sort_and_partition.<locals>.<listcomp>rN   )r0  rO   ranger8   r   	for_block_find_partitions_sorted)rD   r1  r'  r   r'   rP   r(   sort_and_partition)  s   

z&PandasBlockAccessor.sort_and_partitionblocksr    c                 C   s   t  }t }dd | D } t| dkrt }nt| tj	} |j
| dd}| \}}|j||d}ddlm} ||j|| dfS )	Nc                 S   s   g | ]}|j d  d kr|qS r   )rO   )r3   br'   r'   r(   r3  ?  s    z;PandasBlockAccessor.merge_sorted_blocks.<locals>.<listcomp>r   Tr   r+  r   )stats)r)   r   r%  r8   r   r   r   normalize_block_typesr   r   r   r.  r/  ray.data.blockr    
from_blockbuild)r8  r'  rS   r:  retrJ   r-  r    r'   r'   r(   merge_sorted_blocks9  s   
z'PandasBlockAccessor.merge_sorted_blocksc                 C   r   r0   r   rP   r'   r'   r(   r   L  r   zPandasBlockAccessor.block_typepublic_row_formatc                 c   sB    t |  D ]}| |}|rt|tr| V  q|V  qd S r0   )r4  r   _get_rowr:   r   rW   )rD   rA  r"  r   r'   r'   r(   	iter_rowsO  s   
zPandasBlockAccessor.iter_rows)r   r   r   r   r0   )r,   r   )r'  r   )2rX   rY   rZ   r*   ROW_TYPEra   r   rG   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r1   r   r   r  r   r$  r   r%  r   r)  r0  r!   r7  r	   r@  r   r   r   r   rC  r   r'   r'   rb   r(   r     sf    




!c


r   )>r   loggingr   typingr   r   r   r   r   r   r   r	   r
   r   numpyr   r%   rS   pandas.api.typesr   r   r   ray.air.constantsr   $ray.air.util.tensor_extensions.utilsr    ray.data._internal.numpy_supportr   ray.data._internal.rowr   ray.data._internal.table_blockr   r   ray.data._internal.utilr   r<  r   r   r   r   r   r   r   ray.data.contextr   r   2ray.data._internal.planner.exchange.sort_task_specr   r    r!   r  	getLoggerrX   r>   r$   r)   r*   r\   r   
namedtupler   r   r'   r'   r'   r(   <module>   s<    0$	
	@W7