o
    i+b                    @  s  d dl mZ d dlZd dlZd dlZd dlZd dlmZmZ d dl	m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZ d dlm  mZ d dlmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z, d dl-m.Z.m/Z/ d dl0m1Z1m2Z2m3Z3m4Z4m5Z5 d dl6m7Z7m8Z8m9Z9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZBmCZC d dlDmEZE d dlFmGZGmHZH d dlImJZJ d dlKmLZLmMZM d dlNmOZOmPZP d dlQmRZRmSZS d dlTmUZUmVZVmWZWmXZXmYZYmZZZm[Z[m\Z\m]Z]m^Z^m_Z_m`Z`maZambZbmcZcmdZdmeZemfZfmgZg d dlhmiZi d dljmkZkmlZlmmZm d dlnmoZo d dlpmqZq d dlrmsZs d d ltmuZu d d!lvmwZwmxZxmyZymzZzm{Z{m|Z|m}Z}m~Z~ d d"lmZ d d#lmZ d d$lmZ d d%lmZ erd dlmZ d dlZd dlZd dlZd dlZd dlZd d&lmZ d d'lmZ d d(lmZ d d)lmZmZmZ e Zd*Ze G d+d, d,ZG d-d. d.ZG d/d0 d0ZG d1d2 d2eZG d3d4 d4eyee  ZG d5d6 d6exZG d7d8 d8exZG d9d: d:exZG d;d< d<ZG d=d> d>eZG d?d@ d@eZdrdEdFZedGdHdIdJZG dKdH dHeZG dLdM dMZedNdOG dPdQ dQeZdsdUdVZdtd_d`ZdudddeZG dfdg dgeZedIdhG didj djZdvdpdqZdS )w    )annotationsN)ABCabstractmethod)CallableIterableIterator)	dataclass)cached_property)chain)TracebackType)TYPE_CHECKINGAnyTypeVar)Field)AlwaysFalse
AlwaysTrueAndBooleanExpressionEqualToIsNullOr	Reference)ResidualEvaluator_InclusiveMetricsEvaluatorbindexpression_evaluatorinclusive_projectionmanifest_evaluator)FileIOload_file_io)DataFileDataFileContentManifestContentManifestEntryManifestFile)PARTITION_FIELD_ID_STARTUNPARTITIONED_PARTITION_SPECPartitionKeyPartitionSpec)Schema)DeleteFileIndexInspectTable)LocationProviderload_location_providerMaintenanceTable)INITIAL_SEQUENCE_NUMBERTableMetadata)NameMapping)MAIN_BRANCHSnapshotRef)SnapshotSnapshotLogEntry)UNSORTED_SORT_ORDER	SortOrder)AddPartitionSpecUpdateAddSchemaUpdateAddSortOrderUpdateAssertCreateAssertRefSnapshotIdAssertTableUUIDAssignUUIDUpdateRemovePropertiesUpdateSetCurrentSchemaUpdateSetDefaultSortOrderUpdateSetDefaultSpecUpdateSetLocationUpdateSetPropertiesUpdateSetSnapshotRefUpdateTableRequirementTableUpdateUpdatesAndRequirementsUpgradeFormatVersionUpdateupdate_table_metadata)UpdateSchema)ManageSnapshotsUpdateSnapshot_FastAppendFilesUpdateSortOrder
UpdateSpecUpdateStatistics)IdentityTransform)
EMPTY_DICTIcebergBaseModelIcebergRootModel
IdentifierKeyDefaultDict
PropertiesRecordTableVersion)	strtobool)ExecutorFactory)Config)property_as_bool)DuckDBPyConnectionIcebergDataFusionTable)Catalog)RESTContentFileRESTDeleteFileRESTFileScanTaskz$downcast-ns-timestamp-to-us-on-writec                   @  s*   e Zd ZU dZdZded< dZded< dS )UpsertResultzSummary the upsert operation.r   introws_updatedrows_insertedN)__name__
__module____qualname____doc__rm   __annotations__rn    rt   rt   V/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/pyiceberg/table/__init__.pyrk      s   
 rk   c                   @  s   e Zd ZU dZdZdZdZdZdZdZ	dZ
dZd	Zd
ZdZdZdZdZdZdZdZdZdZdZdZdZdZdZdZdZdZdZdZ dZ!dZ"dZ#d Z$d!Z%d"Z&d#Z'd$Z(e'Z)d%Z*d&Z+d'Z,d(e-d)< d*Z.d+Z/d,Z0d-Z1d.Z2dZ3d/Z4d-Z5d0Z6dZ7d1Z8d2Z9d3Z:d4Z;dS )5TablePropertiesz"write.parquet.row-group-size-bytesi   zwrite.parquet.row-group-limiti   zwrite.parquet.page-size-byteszwrite.parquet.page-row-limiti N  zwrite.parquet.dict-size-bytesi    zwrite.parquet.compression-codeczstdzwrite.parquet.compression-levelNz$write.parquet.bloom-filter-max-bytesz)write.parquet.bloom-filter-enabled.columnzwrite.target-file-size-bytesi    zwrite.avro.compression-codecgzipzwrite.metadata.metrics.defaultztruncate(16)zwrite.metadata.metrics.columnzwrite.summary.partition-limitr   zwrite.py-location-provider.implzwrite.object-storage.enabledFz&write.object-storage.partitioned-pathsTzwrite.data.pathzwrite.format.defaultparquetzwrite.metadata.pathzwrite.delete.modezcopy-on-writezmerge-on-readzschema.name-mapping.defaultzformat-version   r_   DEFAULT_FORMAT_VERSIONz!commit.manifest.target-size-bytesi   z"commit.manifest.min-count-to-merged   zcommit.manifest-merge.enabledz$write.metadata.previous-versions-maxz*write.metadata.delete-after-commit.enabledz"history.expire.max-snapshot-age-msi ̿z$history.expire.min-snapshots-to-keep   )<ro   rp   rq   PARQUET_ROW_GROUP_SIZE_BYTES$PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULTPARQUET_ROW_GROUP_LIMITPARQUET_ROW_GROUP_LIMIT_DEFAULTPARQUET_PAGE_SIZE_BYTESPARQUET_PAGE_SIZE_BYTES_DEFAULTPARQUET_PAGE_ROW_LIMITPARQUET_PAGE_ROW_LIMIT_DEFAULTPARQUET_DICT_SIZE_BYTESPARQUET_DICT_SIZE_BYTES_DEFAULTPARQUET_COMPRESSIONPARQUET_COMPRESSION_DEFAULTPARQUET_COMPRESSION_LEVEL!PARQUET_COMPRESSION_LEVEL_DEFAULTPARQUET_BLOOM_FILTER_MAX_BYTES&PARQUET_BLOOM_FILTER_MAX_BYTES_DEFAULT*PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIXWRITE_TARGET_FILE_SIZE_BYTES$WRITE_TARGET_FILE_SIZE_BYTES_DEFAULTWRITE_AVRO_COMPRESSIONWRITE_AVRO_COMPRESSION_DEFAULTDEFAULT_WRITE_METRICS_MODE"DEFAULT_WRITE_METRICS_MODE_DEFAULTMETRICS_MODE_COLUMN_CONF_PREFIXWRITE_PARTITION_SUMMARY_LIMIT%WRITE_PARTITION_SUMMARY_LIMIT_DEFAULTWRITE_PY_LOCATION_PROVIDER_IMPLOBJECT_STORE_ENABLEDOBJECT_STORE_ENABLED_DEFAULT$WRITE_OBJECT_STORE_PARTITIONED_PATHS,WRITE_OBJECT_STORE_PARTITIONED_PATHS_DEFAULTWRITE_DATA_PATHWRITE_FILE_FORMATWRITE_FILE_FORMAT_DEFAULTWRITE_METADATA_PATHDELETE_MODEDELETE_MODE_COPY_ON_WRITEDELETE_MODE_MERGE_ON_READDELETE_MODE_DEFAULTDEFAULT_NAME_MAPPINGFORMAT_VERSIONr{   rs   MANIFEST_TARGET_SIZE_BYTES"MANIFEST_TARGET_SIZE_BYTES_DEFAULTMANIFEST_MIN_MERGE_COUNT MANIFEST_MIN_MERGE_COUNT_DEFAULTMANIFEST_MERGE_ENABLEDMANIFEST_MERGE_ENABLED_DEFAULTMETADATA_PREVIOUS_VERSIONS_MAX&METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT$METADATA_DELETE_AFTER_COMMIT_ENABLED,METADATA_DELETE_AFTER_COMMIT_ENABLED_DEFAULTMAX_SNAPSHOT_AGE_MSMAX_SNAPSHOT_AGE_MS_DEFAULTMIN_SNAPSHOTS_TO_KEEPMIN_SNAPSHOTS_TO_KEEP_DEFAULTrt   rt   rt   ru   rv      sr   
 rv   c                   @  s  e Zd ZU ded< ded< ded< ded< ddddZedddZdddZdddZ	ddd d!Z		ddd"d#Z
