o
    bi'                     @   sJ  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZmZ d dlmZmZmZ d dlmZ d dlmZmZmZ d d	lmZ d d
lmZ e e Z!deee  de
e"e"e"f fddZ#	d0deee  de	e dee fddZ$						d1dee de	e de	e" de%de	e" de	e" de%dee fddZ&	d0dee de	e' de	e dee fddZ(	d0dee d e	eegef  de	e dee fd!d"Z)	d0dee d#eegef de	e dee fd$d%Z*dee dee fd&d'Z+d(Z,G d)d* d*eZ-G d+d, d,eZ.ej/d d-G d.d/ d/Z0dS )2    N)nullcontext)AnyCallableIteratorListOptionalTuple)ActorHandle)BatcherShufflingBatcher)BatchBlockPrefetcherCollatedBatch)DatasetStats)BlockBlockAccessor	DataBatch)	ObjectRef)NodeAffinitySchedulingStrategyrefsreturnc                    s   t    t jjj }|jr@t j	| }dd |
 D }t fdd|D }tdd |D }t|| | }|||fS dS )aJ  Given a list of object references, returns how many are already on the local
    node, how many require fetching from another node, and how many have unknown
    locations. If `DataContext.get_current().enable_get_object_locations_for_metrics` is
    False, this will return `(-1, -1, -1)` as getting object locations is disabled.c                 S   s   g | ]}|d  qS )node_ids ).0locr   r   Z/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/block_batching/util.py
<listcomp>    s    z'_calculate_ref_hits.<locals>.<listcomp>c                 3   s    | ]} |v V  qd S Nr   r   r   current_node_idr   r   	<genexpr>!       z&_calculate_ref_hits.<locals>.<genexpr>c                 s   s    | ]}|sd V  qdS )   Nr   r   r   r   r   r!   "   r"   )r$   r$   )rayget_runtime_contextget_node_iddatacontextDataContextget_current'enable_get_object_locations_for_metricsexperimentalget_object_locationsvaluessumlen)r   ctxlocsnodeshitsunknownsmissesr   r   r   _calculate_ref_hits   s   
r8   block_ref_iterstatsc           
   	   c   s    d}d}d}| D ]8}t |g\}}}||7 }||7 }||7 }|r&|j nt  t|}	W d   n1 s9w   Y  |	V  q	|rO||_||_||_dS dS )zResolves the block references for each logical batch.

    Args:
        block_ref_iter: An iterator over block object references.
        stats: An optional stats object to recording block hits and misses.
    r   N)	r8   
iter_get_stimerr   r%   getiter_blocks_localiter_blocks_remoteiter_unknown_location)
r9   r:   r5   r7   r6   	block_refcurrent_hitcurrent_misscurrent_unknownblockr   r   r   resolve_block_refs)   s$   

rF   F
block_iter
batch_size	drop_lastshuffle_buffer_min_sizeshuffle_seedensure_copyc              	   #   sP   |durt |||d}nt||d} fdd}d}	| D ]0}
||
 | rM|  | }W d   n1 s:w   Y  t|	|V  |	d7 }	| s(q|  | r{|  | }W d   n1 shw   Y  t|	|V  |	d7 }	| sV|s| r|  | }W d   n1 sw   Y  t|	|V  |	d7 }	dS dS dS )a  Given an iterator over blocks, returns an iterator over blocks
    of the appropriate bacth size.

    If the shuffling configurations are specified, then the
    output blocks contain shuffled data.

    Args:
        block_iter: An iterator over blocks.
        stats: Dataset stats object used to store block batching time.
        batch_size: Record batch size, or None to let the system pick.
        drop_last: Whether to drop the last batch if it's incomplete.
        shuffle_buffer_min_size: If non-None, the data will be randomly shuffled
            using a local in-memory shuffle buffer, and this value will serve as the
            minimum number of rows that must be in the local in-memory shuffle buffer in
            order to yield a batch.
        shuffle_seed: The seed to use for the local random shuffle.
        ensure_copy: Whether batches are always copied from the underlying base
            blocks (not zero-copy views).

    Returns:
        An iterator over blocks of the given size that are potentially shuffled.
    N)rH   rJ   rK   )rH   rL   c                      s    r j  S t S r   )iter_next_batch_sr<   r   r   r:   r   r   get_iter_next_batch_s_timerq   s   z6blocks_to_batches.<locals>.get_iter_next_batch_s_timerr   r#   )r   r
   add	has_batch
next_batchr   done_addinghas_any)rG   r:   rH   rI   rJ   rK   rL   batcherrO   global_counterrE   batchr   rN   r   blocks_to_batchesI   sF   



rX   batch_formatc              	   c   sd    | D ],}|r|j  nt  t|j|}W d   n1 s#w   Y  t|j|V  qdS )aB  Given an iterator of blocks, returns an iterator of formatted batches.

    Args:
        block_iter: An iterator over blocks.
        batch_format: The batch format to use.
        stats: An optional stats object to record formatting times.

    Returns:
        An iterator over batch index and the formatted batch.
    N)	iter_format_batch_sr<   r   r   	for_blockr(   to_batch_formatr   	batch_idx)rG   rY   r:   rW   formatted_batchr   r   r   format_batches   s   r_   
