o
    biz                     @   s   d dl Z d dlmZmZmZmZmZmZ d dlZ	d dl
mZmZ d dlmZ d dlmZ d dlmZmZ er:d dlZe eZG dd deZd	ed
 fddZd	ed
 fddZdS )    N)TYPE_CHECKINGAnyDictIteratorListOptional)_check_importcall_with_retry)BlockMetadata)DataContext)
DatasourceReadTaskc                   @   s   e Zd ZdZdgZdZdZ				ddedee	e  dee d	ee
eef  d
ee
eef  f
ddZdede	e fddZdee fddZdS )LanceDatasourcez,Lance datasource, for reading Lance dataset.zLanceError(IO)
       Nuricolumnsfilterstorage_optionsscanner_optionsc                 C   s   t | ddd dd l}|| _|pi | _|d ur|| jd< |d ur%|| jd< || _|j||d| _g }|| j |t	
 j d|| j| jd	| _d S )
Nlancepylance)modulepackager   r   r   )r   r   zread lance fragments)descriptionmatchmax_attemptsmax_backoff_s)r   r   r   r   r   datasetlance_dsextendREAD_FRAGMENTS_ERRORS_TO_RETRYr   get_currentretried_io_errorsREAD_FRAGMENTS_MAX_ATTEMPTS(READ_FRAGMENTS_RETRY_MAX_BACKOFF_SECONDS_retry_params)selfr   r   r   r   r   r   r    r(   b/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/lance_datasource.py__init__   s$   


zLanceDatasource.__init__parallelismreturnc           	         s   g }t | j |D ]H}t|dkrqdd |D }tdd |D }dd |D }t||d d d}| j| j | jt	|f fdd		||d j
d
}|| q|S )Nr   c                 S   s   g | ]}|j jqS r(   )metadataid.0fr(   r(   r)   
<listcomp>D   s    z2LanceDatasource.get_read_tasks.<locals>.<listcomp>c                 s   s    | ]}|  V  qd S N)
count_rowsr/   r(   r(   r)   	<genexpr>E   s    z1LanceDatasource.get_read_tasks.<locals>.<genexpr>c                 S   s"   g | ]}|  D ]}| qqS r(   )
data_filespath)r0   r1   	data_filer(   r(   r)   r2   F   s
    )num_rowsinput_files
size_bytes
exec_statsc                    s   t |  S r3   )_read_fragments_with_retry)r1   r   retry_paramsr   r(   r)   <lambda>V   s    z0LanceDatasource.get_read_tasks.<locals>.<lambda>)schema)nparray_splitr   get_fragmentslensumr
   r   r&   r   rA   append)	r'   r+   
read_tasks	fragmentsfragment_idsr9   r:   r-   	read_taskr(   r>   r)   get_read_tasks>   s2   
zLanceDatasource.get_read_tasksc                 C   s   d S r3   r(   )r'   r(   r(   r)   estimate_inmemory_data_sizeb   s   z+LanceDatasource.estimate_inmemory_data_size)NNNN)__name__
__module____qualname____doc__r!   r$   r%   strr   r   r   r   r*   intr   rL   rM   r(   r(   r(   r)   r      s,    

$r   r,   zpyarrow.Tablec                    s   t  fddfi |S )Nc                      s   t  S r3   )_read_fragmentsr(   rJ   r   r   r(   r)   r@   n   s    z,_read_fragments_with_retry.<locals>.<lambda>)r	   )rJ   r   r   r?   r(   rU   r)   r=   g   s
   r=   c                 #   sV    ddl } fdd| D }||d<  jdi |}| D ]
}|j|gV  qdS )zRead Lance fragments in batches.

    NOTE: Use fragment ids, instead of fragments as parameter, because pickling
    LanceFragment is expensive.
    r   Nc                    s   g | ]}  |qS r(   )get_fragment)r0   r.   r   r(   r)   r2      s    z#_read_fragments.<locals>.<listcomp>rI   r(   )pyarrowscanner	to_readerTablefrom_batches)rJ   r   r   rX   rI   rY   batchr(   rW   r)   rT   s   s   
rT   )loggingtypingr   r   r   r   r   r   numpyrB   ray.data._internal.utilr   r	   ray.data.blockr
   ray.data.contextr   ray.data.datasource.datasourcer   r   rX   	getLoggerrN   loggerr   r=   rT   r(   r(   r(   r)   <module>   s"     
R