ed$fdd)d*Zdd-d.Zefdd3d4Z	5	5	5ddd@dAZddEdFZefddLdMZdddPdQZdddSdTZeefddVdWZddYdZZeefdd]d^Zeefdd_d`Zeed$efddcddZed$efddfdgZd5d$d$d$eefddmdnZed$efddrdsZddudvZddxdyZdd{d|Z dd}d~Z!d5S )TransactionTable_tablebool_autocommittuple[TableUpdate, ...]_updatestuple[TableRequirement, ...]_requirementsFtable
autocommitc                 C  s   || _ || _d| _d| _dS )zOpen a transaction to stage and commit changes to a table.

        Args:
            table: The table that will be altered.
            autocommit: Option to automatically commit the changes when they are staged.
        rt   N)r   r   r   r   )selfr   r   rt   rt   ru   __init__  s   
zTransaction.__init__returnr2   c                 C  s   t | jj| jS N)rL   r   metadatar   r   rt   rt   ru   table_metadata  s   zTransaction.table_metadatac                 C  s   | S )z(Start a transaction to update the table.rt   r   rt   rt   ru   	__enter__  s   zTransaction.__enter__exctypetype[BaseException] | NoneexcinstBaseException | NoneexctbTracebackType | NoneNonec                 C  s0   |du r|du r|du r|    dS dS dS dS )zCClose and commit the transaction if no exceptions have been raised.N)commit_transaction)r   r   r   r   rt   rt   ru   __exit__  s   zTransaction.__exit__rt   updatesrequirementsc                 C  s\   |D ]}| | j q|  j|7  _dd | jD }|D ]}t||vr+| j|f | _q| S )a  Stage updates to the transaction state without committing to the catalog.

        Args:
            updates: The updates to stage.
            requirements: The requirements that must be met.

        Returns:
            This transaction for method chaining.
        c                 S  s   h | ]}t |qS rt   )type).0requirementrt   rt   ru   	<setcomp>/      z%Transaction._stage.<locals>.<setcomp>)validater   r   r   r   )r   r   r   r   existing_requirementsnew_requirementrt   rt   ru   _stage  s   zTransaction._stagec                 C  s   |  || | jr|   | S )zKCheck if the requirements are met, and applies the updates to the metadata.)r   r   r   )r   r   r   rt   rt   ru   _apply6  s   zTransaction._applyT
row_filterstr | BooleanExpressioncase_sensitiveDataScanc                 C  s   t | j| jj||dS )zIMinimal data scan of the table with the current state of the transaction.r   ior   r   )r   r   r   r   )r   r   r   rt   rt   ru   _scanC  s   zTransaction._scanformat_versionr_   c                 C  s\   |dvrt d| || jjk rt d| jj d| || jjkr,| t|dfS | S )zSet the table to a certain version.

        Args:
            format_version: The newly set version.

        Returns:
            The alter table builder.
        >   r}   rz   z"Unsupported table format version: zCannot downgrade vz table to vr   )
ValueErrorr   r   r   rK   )r   r   rt   rt   ru   upgrade_table_versionI  s   	z!Transaction.upgrade_table_version
propertiesr]   kwargsr   c                 K  s*   |r|rt d|p|}| t|dfS )a  Set properties.

        When a property is already set, it will be overwritten.

        Args:
            properties: The properties set on the table.
            kwargs: properties can also be pass as kwargs.

        Returns:
            The alter table builder.
        z&Cannot pass both properties and kwargsr   )r   r   rF   )r   r   r   r   rt   rt   ru   set_properties]  s   zTransaction.set_propertiesNsnapshot_idrl   ref_namestrr   max_ref_age_ms
int | Nonemax_snapshot_age_msmin_snapshots_to_keeprJ   c           	      C  sF   t ||||||df}t|| jjv r| jj| jnd|df}||fS )zUpdate a ref to a snapshot.

        Returns:
            The updates and requirements for the set-snapshot-ref staged
        )r   r   r   r   r   r   N)r   ref)rG   r>   r   refsr   )	r   r   r   r   r   r   r   r   r   rt   rt   ru   _set_ref_snapshotn  s   zTransaction._set_ref_snapshotpartition_recordsset[Record]r   c           
        s   | j  }| j    fdd|jD }t }|D ]-}t }t|D ]\}}|| dur5tt||| nt	t|}	t
||	}q"t||}q|S )zBuild a filter predicate matching any of the input partition records.

        Args:
            partition_records: A set of partition records to match
        Returns:
            A predicate matching any of the input partition records.
        c                   s   g | ]	}  |jjqS rt   )
find_field	source_idnamer   fieldschemart   ru   
<listcomp>  s    z:Transaction._build_partition_predicate.<locals>.<listcomp>N)r   specr   fieldsr   r   	enumerater   r   r   r   r   )
r   r   partition_specpartition_fieldsexprpartition_recordmatch_partition_expressionpospartition_field	predicatert   r   ru   _build_partition_predicate  s   


z&Transaction._build_partition_predicatesnapshot_propertiesdict[str, str]branch
str | NonerP   c                 C  s6   t | jjtjtj}| j||d}|r| S | S )zDetermine the append type based on table properties.

        Args:
            snapshot_properties: Custom properties to be added to the snapshot summary
        Returns:
            Either a fast-append or a merge-append snapshot producer.
        r  r  )	rc   r   r   rv   r   r   update_snapshotmerge_appendfast_append)r   r  r  manifest_merge_enabledr  rt   rt   ru   _append_snapshot_producer  s   
z%Transaction._append_snapshot_producerallow_incompatible_changesrM   c                 C  s   t | ||| j dS ),  Create a new UpdateSchema to alter the columns of this table.

        Args:
            allow_incompatible_changes: If changes are allowed that might break downstream consumers.
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSchema.
        )r  r   name_mapping)rM   r   r  r   r  r   rt   rt   ru   update_schema  s   
zTransaction.update_schemarR   c                 C  s   t | |dS )zCreate a new UpdateSortOrder to update the sort order of this table.

        Args:
            case_sensitive: If field names are case-sensitive.

        Returns:
            A new UpdateSortOrder.
        r   rQ   r   r   rt   rt   ru   update_sort_order  s   	zTransaction.update_sort_orderrO   c                 C  s   t | | jj||dS )zCreate a new UpdateSnapshot to produce a new snapshot for the table.

        Returns:
            A new UpdateSnapshot
        )r   r  r  )rO   r   r   )r   r  r  rt   rt   ru   r    s   zTransaction.update_snapshotrV   c                 C  s
   t | dS )z
        Create a new UpdateStatistics to update the statistics of the table.

        Returns:
            A new UpdateStatistics
        transactionrU   r   rt   rt   ru   update_statistics  s   
zTransaction.update_statisticsdfpa.Tablec              
   C  s  zddl }W n ty } ztd|d}~ww ddlm}m} t||js-td| t 	t
p4d}|| j |j|| jjd | j||d0}	|jd dkrjt|| j|	j|| jjd	}
|
D ]}|	| qbW d   dS W d   dS 1 s}w   Y  dS )
aM  
        Shorthand API for appending a PyArrow table to a table transaction.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the append operation
        r   N(For writes PyArrow needs to be installed _check_pyarrow_schema_compatible_dataframe_to_data_filesExpected PyArrow table, got: Fprovided_schemadowncast_ns_timestamp_to_usr   r  r   
write_uuidr!  r   )pyarrowModuleNotFoundErrorpyiceberg.io.pyarrowr%  r&  
isinstancer   r   rb   get_bool$DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITEr   r   r   r  shapelistcommit_uuidr   r   append_data_file)r   r!  r  r  paer%  r&  r*  append_files
data_files	data_filert   rt   ru   append  s:   	
"zTransaction.appendc              
   C  sv  zddl }W n ty } ztd|d}~ww ddlm}m} t||js-td| | j	 
 r8td| j	 jD ]}t|jtsMtd| q>t tpUd}	|| j |j|	| jjd	 |jd dkrmdS t }
t|| jj|
|| jjd
}dd |D }| j|d}| j|||d | j||d}|
|_|D ]}|| qW d   dS 1 sw   Y  dS )aK  
        Shorthand for overwriting existing partitions with a PyArrow table.

        The function detects partition values in the provided arrow table using the current
        partition spec, and deletes existing partitions matching these values. Finally, the
        data in the table is appended to the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the dynamic partition overwrite operation
        r   Nr#  r$  r'  z9Cannot apply dynamic overwrite on an unpartitioned table.zsFor now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: Fr(  r,  c                 S  s   h | ]}|j qS rt   )	partition)r   r<  rt   rt   ru   r   I  s    z:Transaction.dynamic_partition_overwrite.<locals>.<setcomp>)r   )delete_filterr  r  r+  )r.  r/  r0  r%  r&  r1  r   r   r   r   is_unpartitionedr  	transformrW   rb   r2  r3  r   r   r4  uuiduuid4r5  r   r   r   r  deleter  r6  r7  )r   r!  r  r  r8  r9  r%  r&  r   r*  append_snapshot_commit_uuidr;  partitions_to_overwriter?  r:  r<  rt   rt   ru   dynamic_partition_overwrite  sV   
"z'Transaction.dynamic_partition_overwriteoverwrite_filterBooleanExpression | strc              
   C  s   zddl }W n ty } ztd|d}~ww ddlm}m}	 t||js-td| t 	t
p4d}
|| j |j|
| jjd |t krQ| j||||d | j||d	.}|jd dkrv|	| j|j|| jjd
}|D ]}|| qnW d   dS W d   dS 1 sw   Y  dS )a  
        Shorthand for adding a table overwrite with a PyArrow table to the transaction.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - OVERWRITE: In case existing Parquet files need to be rewritten to drop rows that match the overwrite filter.
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
            branch: Branch Reference to run the overwrite operation
        r   Nr#  r$  r'  Fr(  r?  r   r  r  r+  r,  )r.  r/  r0  r%  r&  r1  r   r   rb   r2  r3  r   r   r   r   rD  r  r4  r6  r   r   r7  )r   r!  rH  r  r   r  r8  r9  r%  r&  r*  r:  r;  r<  rt   rt   ru   	overwriteR  sD   

"zTransaction.overwriter?  c                 C  s"  ddl m}m}m} | jjtjtj	tj
krtjddd t|tr't|}| j||d }||| W d   n1 sAw   Y  |jdu rt| j ||}	||	| j }
| j||d	}|durl||}| }t }td}g }|D ]F}|| j| jj| j t d
j |gd}|!|
}t"|dkr|#|j$g f q}t"|t"|kr|#|j$t%|| jj|| j||df q}t"|dkr| j||d& !}||_'|D ]\}}|(| |D ]}|)| qqW d   n1 sw   Y  |j*s|jstjddd dS dS dS )a  
        Shorthand for deleting record from a table.

        A delete may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - OVERWRITE: In case existing Parquet files need to be rewritten to drop rows that match the delete filter.

        Args:
            delete_filter: A boolean expression to delete rows from a table
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
            branch: Branch Reference to run the delete operation
        r   )	ArrowScanr&  $_expression_to_complementary_pyarrowzAMerge on read is not yet supported, falling back to copy-on-writerz   
stacklevelr  NT)r   r   )r   r   projected_schemar   )tasks)r   r!  r   r-  counterz*Delete operation did not match any records)+r0  rL  r&  rM  r   r   getrv   r   r   r   warningswarnr1  r   _parse_row_filterr  rD  delete_by_predicaterewrites_neededr   r   r   use_ref
plan_filesrB  rC  	itertoolscountr   r   r   to_tablefilterlenr=  filer5  rK  r6  delete_data_filer7  files_affected)r   r?  r  r   r  rL  r&  rM  delete_snapshotbound_delete_filterpreserve_row_filter	file_scanfilesr6  rR  replaced_filesoriginal_filer!  filtered_dfoverwrite_snapshotoriginal_data_filereplaced_data_filesreplaced_data_filert   rt   ru   rD    s   





	zTransaction.delete	join_colslist[str] | Nonewhen_matched_update_allwhen_not_matched_insert_allrk   c              
   C  s\  zddl }W n ty }	 ztd|	d}	~	ww ddlm}
 ddlm} |du rKg }| j jD ]}| j 	|}|durD|
| q0td| t|dkrUtd|s]|s]td|||rgtd	dd
lm} t tptd}|| j |j|| jjd |||}t| j| jj||d}|| jjv r||}| }g }g }|}|D ]D}|j|g}|r||||}t|dkr|||}|
| |
| |r|||}t| j ||d}|
|}|| }qd}d}|r||}t|}| j |t|dkrt!| n|d ||d |r(t|}|r(| j
|||d t"||dS )L  Shorthand API for performing an upsert to an iceberg table.

        Args:

            df: The input dataframe to upsert with the table's data.
            join_cols: Columns to join on, if not provided, it will use the identifier-field-ids.
            when_matched_update_all: Bool indicating to update rows that are matched but require an update
                due to a value in a non-key column changing
            when_not_matched_insert_all: Bool indicating new rows to be inserted that do not match any
                existing rows in the table
            case_sensitive: Bool indicating if the match should be case-sensitive
            branch: Branch Reference to run the upsert operation
            snapshot_properties: Custom properties to be added to the snapshot summary

            To learn more about the identifier-field-ids: https://iceberg.apache.org/spec/#identifier-field-ids

                Example Use Cases:
                    Case 1: Both Parameters = True (Full Upsert)
                    Existing row found → Update it
                    New row found → Insert it

                    Case 2: when_matched_update_all = False, when_not_matched_insert_all = True
                    Existing row found → Do nothing (no updates)
                    New row found → Insert it

                    Case 3: when_matched_update_all = True, when_not_matched_insert_all = False
                    Existing row found → Update it
                    New row found → Do nothing (no inserts)

                    Case 4: Both Parameters = False (No Merge Effect)
                    Existing row found → Do nothing
                    New row found → Do nothing
                    (Function effectively does nothing)


        Returns:
            An UpsertResult class (contains details of rows updated and inserted)
        r   Nr#  )expression_to_pyarrow)upsert_utilzField-ID could not be found: zWJoin columns could not be found, please set identifier-field-ids or pass in explicitly.z$no upsert options selected...exitingzSDuplicate rows found in source dataset based on the key columns. No upsert executed)r%  Fr(  r   r  r}   )rH  r  r  )r  r  )rm   rn   )#r.  r/  r0  rt  pyiceberg.tableru  r   r   identifier_field_idsfind_column_namer=  r   r_  has_duplicate_rowsr%  rb   r2  r3  r   create_match_filterr   r   r   r   rY  to_arrow_batch_readerr   from_batchesget_rows_to_updater   r^  concat_tablesrK  r   rk   )r   r!  ro  rq  rr  r   r  r  r8  r9  rt  ru  field_idcolr%  r*  matched_predicate#matched_iceberg_record_batches_scanmatched_iceberg_record_batchesbatches_to_overwriteoverwrite_predicatesrows_to_insertbatchrowsrows_to_updateoverwrite_mask_predicate
expr_matchexpr_match_boundexpr_match_arrowupdate_row_cntinsert_row_cntrt   rt   ru   upsert  s   0




zTransaction.upsert
file_paths	list[str]check_duplicate_filesc                 C  s   t |t t|krtd|r:ddlm} |d|}dd | jj	 
| D }|r:tdd| | j du rR| jdi tj| j j i | j||d	}t| j|| jjd
}	|	D ]}
||
 qfW d   dS 1 syw   Y  dS )a  
        Shorthand API for adding files as data files to the table transaction.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: Raises a ValueError given file_paths contains duplicate files
            ValueError: Raises a ValueError given file_paths already referenced by table
        zFile paths must be uniquer   N	file_pathc                 S  s   g | ]}|d  qS )r  rt   )r   r`  rt   rt   ru   r     r   z)Transaction.add_files.<locals>.<listcomp>z>Cannot add files that are already referenced by table, files: , r+  )r   r  r   rt   )r_  setr   pyarrow.computecomputer   isinr   inspectr;  r^  	to_pylistjoinr   r  r   rv   r   r   model_dump_jsonr  _parquet_files_to_data_filesr   r7  )r   r  r  r  r  pcr  referenced_filesr:  r;  r<  rt   rt   ru   	add_files  s(    "zTransaction.add_filesrT   c                 C     t | S )zyCreate a new UpdateSpec to update the partitioning of the table.

        Returns:
            A new UpdateSpec.
        rS   r   rt   rt   ru   update_spec     zTransaction.update_specremovalsc                 G  s   |  t|dfS )zRemove properties.

        Args:
            removals: Properties to be removed.

        Returns:
            The alter table builder.
        )r  )r   rA   )r   r  rt   rt   ru   remove_properties  s   	zTransaction.remove_propertieslocationc                 C     t d)zSet the new table location.

        Args:
            location: The new location of the table.

        Returns:
            The alter table builder.
        zNot yet implementedNotImplementedError)r   r  rt   rt   ru   update_location     	zTransaction.update_locationc                 C  sN   t | jdkr|  jt| jjdf7  _| jj| j| jd d| _d| _| jS )zmCommit the changes to the catalog.

        Returns:
            The table with the updates applied.
        r   rB  r   r   rt   )r_  r   r   r?   r   
