o
    bi                     @   sn   d dl Z d dlZd dlmZmZmZmZ d dlmZ d dl	m
Z
 d dlmZmZ e eZG dd deZdS )    N)DictIteratorListOptional)_check_import)BlockMetadata)
DatasourceReadTaskc                   @   sZ   e Zd ZdZ	ddedeeeef  fddZdede	d	 fd
dZ
dee fddZdS )HudiDatasourcez/Hudi datasource, for reading Apache Hudi table.N	table_uristorage_optionsc                 C   s   t | ddd || _|| _d S )Nhudizhudi-python)modulepackage)r   
_table_uri_storage_options)selfr   r    r   a/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/hudi_datasource.py__init__   s   
zHudiDatasource.__init__parallelismreturnr	   c              	      s  dd l ddlm} dtdtt dtttf dtd ffdd	 |jj}i |	 |
 | }g }||D ]K}d}g }g }	d}
|D ]"}||j7 }| }|| tjj|}|	| |
|j7 }
qIt||	|
d d
}t|f fdd	||d}|| q=|S )Nr   )	HudiTabler   base_file_pathsoptionsr   zpyarrow.Tablec                 3   s@    ddl m} |D ]}|| |}||} j|gV  q	d S )Nr   )HudiFileGroupReader)r   r   !read_file_slice_by_base_file_pathTablefrom_batches)r   r   r   r   pfile_group_readerbatch)pyarrowr   r   _perform_read   s   

z4HudiDatasource.get_read_tasks.<locals>._perform_read)num_rowsinput_files
size_bytes
exec_statsc                    s    j | S N)r   )paths)r#   reader_optionsr   r   r   <lambda>K   s    z/HudiDatasource.get_read_tasks.<locals>.<lambda>)read_fnmetadataschema)r"   r   r   strr   r   r   r   r   r   hudi_options
get_schemaget_file_slices_splitsnum_recordsbase_file_relative_pathappendospathjoinbase_file_sizer   r	   )r   r   r   
hudi_tabler.   
read_tasksfile_slices_splitr$   relative_pathsr%   r&   
file_slicerelative_path	full_pathr-   	read_taskr   )r#   r"   r*   r   r   get_read_tasks   sX   



zHudiDatasource.get_read_tasksc                 C   s   d S r(   r   )r   r   r   r   estimate_inmemory_data_sizeU   s   z*HudiDatasource.estimate_inmemory_data_sizer(   )__name__
__module____qualname____doc__r/   r   r   r   intr   rB   rC   r   r   r   r   r
      s    

<r
   )loggingr6   typingr   r   r   r   ray.data._internal.utilr   ray.data.blockr   ray.data.datasource.datasourcer   r	   	getLoggerrD   loggerr
   r   r   r   r   <module>   s    