batch_iter
collate_fnc              	   c   \    | D ](}|r|j  nt  ||j}W d   n1 sw   Y  t|j|V  qdS )a  Returns an iterator with the provided collate_fn applied to items of the batch
    iterator.

    Args:
        batch_iter: An iterator over formatted batches.
        collate_fn: A function to apply to each batch.
        stats: An optional stats object to record formatting times.
    N)iter_collate_batch_sr<   r   r(   r   r]   )r`   ra   r:   rW   collated_batchr   r   r   collate   s   re   finalize_fnc              	   c   rb   )a  Returns an iterator with the provided finalize_fn applied to items of the batch
    iterator.

    This is the same as `collate` except the input batches can be of type Any.

    Args:
        batch_iter: An iterator over processed batches.
        finalize_fn: A function to apply to each batch.
        stats: An optional stats object to record formatting times.

    Returns:
        An iterator over batch index and the finalized batch.
    N)iter_finalize_batch_sr<   r   r(   r   r]   )r`   rf   r:   rW   finalized_batchr   r   r   finalize_batches   s   ri   c                 c   s    | D ]}|j V  qd S r   )r(   )r`   rW   r   r   r   extract_data_from_batch   s   
rj   zray.datasetc                   @   sF   e Zd ZdZdd Zdd Zdeee  fddZ	d	d
 Z
dd ZdS )WaitBlockPrefetcherz Block prefetcher using ray.wait.c                 C   s8   g | _ d| _t | _tj| jddd| _| j  d S )NF
PrefetcherT)targetnamedaemon)	_blocks_stopped	threading	Condition
_conditionThread_run_threadstartselfr   r   r   __init__   s   
zWaitBlockPrefetcher.__init__c                 C   s   	 zLg }| j / t| jdkr| jd d  g }| _n| jr(	 W d    W d S g }| j   W d    n1 s9w   Y  t|dkrLtj|ddd W n ty[   td Y nw q)NTr   r#   )num_returnsfetch_localzError in prefetcher thread.)	rt   r1   rp   rq   waitr%   	Exceptionlogger	exception)rz   blocks_to_waitr   r   r   rv      s(   
zWaitBlockPrefetcher._runblocksc                 C   sJ   | j  | jrtd|| _| j   W d    d S 1 sw   Y  d S )NzPrefetcher is stopped.)rt   rq   RuntimeErrorrp   notifyrz   r   r   r   r   prefetch_blocks   s   "z#WaitBlockPrefetcher.prefetch_blocksc                 C   sT   | j  | jr	 W d    d S d| _| j   W d    d S 1 s#w   Y  d S )NT)rt   rq   r   ry   r   r   r   stop   s   "zWaitBlockPrefetcher.stopc                 C   s   |    d S r   )r   ry   r   r   r   __del__  s   zWaitBlockPrefetcher.__del__N)__name__
__module____qualname____doc__r{   rv   r   r   r   r   r   r   r   r   r   r   rk      s    rk   c                   @   s<   e Zd ZdZdd ZedddZdeee	  fd	d
Z
dS )ActorBlockPrefetcherz%Block prefetcher using a local actor.c                 C   s   |   | _d S r   )_get_or_create_actor_prefetcherprefetch_actorry   r   r   r   r{     s   zActorBlockPrefetcher.__init__r   r	   c                  C   s4   t   } d|  }tjt| dd|tdd S )Nzdataset-block-prefetcher-F)softT)scheduling_strategyrn   	namespaceget_if_exists)r%   r&   r'   _BlockPretcheroptionsr   PREFETCHER_ACTOR_NAMESPACEremote)node_id
actor_namer   r   r   r     s   

z4ActorBlockPrefetcher._get_or_create_actor_prefetcherr   c                 C   s   | j jj|  d S r   )r   prefetchr   r   r   r   r   r     s   z$ActorBlockPrefetcher.prefetch_blocksN)r   r	   )r   r   r   r   r{   staticmethodr   r   r   r   r   r   r   r   r   r     s    
r   )num_cpusc                   @   s   e Zd ZdZdddZdS )r   z3Helper actor that prefetches blocks asynchronously.r   Nc                 G   s   d S r   r   r   r   r   r   r   $  s   z_BlockPretcher.prefetch)r   N)r   r   r   r   r   r   r   r   r   r      s    r   r   )NNFNNF)1loggingrr   
contextlibr   typingr   r   r   r   r   r   r%   	ray.actorr	   ray.data._internal.batcherr
   r   ,ray.data._internal.block_batching.interfacesr   r   r   ray.data._internal.statsr   ray.data.blockr   r   r   	ray.typesr   ray.util.scheduling_strategiesr   	getLoggerr   r   intr8   rF   boolrX   strr_   re   ri   rj   r   rk   r   r   r   r   r   r   r   <module>   s     
$

"
J


1