table_uuidr   
_do_commitr   rt   rt   ru   r     s   zTransaction.commit_transaction)F)r   r   r   r   )r   r2   r   r   )r   r   r   r   r   r   r   r   )rt   )r   r   r   r   r   r   )r   r   r   r   r   r   )r   r_   r   r   )r   r]   r   r   r   r   )NNN)r   rl   r   r   r   r   r   r   r   r   r   r   r   rJ   )r   r   r   r   )r  r  r  r  r   rP   FTr  r   r   r   r   rM   Tr   r   r   rR   )r  r  r  r  r   rO   r   rV   r!  r"  r  r  r  r  r   r   r!  r"  rH  rI  r  r  r   r   r  r  r   r   )
r?  r   r  r  r   r   r  r  r   r   r!  r"  ro  rp  rq  r   rr  r   r   r   r  r  r  r  r   rk   
r  r  r  r  r  r   r  r  r   r   )r   rT   )r  r   r   r   )r  r   r   r   r   r   )"ro   rp   rq   rs   r   propertyr   r   r   r   r   ALWAYS_TRUEr   r   rX   r   r   r  r4   r  r  r  r  r   r=  rG  rK  rD  r  r  r  r  r  r   rt   rt   rt   ru   r      sr   
 



!

	'B@h 
)

