o
    biD"                     @   s   d dl mZmZmZmZ d dlZd dlmZ d dl	m
Z
mZmZ d dlmZmZmZ eG dd dZeG dd	 d	ZG d
d deZeG dd deg ee
 f ZeG dd deZdS )    )CallableIterableListOptionalN)_check_pyarrow_version)BlockBlockMetadataSchema)
DeprecatedDeveloperAPI	PublicAPIc                   @   s   e Zd ZdZedddZededed fdd	Zde	fd
dZ
dee fddZdeded fddZedefddZedefddZdS )
DatasourcezInterface for defining a custom :class:`~ray.data.Dataset` datasource.

    To read a datasource into a dataset, use :meth:`~ray.data.read_datasource`.
    returnReaderc                 K   s   t | fi |S z
        Deprecated: Implement :meth:`~ray.data.Datasource.get_read_tasks` and
        :meth:`~ray.data.Datasource.estimate_inmemory_data_size` instead.
        )_LegacyDatasourceReader)self	read_args r   R/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/datasource/datasource.pycreate_reader   s   zDatasource.create_readerparallelismReadTaskc                 K      t r   NotImplementedError)r   r   r   r   r   r   prepare_read   s   zDatasource.prepare_readc                 C   s.   t | j}d}||r|dt|  }|S )ztReturn a human-readable name for this datasource.
        This will be used as the names of the read tasks.
        r   N)type__name__endswithlen)r   namedatasource_suffixr   r   r   get_name!   s
   

zDatasource.get_namec                 C   r   zReturn an estimate of the in-memory data size, or None if unknown.

        Note that the in-memory data size may be larger than the on-disk data size.
        r   r   r   r   r   estimate_inmemory_data_size+      z&Datasource.estimate_inmemory_data_sizec                 C   r   )aM  Execute the read and return read tasks.

        Args:
            parallelism: The requested read parallelism. The number of read
                tasks should equal to this value if possible.

        Returns:
            A list of read tasks that can be executed to read blocks from the
            datasource in parallel.
        r   r   r   r   r   r   get_read_tasks2   s   zDatasource.get_read_tasksc                 C   s,   t | jtju}t | jtju}| p| S N)r   r)   r   r&   )r   has_implemented_get_read_tasks+has_implemented_estimate_inmemory_data_sizer   r   r   should_create_reader?   s   zDatasource.should_create_readerc                 C      dS )z:If ``False``, only launch read tasks on the driver's node.Tr   r%   r   r   r   supports_distributed_readsM   s   z%Datasource.supports_distributed_readsN)r   r   )r   
__module____qualname____doc__r
   r   intr   r   strr#   r   r&   r)   propertyboolr-   r/   r   r   r   r   r   
   s    
r   c                   @   s8   e Zd ZdZdee fddZdeded fddZd	S )
r   a  A bound read operation for a :class:`~ray.data.Datasource`.

    This is a stateful class so that reads can be prepared in multiple stages.
    For example, it is useful for :class:`Datasets <ray.data.Dataset>` to know the
    in-memory size of the read prior to executing it.
    r   c                 C   r   r$   r   r%   r   r   r   r&   \   r'   z"Reader.estimate_inmemory_data_sizer   r   c                 C   r   )a  Execute the read and return read tasks.

        Args:
            parallelism: The requested read parallelism. The number of read
                tasks should equal to this value if possible.
            read_args: Additional kwargs to pass to the datasource impl.

        Returns:
            A list of read tasks that can be executed to read blocks from the
            datasource in parallel.
        r   r(   r   r   r   r)   c   s   zReader.get_read_tasksN)	r   r0   r1   r2   r   r3   r&   r   r)   r   r   r   r   r   S   s    r   c                   @   sB   e Zd ZdefddZdee fddZdeded fd	d
Z	dS )r   
datasourcec                 K      || _ || _d S r*   )_datasource
_read_args)r   r7   r   r   r   r   __init__s      
z _LegacyDatasourceReader.__init__r   c                 C   s   d S r*   r   r%   r   r   r   r&   w   s   z3_LegacyDatasourceReader.estimate_inmemory_data_sizer   r   c                 C   s   | j j|fi | jS r*   )r9   r   r:   r(   r   r   r   r)   z   s   z&_LegacyDatasourceReader.get_read_tasksN)
r   r0   r1   r   r;   r   r3   r&   r   r)   r   r   r   r   r   r   s    r   c                   @   s   e Zd ZdZ	ddeg ee f deded fddZ	e
d	efd
dZe
d	ed fddZe
d	eg ee f fddZd	ee fddZdS )r   a  A function used to read blocks from the :class:`~ray.data.Dataset`.

    Read tasks are generated by :meth:`~ray.data.Datasource.get_read_tasks`,
    and return a list of ``ray.data.Block`` when called. Initial metadata about the read
    operation can be retrieved via the ``metadata`` attribute prior to executing the
    read. Final metadata is returned after the read along with the blocks.

    Ray will execute read tasks in remote functions to parallelize execution.
    Note that the number of blocks returned can vary at runtime. For example,
    if a task is reading a single large file it can return multiple blocks to
    avoid running out of memory during the read.

    The initial metadata should reflect all the blocks returned by the read,
    e.g., if the metadata says ``num_rows=1000``, the read can return a single
    block of 1000 rows, or multiple blocks with 1000 rows altogether.

    The final metadata (returned with the actual block) reflects the exact
    contents of the block itself.
    Nread_fnmetadataschemar	   c                 C   s   || _ || _|| _d S r*   )	_metadata_read_fn_schema)r   r=   r>   r?   r   r   r   r;      s   
