o
    $iT                     @   s  d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlZd dlmZ d dlmZmZmZmZmZmZmZmZmZmZ d dlmZ erZd dlm Z  d dl!m"Z"m#Z# e $e%Z&ed	d
eG dd dZ'eG dd dZ(de)dee fddZ*de)dee fddZ+de)dee fddZ,deed ee)gee f f fddZ-de)dddee fddZ.de)dddeed ee)gee f f dee fddZ/		d;d e	d! d"e	ee)  de	eed ee)gee f f  de(fd#d$Z0d%ed&ed'ej"dee)e
eej"f f fd(d)Z1d*ee)e2f d+ej d,ej d-ee de3f
d.d/Z4	0d<d1ed2e	ej" d3e)dej5fd4d5Z6d6ee)ee)e3f f d7e7d+ej d8e8dej9f
d9d:Z:dS )=    N)	dataclass)TYPE_CHECKINGAnyCallableDictListOptionalTupleUnion)convert_to_pyarrow_array)
AggregateFnV2ApproximateQuantileApproximateTopKCountMaxMeanMinMissingValuePercentageStdZeroPercentage)	PublicAPI)SchemaDataTypeTypeCategoryalpha)	stabilityc                   @   s   e Zd ZU dZdZejed< ejed< ejed< e	e
 ed< dejfdd	Zd
ejdejfddZdd Zdejde
dee fddZde
fddZdS )DatasetSummaryzWrapper for dataset summary statistics.

    Provides methods to access computed statistics.

    Attributes:
        dataset_schema: PyArrow schema of the original dataset
    	statistic_stats_matching_column_dtype_stats_mismatching_column_dtypedataset_schemacolumnstablec                 C   s   ddl m} z|| W S  tttjfyf } zHt	d| d i }|j
jD ]-}||}z| ||< W q) tttjfyV   tjt|t d}| ||< Y q)w t|W  Y d}~S d}~ww )zSafely convert a PyArrow table to pandas, handling problematic extension types.

        Args:
            table: PyArrow table to convert

        Returns:
            pandas DataFrame with converted data
        r   )BlockAccessorz$Direct conversion to pandas failed (z)), attempting column-by-column conversiontypeN)ray.data.blockr$   	for_block	to_pandas	TypeError
ValueErrorpaArrowInvalidloggerwarningschemanamescolumnnullslennullpd	DataFrame)selfr#   r$   eresult_datacol_namecolnull_col r>   K/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/stats.py_safe_convert_table:   s&   	

z"DatasetSummary._safe_convert_tabledfreturnc                 C   s    | j |jv r|| j S t S )zSet the statistic column as index if it exists, else return empty DataFrame.

        Args:
            df: DataFrame to set index on

        Returns:
            DataFrame with statistic column as index, or empty DataFrame if column missing
        )STATISTIC_COLUMNr"   	set_indexr6   r7   )r8   rA   r>   r>   r?   _set_statistic_indexY   s   	z#DatasetSummary._set_statistic_indexc                 C   sb   |  | | j}|  | | j}|jr |jr tj| jgdS ||}|	 
| jj	ddS )a  Convert summary to a single pandas DataFrame.

        Combines statistics from both schema-matching and schema-changing tables.

        Note: Some PyArrow extension types (like TensorExtensionType) may fail to convert
        to pandas when all values in a column are None. In such cases, this method
        attempts to convert column-by-column, casting problematic columns to null type.

        Returns:
            DataFrame with all statistics, where rows are unique statistics from both tables
        r"   Tdrop)rE   r@   r   r    emptyr6   r7   rC   combine_firstreset_indexsort_values)r8   df_matchingdf_changingresultr>   r>   r?   r)   f   s   