r   c                      s6   e Zd ZdZdddZd fd
dZdddZ  ZS )CreateTableTransactionz8A transaction that involves the creation of a new table.r   r2   r   r   c                 C  s   |  j t|jdt|jdf7  _ | }|  j t|dtddf7  _ | }|	 r8|  j t
tdf7  _ n|  j t
|df7  _ |  j tddf7  _ ||j}|du s[|jrg|  j ttd	f7  _ n|  j t|d	f7  _ |  j tdd
f7  _ |  j t|jdt|jdf7  _ dS )zqSet the initial changes that can reconstruct the initial table metadata when creating the CreateTableTransaction.r  r   )schema_	schema_id)r   spec_idN)
sort_order)sort_order_id)r  r   )r   r@   r  rK   r   r   r;   rB   r   r@  r:   r&   rD   sort_order_by_iddefault_sort_order_idis_unsortedr<   r8   rC   rE   r  rF   r   )r   r   r   r   r  rt   rt   ru   _initial_changes  s.   



z'CreateTableTransaction._initial_changesr   StagedTablec                   s    t  j|dd | |j d S )NFr   )superr   r  r   )r   r   	__class__rt   ru   r     s   zCreateTableTransaction.__init__r   c                 C  s6   t | jdkr| jj| jt fd d| _d| _| jS )zCommit the changes to the catalog.

        In the case of a CreateTableTransaction, the only requirement is AssertCreate.
        Returns:
            The table with the updates applied.
        r   r  rt   )r_  r   r   r  r=   r   r   rt   rt   ru   r     s   z)CreateTableTransaction.commit_transaction)r   r2   r   r   )r   r  r  )ro   rp   rq   rr   r  r   r   __classcell__rt   rt   r  ru   r    s
    
 r  c                   @  s&   e Zd ZU dZedddZded< dS )	Namespacez/Reference to one or more levels of a namespace..z.Reference to one or more levels of a namespace)descriptionr  rootN)ro   rp   rq   rr   r   r  rs   rt   rt   rt   ru   r    s   
 r  c                   @  s"   e Zd ZU dZded< ded< dS )TableIdentifierz&Fully Qualified identifier to a table.r  	namespacer   r   N)ro   rp   rq   rr   rs   rt   rt   rt   ru   r  '  s   
 r  c                   @  sD   e Zd ZU dZe Zded< eedZded< eedZ	ded< d	S )
CommitTableRequestz0A pydantic BaseModel for a table commit request.r  
identifier)default_factoryr   r   r   r   N)
ro   rp   rq   rr   r   r  rs   tupler   r   rt   rt   rt   ru   r  .  s
   
 r  c                   @  s,   e Zd ZU dZded< eddZded< dS )	CommitTableResponsez1A pydantic BaseModel for a table commit response.r2   r   zmetadata-location)aliasr   metadata_locationN)ro   rp   rq   rr   rs   r   r  rt   rt   rt   ru   r  6  s   
 r  c                   @  s  e Zd ZU dZe Zded< ded< e Zded< ded	< d
