o
    $i<                     @   s   d dl Z d dlmZ d dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d d	lmZmZ d d
lmZ dZG dd dZG dd deZG dd deZdS )    N)Optional)ArrowBlockAccessor)transform_pyarrow)try_combine_chunked_columns)DelegatingBlockBuilder)memory_string)get_total_obj_store_mem_on_node)BlockBlockAccessor)log_onceg      ?c                   @   sR   e Zd ZdefddZdefddZdefddZdefd	d
ZdefddZ	dS )BatcherInterfaceblockc                 C      t  )zmAdd a block to the block buffer.

        Args:
            block: Block to add to the block buffer.
        NotImplementedErrorselfr    r   W/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/_internal/batcher.pyadd      zBatcherInterface.addreturnc                 C   r   )zHIndicate to the batcher that no more blocks will be added to the buffer.r   r   r   r   r   done_adding      zBatcherInterface.done_addingc                 C   r   )*Whether this Batcher has any full batches.r   r   r   r   r   	has_batch!   r   zBatcherInterface.has_batchc                 C   r   )"Whether this Batcher has any data.r   r   r   r   r   has_any%   r   zBatcherInterface.has_anyc                 C   r   )pGet the next batch from the block buffer.

        Returns:
            A batch represented as a Block.
        r   r   r   r   r   
next_batch)   r   zBatcherInterface.next_batchN)
__name__
__module____qualname__r	   r   boolr   r   r   r    r   r   r   r   r      s    r   c                   @   sn   e Zd ZdZddee defddZdefdd	Z	d
efddZ
d
efddZd
efddZd
efddZdS )BatcherzChunks blocks into batches.F
batch_sizeensure_copyc                 C   s"   || _ g | _d| _d| _|| _dS )a  
        Construct a batcher that yields batches of batch_sizes rows.

        Args:
            batch_size: The size of batches to yield.
            ensure_copy: Whether batches are always copied from the underlying base
                blocks (not zero-copy views).
        r   FN)_batch_size_buffer_buffer_size_done_adding_ensure_copy)r   r&   r'   r   r   r   __init__<   s
   	
zBatcher.__init__r   c                 C   s>   t | dkr| j| |  jt | 7  _dS dS )zAdd a block to the block buffer.

        Note empty block is not added to buffer.

        Args:
            block: Block to add to the block buffer.
        r   N)r
   	for_blocknum_rowsr)   appendr*   r   r   r   r   r   K   s   zBatcher.addr   c                 C   
   d| _ dS )zIIndicate to the batcher that no more blocks will be added to the batcher.TNr+   r   r   r   r   r   W      
zBatcher.done_addingc                 C   s   |   o| jdu p| j| jkS )r   N)r   r(   r*   r   r   r   r   r   [   s   zBatcher.has_batchc                 C   s
   | j dkS )r   r   )r*   r   r   r   r   r   a   r3   zBatcher.has_anyc                 C   sz  |   s| jr|  sJ | j}| jdu r<t| jdksJ | jd }|r4t|}|j	d|
 dd}g | _d| _|S t }g }| j}| jD ]L}t|}|dkrX|| qG|
 |krl||  ||
 8 }qGt|trytt|}||j	d|dd ||j	||
 dd d}qG|| _|  j| j8  _|o|  }| }|rt|}|j	d|
 dd}|S )r   N   r   T)copyF)r   r+   r   r,   r(   lenr)   r
   r.   slicer/   r*   r   r0   	add_blockto_block
isinstancer   r   r   will_build_yield_copybuild)r   
needs_copyr   outputleftoverneededaccessorbatchr   r   r   r    e   sH   






zBatcher.next_batchN)F)r!   r"   r#   __doc__r   intr$   r-   r	   r   r   r   r   r    r   r   r   r   r%   2   s    	r%   c                   @   s   e Zd ZdZ	ddee dedee fddZdefd	d
Ze	dee fddZ
e	dee fddZdefddZdefddZdefddZdefddZdefddZdefddZdefddZdS )ShufflingBatcherzLChunks blocks into shuffled batches, using a local in-memory shuffle buffer.Nr&   shuffle_buffer_min_sizeshuffle_seedc                 C   sn   |du rt d|| _|| _||k r|}|| _t|t | _t | _d| _	d| _
d| _t | _d| _d| _dS )a  Constructs a random-shuffling block batcher.

        Args:
            batch_size: Record batch size.
            shuffle_buffer_min_size: Minimum number of rows that must be in the local
                in-memory shuffle buffer in order to yield a batch. When there are no
                more rows to be added to the buffer, the number of rows in the buffer
                *will* decrease below this value while yielding the remaining batches,
                and the final batch may have less than ``batch_size`` rows. Increasing
                this will improve the randomness of the shuffle but may increase the
                latency to the first batch.
            shuffle_seed: The seed to use for the local random shuffle.
        Nz3Must specify a batch_size if using a local shuffle.r   F)
