o
    `۷i+                     @   s*  d dl Z d dlZd dlmZmZmZmZ d dlZd dlm	Z	 d dl
mZ d dlmZmZmZmZmZ d dlmZ eeZdedee fd	d
Zdee dee dee fddZdee dee deee  fddZdedededee deeeeee f ef df f
ddZdee dedee fddZdeeee ef  deee  dedeeee ef  fddZdeeee ef  d ee deeeee   eee  f fd!d"Z	#	d*deeee ef  d$ee ded%ee deeeee   eee  f f
d&d'Z dedefd(d)Z!dS )+    N)IterableListTupleUnion)trace_deallocation)cached_remote_fn)BlockBlockAccessorBlockExecStatsBlockMetadataBlockPartition)	ObjectRefblocks_with_metadatareturnc                 C   sP   t t}g }| D ]\}}|jdu rt||}||_n|j}|| q|S )z@Calculate the number of rows for a list of blocks with metadata.N)r   _get_num_rowsnum_rowsraygetremoteappend)r   get_num_rows
block_rowsblockmetadatar    r   N/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/_internal/split.py_calculate_blocks_rows   s   
r   num_rows_per_blocksplit_indicesc                    s   t |   fdd|D S )zTGenerate valid split indices by apply min(index, total_num_rows)
    to every index.c                    s   g | ]}t | qS r   )min).0index
total_rowsr   r   
<listcomp>,   s    z+_generate_valid_indices.<locals>.<listcomp>)sum)r   r   r   r"   r   _generate_valid_indices%   s   r&   c           	      C   s   g }d}g }d}d}|t |k rA|| }| | }|| |kr*|||  |d7 }q
|| g }|| | 7 }|d7 }|t |k st |t | k rX|| g }t |t | k sI|S )a:  Given num rows per block and valid split indices, generate per block split indices.

    Args:
        num_rows_per_block: num of rows per block.
        split_indices: The (global) indices at which to split the blocks.
    Returns:
        Per block split indices indicates each input block's split point(s).
    r      )lenr   )	r   r   per_block_split_indicescurrent_input_block_idcurrent_block_split_indicescurrent_block_global_offsetcurrent_index_idsplit_indexcurrent_block_rowr   r   r   !_generate_per_block_split_indices/   s0   

r0   block_idr   meta.c                 C   s   g }g }t |}d}||j |D ]7}td| d|  t }	|||}
t |
}t	| |
 |j|	 d}|| ||
 |}q| |fg}|| t|S )aP  Split the provided block at the given indices.

    Args:
        block_id: the id of this block in the block list.
        block: block to be split.
        meta: metadata of the block, we expect meta.num is valid.
        split_indices: the indices where the block should be split.
    Returns:
        returns block_id, split blocks metadata, and a list of blocks
        in the following form. We return blocks in this way
        so that the owner of blocks could be the caller(driver)
        instead of worker itself.
        Tuple(block_id, split_blocks_meta), block0, block1 ...
    r   zslicing block :)r   
size_bytesinput_files
exec_stats)r	   	for_blockr   r   loggerdebugr
   builderslicer   r4   r5   buildextendtuple)r1   r   r2   r   
split_metasplit_blocksblock_accessor
prev_indexr!   statssplit_blockaccessor_metaresultsr   r   r   _split_single_block[   s,   





rH   block_split_indicesr   c                 C   s@   d}g }| D ]}|dks||krq||krq| | |}q|S )zdrop split indices that creates empty block split. This could happen when there
    are duplicated indices, or index equal to 0 (start of the block) or num_block_rows
    (end of the block).
    r   )r   )rI   r   rB   optimized_indicesr!   r   r   r   _drop_empty_block_split   s   
rL   r)   owned_by_consumerc                 C   s>  t t}dgt|  }g }g }g }t|D ]F\}}	| | \}
}|j}t|	|}	t|	dkr5|
|fg||< q|jddt|	 d||
||	}||d  ||dd  ||
 q|rt	
|}t||D ]\\}}}t|t|ksxJ t||||< qh|r|D ]}t|d qn|D ]	}t|ddd	 qtj|S )
z5Split all the input blocks based on the split indicesNr   SPREAD   )scheduling_strategynum_returnsr'   zsplit._split_all_blocksF)free)r   rH   r(   	enumerater   rL   optionsr   r   r   r   zipr   	itertoolschainfrom_iterable)r   r)   rM   split_single_blockall_blocks_split_results per_block_split_metadata_futuresper_block_split_block_refsblocks_splittedr1   rI   	block_refr2   	block_rowobject_refsper_block_split_metadata
block_refsbr   r   r   _split_all_blocks   sL   

rd   rZ   global_split_sizesc           
      C   s   g }g }g }g }d}d}|t |k rP||| kr5||| ks J || || g }g }d}|d7 }nt| \}}	|| ||	 ||	j7 }|t |k s||fS )z<Reassemble per block's split result into final split result.r   r'   )r(   r   nextr   )
rZ   re   result_blocksresult_metascurrent_blockscurrent_metacurrent_split_sizecurrent_split_idr^   r2   r   r   r   _generate_global_split_results   s*   





rm   Tindicesr   c                    s   t | } t| dkrg gt|d  g gt|d  fS |du r$t| }t||}t||}t| ||}dg| t|g   fddtdt D }t||S )a!  Split blocks at the provided indices.

    Args:
        blocks_with_metadata: Block futures to split, including the associated metadata.
        indices: The (global) indices at which to split the blocks.
        owned_by_consumer: Whether the provided blocks are owned by the consumer.
        block_rows: The number of rows for each block, in case it has already been
            computed.

    Returns:
        The block split futures and their metadata. If an index split is empty, the
        corresponding block split will be empty .
    r   r'   Nc                    s    g | ]} |  |d    qS )r'   r   )r    ihelperr   r   r$   !  s     z%_split_at_indices.<locals>.<listcomp>)	listr(   r   r&   r0   rd   r%   rangerm   )r   rn   rM   r   valid_indicesr)   rZ   split_sizesr   rp   r   _split_at_indices   s    $
	
rv   c                 C   s   t |  S )z7Get the number of rows contained in the provided block.)r	   r7   r   )r   r   r   r   r   &  s   r   )TN)"rV   loggingtypingr   r   r   r   r   !ray.data._internal.memory_tracingr   ray.data._internal.remote_fnr   ray.data.blockr   r	   r
   r   r   	ray.typesr   	getLogger__name__r8   intr   r&   r0   rH   rL   boolrd   rm   rv   r   r   r   r   r   <module>   s    





,
.

<
#
0