zReadTask.__init__r   c                 C      | j S r*   )r@   r%   r   r   r   r>         zReadTask.metadatac                 C   rC   r*   )rB   r%   r   r   r   r?      rD   zReadTask.schemac                 C   rC   r*   )rA   r%   r   r   r   r=      rD   zReadTask.read_fnc                 c   s0    |   }t|dstd| |E d H  d S )N__iter__zlRead function must return Iterable[Block], got {}. Probably you need to return `[block]` instead of `block`.)rA   hasattrDeprecationWarningformat)r   resultr   r   r   __call__   s   
zReadTask.__call__r*   )r   r0   r1   r2   r   r   r   r   r   r;   r5   r>   r?   r=   rJ   r   r   r   r   r   ~   s"    

r   c                   @   sX   e Zd ZdZdedefddZdee fddZd	edee	 fd
dZ
defddZdS )RandomIntRowDatasourcea  An example datasource that generates rows with random int64 columns.

    Examples:
        >>> import ray
        >>> from ray.data.datasource import RandomIntRowDatasource
        >>> source = RandomIntRowDatasource() # doctest: +SKIP
        >>> ray.data.read_datasource( # doctest: +SKIP
        ...     source, n=10, num_columns=2).take()
        {'c_0': 1717767200176864416, 'c_1': 999657309586757214}
        {'c_0': 4983608804013926748, 'c_1': 1160140066899844087}
    nnum_columnsc                 C   r8   r*   _n_num_columns)r   rL   rM   r   r   r   r;      r<   zRandomIntRowDatasource.__init__r   c                 C   s   | j | j d S )N   rN   r%   r   r   r   r&      s   z2RandomIntRowDatasource.estimate_inmemory_data_sizer   c           
         s   t   dd lg }| j}| j}td|| }dtdtdtffdd jdd	 t	|D j
}d}||k rct||| }t|d
| | d d d}	|t||f fdd	|	|d ||7 }||k s7|S )Nr      countrM   r   c                    s<    j jtjjttjj|| ftjddd t|D dS )N)sizedtypec                 S   s   g | ]}d | qS )c_r   .0ir   r   r   
<listcomp>   s    zMRandomIntRowDatasource.get_read_tasks.<locals>.make_block.<locals>.<listcomp>)names)	Tablefrom_arraysnprandomrandintiinfoint64maxrangerS   rM   )pyarrowr   r   
make_block   s   z9RandomIntRowDatasource.get_read_tasks.<locals>.make_blockc                 S   s   i | ]	}d | dgqS )rV   r   r   rW   r   r   r   
<dictcomp>   s    z9RandomIntRowDatasource.get_read_tasks.<locals>.<dictcomp>rQ   )num_rows
size_bytesinput_files
exec_statsc                    s    | |gS r*   r   re   )rg   r   r   <lambda>   s   z7RandomIntRowDatasource.get_read_tasks.<locals>.<lambda>)r?   )r   rf   rO   rP   rc   r3   r   r\   from_pydictrd   r?   minr   appendr   )
r   r   
read_tasksrL   rM   
block_sizer?   rY   rS   metar   )rg   rf   r   r)      s>   
	z%RandomIntRowDatasource.get_read_tasksc                 C   r.   )zReturn a human-readable name for this datasource.
        This will be used as the names of the read tasks.
        Note: overrides the base `Datasource` method.
        	RandomIntr   r%   r   r   r   r#      r'   zRandomIntRowDatasource.get_nameN)r   r0   r1   r2   r3   r;   r   r&   r   r   r)   r4   r#   r   r   r   r   rK      s    
.rK   )typingr   r   r   r   numpyr^   ray.data._internal.utilr   ray.data.blockr   r   r	   ray.util.annotationsr
   r   r   r   r   r   r   rK   r   r   r   r   <module>   s    H7