ValueErrorr(   _shuffle_seed_min_rows_to_yield_batchrD   SHUFFLE_BUFFER_COMPACTION_RATIO_min_rows_to_trigger_compactionr   _builder_shuffle_buffer_batch_headr+   r   _total_object_store_nbytes_total_num_rows_added_total_nbytes_added)r   r&   rF   rG   r   r   r   r-      s"   
zShufflingBatcher.__init__r   c              	   C   s   | j dur%| j | jkr%tdr%tdt| j dt| j  d| j d t|}|	 dkrJ| j
| |  j|	 7  _|  j| 7  _dS dS )zAdd a block to the shuffle buffer.

        Note empty block is not added to buffer.

        Args:
            block: Block to add to the shuffle buffer.
        Nshuffle_buffer_mem_warningz!The node you're iterating on has zA object store memory, but the shuffle buffer is estimated to use z5. If you don't decrease the shuffle buffer size from z$ rows, you might encounter spilling.r   ) _estimated_min_nbytes_in_buffersrP   r   warningswarnr   rJ   r
   r.   r/   rM   r8   rQ   rR   
size_bytes)r   r   block_accessorr   r   r   r      s&   

	zShufflingBatcher.addr   c                 C   s   | j dkr| j| j  S dS )zAReturn the average number of bytes per row added to this batcher.r   N)rQ   rR   r   r   r   r   _average_row_nbytes   s
   

z$ShufflingBatcher._average_row_nbytesc                 C   s   | j du rdS | j | j S )zReturn the estimated minimum number of bytes across all buffers.

        This includes data in both the compacted and uncompacted buffers.
        N)rY   rL   r   r   r   r   rT     s   
z1ShufflingBatcher._estimated_min_nbytes_in_buffersc                 C   r1   )zIndicate to the batcher that no more blocks will be added to the batcher.

        No more blocks should be added to the batcher after calling this.
        TNr2   r   r   r   r   r     s   
zShufflingBatcher.done_addingc                 C   s   |   dkS )z"Whether this batcher has any data.r   )	_num_rowsr   r   r   r   r     s   zShufflingBatcher.has_anyc                 C   s6   |   }| js|  | jkp|| j | jkS || jkS )z%Whether this batcher has any batches.)rZ   r+   _num_compacted_rowsrJ   r(   rL   )r   r/   r   r   r   r     s   
zShufflingBatcher.has_batchc                 C   s   |   |   S )zReturn the total number of rows that haven't been yielded yet.

        This includes rows in both the compacted and uncompacted buffers.
        )r[   _num_uncompacted_rowsr   r   r   r   rZ   )  s   zShufflingBatcher._num_rowsc                 C   s*   | j du rdS tdt| j  | j S )zBReturn number of unyielded rows in the compacted (shuffle) buffer.Nr   )rN   maxr
   r.   r/   rO   r   r   r   r   r[   0  s   
z$ShufflingBatcher._num_compacted_rowsc                 C   s
   | j  S )z:Return number of unyielded rows in the uncompacted buffer.)rM   r/   r   r   r   r   r\   <  r3   z&ShufflingBatcher._num_uncompacted_rowsc                 C   s6  |   s| jr|  sJ |  dkrq| js|  | jkrq| jdur>| jdkr7t	| j}|
| j| | _| j| j | j | _t	| j| j| _| jdur[|  jd7  _tt	| jtrjt| j| _t | _d| _| jdusxJ t	| j }t| j|}| j}|  j|7  _t	| j
|| jS )z{Get the next shuffled batch from the shuffle buffer.

        Returns:
            A batch represented as a Block.
        r   Nr4   )r   r+   r   r\   r[   rJ   rN   rO   r
   r.   r7   r/   rM   r8   r<   random_shufflerI   r:   r   r   r   minr(   )r   r   buffer_sizer&   slice_startr   r   r   r    @  sD   



zShufflingBatcher.next_batch)N)r!   r"   r#   rC   r   rD   r-   r	   r   propertyrY   rT   r$   r   r   r   rZ   r[   r\   r    r   r   r   r   rE      s,    
( 
rE   )rU   typingr   ray.data._internal.arrow_blockr   ray.data._internal.arrow_opsr   .ray.data._internal.arrow_ops.transform_pyarrowr   +ray.data._internal.delegating_block_builderr   !ray.data._internal.execution.utilr   ray.data._internal.utilr   ray.data.blockr	   r
   ray.utilr   rK   r   r%   rE   r   r   r   r   <module>   s    n