zDatasetSummary.to_pandasr2   c                 C   s4   ||j jvrdS | || j|g }|j|didS )a  Extract a column from a PyArrow table if it exists.

        Args:
            table: PyArrow table to extract from
            column: Column name to extract

        Returns:
            DataFrame with 'statistic' and 'value' columns, or None if column doesn't exist
        NvaluerF   )r0   r1   r@   rC   rename)r8   r#   r2   rA   r>   r>   r?   _extract_column_from_table   s   z)DatasetSummary._extract_column_from_tablec                    sz    fddj jfD }|std  dtj|dd}dd }|jjd	d
d | 	jjdd}|S )zGet all statistics for a specific column, merging from both tables.

        Args:
            column: Column name to get statistics for

        Returns:
            DataFrame with all statistics for the column
        c                    s$   g | ]} |  d urqS )N)rR   ).0r#   r2   rA   r8   r>   r?   
<listcomp>   s
    z3DatasetSummary.get_column_stats.<locals>.<listcomp>zColumn 'z' not found in summary tablesT)ignore_indexc                 S   s"   |   }t|dkr|jd S d S )Nr   )dropnar4   iloc)seriesnon_nullr>   r>   r?   first_non_null   s   z7DatasetSummary.get_column_stats.<locals>.first_non_nullF)sortrP   rG   )
r   r    r+   r6   concatgroupbyrC   applyrK   rL   )r8   r2   dfscombinedr[   rO   r>   rT   r?   get_column_stats   s"   		zDatasetSummary.get_column_statsN)__name__
__module____qualname____doc__rC   r,   Table__annotations__r   liststrr@   r6   r7   rE   r)   r   dictrR   rb   r>   r>   r>   r?   r      s$   
 


 
r   c                   @   s.   e Zd ZU dZeeef ed< ee ed< dS )_DtypeAggregatorszContainer for columns and their aggregators.

    Attributes:
        column_to_dtype: Mapping from column name to dtype string representation
        aggregators: List of all aggregators to apply
    column_to_dtypeaggregatorsN)	rc   rd   re   rf   r   rj   rh   r   r   r>   r>   r>   r?   rl      s   
 rl   r2   rB   c                 C   sV   t | ddt| ddt| ddt| ddt| dddt| dgdt| dt| ddgS )	a  Generate default metrics for numerical columns.

    This function returns a list of aggregators that compute the following metrics:
    - count
    - mean
    - min
    - max
    - std
    - approximate_quantile (median)
    - missing_value_percentage
    - zero_percentage

    Args:
        column: The name of the numerical column to compute metrics for.

    Returns:
        A list of AggregateFnV2 instances that can be used with Dataset.aggregate()
    Fonignore_nullsTr   )rp   rq   ddofg      ?)rp   	quantilesrp   )r   r   r   r   r   r   r   r   r2   r>   r>   r?   _numerical_aggregators   s   




rv   c                 C   s*   t | ddt| ddt| ddt| dgS )a|  Generate default metrics for temporal columns.

    This function returns a list of aggregators that compute the following metrics:
    - count
    - min
    - max
    - missing_value_percentage

    Args:
        column: The name of the temporal column to compute metrics for.

    Returns:
        A list of AggregateFnV2 instances that can be used with Dataset.aggregate()
    Fro   Trt   )r   r   r   r   ru   r>   r>   r?   _temporal_aggregators   s
   


rw   c                 C   s    t | ddt| dt| ddgS )a  Generate default metrics for all columns.

    This function returns a list of aggregators that compute the following metrics:
    - count
    - missing_value_percentage
    - approximate_top_k (top 10 most frequent values)

    Args:
        column: The name of the column to compute metrics for.

    Returns:
        A list of AggregateFnV2 instances that can be used with Dataset.aggregate()
    Fro   rt   
   )rp   k)r   r   r   ru   r>   r>   r?   _basic_aggregators  s   

