o
    `۷i                      @   st   d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	m
Z
mZ d dlmZ eG dd dZG d	d
 d
ZdS )    N)	dataclass)AnyOptional)DelegatingBlockBuilder)BlockBlockAccessor	DataBatch)MAX_SAFE_BLOCK_SIZE_FACTORc                   @   st   e Zd ZU dZee ed< dZee ed< dZe	ed< dd Z
e			ddee dee de	ded  fd	d
ZdS )OutputBlockSizeOptionNtarget_max_block_sizetarget_num_rows_per_blockFdisable_block_shapingc                 C   s.   | j d u r| jd u r| jstdd S d S d S )NzOEither `target_max_block_size` or `target_num_rows_per_block` must be specified)r   r   r   
ValueErrorself r   V/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/_internal/output_buffer.py__post_init__   s   

z#OutputBlockSizeOption.__post_init__returnc                 C   s&   |d u r|d u r|sd S t |||dS )N)r   r   r   )r
   )clsr   r   r   r   r   r   of   s   zOutputBlockSizeOption.of)NNF)__name__
__module____qualname__r   r   int__annotations__r   r   boolr   classmethodr   r   r   r   r   r
   
   s$   
 r
   c                   @   s   e Zd ZdZdee fddZdeddfdd	Zd
e	ddfddZ
deddfddZd"ddZdefddZdefddZdee fddZdee fddZdefddZdedefddZdedefddZdefd d!ZdS )#BlockOutputBuffera$  Generates output blocks of a given size or number of rows given a stream of
    inputs.

    This class is used to turn a stream of items / blocks of arbitrary size
    into a stream of blocks of target max block size or
    target max rows per block. The caller should check ``has_next()`` after each
    ``add()`` call, and call ``next()`` to get the next block when ``has_next()``
    returns True.

    When all items have been added, the caller must call ``finalize()`` and
    then check ``has_next()`` one last time.

    Examples:
        >>> from ray.data._internal.output_buffer import BlockOutputBuffer
        >>> udf = ... # doctest: +SKIP
        >>> generator = ... # doctest: +SKIP
        >>> # Yield a stream of output blocks.
        >>> output_block_size_option = OutputBlockSizeOption(target_max_block_size=500 * 1024 * 1024) # doctest: +SKIP
        >>> output = BlockOutputBuffer(output_block_size_option) # doctest: +SKIP
        >>> for item in generator(): # doctest: +SKIP
        ...     output.add(item) # doctest: +SKIP
        ...     if output.has_next(): # doctest: +SKIP
        ...         yield output.next() # doctest: +SKIP
        >>> output.finalize() # doctest: +SKIP
        >>> if output.has_next() # doctest: +SKIP
        ...     yield output.next() # doctest: +SKIP
    output_block_size_optionc                 C   s   || _ t | _d| _d| _d S NF)_output_block_size_optionr   _buffer
_finalized_has_yielded_blocks)r   r   r   r   r   __init__R   s   
zBlockOutputBuffer.__init__itemr   Nc                 C      | j rJ | j| dS )z(Add a single item to this output buffer.N)r#   r"   add)r   r&   r   r   r   r(   X      
zBlockOutputBuffer.addbatchc                 C   r'   )z'Add a data batch to this output buffer.N)r#   r"   	add_batch)r   r*   r   r   r   r+   ]   r)   zBlockOutputBuffer.add_batchblockc                 C   r'   )z'Add a data block to this output buffer.N)r#   r"   	add_blockr   r,   r   r   r   r-   b   r)   zBlockOutputBuffer.add_blockc                 C   s   | j rJ d| _ dS )z.Must be called once all items have been added.TN)r#   r   r   r   r   finalizeg   s   

zBlockOutputBuffer.finalizec                 C   *   | j jrdS |  d uo| j |  kS r    )r!   r   _max_num_rows_per_blockr"   num_rowsr   r   r   r   _exceeded_buffer_row_limitl   
   z,BlockOutputBuffer._exceeded_buffer_row_limitc                 C   r0   r    )r!   r   _max_bytes_per_blockr"   get_estimated_memory_usager   r   r   r   _exceeded_buffer_size_limitu   r4   z-BlockOutputBuffer._exceeded_buffer_size_limitc                 C   "   | j d u rd S | j jrd S | j jS N)r!   r   r   r   r   r   r   r1   ~   
   
z)BlockOutputBuffer._max_num_rows_per_blockc                 C   r8   r9   )r!   r   r   r   r   r   r   r5      r:   z&BlockOutputBuffer._max_bytes_per_blockc                 C   sP   | j r| j p| j dkS | jdu rdS | jjr | j dkS |  p'|  S )z6Returns true when a complete output block is produced.r   NF)r#   r$   r"   r2   r!   r   r3   r7   r   r   r   r   has_next   s   
zBlockOutputBuffer.has_nextc                 C   s    |   d uo| t|    kS r9   )r5   
size_bytesr	   r.   r   r   r    _exceeded_block_size_slice_limit   s
   
z2BlockOutputBuffer._exceeded_block_size_slice_limitc                 C   s   |   d uo| |   kS r9   )r1   r2   r.   r   r   r   _exceeded_block_row_slice_limit   s   z1BlockOutputBuffer._exceeded_block_row_slice_limitc                 C   s   |   sJ | j }t|}d}d}| |r|  }n#| |rA| dks-J d|	 |  }t
dt|  | }|dur]|| k r]|jd|dd}|j|| dd}t | _|durk| j| d| _|S )z'Returns the next complete output block.Nr   zBlock may not be empty   F)copyT)r;   r"   buildr   	for_blockr>   r1   r=   r2   r<   maxmathceilr5   slicer   r-   r$   )r   r,   accessorblock_remaindertarget_num_rowsnum_bytes_per_rowr   r   r   next   s.   





zBlockOutputBuffer.next)r   N)r   r   r   __doc__r   r
   r%   r   r(   r   r+   r   r-   r/   r   r3   r7   r   r1   r5   r;   r   r=   r>   rK   r   r   r   r   r   5   s    
				
	r   )rD   dataclassesr   typingr   r   +ray.data._internal.delegating_block_builderr   ray.data.blockr   r   r   ray.data.contextr	   r
   r   r   r   r   r   <module>   s    *