o
    $i                     @   s   d dl mZmZmZ d dlmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZmZ 	ddeeeee f  dee dededee defddZdS )    )ListOptionalUnion)AllToAllTransformFn	RefBundleTaskContext)AllToAllTransformFnResult)SortAggregateTaskSpec)PullBasedShuffleTaskScheduler)PushBasedShuffleTaskScheduler)SortKeySortTaskSpec)unify_ref_bundles_schema)AggregateFn)DataContextShuffleStrategyNkeyaggsbatch_formatdata_context,_debug_limit_shuffle_execution_to_num_blocksreturnc                    sT   j tjtjfv sJ tdkrtddtt dtdt	f fdd}|S )z[Generate function to aggregate blocks by the specified key column or key
    function.
    r   z+Aggregate requires at least one aggregationrefsctxr   c                    s   g }g }| D ]}| |j | |j qt|dkr|i fS t| }D ]}|| q%t|}t}d u r>d}	g }
n|}	|jtj	 }t
|||	|}
t|
|d}jtjkrat|}njtjkrlt|}n	tdj d|j| |	| dS )Nr      )
boundariesr   r   r   zInvalid shuffle strategy '')$_debug_limit_execution_to_num_blocks)extend
block_refsmetadatalenr   	_validater   sub_progress_bar_dictr   !SORT_SAMPLE_SUB_PROGRESS_BAR_NAMEsample_boundariesr	   shuffle_strategyr   SORT_SHUFFLE_PUSH_BASEDr   SORT_SHUFFLE_PULL_BASEDr
   
ValueErrorexecute)r   r   blocksr    
ref_bundleunified_schemaagg_fnnum_mapperssort_keynum_outputsr   
sample_baragg_spec	schedulerr   r   r   r   r    a/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/planner/aggregate.pyfn,   sR   

z!generate_aggregate_fn.<locals>.fn)
r&   r   r(   r'   r!   r)   r   r   r   r   )r   r   r   r   r   r8   r6   r5   r7   generate_aggregate_fn   s   

;r9   )N)typingr   r   r   'ray.data._internal.execution.interfacesr   r   r   4ray.data._internal.execution.interfaces.transform_fnr   7ray.data._internal.planner.exchange.aggregate_task_specr	   Eray.data._internal.planner.exchange.pull_based_shuffle_task_schedulerr
   Eray.data._internal.planner.exchange.push_based_shuffle_task_schedulerr   2ray.data._internal.planner.exchange.sort_task_specr   r   ray.data._internal.utilr   ray.data.aggregater   ray.data.contextr   r   strintr9   r6   r6   r6   r7   <module>   s0    