o
    bi{                     @   s   d dl Z d dlmZmZmZmZ d dlmZmZm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ 			ddedee dee deeeef  dee defddZdS )    N)AnyDictListOptional)AllToAllTransformFn	RefBundleTaskContext)AllToAllTransformFnResult)MapTransformer)PullBasedShuffleTaskScheduler)PushBasedShuffleTaskScheduler)ShuffleTaskSpec)DataContextShuffleStrategy)	INT32_MAXdata_contextseednum_outputsray_remote_args,_debug_limit_shuffle_execution_to_num_blocksreturnc                    sB   durnt  t dtt dtdtf fdd}|S )z=Generate function to randomly shuffle each records of blocks.Nrefsctxr   c                    s   t dd | D } jd }r!td  fdd} jt jd|d}jtj	kr=d ur8t
dt|}nt|}|j| pG| d	S )
Nc                 s   s    | ]}t |jV  qd S N)lenblocks).0r r   ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/planner/random_shuffle.py	<genexpr>)   s    z9generate_random_shuffle_fn.<locals>.fn.<locals>.<genexpr>infc                    s    |  S r   )apply_transform)r   r   map_transformerr   r   upstream_map_fn;   s   z?generate_random_shuffle_fn.<locals>.fn.<locals>.upstream_map_fnT)random_shufflerandom_seedr%   z:Push-based shuffle doesn't support setting num_blocks yet.)task_ctxmap_ray_remote_argsreduce_ray_remote_args$_debug_limit_execution_to_num_blocks)sumupstream_map_transformerset_target_max_block_sizefloatupstream_map_ray_remote_argsr   target_max_block_sizeshuffle_strategyr   SORT_SHUFFLE_PUSH_BASEDNotImplementedErrorr   r   execute)r   r   num_input_blocksr%   shuffle_spec	schedulerr   r   r   r   r   r#   r   fn%   s8   
z&generate_random_shuffle_fn.<locals>.fn)timetime_nsr   r   r   r   r	   )r   r   r   r   r   r:   r   r9   r   generate_random_shuffle_fn   s   8r=   )NNN)r;   typingr   r   r   r   'ray.data._internal.execution.interfacesr   r   r   4ray.data._internal.execution.interfaces.transform_fnr	   6ray.data._internal.execution.operators.map_transformerr
   Eray.data._internal.planner.exchange.pull_based_shuffle_task_schedulerr   Eray.data._internal.planner.exchange.push_based_shuffle_task_schedulerr   5ray.data._internal.planner.exchange.shuffle_task_specr   ray.data.contextr   r   ray.util.commonr   intstrr=   r   r   r   r   <module>   s4    