o
    iC                     @  sp  d dl mZ d dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
m
Z
 d dlmZ d dlmZmZ d d	lmZ d d
lmZmZmZ d dlmZmZmZmZmZmZ d dlmZmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6 d dl7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZA d dlBmCZCmDZD d dlEmFZF d dlGmHZH d dlImJZJ d dlKmLZLmMZM erd dlNmOZO d6ddZPd7d"d#ZQG d$d% d%eAe? ee? ZRG d&d' d'eRd' ZSG d(d) d)eRd) ZTG d*d+ d+eTZUG d,d- d-eRd- ZVG d.d/ d/ZWG d0d1 d1ee? ZXG d2d3 d3eAd3 ZYG d4d5 d5eAd5 ZZdS )8    )annotationsN)abstractmethod)defaultdict)Callable)datetime)cached_property)TYPE_CHECKINGGeneric)AvroCompressionCodec)AlwaysFalseBooleanExpressionOr)ROWS_MIGHT_NOT_MATCHROWS_MUST_MATCH_InclusiveMetricsEvaluator_StrictMetricsEvaluatorinclusive_projectionmanifest_evaluator)FileIO
OutputFile)	DataFileDataFileContentManifestContentManifestEntryManifestEntryStatusManifestFileManifestWriterwrite_manifestwrite_manifest_list)PartitionSpec)MAIN_BRANCHSnapshotRefType)	OperationSnapshotSnapshotSummaryCollectorSummaryancestors_of latest_ancestor_before_timestampupdate_snapshot_summaries)
AddSnapshotUpdateAssertRefSnapshotIdRemoveSnapshotRefUpdateRemoveSnapshotsUpdateSetSnapshotRefUpdateTableRequirementTableUpdateUUpdatesAndRequirementsUpdateTableMetadata)
EMPTY_DICTKeyDefaultDict)
ListPacker)ExecutorFactory)datetime_to_millis)property_as_boolproperty_as_int)Transactionnumintcommit_uuid	uuid.UUIDreturnstrc                 C  s   | d|  dS )Nz-m.avro r;   r=   rB   rB   ]/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/pyiceberg/table/update/snapshot.py_new_manifest_file_name\      rE   snapshot_idattemptc                 C  s   d|  d| d| dS )Nzsnap--rA   rB   rG   rH   r=   rB   rB   rD   _new_manifest_list_file_name`   s   rK   c                      s   e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< ded< ded< ded< deefdR fd d!ZdSd"d#ZdTd'd(ZdTd)d*Z	dUd-d.Z
edVd0d1ZedWd2d3ZdXd4d5ZdWd6d7ZefdYd9d:ZdZd<d=Zed[d>d?Zd\dBdCZd]dFdGZd^dIdJZd_d`dPdQZ  ZS )a_SnapshotProducerr>   r=   r   _ior"   
_operationr<   _snapshot_id
int | None_parent_snapshot_idzlist[DataFile]_added_data_fileszitertools.count[int]_manifest_num_counterzset[DataFile]_deleted_data_filesr
   _compression
str | None_target_branchN	operationtransactionr:   iouuid.UUID | Nonesnapshot_propertiesdict[str, str]branchr?   Nonec           	        s   t  | |pt | _|| _|| _| jj	 | _
g | _t | _|| _td| _ddlm} | jjj|j|j| _| j|d| _| jj| j }rS|j| _d S d | _d S )Nr   TableProperties)r^   )super__init__uuiduuid4r=   rM   rN   _transactiontable_metadatanew_snapshot_idrO   rR   setrT   r\   	itertoolscountrS   pyiceberg.tablera   
propertiesgetWRITE_AVRO_COMPRESSIONWRITE_AVRO_COMPRESSION_DEFAULTrU   _validate_target_branchrW   snapshot_by_namerG   rQ   )	selfrX   rY   rZ   r=   r\   r^   ra   snapshot	__class__rB   rD   rc   r   s$   	
z_SnapshotProducer.__init__c                 C  sB   |d ur|| j jjv r| j jj| }|jtjkrt| d|S )NzG is a tag, not a branch. Tags cannot be targets for producing snapshots)rf   rg   refssnapshot_ref_typer!   BRANCH
ValueError)rs   r^   refrB   rB   rD   rq      s   z)_SnapshotProducer._validate_target_branch	data_filer   _SnapshotProducer[U]c                 C     | j | | S N)rR   appendrs   r|   rB   rB   rD   append_data_file      z"_SnapshotProducer.append_data_filec                 C  r~   r   )rT   addr   rB   rB   rD   delete_data_file   r   z"_SnapshotProducer.delete_data_file	manifestslist[ManifestFile]c                 C  sN   d}|D ] }|j du s|j | jkr$|jdu rtd|j d||j7 }q|S )zACalculate the number of added rows from a list of manifest files.r   NzQCannot determine number of added rows in snapshot because the entry for manifest z( is missing the field `added-rows-count`)added_snapshot_idrO   added_rows_countrz   manifest_path)rs   r   
added_rowsmanifestrB   rB   rD   _calculate_added_rows   s   