ed< ded< efdddZ	dddZ
edddZedddZdddZdddZed d!d"ed"fdd/d0Zedd2d3Zdd5d6Zdd8d9Zdd;d<Zdd>d?ZddAdBZddDdEZddGdHZeddIdJZddKdLZddNdOZeddPdQZddSdTZddVdWZddXdYZ dd[d\Z!ddd_d`Z"ddbdcZ#ddedfZ$ddhdiZ%dddmdnZ&dddpdqZ'ddsdtZ(d"d!d!d!e)efdddZ*ee)fdddZ+ee)fdddZ,eed!e)fdddZ-eed!e)fdddZ.ed!e)fdddZ/ddddZ0dddZ1e2dddZ3dddZ4dddZ5dddZ6dddZ7dddZ8dddZ9dddZ:d"S )r   zAn Iceberg table.r[   _identifierr2   r   r   r  r   r   rg   catalogr  configr  r   r   c                 C  s(   || _ || _|| _|| _|| _|| _d S r   )r  r   r  r   r  r  )r   r  r   r  r   r  r  rt   rt   ru   r   G  s   	
zTable.__init__r   c                 C  r  )zCreate a new transaction object to first stage the changes, and then commit them to the catalog.

        Returns:
            The transaction object
        )r   r   rt   rt   ru   r  W  r  zTable.transactionr,   c                 C  r  )zReturn the InspectTable object to browse the table metadata.

        Returns:
            InspectTable object based on this Table.
        r+   r   rt   rt   ru   r  _     zTable.inspectr0   c                 C  r  )zReturn the MaintenanceTable object for maintenance.

        Returns:
            MaintenanceTable object based on this Table.
        r/   r   rt   rt   ru   maintenanceh  r  zTable.maintenancec                 C  s:   | j | j}| | j|j |j| _|j| _|j| _| S )zxRefresh the current table metadata.

        Returns:
            An updated instance of the same Iceberg table
        )r  
load_tabler  _check_uuidr   r   r  )r   freshrt   rt   ru   refreshq  s   zTable.refreshc                 C  s   | j S )zqReturn the identifier of this table.

        Returns:
            An Identifier tuple of the table name
        )r  r   rt   rt   ru   r   ~  s   z
Table.name*TNr   r   selected_fieldstuple[str, ...]r   r   r   r   optionsr]   limitr   c                 C  s$   t | j| j||||||| j| jd
S )aw  Fetch a DataScan based on the table's current metadata.

            The data scan can be used to project the table's data
            that matches the provided row_filter onto the table's
            current schema.

        Args:
            row_filter:
                A string or BooleanExpression that describes the
                desired rows
            selected_fields:
                A tuple of strings representing the column names
                to return in the output dataframe.
            case_sensitive:
                If True column matching is case sensitive
            snapshot_id:
                Optional Snapshot ID to time travel to. If None,
                scans the table as of the current snapshot ID.
            options:
                Additional Table properties as a dictionary of
                string key value pairs to use for this scan.
            limit:
                An integer representing the number of rows to
                return in the scan result. If None, fetches all
                matching rows.

        Returns:
            A DataScan based on the table's current metadata.
        )
r   r   r   r  r   r   r  r  r  table_identifier)r   r   r   r  r  r   r   r  r   r   r  r  rt   rt   ru   scan  s   &z
Table.scanr_   c                 C     | j jS r   )r   r   r   rt   rt   ru   r        zTable.format_versionr)   c                      t  fdd jjD S )z!Return the schema for this table.c                 3  "    | ]}|j  jjkr|V  qd S r   )r  r   current_schema_idr   r   r   rt   ru   	<genexpr>       zTable.schema.<locals>.<genexpr>)nextr   schemasr   rt   r   ru   r        zTable.schemadict[int, Schema]c                 C     dd | j jD S )z*Return a dict of the schema of this table.c                 S     i | ]}|j |qS rt   r  r  rt   rt   ru   
<dictcomp>  r   z!Table.schemas.<locals>.<dictcomp>)r   r  r   rt   rt   ru   r       zTable.schemasr(   c                   r  )z(Return the partition spec of this table.c                 3  r  r   )r  r   default_spec_idr   r   r   rt   ru   r    r  zTable.spec.<locals>.<genexpr>)r  r   partition_specsr   rt   r   ru   r     r  z
Table.specdict[int, PartitionSpec]c                 C  r  )z-Return a dict the partition specs this table.c                 S  r  rt   r  r   rt   rt   ru   r    r   zTable.specs.<locals>.<dictcomp>)r   r  r   rt   rt   ru   specs  r  zTable.specsr9   c                   r  )z$Return the sort order of this table.c                 3  r  r   )order_idr   r  r   r  r   rt   ru   r    s    z#Table.sort_order.<locals>.<genexpr>)r  r   sort_ordersr   rt   r   ru   r    s   zTable.sort_orderdict[int, SortOrder]c                 C  r  )z/Return a dict of the sort orders of this table.c                 S  r  rt   )r  r  rt   rt   ru   r    r   z%Table.sort_orders.<locals>.<dictcomp>)r   r  r   rt   rt   ru   r    r  zTable.sort_ordersrl   c                 C  s   | j jr| j jS td S )zmReturn the highest assigned partition field ID across all specs or 999 if only the unpartitioned spec exists.r}   )r   last_partition_idr%   r   rt   rt   ru   r    s   zTable.last_partition_idc                 C  r  )zProperties of the table.)r   r   r   rt   rt   ru   r     s   zTable.propertiesc                 C  r  )z!Return the table's base location.)r   r  r   rt   rt   ru   r    r  zTable.locationr-   c                 C  s   t | jj| jjdS )z%Return the table's location provider.)table_locationtable_properties)r.   r   r  r   r   rt   rt   ru   location_provider  s   zTable.location_providerc                 C  r  r   )r   last_sequence_numberr   rt   rt   ru   r    r  zTable.last_sequence_numberSnapshot | Nonec                 C  s   | j jdur| | j jS dS )zQGet the current snapshot for this table, or None if there is no current snapshot.N)r   current_snapshot_idsnapshot_by_idr   rt   rt   ru   current_snapshot  s   zTable.current_snapshotlist[Snapshot]c                 C  r  r   )r   	snapshotsr   rt   rt   ru   r       zTable.snapshotsc                 C  s   | j |S )z[Get the snapshot of this table with the given id, or None if there is no matching snapshot.)r   r  )r   r   rt   rt   ru   r    s   zTable.snapshot_by_idr   c                 C  s"   | j j| }r| |jS dS )zUReturn the snapshot referenced by the given name or null if no such reference exists.N)r   r   rS  r  r   )r   r   r   rt   rt   ru   snapshot_by_name  s   zTable.snapshot_by_nametimestamp_ms	inclusivec                 C  s>   t |  D ]}|r|j|ks|j|k r| |j  S qdS )aC  Get the snapshot that was current as of or right before the given timestamp, or None if there is no matching snapshot.

        Args:
            timestamp_ms: Find snapshot that was current at/before this timestamp
            inclusive: Includes timestamp_ms in search when True. Excludes timestamp_ms when False
        N)reversedhistoryr  r  r   )r   r  r  	log_entryrt   rt   ru   snapshot_as_of_timestamp  s
   zTable.snapshot_as_of_timestamplist[SnapshotLogEntry]c                 C  r  )z'Get the snapshot history of this table.)r   snapshot_logr   rt   rt   ru   r    r  zTable.historyrN   c                 C     t t| dddS )a*  
        Shorthand to run snapshot management operations like create branch, create tag, etc.

        Use table.manage_snapshots().<operation>().commit() to run a specific operation.
        Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example,

        with table.manage_snapshots() as ms:
           ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
        Tr  r  )rN   r   r   rt   rt   ru   manage_snapshots  s   zTable.manage_snapshotsrV   c                 C  r  )a_  
        Shorthand to run statistics management operations like add statistics and remove statistics.

        Use table.update_statistics().<operation>().commit() to run a specific operation.
        Use table.update_statistics().<operation-one>().<operation-two>().commit() to run multiple operations.

        Pending changes are applied on commit.

        We can also use context managers to make more changes. For example:

        with table.update_statistics() as update:
            update.set_statistics(statistics_file=statistics_file)
            update.remove_statistics(snapshot_id=2)
        Tr  r  )rV   r   r   rt   rt   ru   r      s   zTable.update_statisticsFr  rM   c                 C  s   t t| dd|||  dS )r  Tr  )r  r  r   r  )rM   r   r  r  rt   rt   ru   r  1  s   

zTable.update_schemarR   c                 C     t t| dd|dS )zCreate a new UpdateSortOrder to update the sort order of this table.

        Returns:
            A new UpdateSortOrder.
        Tr  )r  r   )rR   r   r  rt   rt   ru   r  B     zTable.update_sort_orderNameMapping | Nonec                 C  s
   | j  S )z(Return the table's field-id NameMapping.)r   r  r   rt   rt   ru   r  J     
zTable.name_mappingr!  r"  ro  rp  rq  rr  r  r  r  rk   c           	   
   C  sD   |   }|j|||||||dW  d   S 1 sw   Y  dS )rs  )r!  ro  rq  rr  r   r  r  N)r  r  )	r   r!  ro  rq  rr  r   r  r  txrt   rt   ru   r  N  s   
0$zTable.upsertc                 C  >   |   }|j|||d W d   dS 1 sw   Y  dS )aC  
        Shorthand API for appending a PyArrow table to the table.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the append operation
        r!  r  r  N)r  r=  r   r!  r  r  r#  rt   rt   ru   r=    s   
	"zTable.appendc                 C  r$  )a  Shorthand for dynamic overwriting the table with a PyArrow table.

        Old partitions are auto detected and replaced with data files created for input arrow table.
        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
            branch: Branch Reference to run the dynamic partition overwrite operation
        r%  N)r  rG  r&  rt   rt   ru   rG    s   
"z!Table.dynamic_partition_overwriterH  rI  c                 C  sB   |   }|j|||||d W d   dS 1 sw   Y  dS )a  
        Shorthand for overwriting the table with a PyArrow table.

        An overwrite may produce zero or more snapshots based on the operation:

            - DELETE: In case existing Parquet files can be dropped completely.
            - OVERWRITE: In case existing Parquet files need to be rewritten to drop rows that match the overwrite filter..
            - APPEND: In case new data is being inserted into the table.

        Args:
            df: The Arrow dataframe that will be used to overwrite the table
            overwrite_filter: ALWAYS_TRUE when you overwrite all the data,
                              or a boolean expression in case of a partial overwrite
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `overwrite_filter` is case-sensitive
            branch: Branch Reference to run the overwrite operation
        )r!  rH  r   r  r  N)r  rK  )r   r!  rH  r  r   r  r#  rt   rt   ru   rK    s   
"zTable.overwriter?  c                 C  s@   |   }|j||||d W d   dS 1 sw   Y  dS )a  
        Shorthand for deleting rows from the table.

        Args:
            delete_filter: The predicate that used to remove rows
            snapshot_properties: Custom properties to be added to the snapshot summary
            case_sensitive: A bool determine if the provided `delete_filter` is case-sensitive
            branch: Branch Reference to run the delete operation
        rJ  N)r  rD  )r   r?  r  r   r  r#  rt   rt   ru   rD    s
   
"zTable.deleter  r  r  c                 C  s@   |   }|j||||d W d   dS 1 sw   Y  dS )a  
        Shorthand API for adding files as data files to the table.

        Args:
            file_paths: The list of full file paths to be added as data files to the table

        Raises:
            FileNotFoundError: If the file does not exist.
        )r  r  r  r  N)r  r  )r   r  r  r  r  r#  rt   rt   ru   r    s   
"zTable.add_filesrT   c                 C  r  )NTr  r  )rT   r   r  rt   rt   ru   r    s   zTable.update_specdict[str, SnapshotRef]c                 C  r  )z,Return the snapshot references in the table.)r   r   r   rt   rt   ru   r     r  z
Table.refscurrent_metadatanew_metadatac                 C  s,   | j }|j }||krtd| d| dS )z3Validate that the table UUID matches after refresh.z#Table UUID does not match: current=z != refreshed=N)r  r   )r(  r)  current	refreshedrt   rt   ru   r    s
   zTable._check_uuidr   r   r   r   c              
   C  s   | j | ||}| | j|j z| j | j| j|j W n ty9 } ztjd| dd W Y d }~nd }~ww |j| _|j	| _	d S )Nz,Failed to delete old metadata after commit: rz   rN  )
r  commit_tabler  r   _delete_old_metadatar   	ExceptionrT  rU  r  )r   r   r   responser9  rt   rt   ru   r    s    zTable._do_commitotherr   c                 C  s6   t |tr|  | ko| j|jko| j|jkS dS )z8Return the equality of two instances of the Table class.F)r1  r   r   r   r  )r   r0  rt   rt   ru   __eq__  s
   (zTable.__eq__c              	     s    j  j}d fdd  jD }dd fdd  jD  d}dd fd	d  jD  d}d
 	 rGt
 	 nd }| d| d| d| d| 	}|S )z4Return the string representation of the Table class.z,
  c                 3       | ]}   rt|V  qd S r   )r   r   )r   columnr   rt   ru   r        z!Table.__repr__.<locals>.<genexpr>zpartition by: [r  c                 3  s    | ]
}   r|jV  qd S r   )r   r   r   r   rt   ru   r     s    ]zsort order: [c                 3  r2  r   )r  r   r   r   rt   ru   r  !  r4  z
snapshot: nullz(
  z
),
z,
)r  table_name_fromr  r  r   columnsr   r  r  r  r   )r   
table_name
schema_strpartition_strsort_order_strsnapshot_str
result_strrt   r   ru   __repr__  s   && zTable.__repr__daft.DataFramec                 C     ddl }|| S )zRead a Daft DataFrame lazily from this Iceberg table.

        Returns:
            daft.DataFrame: Unmaterialized Daft Dataframe created from the Iceberg table
        r   N)daftread_iceberg)r   rB  rt   rt   ru   to_daft&     
zTable.to_daftbd.DataFramec                 C  s   ddl m} || S )zRead a bodo DataFrame lazily from this Iceberg table.

        Returns:
            bd.DataFrame: Unmaterialized Bodo Dataframe created from the Iceberg table
        r   N)bodo.pandaspandasread_iceberg_table)r   bdrt   rt   ru   to_bodo0  s   
zTable.to_bodopl.LazyFramec                 C  rA  )zLazily read from this Apache Iceberg table.

        Returns:
            pl.LazyFrame: Unmaterialized Polars LazyFrame created from the Iceberg table
        r   N)polarsscan_iceberg)r   plrt   rt   ru   	to_polars:  rE  zTable.to_polarsrf   c                 C  s(   ddl m} ||  | j| jjd S )aX  Return the DataFusion table provider PyCapsule interface.

        To support DataFusion features such as push down filtering, this function will return a PyCapsule
        interface that conforms to the FFI Table Provider required by DataFusion. From an end user perspective
        you should not need to call this function directly. Instead you can use ``register_table`` in
        the DataFusion SessionContext.

        Returns:
            A PyCapsule DataFusion TableProvider interface.

        Example:
            ```python
            from datafusion import SessionContext
            from pyiceberg.catalog import load_catalog
            import pyarrow as pa
            catalog = load_catalog("catalog", type="in-memory")
            catalog.create_namespace_if_not_exists("default")
            data = pa.table({"x": [1, 2, 3], "y": [4, 5, 6]})
            iceberg_table = catalog.create_table("default.test", schema=data.schema)
            iceberg_table.append(data)
            ctx = SessionContext()
            ctx.register_table("test", iceberg_table)
            ctx.table("test").show()
            ```
            Results in
            ```
            DataFrame()
            +---+---+
            | x | y |
            +---+---+
            | 1 | 4 |
            | 2 | 5 |
            | 3 | 6 |
            +---+---+
            ```
        r   re   )r  r  file_io_properties)pyiceberg_core.datafusionrf   r   r  r   r   __datafusion_table_provider__)r   rf   rt   rt   ru   rS  D  s   %z#Table.__datafusion_table_provider__)r  r[   r   r2   r  r   r   r   r  rg   r  r  r   r   r  )r   r,   )r   r0   r  )r   r[   r   r   r  r  r   r   r   r   r  r]   r  r   r   r   )r   r_   r   r)   )r   r  )r   r(   )r   r  )r   r9   )r   r  r   rl   )r   r  )r   r   )r   r-   r   r  )r   r  )r   rl   r   r  )r   r   r   r  r  )r  rl   r  r   r   r  )r   r  )r   rN   r  r  r  r  )r   r!  r  r  r  )
r?  rI  r  r  r   r   r  r  r   r   r  )r   r   r   rT   )r   r'  )r(  r2   r)  r2   r   r   )r   r   r   r   r   r   )r0  r   r   r   r   r@  )r   rF  )r   rL  )r   rf   );ro   rp   rq   rr   r   r  rs   r  rX   r   r  r  r  r  r  r   r  r  r   r   r  r   r  r  r  r  r   r  r  r  r  r  r  r  r  r  r  r   r  r  r  r4   r  r=  rG  rK  rD  r  r  r   staticmethodr  r  r1  r?  rD  rK  rP  rS  rt   rt   rt   ru   r   =  s   
 	



3
















;$










r   c                   @  s>   e Zd ZdZdddZeefdd
dZeefdddZdS )StaticTablezKLoad a table directly from a metadata file (i.e., without using a catalog).r   r   c                 C  r  )z#Refresh the current table metadata.zTo be implementedr  r   rt   rt   ru   r  u  r  zStaticTable.refreshr  r   r   r]   c                 C  s   t j|dd}t||d}||}| }| d}W d    n1 s)w   Y  |dr;t j|d|S |	 rKt j|dd| dS t j|d| dS )Nr   zversion-hint.textr   r  zutf-8.metadata.jsonv)
ospathr  r   	new_inputopenreaddecodeendswith	isnumeric)clsr  r   version_hint_locationr   r`  streamcontentrt   rt   ru   $_metadata_location_from_version_hinty  s   


z0StaticTable._metadata_location_from_version_hintc                 C  sv   | dst||}t||d}||}ddlm} ||}ddlm	} | d|f||ti ||j
|ddS )Nr\  r[  r   )FromInputFile)NoopCatalogzstatic-table)r  r  r   r   r  )rd  rZ  rj  r   r`  pyiceberg.serializersrk  r   pyiceberg.catalog.nooprl  r   )rf  r  r   r   r`  rk  r   rl  rt   rt   ru   from_metadata  s   


