o
    bi)                     @   sX  d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
mZmZmZmZmZ ddlZddlmZ ddlmZ ddlmZmZ ddlmZmZ dd	lmZ erdd
lmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z)m*Z*m+Z+ ddl,m-Z- e.e/Z0de
d ddddddde1dee2 ddde
e fddZ3eG d d! d!eZ4dS )"zV
Module to read an iceberg table into a Ray Dataset, by using the Ray Datasource API.
    N)partial)	TYPE_CHECKINGAnyDictIterableListOptionalSetTupleUnion)version)_check_import)BlockBlockMetadata)
DatasourceReadTask)DeveloperAPI)Catalog)BooleanExpression)FileIO)DataFile)Schema)DataScanFileScanTaskTable)TableMetadatatasksr   table_ior   table_metadatar   
row_filterr   case_sensitivelimitschemar   returnc              	   c   s    dd l }t|jtdkr7ddlm} |||||||d}	|	j| d}
|
 D ]
}tj	
|gV  q*d S ddlm} |j| ||||||dV  d S )Nr   z0.9.0)	ArrowScan)r   ior   projected_schemar    r!   r   pyarrow)r   r   r%   r   r&   r    r!   )	pyicebergr   parse__version__pyiceberg.io.pyarrowr$   to_table
to_batchespar   from_batchespyiceberg.ior)   project_table)r   r   r   r   r    r!   r"   r*   r$   scannerresult_tablebatch	pyi_pa_io r8   d/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/iceberg_datasource.py_get_read_task   s4   

r:   c                   @   s   e Zd ZdZ					d$dedeedf deedf d	ee d
ee	ee
f  dee	ee
f  fddZd%ddZed&ddZeded fddZd'ddZdee fddZeded dedeed  fdd Zd!edee fd"d#ZdS )(IcebergDatasourcez
    Iceberg datasource to read Iceberg tables into a Ray Dataset. This module heavily
    uses PyIceberg to read iceberg tables. All the routines in this class override
    `ray.data.Datasource`.
    N*table_identifierr   r   selected_fields.snapshot_idscan_kwargscatalog_kwargsc                 C   s   t | ddd ddlm} |dur|ni | _|dur|ni | _d| jv r,| jd| _nd| _|| _|dur8|n| | _|| _	|rF|| jd< d| _
d| _dS )	a  
        Initialize an IcebergDatasource.

        Args:
            table_identifier: Fully qualified table identifier (i.e.,
                "db_name.table_name")
            row_filter: A PyIceberg BooleanExpression to use to filter the data *prior*
                 to reading
            selected_fields: Which columns from the data to read, passed directly to
                PyIceberg's load functions
            snapshot_id: Optional snapshot ID for the Iceberg table
            scan_kwargs: Optional arguments to pass to PyIceberg's Table.scan()
                function
            catalog_kwargs: Optional arguments to use when setting up the Iceberg
                catalog
        r*   )modulepackager   )
AlwaysTrueNnamedefaultr@   )r   pyiceberg.expressionsrE   _scan_kwargs_catalog_kwargspop_catalog_namer>   _row_filter_selected_fields_plan_files_table)selfr>   r   r?   r@   rA   rB   rE   r8   r8   r9   __init__[   s   


zIcebergDatasource.__init__r#   r   c                 C   s"   ddl m} |j| jfi | jS )Nr   )catalog)r*   rS   load_catalogrL   rJ   rQ   rS   r8   r8   r9   _get_catalog   s   zIcebergDatasource._get_catalogr   c                 C   s&   | j du r|  }|| j| _ | j S )z=
        Return the table reference from the catalog
        N)rP   rV   
load_tabler>   rU   r8   r8   r9   table   s   
zIcebergDatasource.tabler   c                 C   s"   | j du r|  }| | _ | j S )z?
        Return the plan files specified by this query
        N)rO   _get_data_scan
plan_filesrQ   	data_scanr8   r8   r9   rZ      s   