z'_SnapshotProducer._calculate_added_rowslist[ManifestEntry]c                 C     d S r   rB   rs   rB   rB   rD   _deleted_entries      z"_SnapshotProducer._deleted_entriesc                 C  r   r   rB   r   rB   rB   rD   _existing_manifests   r   z%_SnapshotProducer._existing_manifestsc                 C  s   |S )zXTo perform any post-processing on the manifests before writing them to the new snapshot.rB   )rs   r   rB   rB   rD   _process_manifests   s   z$_SnapshotProducer._process_manifestsc                   sb   d fdd}d fdd}t  }||}||}| j} | |  |  S )Nr?   r   c                    s    j rFt jjj jj  jj    j j	d}  j D ]}| 
tjtj jd d |d q W d    n1 s<w   Y  |  gS g S )Nformat_versionspecschemaoutput_filerG   avro_compressionstatusrG   sequence_numberfile_sequence_numberr|   )rR   r   rf   rg   r   r   r   new_manifest_outputrO   rU   r   r   	from_argsr   ADDEDto_manifest_file)writerr|   r   rB   rD   _write_added_manifest   s0   



z;_SnapshotProducer._manifests.<locals>._write_added_manifestc               	     s      } t| dkreg }tt}| D ]}||jj | q| D ]@\}}t j	j
j j	j
 |  j	j
    j jd}|D ]}|| qDW d    n1 sVw   Y  ||  q"|S g S )Nr   r   )r   lenr   listr|   spec_idr   itemsr   rf   rg   r   specsr   r   rO   rU   	add_entryr   )deleted_entriesdeleted_manifestspartition_groupsdeleted_entryr   entriesr   entryr   rB   rD   _write_delete_manifest   s.   

z<_SnapshotProducer._manifests.<locals>._write_delete_manifestr?   r   )r6   get_or_createsubmitr   r   result)rs   r   r   executoradded_manifestsdelete_manifestsexisting_manifestsrB   r   rD   
_manifests   s   