zStaticTable.from_metadataNr  )r  r   r   r]   r   r   )r  r   r   r]   r   rZ  )	ro   rp   rq   rr   r  classmethodrX   rj  ro  rt   rt   rt   ru   rZ  r  s    
rZ  c                   @  s8   e Zd ZdddZedddedfdddZdddZdS )r  r   r   c                 C  r  )NzCannot refresh a staged tabler   r   rt   rt   ru   r    r  zStagedTable.refreshr  TNr   r   r  r  r   r   r   r   r  r]   r  r   c                 C  r  )NzCannot scan a staged tablerq  r  rt   rt   ru   r    r  zStagedTable.scanr@  c                 C  r  )Nz1Cannot convert a staged table to a Daft DataFramerq  r   rt   rt   ru   rD    r  zStagedTable.to_daftr  rT  rX  )ro   rp   rq   r  r  rX   r  rD  rt   rt   rt   ru   r    s    
r  r  r   r   r   c                 C  s   t | tr
t| S | S )a  Accept an expression in the form of a BooleanExpression or a string.

    In the case of a string, it will be converted into a unbound BooleanExpression.

    Args:
        expr: Expression as a BooleanExpression or a string.

    Returns: An unbound BooleanExpression.
    )r1  r   parserparse)r  rt   rt   ru   rV    s   
rV  S	TableScanT)bound	covariantc                   @  s  e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< ded< ded< ded< edddedddfdCddZdDddZdEdd Ze	dFd"d#Z
e	dGd%d&Ze	dHd*d+Ze	dId-d.ZdJd2d3ZdKd6d7ZdLd9d:ZdMd<d=ZdNdOd>d?Ze	dPdAdBZdS )Qru  r2   r   r   r   r   r   r  r  r   r   r   r   r]   r  r  Catalog | Noner  Identifier | Noner  r  TNr   c                 C  sD   || _ || _t|| _|| _|| _|| _|| _|| _|	| _	|
| _
d S r   )r   r   rV  r   r  r   r   r  r  r  r  )r   r   r   r   r  r   r   r  r  r  r  rt   rt   ru   r     s   

zTableScan.__init__r   r  c                 C  s   | j r
| j| j S | j S r   )r   r   r  r  r   rt   rt   ru   snapshot  s   
zTableScan.snapshotr)   c                   s   | j  }| jd urG| j | j  d ur? jd ur>zt fdd| j jD }W n ty=   tj	d j dd Y n
w nt
d| j d| jv rN|S |j| jd| jiS )	Nc                 3  s     | ]}|j  j kr|V  qd S r   r  r  rz  rt   ru   r    s    z'TableScan.projection.<locals>.<genexpr>z*Metadata does not contain schema with id: rz   rN  zSnapshot not found: r  r   )r   r   r   r  r  r  r  StopIterationrT  rU  r   r  selectr   )r   current_schemart   r{  ru   
projection  s"   



