o
    `۷i!                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ d dlmZ er<d dlmZ eded	e	e fd
dZededed	efddZG dd dZG dd deZG dd deZG dd dZde	e fddZdS )    N)deque)TYPE_CHECKINGAnyCallableDequeDictListOptionalUnion)BatchFormat)AggregateFnV2)DeveloperAPI)Datasetvaluereturnc                 C   s
   |  dS )z*Tokenize a string using a split on spaces. )split)r    r   R/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/preprocessors/utils.pysimple_split_tokenizer   s   
r   num_featuresc                 C   s,   t |  }t|}t| d}|| S )z6Deterministically hash a value into the integer space.   )strencodehashlibsha256int	hexdigest)r   r   encoded_valuehashed_valuehashed_value_intr   r   r   simple_hash   s   
r!   c                   @   s4   e Zd ZdZdd ddeeef defddZd	S )
BaseStatSpeczEEncapsulates a statistical computation with optional post-processing.c                 C      | S Nr   xr   r   r   <lambda>$       zBaseStatSpec.<lambda>post_process_fnstat_fnr*   c                C   s   || _ || _d S r$   r+   r*   )selfr+   r*   r   r   r   __init__    s   
zBaseStatSpec.__init__N)__name__
__module____qualname____doc__r
   r   r   r.   r   r   r   r   r"      s    
r"   c                
       sZ   e Zd ZdZdd ddddeeeegef f dedee d	ee	 f fd
dZ
  ZS )AggregateStatSpecz5Represents an AggregateFnV2 spec for a single column.c                 C   r#   r$   r   r%   r   r   r   r'   1   r(   zAggregateStatSpec.<lambda>N)r*   columnbatch_formataggregator_fnr*   r4   r5   c                   s    t  j||d || _|| _d S Nr,   )superr.   r4   r5   )r-   r6   r*   r4   r5   	__class__r   r   r.   -   s   
zAggregateStatSpec.__init__)r/   r0   r1   r2   r
   r   r   r   r	   r   r.   __classcell__r   r   r9   r   r3   *   s    r3   c                       s`   e Zd ZdZdd ddedeeegef  deeegef  ded	ee f
 fd
dZ  Z	S )CallableStatSpeczPRepresents a user-defined stat function that operates outside Dataset.aggregate.c                 C   r#   r$   r   r%   r   r   r   r'   F   r(   zCallableStatSpec.<lambda>r)   r+   stat_key_fnpost_key_fnr*   columnsc                   s&   t  j||d || _|| _|| _d S r7   )r8   r.   r?   r=   r>   r-   r+   r=   r>   r*   r?   r9   r   r   r.   @   s   	
zCallableStatSpec.__init__)
r/   r0   r1   r2   r   r	   r   r   r.   r;   r   r   r9   r   r<   =   s    r<   c                   @   s  e Zd ZdZdd Zdd Zdd dd	d
eegef dede	e de
e ddf
ddZddd ddeg ef deegef de
eegef  dede	e ddfddZdddeeef fddZde	e fddZde	e fddZde	e fd d!Zd"d# Zd$d% ZdS )&StatComputationPlana\  
    Encapsulates a set of aggregators (AggregateFnV2) and legacy stat functions
    to compute statistics over a Ray dataset.

    Supports two types of aggregations:
    1. AggregateFnV2-based aggregators, which are batch-executed using `Dataset.aggregate(...)`.
    2. Callable-based stat functions, executed sequentially (legacy use case).
    c                 C   s   t  | _d S r$   )r   _aggregatorsr-   r   r   r   r.   \   s   zStatComputationPlan.__init__c                 C   s   | j   d S r$   )rB   clearrC   r   r   r   reset_   s   zStatComputationPlan.resetc                 C   r#   r$   r   r%   r   r   r   r'   f   r(   zStatComputationPlan.<lambda>N)r*   r5   r6   r*   r?   r5   r   c             	   C   s.   |D ]}||}| j t||||d qdS )a  
        Registers an AggregateFnV2 factory for one or more columns.

        Args:
            aggregator_fn: A callable (typically a lambda or class) that accepts a column name and returns an instance of AggregateFnV2.
                          The aggregator should set its name using alias_name parameter to control the output key.
            post_process_fn: Function to post-process the aggregated result.
            columns: List of column names to aggregate.
            batch_format: The batch format for aggregation results. If ARROW, results
                         are kept in Arrow format for post_process_fn. Otherwise,
                         results are converted to Python/pandas format.
        )r6   r*   r4   r5   N)rB   appendr3   )r-   r6   r*   r?   r5   r4   agg_instancer   r   r   add_aggregatorb   s   z"StatComputationPlan.add_aggregatorc                 C   r#   r$   r   r%   r   r   r   r'      r(   )r>   r*   r+   r=   r>   c             	   C   s"   | j t|||||p|d dS )ao  
        Registers a custom stat function to be run sequentially.

        This supports legacy use cases where arbitrary callables are needed
        and cannot be run via Dataset.aggregate().

        Args:
            stat_fn: A zero-argument callable that returns the stat.
            stat_key_fn: A callable that takes a column name and returns the key for the stat.
            post_key_fn: Optional; a callable to post-process the key. If not provided, stat_key_fn is used.
            post_process_fn: Function to post-process the result.
            columns: List of column names to compute the stat for.
        )r+   r*   r?   r=   r>   N)rB   rF   r<   r@   r   r   r   add_callable_stat   s   z%StatComputationPlan.add_callable_statdatasetr   c                 C   s   i }|   }|rJ|dj| }| }|stdt|d }|  D ]$}|jj	}|