z_SnapshotProducer._manifestsr%   c           	      C  s   ddl m} | jj}t|j|j|j}t	|d}| j
D ]}|j|| | d qt| jdkrJ| }| jD ]}|j|||j | d q:| jd urU|| jnd }ttdd| ji| ||d urn|jdS d dS )Nr   r`   )partition_summary_limit)r|   partition_specr   rX   )summaryprevious_summaryrB   )rl   ra   rf   rg   r<   rm   rn   WRITE_PARTITION_SUMMARY_LIMIT%WRITE_PARTITION_SUMMARY_LIMIT_DEFAULTr$   rR   add_filer   r   r   rT   r   remove_filer   rQ   snapshot_by_idr(   r%   rN   buildr   )	rs   r\   ra   rg   r   sscr|   r   previous_snapshotrB   rB   rD   _summary   s<   


z_SnapshotProducer._summaryr1   c              	   C  sB  |   }| jj }| | j}t| jd| jd}| jj	
 }||}t| jjj| j|| j| j|| jd}|| W d    n1 sHw   Y  d }| jjjdkr[| jjj}t| j| j|||| jjj|d}	t|	d}
| jd u rz|
fdfS |
t| j| j| jtjdft| j| jjjv r| jjj| j jnd | jd	ffS )
Nr   rJ   )r   r   rG   parent_snapshot_idr   r      )rG   r   manifest_listr   r   	schema_idfirst_row_id)rt   rB   )rG   r   ref_nametyperG   r{   )r   rf   rg   next_sequence_numberr   r\   rK   rO   r=   _tablelocation_providernew_metadata_locationr   r   rM   
new_outputrQ   rU   add_manifestsnext_row_idr#   current_schema_idr)   rW   r-   r!   ry   r*   rw   rG   )rs   new_manifestsr   r   	file_namer   manifest_list_file_pathr   r   rt   add_snapshot_updaterB   rB   rD   _commit  sl   







z_SnapshotProducer._commitc                 C     | j S r   )rO   r   rB   rB   rD   rG   W  s   z_SnapshotProducer.snapshot_idr   r   c                 C  s   | j j | S r   )rf   rg   r   )rs   r   rB   rB   rD   r   [  rF   z_SnapshotProducer.specr   r   c                 C  s*   t | jjj|| jj |  | j| jdS )Nr   )r   rf   rg   r   r   r   rO   rU   )rs   r   rB   rB   rD   new_manifest_writer^  s   
z%_SnapshotProducer.new_manifest_writerr   c                 C  s6   | j j }tt| j| jd}||}| j	|S )NrC   )
rf   r   r   rE   nextrS   r=   r   rM   r   )rs   r   r   	file_pathrB   rB   rD   r   h  s   
z%_SnapshotProducer.new_manifest_outputTr   r   discard_deletedboolc                 C  s   |j | j|dS )NrZ   r   fetch_manifest_entryrM   )rs   r   r   rB   rB   rD   r   n  rF   z&_SnapshotProducer.fetch_manifest_entry)rX   r"   rY   r:   rZ   r   r=   r[   r\   r]   r^   rV   r?   r_   )r^   rV   r?   rV   )r|   r   r?   r}   )r   r   r?   r<   r?   r   r   r   r   r?   r   )r\   r]   r?   r%   r?   r1   )r?   r<   )r   r<   r?   r   )r   r   r?   r   )r?   r   T)r   r   r   r   r?   r   )__name__
__module____qualname____annotations__r3   r    rc   rq   r   r   r   r   r   r   r   r   r   r   propertyrG   r   r   r   r   __classcell__rB   rB   ru   rD   rL   f   sB   
 

	



9
&B



rL   c                      s   e Zd ZU dZded< ded< edefd6 fddZd7 fddZd8ddZ	e
d9ddZd:d!d"Zd;d<d'd(Ze
d=d*d+Zd>d-d.Zd?d0d1Zed@d2d3Zed@d4d5Z  ZS )A_DeleteFilesa  Will delete manifest entries from the current snapshot based on the predicate.

    This will produce a DELETE snapshot:
        Data files were removed and their contents logically deleted and/or delete
        files were added to delete rows.

    From the specification
    r   
_predicater   _case_sensitiveNrX   r"   rY   r:   rZ   r   r^   rV   r=   r[   r\   r]   c                   s(   t  |||||| t | _d| _d S )NT)rb   rc   r   r   r   )rs   rX   rY   rZ   r^   r=   r\   ru   rB   rD   rc     s   	
z_DeleteFiles.__init__r?   r1   c                   s   | j rt  S dS )N)rB   rB   )files_affectedrb   r   r   ru   rB   rD   r     s   
z_DeleteFiles._commitr   r<   c                 C  s4   | j j }| j j | }t||| j}|| jS r   )rf   rg   r   r   r   r   r   )rs   r   r   r   projectrB   rB   rD   _build_partition_projection  s   
z(_DeleteFiles._build_partition_projection&KeyDefaultDict[int, BooleanExpression]c                 C  s
   t | jS r   )r4   r   r   rB   rB   rD   partition_filters  s   
z_DeleteFiles.partition_filtersCallable[[ManifestFile], bool]c                 C  s2   | j j }| j j | }t||| j| | jS r   )rf   rg   r   r   r   r   r   )rs   r   r   r   rB   rB   rD   _build_manifest_evaluator  s   z&_DeleteFiles._build_manifest_evaluatorT	predicatecase_sensitiver_   c                 C  s   t | j|| _|| _d S r   )r   r   r   )rs   r   r   rB   rB   rD   delete_by_predicate  s   
z _DeleteFiles.delete_by_predicate4tuple[list[ManifestFile], list[ManifestEntry], bool]c              	     s   j j }d fdd}t j}t| j jdj}t	| j jdj}g }g }d	}t
  _ j}	|	d
ur j j|	}
|
r|
j jdD ]}|jtjkr||j |s\|| qGg }g }|j jddD ],}||jtkr|||tj  j|j qh|||tj ||jtkrd}qht|dkr||7 }t|dkrt j jj j j  |j  j j  !  j" j#d}|D ]}|$| qW d
   n1 sw   Y  ||%  qG|| qG|| qG|||fS )aH  Computes all the delete operation and cache it when nothing changes.

        Returns:
            - List of existing manifests that are not affected by the delete operation.
            - The manifest-entries that are deleted based on the metadata.
            - Flag indicating that rewrites of data-files are needed.
        r   r   r   r   r?   c                   s,   t j||tjkr jn| j| j| j| jdS )Nr   )r   r   r   DELETEDrG   r   r   r|   )r   r   r   rB   rD   _copy_with_new_status  s   z<_DeleteFiles._compute_deletes.<locals>._copy_with_new_status)r   FNrZ   Tr   r   r   )r   r   r   r   r?   r   )&rf   rg   r   r4   r   r   r   r   evalr   ri   rT   rQ   r   r   rM   contentr   DATApartition_spec_idr   r   r|   r   r   r   r   EXISTINGr   r   r   r   r   r   rO   rU   r   r   )rs   r   r   manifest_evaluatorsstrict_metrics_evaluatorinclusive_metrics_evaluatorr   total_deleted_entriespartial_rewrites_needed$parent_snapshot_id_for_delete_sourcert   manifest_filer   existing_entriesr   r   existing_entryrB   r   rD   _compute_deletes  sh   	




z_DeleteFiles._compute_deletesr   c                 C  
   | j d S Nr   r  r   rB   rB   rD   r        
z _DeleteFiles._existing_manifestsr   c                 C  r  )N   r  r   rB   rB   rD   r     r  z_DeleteFiles._deleted_entriesc                 C  r  )z,Indicate if data files need to be rewritten.   r  r   rB   rB   rD   rewrites_needed  s   
z_DeleteFiles.rewrites_neededc                 C  s   t |  dkS )z0Indicate if any manifest-entries can be dropped.r   )r   r   r   rB   rB   rD   r     s   z_DeleteFiles.files_affected)rX   r"   rY   r:   rZ   r   r^   rV   r=   r[   r\   r]   r   )r   r<   r?   r   )r?   r   )r   r<   r?   r   r   )r   r   r   r   r?   r_   )r?   r   r   r   )r?   r   )r   r   r   __doc__r   r    r3   rc   r   r   r   r   r   r   r  r   r   r   r  r   r   rB   rB   ru   rD   r   r  s,   
 	


Q
r   c                   @  s    e Zd Zd	ddZd
ddZdS )_FastAppendFilesr?   r   c                 C  st   g }| j dur8| jj| j }|du rtd| j  |j| jdD ]}| s2| s2|j	| j
kr7|| q"|S )zTo determine if there are any existing manifest files.

        A fast append will add another ManifestFile to the ManifestList.
        All the existing manifest files are considered existing.
        NzSnapshot could not be found: r   )rQ   rf   rg   r   rz   r   rM   has_added_fileshas_existing_filesr   rO   r   )rs   r   r   r   rB   rB   rD   r   
  s   

z$_FastAppendFiles._existing_manifestsr   c                 C  s   g S )z{To determine if we need to record any deleted manifest entries.

        In case of an append, nothing is deleted.
        rB   r   rB   rB   rD   r     s   z!_FastAppendFiles._deleted_entriesNr   r   )r   r   r   r   r   rB   rB   rB   rD   r  	  s    
r  c                      sJ   e Zd ZU ded< ded< ded< edefd fddZdddZ  ZS )_MergeAppendFilesr<   _target_size_bytes_min_count_to_merger   _merge_enabledNrX   r"   rY   r:   rZ   r   r^   rV   r=   r[   r\   r]   r?   r_   c                   sn   ddl m} t |||||| t| jjj|j|j	| _
t| jjj|j|j| _t| jjj|j|j| _d S )Nr   r`   )rl   ra   rb   rc   r9   rf   rg   rm   MANIFEST_TARGET_SIZE_BYTES"MANIFEST_TARGET_SIZE_BYTES_DEFAULTr  MANIFEST_MIN_MERGE_COUNT MANIFEST_MIN_MERGE_COUNT_DEFAULTr  r8   MANIFEST_MERGE_ENABLEDMANIFEST_MERGE_ENABLED_DEFAULTr  )rs   rX   rY   rZ   r^   r=   r\   ra   ru   rB   rD   rc   +  s"   	
z_MergeAppendFiles.__init__r   r   c                 C  s@   dd |D }dd |D }t | j| j| j| d}||| S )zTo perform any post-processing on the manifests before writing them to the new snapshot.

        In _MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
        if automatic merge is enabled.
        c                 S     g | ]
}|j tjkr|qS rB   )r  r   r  .0r   rB   rB   rD   
<listcomp>M      z8_MergeAppendFiles._process_manifests.<locals>.<listcomp>c                 S  r%  rB   )r  r   DELETESr&  rB   rB   rD   r(  N  r)  )target_size_bytesmin_count_to_mergemerge_enabledsnapshot_producer)_ManifestMergeManagerr  r  r  merge_manifests)rs   r   unmerged_data_manifestsunmerged_deletes_manifestsdata_manifest_merge_managerrB   rB   rD   r   G  s   z$_MergeAppendFiles._process_manifests)rX   r"   rY   r:   rZ   r   r^   rV   r=   r[   r\   r]   r?   r_   r   )	r   r   r   r   r    r3   rc   r   r   rB   rB   ru   rD   r  &  s   
 r  c                   @  s$   e Zd ZdZd