zTableScan.projectionIterable[ScanTask]c                 C     d S r   rt   r   rt   rt   ru   rZ        zTableScan.plan_filesr"  c                 C  r  r   rt   r   rt   rt   ru   to_arrow  r  zTableScan.to_arrowr   r   pd.DataFramec                 K  r  r   rt   r   r   rt   rt   ru   	to_pandas  r  zTableScan.to_pandaspl.DataFramec                 C  r  r   rt   r   rt   rt   ru   rP  	  r  zTableScan.to_polarsr   rt  	overridesc                   sR   ddl m} |t jj dh } fdd|D }t di i ||S )z5Create a copy of this table scan with updated fields.r   )	signaturer   c                   s   i | ]}|t  |qS rt   )getattr)r   paramr   rt   ru   r    s    z$TableScan.update.<locals>.<dictcomp>Nrt   )r  r  r   r   
parameterskeys)r   r  r  paramsr   rt   r   ru   update  s   zTableScan.updater   r   c                 C  sB   | j rtd| j  | j| }r| j|j dS td| )Nz-Cannot override ref, already set snapshot id=)r   zCannot scan unknown ref=)r   r   r   r  r  )r   r   rz  rt   rt   ru   rY    s
   zTableScan.use_reffield_namesc                 G  s6   d| j v r| j|dS | jtt| j t|dS )Nr  )r  )r  r  r  r  intersection)r   r  rt   rt   ru   r}    s   
 zTableScan.selectr  c                 C  s   | j t| jt|dS )N)r   )r  r   r   rV  )r   r  rt   rt   ru   r^  $  s   zTableScan.filterc                 C  s   | j |dS )Nr  )r  r  rt   rt   ru   with_case_sensitive'  s   zTableScan.with_case_sensitiverl   c                 C  r  r   rt   r   rt   rt   ru   r\  *  r  zTableScan.count)r   r2   r   r   r   r   r  r  r   r   r   r   r  r]   r  r   r  rx  r  ry  rW  rU  )r   r  r   r"  r   r   r   r  r   r  )r   rt  r  r   r   rt  )r   rt  r   r   r   rt  )r   rt  r  r   r   rt  )r   rt  r  r   r   rt  r  )r   rt  r   r   r   rt  rV  )ro   rp   rq   rs   r  rX   r   rz  r  r   rZ  r  r  rP  r  rY  r}  r^  r  r\  rt   rt   rt   ru   ru    sJ   
 





c                   @  s   e Zd ZdS )ScanTaskN)ro   rp   rq   rt   rt   rt   ru   r  .  s    r  F)initc                   @  sH   e Zd ZU dZded< ded< ded< defdddZedddZdS )FileScanTaskzATask representing a data file and its corresponding delete files.r    r`  zset[DataFile]delete_filesr   residualNr<  set[DataFile] | Noner   r   c                 C  s   || _ |pt | _|| _d S r   )r`  r  r  r  )r   r<  r  r  rt   rt   ru   r   :  s   
zFileScanTask.__init__	rest_taskrj   list[RESTDeleteFile]c                 C  s|   ddl m} t| j}t }| jr/| jD ]}|| }t||r'td|j |	t| qt
||| jr:| jdS tdS )ax  Convert a RESTFileScanTask to a FileScanTask.

        Args:
            rest_task: The REST file scan task.
            delete_files: The list of delete files from the ScanTasks response.

        Returns:
            A FileScanTask with the converted data and delete files.

        Raises:
            NotImplementedError: If equality delete files are encountered.
        r   )RESTEqualityDeleteFilez1PyIceberg does not yet support equality deletes: )r<  r  r  )$pyiceberg.catalog.rest.scan_planningr  _rest_file_to_data_filer<  r  delete_file_referencesr1  r  r  addr  residual_filterr  )r  r  r  r<  resolved_deletesidxdelete_filert   rt   ru   from_rest_responseD  s    



zFileScanTask.from_rest_response)r<  r    r  r  r  r   r   r   )r  rj   r  r  r   r  )	ro   rp   rq   rr   rs   r  r   rY  r  rt   rt   rt   ru   r  2  s   
 
r  	rest_filerh   r    c                 C  s   ddl m} t| |r4| jr| j nd}| jr| j nd}| jr'| j nd}| jr1| j nd}nd}d}d}d}tj	t
| j| j| j| jrOt| j nt | j| j||||| j| jd}| j|_|S )z3Convert a REST content file to a manifest DataFile.r   )RESTDataFileN)ri  r  file_formatr>  record_countfile_size_in_bytescolumn_sizesvalue_countsnull_value_countsnan_value_countssplit_offsetsr  )r  r  r1  r  to_dictr  r  r  r    	from_argsr!   from_rest_typeri  r  r  r>  r^   r  r  r  r  r  )r  r  r  r  r  r  r<  rt   rt   ru   r  h  s4   

r  r   r   manifestr$   partition_filterCallable[[DataFile], bool]metrics_evaluatorlist[ManifestEntry]c                   s    fdd|j | ddD S )zOpen a manifest file and return matching manifest entries.

    Returns:
        A list of ManifestEntry that matches the provided filters.
    c                   s$   g | ]}|j r |j r|qS rt   r<  )r   manifest_entryr  r  rt   ru   r     s    z"_open_manifest.<locals>.<listcomp>T)discard_deleted)fetch_manifest_entry)r   r  r  r  rt   r  ru   _open_manifest  s   r  	manifestslist[ManifestFile]rl   c                 C  s,   z
t dd | D W S  ty   t Y S w )Nc                 s  s&    | ]}|j tjkr|jptV  qd S r   )ri  r"   DATAmin_sequence_numberr1   r   r  rt   rt   ru   r    s    z'_min_sequence_number.<locals>.<genexpr>)minr   r1   )r  rt   rt   ru   _min_sequence_number  s   
r  c                   @  s   e Zd ZdBddZedCdd	ZdDddZdEddZdFddZdGddZ	e
dHddZdIddZdJddZdKd!d"ZdKd#d$ZdKd%d&ZdLd(d)ZdMd+d,ZdNd0d1ZdOdPd8d9ZdQd;d<ZdRd>d?ZdSd@dAZd2S )Tr   r  rl   r   r   c                 C  s(   t | j | j | | j}|| jS r   )r   r   r   r  r   r   )r   r  projectrt   rt   ru   _build_partition_projection  s   
z$DataScan._build_partition_projection&KeyDefaultDict[int, BooleanExpression]c                 C  s
   t | jS r   )r\   r  r   rt   rt   ru   partition_filters  r"  zDataScan.partition_filtersCallable[[ManifestFile], bool]c                 C  s*   | j  | }t|| j  | j| | jS r   )r   r  r   r   r  r   )r   r  r   rt   rt   ru   _build_manifest_evaluator  s   z"DataScan._build_manifest_evaluatorr  c                   sB   j  | }|j  }t|j j|   fddS )Nc                   s   t  j| jS r   )r   r   r>  r  partition_exprpartition_schemar   rt   ru   <lambda>  r   z5DataScan._build_partition_evaluator.<locals>.<lambda>)r   r  partition_typer   r)   r  r  )r   r  r   r  rt   r  ru   _build_partition_evaluator  s
   

z#DataScan._build_partition_evaluatorc                   s,   j  tjdd  fddS )Ninclude_empty_filesfalsec                   s   t jj | S r   )r   r   r   evalr  r  r   r   rt   ru   r    s    z3DataScan._build_metrics_evaluator.<locals>.<lambda>)r   r   r`   r  rS  r   rt   r  ru   _build_metrics_evaluator  s   
z!DataScan._build_metrics_evaluator'Callable[[DataFile], ResidualEvaluator]c                   s*   j  | ddlm   fddS )Nr   )residual_evaluator_ofc                   s    j jj dS )N)r   r  r   r   )r   r   r   r   )datafiler  r   r   rt   ru   r    s   z4DataScan._build_residual_evaluator.<locals>.<lambda>)r   r  pyiceberg.expressions.visitorsr  )r   r  rt   r  ru   _build_residual_evaluator  s   z"DataScan._build_residual_evaluatorr  r  r$   r   c                 C  s&   |j tjkp|j tjko|jpt| kS )as  Ensure that no manifests are loaded that contain deletes that are older than the data.

        Args:
            min_sequence_number (int): The minimal sequence number.
            manifest (ManifestFile): A ManifestFile that can be either data or deletes.

        Returns:
            Boolean indicating if it is either a data file, or a relevant delete file.
        )ri  r"   r  DELETESsequence_numberr1   )r  r  rt   rt   ru   _check_sequence_number  s   zDataScan._check_sequence_numberIterator[list[ManifestEntry]]c                   st     }|s
tg S tj  fdd|jD }tjt|t	 }|
dd fdd|D S )zFilter and return manifest entries based on partition and metrics evaluators.

        Returns:
            Iterator of ManifestEntry objects that match the scan's partition filter.
        c                   s   g | ]} |j  |r|qS rt   )partition_spec_id)r   manifest_file)manifest_evaluatorsrt   ru   r     s    z-DataScan.scan_plan_helper.<locals>.<listcomp>c                 S  s   t |  S r   )r  )argsrt   rt   ru   r    s    z+DataScan.scan_plan_helper.<locals>.<lambda>c                   s0   g | ]}  |rj||j  fqS rt   )r  r   r  r  r  )r  partition_evaluatorsr   rt   ru   r     s    
)rz  iterr\   r  r  r   r  r  ra   get_or_createmap)r   rz  r  executorrt   )r  r  r  r   ru   scan_plan_helper  s    



	zDataScan.scan_plan_helperc                 C  s   | j sdS | j  S )z@Check if server-side scan planning should be used for this scan.F)r  supports_server_side_planningr   rt   rt   ru    _should_use_server_side_planning  s   
