o
    `۷i'                     @   s.  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ zd dlZW n ey_   dZY nw erhd d
lmZ ee Z!eddG dd dZ"ej#d dG dd dZ$dd Z%G dd dZ&dd Z'dS )    N)defaultdict)TYPE_CHECKINGAnyListOptional)(_ref_bundles_iterator_to_block_refs_list)cached_remote_fn)BlockAccessor)DataContext)	ObjectRef)	PublicAPI)Datasetalpha)	stabilityc                   @   s   e Zd ZdZdddedefddZdd	 Zded
e	e fddZ
dee d
eee  fddZd
efddZdefddZded
efddZdS )RandomAccessDatasetzuA class that provides distributed, random access to a Dataset.

    See: ``Dataset.to_random_access_dataset()``.
    dsr   keynum_workersc                    sj  |j dd}|du st|trtdt }td |}t	t
 | }t|}td t fdd|D }	g _d_g _t|	D ] \}
}|rlj||
  jdu rd|d	 _j|d
  qLtd| t }|jfddt|D _ \__tdj tfddjD  td t | _dS )zConstruct a RandomAccessDataset (internal API).

        The constructor is a private API. Use ``ds.to_random_access_dataset()``
        to construct a RandomAccessDataset.
        T)fetch_if_missingNz6RandomAccessDataset only supports Arrow-format blocks.z%[setup] Indexing dataset by sort key.z%[setup] Computing block range bounds.c                    s   g | ]}  |qS  )remote).0b)
get_boundsr   r   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/random_access_dataset.py
<listcomp>=   s    z0RandomAccessDataset.__init__.<locals>.<listcomp>r      z*[setup] Creating {} random access workers.c                    s   g | ]}t jd  qS ))scheduling_strategy)_RandomAccessWorkeroptionsr   )r   _)r   r   r   r   r   K   s    z'[setup] Worker to blocks assignment: {}c                    s,   g | ]}|j  fd d j| D qS )c                    s   i | ]}| j | qS r   )_non_empty_blocks)r   iselfr   r   
<dictcomp>\   s    
z;RandomAccessDataset.__init__.<locals>.<listcomp>.<dictcomp>)assign_blocksr   _worker_to_blocks_mapr   wr#   r   r   r   Z   s    
z-[setup] Finished assigning blocks to workers.)schema
isinstancetype
ValueErrortimeperf_counterloggerinfosortr   _get_boundsiter_internal_ref_bundlesr   raygetr!   _lower_bound_upper_bounds	enumerateappendformatr
   get_currentr   range_workers$_compute_block_to_worker_assignments_block_to_workers_mapr'   _build_time)r$   r   r   r   r*   start	sorted_dsbundlesblocksboundsr"   r   ctxr   )r   r   r   r$   r   __init__&   sR   




	


zRandomAccessDataset.__init__c                 C   s  t t}t t}t t}tdd | jD }t|D ]\}}|| | j|  qtj| j	}t| j	D ](\}}	||	 }
|
dg }|D ]}|| D ]}|| | || | qLqFq6t| j	D ] \}}	t
|| dkrt| j}|| | || | qd||fS )Nc                 S      g | ]}|j  qS r   )pingr   r(   r   r   r   r   o       zLRandomAccessDataset._compute_block_to_worker_assignments.<locals>.<listcomp>node_idsr   )r   listr5   r6   r>   r9   r:   experimentalget_object_locationsr!   lenrandomchoice)r$   block_to_workersworker_to_blocksloc_to_workerslocsr"   loc
block_locs	block_idxblock
block_infoworkerr   r   r   r?   h   s.   z8RandomAccessDataset._compute_block_to_worker_assignmentsreturnc                 C   s0   |  |}|du rtdS | |j||S )zAsynchronously finds the record for a single key.

        Args:
            key: The key of the record to find.

        Returns:
            ObjectRef containing the record (in pydict form), or None if not found.
        N)_find_ler5   put_worker_forr6   r   )r$   r   block_indexr   r   r   	get_async   s   
	
zRandomAccessDataset.get_asynckeysc                    s   t t}|D ]}|| | | qi }| D ]\}}|du r"q| |j|gt| |}|||< qi  | D ]\}}|| }t	
|}	t||	D ]\}}
|
 |< qNq< fdd|D S )zSynchronously find the records for a list of keys.

        Args:
            keys: List of keys to find the records for.

        Returns:
            List of found records (in pydict form), or None for missing records.
        Nc                    s   g | ]}  |qS r   )r6   )r   kresultsr   r   r      rK   z0RandomAccessDataset.multiget.<locals>.<listcomp>)r   rM   r^   r:   itemsr`   multigetr   rP   r5   r6   zip)r$   rc   batchesrd   futuresindexkeybatchfutr"   valuesvr   re   r   rh      s&   	