ddZdddZd	S )_OverwriteFileszOverwrites data from the table. This will produce an OVERWRITE snapshot.

    Data and delete files were added and removed in a logical overwrite operation.
    r?   r   c                   s   g }j jjjd }r|jjdD ]y}|jjdd}fdd|D  t dkr3|| qt	 fdd	|D rt
j jjj j |j j j  jjd
$}|D ]}|j vrv|tjtj|j|j|j|jd q]W d   n1 sw   Y  ||  q|S )z3Determine if there are any existing manifest files.namer   Tr   c                   s   g | ]}|j  jv r|j qS rB   )r|   rT   r'  r   r   rB   rD   r(  g  s    z7_OverwriteFiles._existing_manifests.<locals>.<listcomp>r   c                 3  s    | ]}|j  vV  qd S r   )r|   r7  )found_deleted_data_filesrB   rD   	<genexpr>m  s    z6_OverwriteFiles._existing_manifests.<locals>.<genexpr>r   r   N)rf   rg   rr   rW   r   rM   r   r   r   anyr   r   r   r  r   r   rO   rU   r|   r   r   r   r   r  rG   r   r   r   )rs   existing_filesrt   r  r   r   r   rB   )r8  rs   rD   r   `  sD   

z#_OverwriteFiles._existing_manifestsr   c                   sn    j dur5 jj j }|du rtd j  t }d	 fdd}||| j	}t
tj| S g S )
zTo determine if we need to record any deleted entries.

        With a full overwrite all the entries are considered deleted.
        With partial overwrites we have to use the predicate to evaluate
        which entries are affected.
        Nz&Could not find the previous snapshot: r   r   r?   r   c                   s    fdd| j  jddD S )Nc              	     sD   g | ]}|j jtjkr|j  jv rtjtj|j	|j
