o
    `۷i                     @   sN   d dl mZ d dlmZmZmZ d dlmZ d dlm	Z	 	 G dd de	Z
dS )    )deque)DequeListTuple)	RefBundle)BaseRefBundlerc                   @   s   e Zd ZdZdefddZddefddZd	efd
dZ	defddZ
deee ef fddZdd Zdd ZdefddZdS )StreamingRepartitionRefBundlerzNIncrementally builds task inputs to produce multiples of target-sized outputs.target_num_rows_per_blockc                 C   s6   |dksJ d|| _ t | _t | _g | _d| _d S )Nr   zEtarget_num_rows_per_block must be positive for streaming repartition.)_target_num_rowsr   _pending_bundles_ready_bundles_consumed_input_bundles_total_pending_rows)selfr	    r   ^/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/_internal/streaming_repartition.py__init__"   s   

z'StreamingRepartitionRefBundler.__init__Fflush_remainingc                 C   s
  | j | jkrd| jd  | j | j  }|dksJ t| j}d }|dkr<||d  k r<| }||\}}|| | jt	
| | j  d| _ |rd| dkrd| j| |  j | 7  _ |rt| jdkr| jt	
| j | j  d| _ d S d S d S )Nr   )r   r
   r   num_rowslistpopsliceappendr   r   merge_ref_bundlesclearlen)r   r   rows_needed_from_last_bundlepending_bundlesremaining_bundlelast_bundlesliced_bundler   r   r   _try_build_ready_bundle,   s8   






z6StreamingRepartitionRefBundler._try_build_ready_bundle
ref_bundlec                 C   s6   |  j | 7  _ | j| |   | j| d S N)r   r   r   r   r"   r   )r   r#   r   r   r   
add_bundleK   s   z)StreamingRepartitionRefBundler.add_bundlereturnc                 C   s   t | jdkS )Nr   )r   r   r   r   r   r   
has_bundleQ   s   z)StreamingRepartitionRefBundler.has_bundlec                 C   s   | j }g | _ || j fS r$   )r   r   popleft)r   consumed_input_bundlesr   r   r   get_next_bundleT   s   z.StreamingRepartitionRefBundler.get_next_bundlec                 C   s"   t | jdkr| jdd d S d S )Nr   T)r   )r   r   r"   r'   r   r   r   done_adding_bundles[   s   z2StreamingRepartitionRefBundler.done_adding_bundlesc                 C   (   t dd | jD t dd | jD  S )Nc                 s   s    | ]}t |V  qd S r$   )r   .0bundler   r   r   	<genexpr>`       z<StreamingRepartitionRefBundler.num_blocks.<locals>.<genexpr>sumr   r   r'   r   r   r   
num_blocks_      
z)StreamingRepartitionRefBundler.num_blocksc                 C   r-   )Nc                 s   s    | ]}|  V  qd S r$   )
size_bytesr.   r   r   r   r1   e   r2   z<StreamingRepartitionRefBundler.size_bytes.<locals>.<genexpr>r3   r'   r   r   r   r7   d   r6   z)StreamingRepartitionRefBundler.size_bytesN)F)__name__
__module____qualname____doc__intr   boolr"   r   r%   r(   r   r   r+   r,   r5   r7   r   r   r   r   r      s    

r   N)collectionsr   typingr   r   r   'ray.data._internal.execution.interfacesr   3ray.data._internal.execution.operators.map_operatorr   r   r   r   r   r   <module>   s    