o
    $it                     @   s   d dl mZmZ d dlmZ d dlmZmZ d dlm	Z	m
Z
mZmZ d dlmZ dee dedee fd	d
Zdedee dedeeeef fddZdee deee  dedeee ee ef fddZdedee dee fddZdS )    )ListTuple)	RefBundle)_calculate_blocks_rows_split_at_indices)BlockBlockMetadataBlockPartition_take_first_non_empty_schema)	ObjectRefper_split_bundlesowned_by_consumerreturnc                 C   s0  t | dkr| S dd | D }dd |D }tdd |D }|t | }t|||\}}}t||D ]\}	}
tdd |	D }||ksFJ ||
 |ksNJ q3tdd | D }t|||d	}t||}t|D ]\}}|| | td
d || D }||ksJ qhg }|D ]}|	t|||d	 q|S )zEqualize split ref bundles into equal number of rows.

    Args:
        per_split_bundles: ref bundles to equalize.
    Returns:
        the equalized ref bundles.
    r   c                 S   s   g | ]}|j qS  )blocks.0bundler   r   X/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/equalize.py
<listcomp>   s    z_equalize.<locals>.<listcomp>c                 S      g | ]}t |qS r   )r   )r   splitr   r   r   r      s    c                 S   r   r   )sum)r   blocks_rowsr   r   r   r      s    c                 S      g | ]\}}|j qS r   num_rowsr   _metar   r   r   r   *       c                 s   s    | ]}|j V  qd S )N)schemar   r   r   r   	<genexpr>0   s    z_equalize.<locals>.<genexpr>)owns_blocksr!   c                 S   r   r   r   r   r   r   r   r   9   r    )
lenr   _shave_all_splitszipr
   r   _split_leftovers	enumerateextendappend)r   r   per_split_blocks_with_metadataper_split_num_rows
total_rowstarget_split_sizeshaved_splitsper_split_needed_rows	leftoversshaved_splitsplit_needed_rownum_shaved_rowsr!   leftover_bundleleftover_splitsileftover_splitequalized_ref_bundlesr   r   r   r   	_equalize   s8   

r:   r   num_rows_per_blocktarget_sizec           	      C   s\   g }g }d}t | |D ]\}}|| |kr|| ||7 }q|| q|| }|||fS )a  Shave a block list to the target size.

    Args:
        split: the block list to shave.
        num_rows_per_block: num rows for each block in the list.
        target_size: the upper bound target size of the shaved list.
    Returns:
        A tuple of:
            - shaved block list.
            - num of rows needed for the block list to meet the target size.
            - leftover blocks.

    r   )r&   r*   )	r   r;   r<   shavedr1   shaved_rowsblock_with_meta
block_rowsnum_rows_neededr   r   r   _shave_one_splitE   s   


rB   input_splitsr,   c                 C   sZ   g }g }g }t | |D ]\}}t|||\}}	}
|| ||	 ||
 q|||fS )a  Shave all block list to the target size.

    Args:
        input_splits: all block list to shave.
        input_splits: num rows (per block) for each block list.
        target_size: the upper bound target size of the shaved lists.
    Returns:
        A tuple of:
            - all shaved block list.
            - num of rows needed for the block list to meet the target size.
            - leftover blocks.
    )r&   rB   r*   r)   )rC   r,   r<   r/   r0   r1   r   r;   r=   rA   
_leftoversr   r   r   r%   c   s   



r%   r1   r0   c                 C   sb   t |}g }d}t|D ]\}}|||  || }qt| j|| j}dd t| D d| S )z0Split leftover blocks by the num of rows needed.r   c                 S   s   g | ]\}}t t||qS r   )listr&   )r   
block_refsr   r   r   r   r      s    z$_split_leftovers.<locals>.<listcomp>N)r$   r(   r*   r   r   r#   r&   )r1   r0   
num_splitssplit_indicesprevr7   rA   split_resultr   r   r   r'      s   
r'   N)typingr   r   'ray.data._internal.execution.interfacesr   ray.data._internal.splitr   r   ray.data.blockr   r   r	   r
   	ray.typesr   boolr:   intrB   r%   r'   r   r   r   r   <module>   sJ    
7


 