|j|j d qS )r   )r|   r  r   r  rT   r   r   r   r   rG   r   r   r7  r   rB   rD   r(    s    zJ_OverwriteFiles._deleted_entries.<locals>._get_entries.<locals>.<listcomp>T)r   r   )r   r   rB   rD   _get_entries  s   
z6_OverwriteFiles._deleted_entries.<locals>._get_entries)r   r   r?   r   )rQ   rf   rg   r   rz   r6   r   mapr   rM   r   rj   chain)rs   r   r   r<  list_of_entriesrB   r   rD   r     s   
z _OverwriteFiles._deleted_entriesNr   r   )r   r   r   r  r   r   rB   rB   rB   rD   r4  Z  s    
$r4  c                   @  sh   e Zd ZU ded< ded< ded< ded< eefd ddZd!ddZd"ddZd#d$ddZ	d%ddZ
dS )&UpdateSnapshotr:   rf   r   rM   rV   _branchr]   _snapshot_propertiesrY   rZ   r^   r\   r?   r_   c                 C  s   || _ || _|| _|| _d S r   )rf   rM   rB  rA  )rs   rY   rZ   r^   r\   rB   rB   rD   rc     s   
zUpdateSnapshot.__init__r  c                 C     t tj| j| j| j| jdS N)rX   rY   rZ   r^   r\   )r  r"   APPENDrf   rM   rA  rB  r   rB   rB   rD   fast_append     zUpdateSnapshot.fast_appendr  c                 C  rC  rD  )r  r"   rE  rf   rM   rA  rB  r   rB   rB   rD   merge_append  rG  zUpdateSnapshot.merge_appendNr=   r[   r4  c                 C  s:   t || jjj| jdd urtjntj| j| j| j| j	dS )Nr5  )r=   rX   rY   rZ   r^   r\   )
r4  rf   rg   rr   rA  r"   	OVERWRITErE  rM   rB  )rs   r=   rB   rB   rD   	overwrite  s   zUpdateSnapshot.overwriter   c                 C  rC  rD  )r   r"   DELETErf   rM   rA  rB  r   rB   rB   rD   delete  rG  zUpdateSnapshot.delete)
rY   r:   rZ   r   r^   rV   r\   r]   r?   r_   )r?   r  )r?   r  r   )r=   r[   r?   r4  )r?   r   )r   r   r   r   r    r3   rc   rF  rH  rJ  rL  rB   rB   rB   rD   r@    s   
 

		r@  c                   @  s`   e Zd ZU ded< ded< ded< ded< d ddZd!ddZd"ddZd#ddZd$ddZdS )%r/  r<   r  r  r   r  r}   _snapshot_producerr+  r,  r-  r.  r?   r_   c                 C  s   || _ || _|| _|| _d S r   )r  r  r  rM  )rs   r+  r,  r-  r.  rB   rB   rD   rc     s   
z_ManifestMergeManager.__init__r   r   dict[int, list[ManifestFile]]c                 C  s&   t t}|D ]
}||j | q|S r   )r   r   r  r   )rs   r   groupsr   rB   rB   rD   _group_by_spec  s   z$_ManifestMergeManager._group_by_specr   manifest_binr   c                 C  s   | j j| j |dL}|D ]>}| j j|ddD ]3}|jtjkr-|j| j jkr-|| q|jtj	kr@|j| j jkr@|
| q|jtjkrK|| qqW d    | S 1 sZw   Y  | S )N)r   F)r   r   )rM  r   r   r   r   r   r   rG   rL  r   r   existingr   )rs   r   rQ  r   r   r   rB   rB   rD   _create_manifest  s    

z&_ManifestMergeManager._create_manifestfirst_manifestc                   sn   t jddd}||dd }dfd	d
t   fdd|D }fdd|D }dd |D S )Nr  F)target_weightlookbacklargest_bin_firstc                 S  r   r   )manifest_length)mrB   rB   rD   <lambda>  s    z4_ManifestMergeManager._merge_group.<locals>.<lambda>rQ  r   r?   c                   s\   g }t | dkr|| d  |S  | v r#t | jk r#||  |S ||  |S )Nr  r   )r   r   r  extendrS  )rQ  output_manifests)rT  rs   r   rB   rD   	merge_bin  s   	
z5_ManifestMergeManager._merge_group.<locals>.merge_binc                   s   g | ]}  |qS rB   )r   )r'  b)r   r]  rB   rD   r(    s    z6_ManifestMergeManager._merge_group.<locals>.<listcomp>c                   s   g | ]
}|    r qS rB   )r   )r'  f)rrB   rD   r(    r)  c                 S  s   g | ]	}|D ]}|qqS rB   rB   )r'  
bin_resultr   rB   rB   rD   r(    s    )rQ  r   r?   r   )r5   r  pack_endr6   r   )rs   rT  r   r   packerbinsfuturesbin_resultsrB   )r   rT  r]  r`  rs   r   rD   _merge_group  s   z"_ManifestMergeManager._merge_groupc              	   C  sZ   | j r	t|dkr|S |d }| |}g }t| D ]}|| ||||  q|S r  )r  r   rP  reversedkeysr[  rg  )rs   r   rT  rO  merged_manifestsr   rB   rB   rD   r0    s   
z%_ManifestMergeManager.merge_manifestsN)
r+  r<   r,  r<   r-  r   r.  r}   r?   r_   )r   r   r?   rN  )r   r<   rQ  r   r?   r   )rT  r   r   r<   r   r   r?   r   r   )	r   r   r   r   rc   rP  rS  rg  r0  rB   rB   rB   rD   r/    s   
 



