o
    bi8                     @   sp   d dl mZ d dlmZmZ d dlmZ d dlmZm	Z	m
Z
 d dlmZmZ eG dd dZG dd	 d	Zd
S )    )	dataclass)AnyOptional)DelegatingBlockBuilder)BlockBlockAccessor	DataBatch)MAX_SAFE_BLOCK_SIZE_FACTORMAX_SAFE_ROWS_PER_BLOCK_FACTORc                   @   s8   e Zd ZU dZee ed< dZee ed< dddZdS )OutputBlockSizeOptionNtarget_max_block_sizetarget_num_rows_per_blockreturnc                 C   s    | j d u | jd u ksJ dd S )NzNExactly one of target_max_block_size or target_num_rows_per_block must be set.)r   r   self r   T/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/output_buffer.py__post_init__   s
   z#OutputBlockSizeOption.__post_init__r   N)	__name__
__module____qualname__r   r   int__annotations__r   r   r   r   r   r   r   	   s   
 r   c                   @   s   e Zd ZdZdefddZdeddfdd	Zd
eddfddZ	de
ddfddZdddZdefddZdefddZdefddZde
defddZde
defddZde
fddZdS )BlockOutputBuffera$  Generates output blocks of a given size or number of rows given a stream of
    inputs.

    This class is used to turn a stream of items / blocks of arbitrary size
    into a stream of blocks of target max block size or
    target max rows per block. The caller should check ``has_next()`` after each
    ``add()`` call, and call ``next()`` to get the next block when ``has_next()``
    returns True.

    When all items have been added, the caller must call ``finalize()`` and
    then check ``has_next()`` one last time.

    Examples:
        >>> from ray.data._internal.output_buffer import BlockOutputBuffer
        >>> udf = ... # doctest: +SKIP
        >>> generator = ... # doctest: +SKIP
        >>> # Yield a stream of output blocks.
        >>> output_block_size_option = OutputBlockSizeOption(target_max_block_size=500 * 1024 * 1024) # doctest: +SKIP
        >>> output = BlockOutputBuffer(output_block_size_option) # doctest: +SKIP
        >>> for item in generator(): # doctest: +SKIP
        ...     output.add(item) # doctest: +SKIP
        ...     if output.has_next(): # doctest: +SKIP
        ...         yield output.next() # doctest: +SKIP
        >>> output.finalize() # doctest: +SKIP
        >>> if output.has_next() # doctest: +SKIP
        ...     yield output.next() # doctest: +SKIP
    output_block_size_optionc                 C   s   || _ t | _d| _d| _d S )NF)_output_block_size_optionr   _buffer_returned_at_least_one_block
_finalized)r   r   r   r   r   __init__1   s   
zBlockOutputBuffer.__init__itemr   Nc                 C      | j rJ | j| dS )z(Add a single item to this output buffer.N)r   r   add)r   r!   r   r   r   r#   7      
zBlockOutputBuffer.addbatchc                 C   r"   )z'Add a data batch to this output buffer.N)r   r   	add_batch)r   r%   r   r   r   r&   <   r$   zBlockOutputBuffer.add_batchblockc                 C   r"   )z'Add a data block to this output buffer.N)r   r   	add_blockr   r'   r   r   r   r(   A   r$   zBlockOutputBuffer.add_blockc                 C   s   | j rJ d| _ dS )z.Must be called once all items have been added.TN)r   r   r   r   r   finalizeF   s   

zBlockOutputBuffer.finalizec                 C      | j jd uo| j | j jkS N)r   r   r   num_rowsr   r   r   r   _exceeded_buffer_row_limitK   
   z,BlockOutputBuffer._exceeded_buffer_row_limitc                 C   r+   r,   )r   r   r   get_estimated_memory_usager   r   r   r   _exceeded_buffer_size_limitR   r/   z-BlockOutputBuffer._exceeded_buffer_size_limitc                 C   s,   | j r| j p| j dkS |  p|  S )z6Returns true when a complete output block is produced.r   )r   r   r   r-   r.   r1   r   r   r   r   has_nextY   s   zBlockOutputBuffer.has_nextc                 C       | j jd uo| t| j j kS r,   )r   r   
size_bytesr	   r)   r   r   r    _exceeded_block_size_slice_limitb      z2BlockOutputBuffer._exceeded_block_size_slice_limitc                 C   r3   r,   )r   r   r-   r
   r)   r   r   r   _exceeded_block_row_slice_limitm   r6   z1BlockOutputBuffer._exceeded_block_row_slice_limitc                 C   s   |   sJ | j }d}t|}d}| |r| jj}n| |r4|	 |
  }td| jj| }|durP||
 k rP|jd|dd}|j||
 dd}t | _|dur^| j| d| _|S )z'Returns the next complete output block.N   r   T)copyF)r2   r   buildr   	for_blockr7   r   r   r5   r4   r-   maxr   slicer   r(   r   )r   block_to_yieldblock_remainderr'   target_num_rowsnum_bytes_per_rowr   r   r   nextx   s.   




zBlockOutputBuffer.nextr   )r   r   r   __doc__r   r    r   r#   r   r&   r   r(   r*   boolr.   r1   r2   r5   r7   rB   r   r   r   r   r      s    
	r   N)dataclassesr   typingr   r   +ray.data._internal.delegating_block_builderr   ray.data.blockr   r   r   ray.data.contextr	   r
   r   r   r   r   r   r   <module>   s    