z)DataScan._should_use_server_side_planningIterable[FileScanTask]c                 C  s   ddl m} ddlm} t| j|std| jdu rtd|| j	| j
dkr,t| j
nd| jtkr5| jnd| jd}| j| j|S )	z0Plan files using REST server-side scan planning.r   )RestCatalog)PlanTableScanRequestz)REST scan planning requires a RestCatalogNz.REST scan planning requires a table identifierr  )r   r}  r^  r   )pyiceberg.catalog.restr  r  r  r1  r  	TypeErrorr  r   r   r  r5  r   r  r   	plan_scan)r   r  r  requestrt   rt   ru   _plan_files_server_side!  s   
z DataScan._plan_files_server_sidec                   s   g }t   t| jt|  D ]4}|j}|jtj	kr"|
| q|jtjkr1 j||jd q|jtjkr;tdtd|j d|  fdd|D S )z(Plan files locally by reading manifests.partition_keyz^PyIceberg does not yet support equality deletes: https://github.com/apache/iceberg/issues/6568zUnknown DataFileContent (z): c              	     sL   g | ]"}t |j j|jpt|j|jjd |jj |j|jjdqS )r  )r  r  )r  r<  for_data_filer  r1   r>  r  residual_for)r   
data_entrydelete_indexresidual_evaluatorsrt   ru   r   E  s    z.DataScan._plan_files_local.<locals>.<listcomp>)r*   r\   r  r
   from_iterabler  r<  ri  r!   r  r=  POSITION_DELETESadd_delete_filer>  EQUALITY_DELETESr   )r   data_entriesr  r<  rt   r  ru   _plan_files_local4  s   
zDataScan._plan_files_localc                 C  s   |   r|  S |  S )aS  Plans the relevant files by filtering on the PartitionSpecs.

        If the table comes from a REST catalog with scan planning enabled,
        this will use server-side scan planning. Otherwise, it falls back
        to local planning.

        Returns:
            List of FileScanTasks that contain both data and delete files.
        )r  r  r  r   rt   rt   ru   rZ  T  s   
zDataScan.plan_filesr"  c                 C  s6   ddl m} || j| j|  | j| j| j| 	 S )zRead an Arrow table eagerly from this DataScan.

        All rows will be loaded into memory at once.

        Returns:
            pa.Table: Materialized Arrow Table from the Iceberg table's DataScan
        r   rL  )
r0  rL  r   r   r  r   r   r  r]  rZ  )r   rL  rt   rt   ru   r  b  s   
zDataScan.to_arrowpa.RecordBatchReaderc                 C  sb   ddl }ddlm}m} ||  }|| j| j|  | j| j| j	
|  }|j|||S )a  Return an Arrow RecordBatchReader from this DataScan.

        For large results, using a RecordBatchReader requires less memory than
        loading an Arrow Table for the same DataScan, because a RecordBatch
        is read one at a time.

        Returns:
            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table's DataScan
                which can be used to read a stream of record batches one by one.
        r   N)rL  schema_to_pyarrow)r.  r0  rL  r  r  r   r   r   r   r  to_record_batchesrZ  RecordBatchReaderr|  cast)r   r8  rL  r  target_schemabatchesrt   rt   ru   r{  p  s   
zDataScan.to_arrow_batch_readerr   r   r  c                 K  s   |   jdi |S )zRead a Pandas DataFrame eagerly from this Iceberg table.

        Returns:
            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
        Nrt   )r  r  r  rt   rt   ru   r    r   zDataScan.to_pandasNr9  r   
connectionDuckDBPyConnection | Nonerd   c                 C  s,   ddl }|p|jdd}|||   |S )zShorthand for loading the Iceberg Table in DuckDB.

        Returns:
            DuckDBPyConnection: In memory DuckDB connection with the Iceberg table.
        r   Nz:memory:)database)duckdbconnectregisterr  )r   r9  r  r  conrt   rt   ru   	to_duckdb  s   zDataScan.to_duckdbray.data.dataset.Datasetc                 C  s   ddl }|j|  S )zRead a Ray Dataset eagerly from this Iceberg table.

        Returns:
            ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table
        r   N)raydata
from_arrowr  )r   r  rt   rt   ru   to_ray  s   zDataScan.to_rayr  c                 C  s.   ddl }||  }t||jr| }|S )zRead a Polars DataFrame from this Iceberg table.

        Returns:
            pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
        r   N)rM  r  r  r1  Seriesto_frame)r   rO  resultrt   rt   ru   rP    s
   zDataScan.to_polarsc                 C  s   ddl m} d}|  }|D ]1}|jt kr$t|jdkr$||jj7 }q|| j	| j
|  | j| jd}||g}|t|7 }q|S )Nr   r  )r   r   rP  r   r   )r0  rL  rZ  r  r   r_  r  r`  r  r   r   r  r   r   r]  )r   rL  resrQ  task
arrow_scantblrt   rt   ru   r\    s    zDataScan.count)r  rl   r   r   )r   r  )r  rl   r   r  )r  rl   r   r  )r   r  )r  rl   r   r  )r  rl   r  r$   r   r   )r   r  )r   r   )r   r  r  )r   r  r  r   )r9  r   r  r  r   rd   )r   r  r  rV  )ro   rp   rq   r  r	   r  r  r  r  r  rY  r  r  r  r  r  rZ  r  r{  r  r  r  rP  r\  rt   rt   rt   ru   r     s,    






,


 





r   )frozenc                   @  sT   e Zd ZU dZded< ded< ded< ded	< d
Zded< d
Zded< dddZd
S )	WriteTaskz0Task with the parameters for writing a DataFile.z	uuid.UUIDr-  rl   task_idr)   r   zlist[pa.RecordBatch]record_batchesNr   r  zPartitionKey | Noner  	extensionr   r   c                 C  s   d| j  d| j d| S )Nz00000--.)r"  r-  )r   r$  rt   rt   ru   generate_data_file_filename  s   z%WriteTask.generate_data_file_filename)r$  r   r   r   )ro   rp   rq   rr   rs   r  r  r'  rt   rt   rt   ru   r!    s   
 r!  r   r2   r  r  Iterable[DataFile]c                   s:   ddl m t   fdd|D }dd |D S )zConvert a list files into DataFiles.

    Returns:
        An iterable that supplies DataFiles that describe the parquet files.
    r   )parquet_file_to_data_filec                   s   g | ]
}  |qS rt   )submit)r   r  r  r   r)  r   rt   ru   r         z0_parquet_files_to_data_files.<locals>.<listcomp>c                 S  s   g | ]
}|  r|  qS rt   )r  )r   frt   rt   ru   r     r,  )r0  r)  ra   r  )r   r  r   futuresrt   r+  ru   r    s   r  )r  r   r   r   )r  rh   r   r    )
r   r   r  r$   r  r  r  r  r   r  )r  r  r   rl   )r   r2   r  r  r   r   r   r(  )
__future__r   r[  r^  rB  rT  abcr   r   collections.abcr   r   r   dataclassesr   	functoolsr	   r
   typesr   typingr   r   r   pydanticr   pyiceberg.expressions.parserexpressionsrr  pyiceberg.expressionsr   r   r   r   r   r   r   r   r  r   r   r   r   r   r   pyiceberg.ior   r   pyiceberg.manifestr    r!   r"   r#   r$   pyiceberg.partitioningr%   r&   r'   r(   pyiceberg.schemar)   !pyiceberg.table.delete_file_indexr*   pyiceberg.table.inspectr,   pyiceberg.table.locationsr-   r.   pyiceberg.table.maintenancer0   pyiceberg.table.metadatar1   r2   pyiceberg.table.name_mappingr3   pyiceberg.table.refsr4   r5   pyiceberg.table.snapshotsr6   r7   pyiceberg.table.sortingr8   r9   pyiceberg.table.updater:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   rK   rL   pyiceberg.table.update.schemarM   pyiceberg.table.update.snapshotrN   rO   rP   pyiceberg.table.update.sortingrR   pyiceberg.table.update.specrT   !pyiceberg.table.update.statisticsrV   pyiceberg.transformsrW   pyiceberg.typedefrX   rY   rZ   r[   r\   r]   r^   r_   pyiceberg.typesr`   pyiceberg.utils.concurrentra   pyiceberg.utils.configrb   pyiceberg.utils.propertiesrc   rG  rH  rJ  rB  pdrM  rO  r.  r8  r  r  rd   rR  rf   pyiceberg.catalogrg   r  rh   ri   rj   r  r3  rk   rv   r   r  r5  r   r  r  r  r  r   rZ  r  rV  rt  ru  r  r  r  r  r  r   r!  r  rt   rt   rt   ru   <module>   s   ( 
T(
V     n:	    9.
k
5
!
  -