r/  c                      s   e Zd ZU dZded< ded< d4 fd
dZd5ddZd6ddZd7ddZd8d9ddZ	d:ddZ
			d;d<d"d#Zd=d$d%Zd>d?d'd(Zd@d)d*ZdAd,d-ZdBd/d0ZdCd2d3Z  ZS )DManageSnapshotsa  
    Run snapshot management operations using APIs.

    APIs include create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    tuple[TableUpdate, ...]_updatestuple[TableRequirement, ...]_requirementsrY   r:   r?   r_   c                   s   t  | d| _d| _d S NrB   )rb   rc   rm  ro  rs   rY   ru   rB   rD   rc   =  s   
zManageSnapshots.__init__r1   c                 C  s   | j | jfS )z%Apply the pending changes and commit.)rm  ro  r   rB   rB   rD   r   B  s   zManageSnapshots._commitc                 C  s*   | j r| jj|    d| _ d| _dS dS )z7Stage any pending ref updates to the transaction state.rB   N)rm  rf   _stager   ro  r   rB   rB   rD   _commit_if_ref_updates_existF  s
   
z,ManageSnapshots._commit_if_ref_updates_existr   r@   c                 C  sX   t |df}t|| jjjv r| jjj| jnd|df}|  j|7  _|  j|7  _| S )zRemove a snapshot ref.

        Args:
            ref_name: branch / tag name to remove
        Stages the updates and requirements for the remove-snapshot-ref.
        Returns
            This method for chaining
        r   Nr   )r+   r*   rf   rg   rw   rG   rm  ro  )rs   r   updatesrequirementsrB   rB   rD   _remove_ref_snapshotM  s   	z$ManageSnapshots._remove_ref_snapshotNrG   r<   tag_namemax_ref_age_msrP   c                 C  s:   | j j||tj|d\}}|  j|7  _|  j|7  _| S )aJ  
        Create a new tag pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of the existing snapshot to tag
            tag_name (str): name of the tag
            max_ref_age_ms (Optional[int]): max ref age in milliseconds

        Returns:
            This for method chaining
        )rG   r   r   ry  )rf   _set_ref_snapshotr!   TAGrm  ro  )rs   rG   rx  ry  updaterequirementrB   rB   rD   
