o
    bi                     @   s   d dl Z d dlZd dlmZmZmZmZ d dlZd dl	m
Z
 d dlmZmZmZ d dlmZ d dlmZmZ G dd deZdS )	    N)IterableListOptionalTuple)_check_pyarrow_version)BlockBlockAccessorBlockMetadata)DataContext)
DatasourceReadTaskc                
   @   sl   e Zd ZdZ			ddedededee fd	d
Zdee fddZ	dede
e fddZejdd ZdS )RangeDatasourcezCAn example datasource that generates ranges of numbers from [0..n).arrow   Nnblock_formattensor_shapecolumn_namec                 C   s    t || _|| _|| _|| _d S N)int_n_block_format_tensor_shape_column_name)selfr   r   r   r    r   b/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/range_datasource.py__init__   s   

zRangeDatasource.__init__returnc                 C   s.   | j dkrtt| j}nd}d| j | S )Ntensorr      )r   r   npprodr   r   )r   element_sizer   r   r   estimate_inmemory_data_size   s   
z+RangeDatasource.estimate_inmemory_data_sizeparallelismc              	      s,  g }j }j jtd|| }t }j dkrdn j  }t|d}td|j| dtdtdt	f fdddtdtdtdt
t	 ffd	d
 dkr^tt}nd}d}||k rt||| }	t|	d|	 | d d d}
|t||	ffdd	|
jd ||7 }||k sf|S )Nr   r   startcountr   c              
      s    dkrdd l }|jjt| | | gjpdgdS  dkrNdd l }tjtjdtt| | | t	t
ddt  }tjrKj|iS |S tt
| | | S )Nr   r   value)namesr    dtyper   )pyarrowTablefrom_arraysr"   aranger   onesint64expand_dimstuplerangelenr   batch_to_blocklistbuiltins)r'   r(   par    )r   r   r   r   r   
make_block9   s$   
z2RangeDatasource.get_read_tasks.<locals>.make_blocktarget_rows_per_blockc                 3   s@    |dkrt ||} | |V  | |7 } ||8 }|dksd S d S )Nr   )min)r'   r(   r<   num_rows)r;   r   r   make_blocksN   s   
z3RangeDatasource.get_read_tasks.<locals>.make_blocksr    r!   )r>   
size_bytesinput_files
exec_statsc                    s    | |S r   r   )ir(   )r?   r<   r   r   <lambda>g   s    z0RangeDatasource.get_read_tasks.<locals>.<lambda>)schema)r   r   r   maxr
   get_currentr%   target_max_block_sizer   r   r   r"   r#   r=   r	   appendr   _schema)r   r&   
read_tasksr   
block_sizectxrow_size_bytesr$   rC   r(   metar   )r   r;   r?   r   r<   r   r   get_read_tasks#   sV   

	
	zRangeDatasource.get_read_tasksc              
   C   s   | j dkrd S | jdkr"t  dd l}|j| jpddgij}|S | jdkr[t  dd l}tj	| j
tjdttddttddt| j
  }t| jrU| j|in|j}|S | jdkrdt}|S td	| j)
Nr   r   r)   r    r+   
   r   r8   zUnsupported block type)r   r   r   r-   r.   from_pydictr   rE   r"   r1   r   r2   r3   r0   r4   r5   r6   r   r7   r   
ValueError)r   r:   rE   r    r   r   r   rJ   r   s.   


 
zRangeDatasource._schema)r   r   N)__name__
__module____qualname____doc__r   strr   r   r   r%   r   r   rP   	functoolscached_propertyrJ   r   r   r   r   r      s,    

Or   )r9   rY   typingr   r   r   r   numpyr"   ray.data._internal.utilr   ray.data.blockr   r   r	   ray.data.contextr
   ray.data.datasourcer   r   r   r   r   r   r   <module>   s    