o
    $iA                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ ededee fd	d
ZedededefddZG dd dZG dd deZG dd deZG dd dZdee fddZdS )    N)deque)AnyCallableDequeDictListOptionalUnion)Dataset)AggregateFnV2)DeveloperAPIvaluereturnc                 C   s
   |  dS )z*Tokenize a string using a split on spaces. )split)r    r   Y/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/preprocessors/utils.pysimple_split_tokenizer
   s   
r   num_featuresc                 C   s,   t |  }t|}t| d}|| S )z6Deterministically hash a value into the integer space.   )strencodehashlibsha1int	hexdigest)r   r   encoded_valuehashed_valuehashed_value_intr   r   r   simple_hash   s   
r   c                	   @   sB   e Zd ZdZdd ddeeef dedeegef fdd	Zd
S )BaseStatSpeczEEncapsulates a statistical computation with optional post-processing.c                 C      | S Nr   xr   r   r   <lambda>        zBaseStatSpec.<lambda>post_process_fnstat_fnr(   post_key_fnc                C   s   || _ || _|| _d S r"   r)   r(   r*   )selfr)   r(   r*   r   r   r   __init__   s   
zBaseStatSpec.__init__N)	__name__
__module____qualname____doc__r	   r   r   r   r-   r   r   r   r   r       s    
r    c                
       s^   e Zd ZdZdd dddeeeegef f dedeegef d	ee f fd