rz   c                  C   s   ddl m} m} |  t|  t|  t|  t|  t| 	 t| 
 t|  t|  t|  t|  t|  t|  t|jtiS )a  Get default mapping from Ray Data DataType to aggregator factory functions.

    This function returns factory functions that create aggregators for specific columns.

    Returns:
        Dict mapping DataType or TypeCategory to factory functions that take a column name
        and return a list of aggregators for that column.

    Examples:
        >>> from ray.data.datatype import DataType
        >>> from ray.data.stats import _default_dtype_aggregators
        >>> mapping = _default_dtype_aggregators()
        >>> factory = mapping.get(DataType.int32())
        >>> aggs = factory("my_column")  # Creates aggregators for "my_column"
    r   r   )ray.data.datatyper   r   int8rv   int16int32int64uint8uint16uint32uint64float32float64boolstringrz   binaryTEMPORALrw   r   r>   r>   r?   _default_dtype_aggregators  s    r   dtyper   c                 C   s   z*|  rtj|jrt| ddgW S | rt| W S | r&t	| W S t
| W S  tyO } ztd|  d| d| d t
| W  Y d}~S d}~ww )a!  Get aggregators using heuristic-based type detection.

    This is a fallback when no explicit mapping is found for the dtype.

    Args:
        column: Column name
        dtype: Ray Data DataType for the column

    Returns:
        List of aggregators suitable for the column type
    Fro   z,Could not determine aggregators for column 'z' with dtype z: z. Using basic aggregators.N)is_arrow_typer,   typesis_null_physical_dtyper   is_numerical_typerv   is_temporal_typerw   rz   	Exceptionr.   r/   )r2   r   r9   r>   r>   r?   _get_fallback_aggregatorsD  s   


r   dtype_agg_mappingc                 C   sn   ddl m}m} | D ]%\}}t||r||kr||   S t||tfr1||r1||   S qt| |S )a  Get aggregators for a specific column based on its DataType.

    Attempts to match the dtype against the provided mapping first, then
    falls back to heuristic-based selection if no match is found.

    Args:
        column: Column name
        dtype: Ray Data DataType for the column
        dtype_agg_mapping: Mapping from DataType to factory functions

    Returns:
        List of aggregators with the column name properly set
    r   r   )r{   r   r   items
isinstancerj   is_ofr   )r2   r   r   r   r   mapping_keyfactoryr>   r>   r?   _get_aggregators_for_dtyped  s   
r   r0   r   r"   c                 C   s
  ddl m} | std|du r| j}t|t| j }|r&td| dt }|rA| }| D ]\}}||vr?|||< q3n|}i }	g }