zRandomAccessDataset.multigetc              	   C   s   t dd | jD }tdd |D }dd |D }dd |D }d}|dt| jd	7 }|d
t|7 }|dt|t	|t
t|t| 7 }|dt|t	|t
t|t| 7 }|dt
|dt|  d 7 }|S )z6Returns a string containing access timing information.c                 S   rI   r   )statsr   r(   r   r   r   r      rK   z-RandomAccessDataset.stats.<locals>.<listcomp>c                 s   s    | ]}|d  V  qdS )
total_timeNr   r   sr   r   r   	<genexpr>   s    z,RandomAccessDataset.stats.<locals>.<genexpr>c                 S      g | ]}|d  qS )num_accessesr   rs   r   r   r   r          c                 S   rv   )
num_blocksr   rs   r   r   r   r      rx   zRandomAccessDataset:
z- Build time: {}s
   z- Num workers: {}
z-- Blocks per worker: {} min, {} max, {} mean
z/- Accesses per worker: {} min, {} max, {} mean
z- Mean access time: {}us
r   g    .A)r5   r6   r>   sumr;   roundrA   rP   minmaxint)r$   rq   rr   accessesrE   msgr   r   r   rq      s"   zRandomAccessDataset.statsra   c                 C   s   t | j| S N)rQ   rR   r@   )r$   ra   r   r   r   r`      s   zRandomAccessDataset._worker_forxc                 C   s.   t | j|}|t| jks|| jk rd S |S r   )bisectbisect_leftr8   rP   r7   )r$   r   r"   r   r   r   r^      s   zRandomAccessDataset._find_leN)__name__
__module____qualname____doc__strr   rH   r?   r   r   rb   r   r   rh   rq   r`   r^   r   r   r   r   r      s    
Br   )num_cpusc                   @   sJ   e Zd Zdd Zdd Zdd Zdd Zd	d
 ZdefddZ	dd Z
dS )r   c                 C   s   d | _ || _d| _d| _d S )Nr   )rE   	key_fieldrw   rr   )r$   r   r   r   r   rH      s   
z_RandomAccessWorker.__init__c                 C   s   dd |  D | _d S )Nc                 S   s   i | ]
\}}|t |qS r   )r5   r6   )r   rd   refr   r   r   r%          z5_RandomAccessWorker.assign_blocks.<locals>.<dictcomp>)rg   rE   )r$   block_ref_dictr   r   r   r&      s   z!_RandomAccessWorker.assign_blocksc                 C   s<   t  }| ||}|  jt  | 7  _|  jd7  _|S )Nr   )r.   r/   _getrr   rw   )r$   ra   r   rB   resultr   r   r   r6      s
   z_RandomAccessWorker.getc                    s   t  }j|d  }tt|dkrFtj|d  tjrFj|d  }|j }t	
||}t|  fddt||||D }nfddt||D } jt  | 7  _ jd7  _|S )Nr   r   c                    s,   g | ]\}}}|  |kr |nd qS r   )as_py_get_row)r   r"   k1k2)accr   r   r      s    z0_RandomAccessWorker.multiget.<locals>.<listcomp>c                    s   g | ]
\}}  ||qS r   )r   )r   r"   rd   r#   r   r   r      r   )r.   r/   rE   rP   setr+   paTabler   npsearchsortedr	   	for_blockri   takerr   rw   )r$   block_indicesrc   rB   rZ   colindicesr   r   )r   r$   r   rh      s    


z_RandomAccessWorker.multigetc                 C   s   t   S r   )r5   get_runtime_contextget_node_idr#   r   r   r   rJ      s   z_RandomAccessWorker.pingr]   c                 C   s   t | j| j| jdS )N)ry   rw   rr   )rP   rE   rw   rr   r#   r   r   r   rq      s   z_RandomAccessWorker.statsc                 C   s^   |d u rd S | j | }|| j }t|tjrt|}t||}|d u r%d S t|}|	|S r   )
rE   r   r+   r   r   _ArrowListWrapper_binary_search_findr	   r   r   )r$   ra   r   rZ   columnr"   r   r   r   r   r     s   




z_RandomAccessWorker._getN)r   r   r   rH   r&   r6   rh   rJ   dictrq   r   r   r   r   r   r      s    r   c                 C   s,   t | |}|t| kr| | |kr|S d S r   )r   r   rP   )r   r   r"   r   r   r   r     s   r   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )r   c                 C   s
   || _ d S r   )	arrow_col)r$   r   r   r   r   rH        
z_ArrowListWrapper.__init__c                 C   s   | j |  S r   )r   r   )r$   r"   r   r   r   __getitem__  s   z_ArrowListWrapper.__getitem__c                 C   s
   t | jS r   )rP   r   r#   r   r   r   __len__  r   z_ArrowListWrapper.__len__N)r   r   r   rH   r   r   r   r   r   r   r     s    r   c                 C   sX   t | dkrd S | | d | | t | d  f}t| tjr*|d  |d  f}|S )Nr   r   )rP   r+   r   r   r   )rZ   r   r   r   r   r   r3   !  s    r3   )(r   loggingrQ   r.   collectionsr   typingr   r   r   r   numpyr   r5   2ray.data._internal.execution.interfaces.ref_bundler   ray.data._internal.remote_fnr   ray.data.blockr	   ray.data.contextr
   	ray.typesr   ray.util.annotationsr   pyarrowr   ImportErrorray.data.datasetr   	getLoggerr   r0   r   r   r   r   r   r3   r   r   r   r   <module>   s<    
 
0?