create_tagc  s   
zManageSnapshots.create_tagc                 C     | j |dS )z
        Remove a tag.

        Args:
            tag_name (str): name of tag to remove
        Returns:
            This for method chaining
        rt  rw  )rs   rx  rB   rB   rD   
remove_tagy     	zManageSnapshots.remove_tagbranch_namemax_snapshot_age_msmin_snapshots_to_keepc                 C  s>   | j j||tj|||d\}}|  j|7  _|  j|7  _| S )a+  
        Create a new branch pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
            branch_name (str): name of the new branch
            max_ref_age_ms (Optional[int]): max ref age in milliseconds
            max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
            min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
        Returns:
            This for method chaining
        )rG   r   r   ry  r  r  )rf   rz  r!   ry   rm  ro  )rs   rG   r  ry  r  r  r|  r}  rB   rB   rD   create_branch  s   
zManageSnapshots.create_branchc                 C  r  )z
        Remove a branch.

        Args:
            branch_name (str): name of branch to remove
        Returns:
            This for method chaining
        rt  r  )rs   r  rB   rB   rD   remove_branch  r  zManageSnapshots.remove_branchrV   c                 C  s   |    |du |du krtd|dur|}n|| jjjvr%td| | jjj| j}| jj|du r=td| | jj|tt	j
d\}}| j|| | S )a  Set the current snapshot to a specific snapshot ID or ref.

        Args:
            snapshot_id: The ID of the snapshot to set as current.
            ref_name: The snapshot reference (branch or tag) to set as current.

        Returns:
            This for method chaining.

        Raises:
            ValueError: If neither or both arguments are provided, or if the snapshot/ref does not exist.
        Nz9Either snapshot_id or ref_name must be provided, not bothz*Cannot find matching snapshot ID for ref: z4Cannot set current snapshot to unknown snapshot id: )rG   r   r   )rs  rz   rf   rg   rw   rG   r   rz  r    r!   ry   rr  )rs   rG   r   target_snapshot_idr|  r}  rB   rB   rD   set_current_snapshot  s"   
z$ManageSnapshots.set_current_snapshotc                 C  s@   | j j|std| | |std| | j|dS )a  Rollback the table to the given snapshot id.

        The snapshot needs to be an ancestor of the current table state.

        Args:
            snapshot_id (int): rollback to this snapshot_id that used to be current.

        Returns:
            This for method chaining

        Raises:
            ValueError: If the snapshot does not exist or is not an ancestor of the current table state.
        z)Cannot roll back to unknown snapshot id: zDCannot roll back to snapshot, not an ancestor of the current state: rG   )rf   rg   r   rz   _is_current_ancestorr  rs   rG   rB   rB   rD   rollback_to_snapshot  s
   
z$ManageSnapshots.rollback_to_snapshottimestamp_msc                 C  s2   t | jj|}|du rtd| | j|jdS )a  Rollback the table to the latest snapshot before the given timestamp.

        Finds the latest ancestor snapshot whose timestamp is before the given timestamp and rolls back to it.

        Args:
            timestamp_ms: Rollback to the latest snapshot before this timestamp in milliseconds.

        Returns:
            This for method chaining

        Raises:
            ValueError: If no valid snapshot exists older than the given timestamp.
        Nz0Cannot roll back, no valid snapshot older than: r  )r'   rf   rg   rz   r  rG   )rs   r  rt   rB   rB   rD   rollback_to_timestamp  s   z%ManageSnapshots.rollback_to_timestampr   c                 C  s   ||   v S r   )_current_ancestorsr  rB   rB   rD   r    s   z$ManageSnapshots._is_current_ancestorset[int]c                 C  s    dd t | jj | jjD S )Nc                 S  s   h | ]}|j qS rB   r  )r'  arB   rB   rD   	<setcomp>  s    z5ManageSnapshots._current_ancestors.<locals>.<setcomp>)r&   rf   rg   current_snapshotr   rB   rB   rD   r    s   
z"ManageSnapshots._current_ancestorsrY   r:   r?   r_   r   )r?   r_   )r   r@   r?   rk  r   )rG   r<   rx  r@   ry  rP   r?   rk  )rx  r@   r?   rk  )NNN)rG   r<   r  r@   ry  rP   r  rP   r  rP   r?   rk  )r  r@   r?   rk  )NN)rG   rP   r   rV   r?   rk  )rG   r<   r?   rk  )r  r<   r?   rk  )rG   r<   r?   r   r?   r  )r   r   r   r  r   rc   r   rs  rw  r~  r  r  r  r  r  r  r  r  r   rB   rB   ru   rD   rk  *  s(   
 




 
%