tt	| j| j
}|D ],}|| }|du s`|tu rjtd| d qR||}t||	|< |
t||| qRt|	|
d	S )
a  Generate aggregators for columns in a dataset based on their DataTypes.

    Args:
        schema: A Ray Schema instance
        columns: List of columns to include. If None, all columns will be included.
        dtype_agg_mapping: Optional user-provided mapping from DataType to aggregator factories.
            Each value should be a callable that takes a column name and returns aggregators.
            This will be merged with the default mapping (user mapping takes precedence).

    Returns:
        _DtypeAggregators containing column-to-dtype mapping and aggregators

    Raises:
        ValueError: If schema is None or if specified columns don't exist in schema
    r   r   z4Dataset must have a schema to determine column typesNzColumns z not found in dataset schemazSkipping field 'z': type is None or unsupported)rm   rn   )r{   r   r+   r1   setr   copyr   rk   zipr   objectr.   r/   
from_arrowrj   extendr   rl   )r0   r"   r   r   missing_colsdefaultsfinal_mappingry   vrm   all_aggsname_to_typenamepa_type	ray_dtyper>   r>   r?   _dtype_aggregators_for_dataset  s@   
r   aggrP   agg_typec                    s   ddl m} |   tj|p|| }t|t	s"|du rK|rK|| r,|j
n||du r3ndd tt|D } fddt||D S  ||fiS )a  Format aggregation result into stat entries.

    Takes the raw aggregation result and formats it into one or more stat
    entries. For scalar results, returns a single entry. For list results,
    expands into multiple indexed entries.

    Args:
        agg: The aggregator instance
        value: The aggregation result value
        agg_type: PyArrow type of the aggregation result

    Returns:
        Dictionary mapping stat names to (value, type) tuples
    r   r   Nc                 S   s   g | ]}t |qS r>   )rj   )rS   idxr>   r>   r?   rU         z!_format_stats.<locals>.<listcomp>c                    s&   i | ]\}}  d | d|fqS )[]r>   )rS   labellist_valagg_namescalar_typer>   r?   
<dictcomp>  s    z!_format_stats.<locals>.<dictcomp>)r{   r   get_agg_namer,   r   is_listr   is_list_typer   ri   
value_typeranger4   r   )r   rP   r   r   r   labelsr>   r   r?   _format_stats  s    r   
agg_resultoriginal_schema
agg_schemarn   c                 C   s   i }i }t  }dd |D }|  D ]Q\}}	d|vs|ds q||}
|
s(q|
 }|s/q||j}||j}t|
|	|}| D ]\}\}}||krQ|n|}||f||i |< qE|	| q|||fS )a  Parse aggregation results into schema-matching and schema-changing stats.

    Args:
        agg_result: Dictionary of aggregation results with keys like "count(col)"
        original_schema: Original dataset schema
        agg_schema: Schema of aggregation results
        aggregators: List of aggregators used to generate the results

    Returns:
        Tuple of (schema_matching_stats, schema_changing_stats, column_names)
    c                 S   s   i | ]}|j |qS r>   )r   )rS   r   r>   r>   r?   r     r   z(_parse_summary_stats.<locals>.<dictcomp>())
r   r   endswithgetget_target_columnfieldr&   r   
setdefaultadd)r   r   r   rn   schema_matchingschema_changingr"   
agg_lookupkeyrP   r   r;   r   original_typeformatted_stats	stat_name
stat_value	stat_type
stats_dictr>   r>   r?   _parse_summary_stats  s,   

r    col_datacol_typer;   c              	   C   sB   |durzt j| |dW S  t jt jfy   Y nw t| |pdS )a{  Create a PyArrow array with fallback strategies.

    Uses convert_to_pyarrow_array from arrow_block.py for type inference and
    error handling when no specific type is provided.

    Args:
        col_data: List of column values
        col_type: Optional PyArrow type to use
        col_name: Column name for error messages (optional)

    Returns:
        PyArrow array
    Nr%   r2   )r,   arrayArrowTypeErrorr-   r   )r   r   r;   r>   r>   r?   _create_pyarrow_array/  s   r   r   all_columnspreserve_typesc                 C   s   | st i S t|  }tj|i}t|D ]B}g }d}|D ]!}	|| |	 v r:| |	 | \}
}||
 |du r9|}q|d q|rN||jv rN||j	}n|}t
|||||< qt |S )as  Build a PyArrow table from parsed statistics.

    Args:
        stats_dict: Nested dict of {stat_name: {col_name: (value, type)}}
        all_columns: Set of all column names across both tables
        original_schema: Original dataset schema
        preserve_types: If True, use original schema types for columns

    Returns:
        PyArrow table with statistics
    N)r,   r#   sortedkeysr   rC   appendr1   r   r&   r   )r   r   r   r   
stat_names
table_datar;   r   
first_typer   rP   r   r   r>   r>   r?   _build_summary_tableK  s(   



r   )NN)Nr   );loggingdataclassesr   typingr   r   r   r   r   r   r	   r
   pandasr6   pyarrowr,   $ray.air.util.tensor_extensions.arrowr   ray.data.aggregater   r   r   r   r   r   r   r   r   r   ray.util.annotationsr   ray.data.datasetr   r{   r   r   	getLoggerrc   r.   r   rl   rj   rv   rw   rz   r   r   r   r   r   anytupler   Arrayr   r   r   rg   r   r>   r>   r>   r?   <module>   s    (0
  , 
#

C
0

8