dZ	  Z
S )AggregateStatSpecz5Represents an AggregateFnV2 spec for a single column.c                 C   r!   r"   r   r#   r   r   r   r%   /   r&   zAggregateStatSpec.<lambda>N)r(   columnaggregator_fnr(   r*   r3   c                   s   t  j|||d || _d S Nr+   )superr-   r3   )r,   r4   r(   r*   r3   	__class__r   r   r-   +   s   
zAggregateStatSpec.__init__)r.   r/   r0   r1   r	   r   r   r   r   r-   __classcell__r   r   r7   r   r2   (   s    r2   c                       s`   e Zd ZdZdd ddedeeegef  deeegef  ded	ee f
 fd
dZ  Z	S )CallableStatSpeczPRepresents a user-defined stat function that operates outside Dataset.aggregate.c                 C   r!   r"   r   r#   r   r   r   r%   D   r&   zCallableStatSpec.<lambda>r'   r)   stat_key_fnr*   r(   columnsc                   s"   t  j|||d || _|| _d S r5   )r6   r-   r<   r;   )r,   r)   r;   r*   r(   r<   r7   r   r   r-   >   s
   	
zCallableStatSpec.__init__)
r.   r/   r0   r1   r   r   r   r   r-   r9   r   r   r7   r   r:   ;   s    r:   c                   @   s  e Zd ZdZdd Zdd Zdd dd	d
eegef dede	eegef  de
e ddf
ddZdd dd	deg ef dedeegef de	eegef  de
e ddfddZdedeeef fddZde
e fddZde
e fddZde
e fddZdd  Zd!d" ZdS )#StatComputationPlana\  
    Encapsulates a set of aggregators (AggregateFnV2) and legacy stat functions
    to compute statistics over a Ray dataset.

    Supports two types of aggregations:
    1. AggregateFnV2-based aggregators, which are batch-executed using `Dataset.aggregate(...)`.
    2. Callable-based stat functions, executed sequentially (legacy use case).
    c                 C   s   t  | _d S r"   )r   _aggregatorsr,   r   r   r   r-   X   s   zStatComputationPlan.__init__c                 C   s   | j   d S r"   )r>   clearr?   r   r   r   reset[   s   zStatComputationPlan.resetc                 C   r!   r"   r   r#   r   r   r   r%   b   r&   zStatComputationPlan.<lambda>N)r(   r*   r4   r(   r*   r<   r   c             	   C   s.   |D ]}||}| j t||||d qdS )a  
        Registers an AggregateFnV2 factory for one or more columns.

        Args:
            aggregator_fn: A callable (typically a lambda or class) that accepts a column name and returns an instance of AggregateFnV2.
            post_process_fn: Function to post-process the aggregated result.
            post_key_fn: Optional key generator to use to save aggregation results after post-processing.
            columns: List of column names to aggregate.
        )r4   r(   r*   r3   N)r>   appendr2   )r,   r4   r(   r*   r<   r3   agg_instancer   r   r   add_aggregator^   s   z"StatComputationPlan.add_aggregatorc                 C   r!   r"   r   r#   r   r   r   r%   ~   r&   r)   r;   c             	   C   s"   | j t|||||p|d dS )a  
        Registers a custom stat function to be run sequentially.

        This supports legacy use cases where arbitrary callables are needed
        and cannot be run via Dataset.aggregate().

        Args:
            stat_fn: A zero-argument callable that returns the stat.
            post_process_fn: Function to apply to the result.
            stat_key_fn:
            post_key_fn:
            columns:
        )r)   r(   r<   r;   r*   N)r>   rB   r:   )r,   r)   r(   r;   r*   r<   r   r   r   add_callable_statz   s   z%StatComputationPlan.add_callable_statdatasetc           
      C   s   i }|   }|r.|j| }|  D ]}|jj}|jdur"||jn|}||| ||< q|  D ]!}||j	}|j
D ]}	|	|	}||	}||| ||< q=q2|S )aq  
        Executes all registered aggregators and stat functions.

        AggregateFnV2-based aggregators are batched and executed via Dataset.aggregate().
        Callable-based stat functions are run sequentially.

        Args:
            dataset: The Ray Dataset to compute statistics on.

        Returns:
            A dictionary of computed statistics.
        N)_get_aggregate_fn_list	aggregate_get_aggregate_specsr)   namer*   r3   r(   _get_custom_stat_fn_specsr;   r<   )
r,   rF   statsaggregators
raw_resultspecstat_keypost_keyresultcolr   r   r   compute   s&   




zStatComputationPlan.computec                 C      dd | j D S )Nc                 S   s   g | ]
}t |tr|jqS r   )
isinstancer2   r)   .0rO   r   r   r   
<listcomp>   s    z>StatComputationPlan._get_aggregate_fn_list.<locals>.<listcomp>r>   r?   r   r   r   rG      s   z*StatComputationPlan._get_aggregate_fn_listc                 C   rU   )Nc                 S      g | ]	}t |tr|qS r   )rV   r2   rW   r   r   r   rY      
    
z<StatComputationPlan._get_aggregate_specs.<locals>.<listcomp>rZ   r?   r   r   r   rI         z(StatComputationPlan._get_aggregate_specsc                 C   rU   )Nc                 S   r[   r   )rV   r:   rW   r   r   r   rY      r\   zAStatComputationPlan._get_custom_stat_fn_specs.<locals>.<listcomp>rZ   r?   r   r   r   rK      r]   z-StatComputationPlan._get_custom_stat_fn_specsc                 C   s   t |  dkS )Nr   )lenrK   r?   r   r   r   has_custom_stat_fn   s   z&StatComputationPlan.has_custom_stat_fnc                 C   s   t |  S )z4
        Iterates over all AggregatorSpecs.
        )iterrI   r?   r   r   r   __iter__   s   zStatComputationPlan.__iter__)r.   r/   r0   r1   r-   rA   r   r   r   r   r   rD   r   rE   r
   r   rT   rG   r2   rI   r:   rK   r_   ra   r   r   r   r   r=   N   sL    	
 

 %r=   	callbacksc                    s    fdd}|S )z
    Wraps a base post-processing function with a sequence of callback functions.
    Useful when multiple post-processing steps need to be applied in order.
    c                    s    | }D ]}||}q|S r"   r   )rR   	processedcbbase_fnrb   r   r   wrapper   s   
z$make_post_processor.<locals>.wrapperr   )rf   rb   rg   r   re   r   make_post_processor   s   rh   )r   collectionsr   typingr   r   r   r   r   r   r	   ray.datar
   ray.data.aggregater   ray.util.annotationsr   r   r   objectr   r   r    r2   r:   r=   rh   r   r   r   r   <module>   s     $ 