rk  c                      sn   e Zd ZU dZded< ded< ded< d fddZd ddZd!ddZd"ddZd#ddZ	d$ddZ
  ZS )%ExpireSnapshotsa  Expire snapshots by ID.

    Use table.expire_snapshots().<operation>().commit() to run a specific operation.
    Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.
    rl  rm  rn  ro  r  _snapshot_ids_to_expirerY   r:   r?   r_   c                   s$   t  | d| _d| _t | _d S rp  )rb   rc   rm  ro  ri   r  rq  ru   rB   rD   rc     s   zExpireSnapshots.__init__r1   c                 C  s>   |   }|  j|8  _t| jd}|  j|f7  _| j| jfS )aF  
        Commit the staged updates and requirements.

        This will remove the snapshots with the given IDs, but will always skip protected snapshots (branch/tag heads).

        Returns:
            Tuple of updates and requirements to be committed,
            as required by the calling parent apply functions.
        )snapshot_ids)_get_protected_snapshot_idsr  r,   rm  ro  )rs   protected_idsr|  rB   rB   rD   r     s
   zExpireSnapshots._commitc                 C  s   dd | j jj D S )a  
        Get the IDs of protected snapshots.

        These are the HEAD snapshots of all branches and all tagged snapshots.  These ids are to be excluded from expiration.

        Returns:
            Set of protected snapshot IDs to exclude from expiration.
        c                 S  s$   h | ]}|j tjtjfv r|jqS rB   )rx   r!   r{  ry   rG   )r'  r{   rB   rB   rD   r  7  s
    z>ExpireSnapshots._get_protected_snapshot_ids.<locals>.<setcomp>)rf   rg   rw   valuesr   rB   rB   rD   r  .  s   	z+ExpireSnapshots._get_protected_snapshot_idsrG   r<   c                 C  sN   | j j|du rtd| d||  v rtd| d| j| | S )z
        Expire a snapshot by its ID.

        This will mark the snapshot for expiration.

        Args:
            snapshot_id (int): The ID of the snapshot to expire.
        Returns:
            This for method chaining.
        NzSnapshot with ID z does not exist.z$ is protected and cannot be expired.)rf   rg   r   rz   r  r  r   r  rB   rB   rD   by_id=  s   zExpireSnapshots.by_idr  	list[int]c                 C  s   |D ]}|  | q| S )z
        Expire multiple snapshots by their IDs.

        This will mark the snapshots for expiration.

        Args:
            snapshot_ids (List[int]): List of snapshot IDs to expire.
        Returns:
            This for method chaining.
        )r  )rs   r  rG   rB   rB   rD   by_idsR  s   zExpireSnapshots.by_idsdtr   c                 C  sF   |   }t|}| jjjD ]}|j|k r |j|vr | j|j q| S )z
        Expire all unprotected snapshots with a timestamp older than a given value.

        Args:
            dt (datetime): Only snapshots with datetime < this value will be expired.

        Returns:
            This for method chaining.
        )	r  r7   rf   rg   	snapshotsr  rG   r  r   )rs   r  r  expire_fromrt   rB   rB   rD   
older_thana  s   
zExpireSnapshots.older_thanr  r   r  )rG   r<   r?   r  )r  r  r?   r  )r  r   r?   r  )r   r   r   r  r   rc   r   r  r  r  r  r   rB   rB   ru   rD   r    s   
 



r  )r;   r<   r=   r>   r?   r@   )rG   r<   rH   r<   r=   r>   r?   r@   )[
__future__r   rj   rd   abcr   collectionsr   collections.abcr   r   	functoolsr   typingr   r	   pyiceberg.avro.codecsr
   pyiceberg.expressionsr   r   r   pyiceberg.expressions.visitorsr   r   r   r   r   r   pyiceberg.ior   r   pyiceberg.manifestr   r   r   r   r   r   r   r   r   pyiceberg.partitioningr   pyiceberg.table.refsr    r!   pyiceberg.table.snapshotsr"   r#   r$   r%   r&   r'   r(   pyiceberg.table.updater)   r*   r+   r,   r-   r.   r/   r0   r1   r2   pyiceberg.typedefr3   r4   pyiceberg.utils.bin_packingr5   pyiceberg.utils.concurrentr6   pyiceberg.utils.datetimer7   pyiceberg.utils.propertiesr8   r9   rl   r:   rE   rK   rL   r   r  r  r4  r@  r/  rk  r  rB   rB   rB   rD   <module>   sP    ,$0	

   4L:J b