|d }	|jtjkr@||	||< q%||	 ||< q%|  D ]!}||j}
|jD ]}||}||}||
| ||< qYqN|S )aq  
        Executes all registered aggregators and stat functions.

        AggregateFnV2-based aggregators are batched and executed via Dataset.aggregate().
        Callable-based stat functions are run sequentially.

        Args:
            dataset: The Ray Dataset to compute statistics on.

        Returns:
            A dictionary of computed statistics.
        NzAggregation returned no resultsr   )_get_aggregate_fn_listgroupby	aggregateto_arrow_refs
ValueErrorrayget_get_aggregate_specsr+   namer4   r5   r   ARROWr*   as_py_get_custom_stat_fn_specsr=   r?   r>   )r-   rJ   statsaggregatorsagg_ds
arrow_refsarrow_tablespecstat_key
agg_resultresultcolpost_keyr   r   r   compute   s,   


zStatComputationPlan.computec                 C      dd | j D S )Nc                 S   s   g | ]
}t |tr|jqS r   )
isinstancer3   r+   .0r\   r   r   r   
<listcomp>   s    z>StatComputationPlan._get_aggregate_fn_list.<locals>.<listcomp>rB   rC   r   r   r   rK      s   z*StatComputationPlan._get_aggregate_fn_listc                 C   rc   )Nc                 S      g | ]	}t |tr|qS r   )rd   r3   re   r   r   r   rg      
    
z<StatComputationPlan._get_aggregate_specs.<locals>.<listcomp>rh   rC   r   r   r   rR         z(StatComputationPlan._get_aggregate_specsc                 C   rc   )Nc                 S   ri   r   )rd   r<   re   r   r   r   rg      rj   zAStatComputationPlan._get_custom_stat_fn_specs.<locals>.<listcomp>rh   rC   r   r   r   rV      rk   z-StatComputationPlan._get_custom_stat_fn_specsc                 C   s   t |  dkS )Nr   )lenrV   rC   r   r   r   has_custom_stat_fn   s   z&StatComputationPlan.has_custom_stat_fnc                 C   s   t |  S )z4
        Iterates over all AggregatorSpecs.
        )iterrR   rC   r   r   r   __iter__   s   zStatComputationPlan.__iter__)r/   r0   r1   r2   r.   rE   r   r   r   r   r	   r   rH   r   rI   r   rb   rK   r3   rR   r<   rV   rm   ro   r   r   r   r   rA   R   sL    	
$

 -rA   	callbacksc                    s    fdd}|S )z
    Wraps a base post-processing function with a sequence of callback functions.
    Useful when multiple post-processing steps need to be applied in order.
    c                    s    | }D ]}||}q|S r$   r   )r_   	processedcbbase_fnrp   r   r   wrapper   s   
z$make_post_processor.<locals>.wrapperr   )rt   rp   ru   r   rs   r   make_post_processor   s   rv   )r   collectionsr   typingr   r   r   r   r   r   r	   r
   rP   "ray.air.util.data_batch_conversionr   ray.data.aggregater   ray.util.annotationsr   ray.data.datasetr   r   r   objectr   r!   r"   r3   r<   rA   rv   r   r   r   r   <module>   s&    ( 