o
    bi                     @   s   d Z ddlZddlZddlmZmZmZmZmZm	Z	 ddl
mZ ddlmZ ddlmZmZ ddlmZmZ ddlmZ erJdd	lmZ dd
lmZ eeZeG dd deed  ZdS )zU
Module to write a Ray Dataset into an iceberg table, by using the Ray Datasink API.
    N)TYPE_CHECKINGAnyDictIterableListOptional)version)TaskContext)BlockBlockAccessor)DatasinkWriteResult)DeveloperAPI)Catalog)DataFilec                
   @   s   e Zd ZdZ		ddedeeeef  deeeef  fddZde	fd	d
Z
de	ddfddZdddZdddZdee dedeed  fddZdeed  fddZdS )IcebergDatasinkz
    Iceberg datasink to write a Ray Dataset into an existing Iceberg table. This module
    heavily uses PyIceberg to write to iceberg table. All the routines in this class override
    `ray.data.Datasink`.

    Ntable_identifiercatalog_kwargssnapshot_propertiesc                 C   s   ddl m} ddlm} ddlm} || _|dur|ni | _|dur$|ni | _d| jv r4| j	d| _
nd| _
d| _d| _d| _d| _dS )a  
        Initialize the IcebergDatasink

        Args:
            table_identifier: The identifier of the table to read e.g. `default.taxi_dataset`
            catalog_kwargs: Optional arguments to use when setting up the Iceberg
                catalog
            snapshot_properties: custom properties write to snapshot when committing
            to an iceberg table, e.g. {"commit_time": "2021-01-01T00:00:00Z"}
        r   )FileIO)Transaction)TableMetadataNnamedefault)pyiceberg.ior   pyiceberg.tabler   pyiceberg.table.metadatar   r   _catalog_kwargs_snapshot_propertiespop_catalog_name_uuid_io_txn_table_metadata)selfr   r   r   r   r   r    r&   b/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/iceberg_datasink.py__init__    s   

zIcebergDatasink.__init__returnc                 C   s   | j  }|d= |S )zExclude `_txn` during pickling.r#   )__dict__copyr%   stater&   r&   r'   __getstate__G   s   
zIcebergDatasink.__getstate__r-   c                 C   s   | j | d | _d S )N)r*   updater#   r,   r&   r&   r'   __setstate__M   s   
zIcebergDatasink.__setstate__r   c                 C   s"   ddl m} |j| jfi | jS )Nr   )catalog)	pyicebergr1   load_catalogr    r   )r%   r1   r&   r&   r'   _get_catalogQ   s   zIcebergDatasink._get_catalogc                 C   s   ddl }ddlm} t|jtdkrddlm} n	ddlm} |j}| 	 }|
| j}| | _| jjj| _| jj| _t | _dd | j jD  }rYtd	| d
|| jj|j|j| _dS )zPrepare for the transactionr   N)TablePropertiesz0.9.0)property_as_bool)PropertyUtilc                 S   s   g | ]}|j js|qS r&   )	transformsupports_pyarrow_transform).0fieldr&   r&   r'   
<listcomp>i   s    z2IcebergDatasink.on_write_start.<locals>.<listcomp>zhNot all partition types are supported for writes. Following partitions cannot be written using pyarrow: .)r2   r   r5   r   parse__version__pyiceberg.utils.propertiesr6   r7   r4   
load_tabler   transactionr#   _tableior"   table_metadatar$   uuiduuid4r!   specfields
ValueError
propertiesMANIFEST_MERGE_ENABLEDMANIFEST_MERGE_ENABLED_DEFAULT_manifest_merge_enabled)r%   r2   r5   r6   r7   r1   tableunsupported_partitionsr&   r&   r'   on_write_startV   s.   






zIcebergDatasink.on_write_startblocksctxr   c                 C   s   ddl m}m} ddlm} ddlm} g }|D ]6}t|	 }	| 
|p(d}
|| j |	j|
d |	jd dkr<qt }|| j|	| j|}|| q|S )Nr   ) _check_pyarrow_schema_compatible_dataframe_to_data_files)$DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE)ConfigF)provided_schemadowncast_ns_timestamp_to_us)pyiceberg.io.pyarrowrT   rU   r   rV   pyiceberg.utils.configrW   r   	for_blockto_arrowget_boolr$   schemashaperF   rG   r"   extend)r%   rR   rS   rT   rU   rV   rW   data_files_listblockpa_tablerY   	task_uuid
data_filesr&   r&   r'   writex   s*   zIcebergDatasink.writewrite_resultc                 C   s~   | j j| jd}| jr|jn|j}| }| j|_|jD ]}|D ]}|	| q qW d    n1 s3w   Y  | j 
  d S )N)r   )r#   update_snapshotr   rN   merge_appendfast_appendr!   commit_uuidwrite_returnsappend_data_filecommit_transaction)r%   rh   ri   append_methodappend_filesrf   	data_filer&   r&   r'   on_write_complete   s    
z!IcebergDatasink.on_write_complete)NN)r)   r   )r)   N)__name__
__module____qualname____doc__strr   r   r   r(   dictr.   r0   r4   rQ   r   r
   r	   r   r   rg   rs   r&   r&   r&   r'   r      s.    

'

"

"r   r   )rw   loggingrF   typingr   r   r   r   r   r   	packagingr   'ray.data._internal.execution.interfacesr	   ray.data.blockr
   r   ray.data.datasource.datasinkr   r   ray.util.annotationsr   pyiceberg.catalogr   pyiceberg.manifestr   	getLoggerrt   loggerr   r&   r&   r&   r'   <module>   s     
