o
    bi^                     @   s   d dl mZmZ d dlmZ d dlmZmZ d dlm	Z	 d dl
mZmZmZ d dlmZ dee ded	ee fd
dZdedee ded	eeeef fddZdee deee  ded	eee ee ef fddZdedee d	ee fddZdS )    )ListTuple)	RefBundle)_calculate_blocks_rows_split_at_indices)unify_ref_bundles_schema)BlockBlockMetadataBlockPartition)	ObjectRefper_split_bundlesowned_by_consumerreturnc                 C   s&  t | dkr| S dd | D }dd |D }tdd |D }|t | }t|||\}}}t||D ]\}	}
tdd |	D }||ksFJ ||
 |ksNJ q3t| }t|||d}t||}t|D ]\}}|| | tdd || D }||ksJ qcg }|D ]}|	t|||d q|S )	zEqualize split ref bundles into equal number of rows.

    Args:
        per_split_bundles: ref bundles to equalize.
    Returns:
        the equalized ref bundles.
    r   c                 S   s   g | ]}|j qS  )blocks).0bundler   r   O/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/equalize.py
<listcomp>   s    z_equalize.<locals>.<listcomp>c                 S      g | ]}t |qS r   )r   )r   splitr   r   r   r      s    c                 S   r   r   )sum)r   blocks_rowsr   r   r   r      s    c                 S      g | ]\}}|j qS r   num_rowsr   _metar   r   r   r   &       )owns_blocksschemac                 S   r   r   r   r   r   r   r   r   5   r   )
lenr   _shave_all_splitszipr   r   _split_leftovers	enumerateextendappend)r   r   per_split_blocks_with_metadataper_split_num_rows
total_rowstarget_split_sizeshaved_splitsper_split_needed_rows	leftoversshaved_splitsplit_needed_rownum_shaved_rowsr!   leftover_bundleleftover_splitsileftover_splitequalized_ref_bundlesr   r   r   r   	_equalize
   s8   

r8   r   num_rows_per_blocktarget_sizec           	      C   s\   g }g }d}t | |D ]\}}|| |kr|| ||7 }q|| q|| }|||fS )a  Shave a block list to the target size.

    Args:
        split: the block list to shave.
        num_rows_per_block: num rows for each block in the list.
        target_size: the upper bound target size of the shaved list.
    Returns:
        A tuple of:
            - shaved block list.
            - num of rows needed for the block list to meet the target size.
            - leftover blocks.

    r   )r$   r(   )	r   r9   r:   shavedr/   shaved_rowsblock_with_meta
block_rowsnum_rows_neededr   r   r   _shave_one_splitA   s   


r@   input_splitsr*   c                 C   sZ   g }g }g }t | |D ]\}}t|||\}}	}
|| ||	 ||
 q|||fS )a  Shave all block list to the target size.

    Args:
        input_splits: all block list to shave.
        input_splits: num rows (per block) for each block list.
        target_size: the upper bound target size of the shaved lists.
    Returns:
        A tuple of:
            - all shaved block list.
            - num of rows needed for the block list to meet the target size.
            - leftover blocks.
    )r$   r@   r(   r'   )rA   r*   r:   r-   r.   r/   r   r9   r;   r?   
_leftoversr   r   r   r#   _   s   



r#   r/   r.   c                 C   sb   t |}g }d}t|D ]\}}|||  || }qt| j|| j}dd t| D d| S )z0Split leftover blocks by the num of rows needed.r   c                 S   s   g | ]\}}t t||qS r   )listr$   )r   
block_refsr   r   r   r   r      s    z$_split_leftovers.<locals>.<listcomp>N)r"   r&   r(   r   r   r    r$   )r/   r.   
num_splitssplit_indicesprevr5   r?   split_resultr   r   r   r%      s   
r%   N)typingr   r   'ray.data._internal.execution.interfacesr   ray.data._internal.splitr   r   ray.data._internal.utilr   ray.data.blockr   r	   r
   	ray.typesr   boolr8   intr@   r#   r%   r   r   r   r   <module>   sL    
7


 