zIcebergDatasource.plan_filesr   c                 C   s"   | j jd| j| jd| j}|S )N)r   r?   r8   )rX   scanrM   rN   rI   r[   r8   r8   r9   rY      s   z IcebergDatasource._get_data_scanc                 C   s   t dd | jD S )Nc                 s       | ]}|j jV  qd S Nfilefile_size_in_bytes.0taskr8   r8   r9   	<genexpr>       z@IcebergDatasource.estimate_inmemory_data_size.<locals>.<genexpr>)sumrZ   )rQ   r8   r8   r9   estimate_inmemory_data_size   s   z-IcebergDatasource.estimate_inmemory_data_sizerZ   n_chunksc                 C   s   dd t |D }dd t |D }t| t| dd ddD ] }t|}||d  | t||d	 |jj |d f q |S )
z
        Implement a greedy knapsack algorithm to distribute the files in the scan
        across tasks, based on their file size, as evenly as possible
        c                 S   s   g | ]}t  qS r8   )list)rd   _r8   r8   r9   
<listcomp>       zIIcebergDatasource._distribute_tasks_into_equal_chunks.<locals>.<listcomp>c                 S   s   g | ]}d |fqS )r   r8   )rd   chunk_idr8   r8   r9   rm          c                 S   s   | j jS r_   r`   )fr8   r8   r9   <lambda>       zGIcebergDatasource._distribute_tasks_into_equal_chunks.<locals>.<lambda>T)keyreverse   r   )	rangeheapqheapifysortedheappopappendheappushra   rb   )rZ   rj   chunkschunk_sizes	plan_filesmallest_chunkr8   r8   r9   #_distribute_tasks_into_equal_chunks   s   



z5IcebergDatasource._distribute_tasks_into_equal_chunksparallelismc              	      sP  ddl m} ddlm  |  }| j}| }||}|tt	|kr3tt	|}t
d| d | jj}| jj}| j}	| jdd}
| jd}tt|||	|
||d	g }t||D ]G}ttjd
d |D }t fdd|D }ttdd |D | tdd |D dd |D d d}|t|ffdd	||d q^|S )Nr   r(   DataFileContentzReducing the parallelism to z, as that is thenumber of filesr    Tr!   )r   r   r   r    r!   r"   c                 S   s   g | ]}|j qS r8   )delete_filesrc   r8   r8   r9   rm     rn   z4IcebergDatasource.get_read_tasks.<locals>.<listcomp>c                 3   s"    | ]}|j  jkr|jV  qd S r_   )contentPOSITION_DELETESrecord_count)rd   deleter   r8   r9   rf     s    z3IcebergDatasource.get_read_tasks.<locals>.<genexpr>c                 s   r^   r_   )ra   r   rc   r8   r8   r9   rf     rg   c                 s   s    | ]}|j V  qd S r_   )lengthrc   r8   r8   r9   rf     s    c                 S   s   g | ]}|j jqS r8   )ra   	file_pathrc   r8   r8   r9   rm     rp   )num_rows
size_bytesinput_files
exec_statsc                    s    | S r_   r8   r'   )get_read_taskr8   r9   rr     rs   z2IcebergDatasource.get_read_tasks.<locals>.<lambda>)read_fnmetadatar"   )r2   r)   pyiceberg.manifestr   rY   rZ   
projectionschema_to_pyarrowlenrk   loggerwarningrX   r%   r   rM   rI   getr   r:   r;   r   set	itertoolschainfrom_iterablerh   r   r|   r   )rQ   r   r7   r\   rZ   r&   
pya_schemar   r   r   r    r!   
read_taskschunk_tasksunique_deletesposition_delete_countr   r8   )r   r   r9   get_read_tasks   sj   


z IcebergDatasource.get_read_tasks)Nr<   NNN)r#   r   )r#   r   )r#   r   )__name__
__module____qualname____doc__strr   r
   r   intr   r   rR   rV   propertyrX   r   rZ   rY   ri   staticmethodr   r   r   r   r8   r8   r8   r9   r;   S   sH    	



/	


r;   )5r   rx   r   logging	functoolsr   typingr   r   r   r   r   r   r	   r
   r   r)   r0   	packagingr   ray.data._internal.utilr   ray.data.blockr   r   ray.data.datasource.datasourcer   r   ray.util.annotationsr   pyiceberg.catalogr   rH   r   r2   r   r   r   pyiceberg.schemar   pyiceberg.tabler   r   r   pyiceberg.table.metadatar   	getLoggerr   r   boolr   r:   r;   r8   r8   r8   r9   <module>   sP    ,

4