o
    uyi                     @  s  d dl mZ d dlZd dlZd dlZd dlmZ d dlm	Z	 d dlm
Z
 d dlmZ d dlmZ d dlmZmZmZmZmZmZmZmZ d d	lmZ d d
lmZ d dlmZmZmZ d dlm Z m!Z!m"Z"m#Z#m$Z$m%Z% d dl&m'Z'm(Z( d dl)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7 d dl8m9Z9m:Z:m;Z;m<Z<m=Z= d dl>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZEmFZFmGZGmHZH d dlImJZJmKZK d dlLmMZM d dlNmOZO d dlPmQZQ d dlRmSZSmTZT erd dlUmVZV d7dd ZWd8d#d$ZXG d%d& d&eHeF eeF ZYG d'd( d(eYd( ZZG d)d* d*eYd* Z[G d+d, d,e[Z\G d-d. d.eYd. Z]G d/d0 d0Z^G d1d2 d2eeF Z_G d3d4 d4eHd4 Z`G d5d6 d6eHd6 ZadS )9    )annotationsN)abstractmethod)defaultdict)Future)datetime)cached_property)TYPE_CHECKINGCallableDictGenericListOptionalSetTuple)
SortedList)AvroCompressionCodec)AlwaysFalseBooleanExpressionOr)ROWS_MIGHT_NOT_MATCHROWS_MUST_MATCH_InclusiveMetricsEvaluator_StrictMetricsEvaluatorinclusive_projectionmanifest_evaluator)FileIO
OutputFile)	DataFileDataFileContentManifestContentManifestEntryManifestEntryStatusManifestFileManifestWriterwrite_manifestwrite_manifest_list)PartitionSpec)MAIN_BRANCHSnapshotRefType)	OperationSnapshotSnapshotSummaryCollectorSummaryupdate_snapshot_summaries)
AddSnapshotUpdateAssertRefSnapshotIdRemoveSnapshotRefUpdateRemoveSnapshotsUpdateSetSnapshotRefUpdateTableRequirementTableUpdateUUpdatesAndRequirementsUpdateTableMetadata)
EMPTY_DICTKeyDefaultDict)
ListPacker)ExecutorFactory)datetime_to_millis)property_as_boolproperty_as_int)Transactionnumintcommit_uuid	uuid.UUIDreturnstrc                 C  s   | d|  dS )Nz-m.avro r@   rB   rG   rG   b/home/ubuntu/maya3_transcribe/venv/lib/python3.10/site-packages/pyiceberg/table/update/snapshot.py_new_manifest_file_name]      rJ   snapshot_idattemptc                 C  s   d|  d| d| dS )Nzsnap--rF   rG   rL   rM   rB   rG   rG   rI   _new_manifest_list_file_namea   s   rP   c                      s  e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< ded< ded< ded< deefdP fd d!ZdQd"d#ZdRd'd(ZdRd)d*Z	e
dSd,d-Ze
dTd/d0ZdUd2d3ZdTd4d5ZefdVd7d8ZdWd:d;ZedXd<d=ZdYd@dAZdZdDdEZd[dGdHZd\d]dNdOZ  ZS )^_SnapshotProducerrC   rB   r   _ior)   
_operationrA   _snapshot_idOptional[int]_parent_snapshot_idzList[DataFile]_added_data_fileszitertools.count[int]_manifest_num_counterzSet[DataFile]_deleted_data_filesr   _compressionOptional[str]_target_branchN	operationtransactionr?   ioOptional[uuid.UUID]snapshot_propertiesDict[str, str]branchrD   Nonec           	        s   t  | |pt | _|| _|| _| jj	 | _
g | _t | _|| _td| _ddlm} | jjj|j|j| _| j|d| _| jj| j }rS|j| _d S d | _d S )Nr   TableProperties)rc   )super__init__uuiduuid4rB   rR   rS   _transactiontable_metadatanew_snapshot_idrT   rW   setrY   ra   	itertoolscountrX   pyiceberg.tablerf   
propertiesgetWRITE_AVRO_COMPRESSIONWRITE_AVRO_COMPRESSION_DEFAULTrZ   _validate_target_branchr\   snapshot_by_namerL   rV   )	selfr]   r^   r_   rB   ra   rc   rf   snapshot	__class__rG   rI   rh   s   s$   	
z_SnapshotProducer.__init__c                 C  sB   |d ur|| j jjv r| j jj| }|jtjkrt| d|S )NzG is a tag, not a branch. Tags cannot be targets for producing snapshots)rk   rl   refssnapshot_ref_typer(   BRANCH
ValueError)rx   rc   refrG   rG   rI   rv      s   z)_SnapshotProducer._validate_target_branch	data_filer   _SnapshotProducer[U]c                 C     | j | | S N)rW   appendrx   r   rG   rG   rI   append_data_file      z"_SnapshotProducer.append_data_filec                 C  r   r   )rY   addr   rG   rG   rI   delete_data_file   r   z"_SnapshotProducer.delete_data_fileList[ManifestEntry]c                 C     d S r   rG   rx   rG   rG   rI   _deleted_entries      z"_SnapshotProducer._deleted_entriesList[ManifestFile]c                 C  r   r   rG   r   rG   rG   rI   _existing_manifests   r   z%_SnapshotProducer._existing_manifests	manifestsc                 C  s   |S )zXTo perform any post-processing on the manifests before writing them to the new snapshot.rG   )rx   r   rG   rG   rI   _process_manifests   s   z$_SnapshotProducer._process_manifestsc                   sb   d fdd}d fdd}t  }||}||}| j} | |  |  S )NrD   r   c                    s    j rFt jjj jj  jj    j j	d}  j D ]}| 
tjtj jd d |d q W d    n1 s<w   Y  |  gS g S )Nformat_versionspecschemaoutput_filerL   avro_compressionstatusrL   sequence_numberfile_sequence_numberr   )rW   r$   rk   rl   r   r   r   new_manifest_outputrT   rZ   r   r    	from_argsr!   ADDEDto_manifest_file)writerr   r   rG   rI   _write_added_manifest   s0   



z;_SnapshotProducer._manifests.<locals>._write_added_manifestc               	     s      } t| dkreg }tt}| D ]}||jj | q| D ]@\}}t j	j
j j	j
 |  j	j
    j jd}|D ]}|| qDW d    n1 sVw   Y  ||  q"|S g S )Nr   r   )r   lenr   listr   spec_idr   itemsr$   rk   rl   r   specsr   r   rT   rZ   	add_entryr   )deleted_entriesdeleted_manifestspartition_groupsdeleted_entryr   entriesr   entryr   rG   rI   _write_delete_manifest   s.   

z<_SnapshotProducer._manifests.<locals>._write_delete_manifestrD   r   )r;   get_or_createsubmitr   r   result)rx   r   r   executoradded_manifestsdelete_manifestsexisting_manifestsrG   r   rI   
_manifests   s   

z_SnapshotProducer._manifestsr,   c                 C  s   ddl m} t| jjj|j|j}t	|d}| j
D ]}|j|| jj | jj d qt| jdkrP| jj }| jD ]}|j|||j | jj d q>| jd ur]| jj| jnd }ttdd| ji| ||d urv|jdS d dS )Nr   re   )partition_summary_limit)r   partition_specr   r]   )summaryprevious_summaryrG   )rq   rf   rA   rk   rl   rr   rs   WRITE_PARTITION_SUMMARY_LIMIT%WRITE_PARTITION_SUMMARY_LIMIT_DEFAULTr+   rW   add_filer   r   r   rY   r   remove_filer   rV   snapshot_by_idr-   r,   rS   buildr   )rx   ra   rf   r   sscr   r   previous_snapshotrG   rG   rI   _summary   s>   







z_SnapshotProducer._summaryr6   c           
      C  s$  |   }| jj }| | j}t| jd| jd}| jj	
 }||}t| jjj| j|| j| j|| jd}|| W d    n1 sHw   Y  t| j| j|||| jjjd}t|d}	| jd u rk|	fdfS |	t| j| j| jtjdft| j| jjjv r| jjj| j jnd | jdffS )	Nr   rO   )r   r   rL   parent_snapshot_idr   r   )rL   r   manifest_listr   r   	schema_id)ry   rG   )rL   r   ref_nametyperL   r   )r   rk   rl   next_sequence_numberr   ra   rP   rT   rB   _tablelocation_providernew_metadata_locationr%   r   rR   
new_outputrV   rZ   add_manifestsr*   current_schema_idr.   r\   r2   r(   r~   r/   r|   rL   )
rx   new_manifestsr   r   	file_namer   manifest_list_file_pathr   ry   add_snapshot_updaterG   rG   rI   _commit  sd   



	

z_SnapshotProducer._commitc                 C     | j S r   )rT   r   rG   rG   rI   rL   D  s   z_SnapshotProducer.snapshot_idr   r&   c                 C  s   | j j | S r   )rk   rl   r   )rx   r   rG   rG   rI   r   H  rK   z_SnapshotProducer.specr   r#   c                 C  s*   t | jjj|| jj |  | j| jdS )Nr   )r$   rk   rl   r   r   r   rT   rZ   )rx   r   rG   rG   rI   new_manifest_writerK  s   
z%_SnapshotProducer.new_manifest_writerr   c                 C  s6   | j j }tt| j| jd}||}| j	|S )NrH   )
rk   r   r   rJ   nextrX   rB   r   rR   r   )rx   r   r   	file_pathrG   rG   rI   r   U  s   
z%_SnapshotProducer.new_manifest_outputTmanifestr"   discard_deletedboolc                 C  s   |j | j|dS )Nr_   r   fetch_manifest_entryrR   )rx   r   r   rG   rG   rI   r   [  rK   z&_SnapshotProducer.fetch_manifest_entry)r]   r)   r^   r?   r_   r   rB   r`   ra   rb   rc   r[   rD   rd   )rc   r[   rD   r[   )r   r   rD   r   rD   r   r   r   r   rD   r   )ra   rb   rD   r,   rD   r6   )rD   rA   )r   rA   rD   r&   )r   r&   rD   r#   )rD   r   T)r   r"   r   r   rD   r   )__name__
__module____qualname____annotations__r8   r'   rh   rv   r   r   r   r   r   r   r   r   r   propertyrL   r   r   r   r   __classcell__rG   rG   rz   rI   rQ   g   s@   
 

	


9
%<



rQ   c                      s   e Zd ZU dZded< ded< edefd6 fddZd7 fddZd8ddZ	e
d9ddZd:d!d"Zd;d<d'd(Ze
d=d*d+Zd>d-d.Zd?d0d1Zed@d2d3Zed@d4d5Z  ZS )A_DeleteFilesa  Will delete manifest entries from the current snapshot based on the predicate.

    This will produce a DELETE snapshot:
        Data files were removed and their contents logically deleted and/or delete
        files were added to delete rows.

    From the specification
    r   
_predicater   _case_sensitiveNr]   r)   r^   r?   r_   r   rc   r[   rB   r`   ra   rb   c                   s(   t  |||||| t | _d| _d S )NT)rg   rh   r   r   r   )rx   r]   r^   r_   rc   rB   ra   rz   rG   rI   rh   l  s   	
z_DeleteFiles.__init__rD   r6   c                   s   | j rt  S dS )N)rG   rG   )files_affectedrg   r   r   rz   rG   rI   r   y  s   
z_DeleteFiles._commitr   rA   c                 C  s4   | j j }| j j | }t||| j}|| jS r   )rk   rl   r   r   r   r   r   )rx   r   r   r   projectrG   rG   rI   _build_partition_projection  s   
z(_DeleteFiles._build_partition_projection&KeyDefaultDict[int, BooleanExpression]c                 C  s
   t | jS r   )r9   r   r   rG   rG   rI   partition_filters  s   
z_DeleteFiles.partition_filtersCallable[[ManifestFile], bool]c                 C  s2   | j j }| j j | }t||| j| | jS r   )rk   rl   r   r   r   r   r   )rx   r   r   r   rG   rG   rI   _build_manifest_evaluator  s   z&_DeleteFiles._build_manifest_evaluatorT	predicatecase_sensitiverd   c                 C  s   t | j|| _|| _d S r   )r   r   r   )rx   r   r   rG   rG   rI   delete_by_predicate  s   
z _DeleteFiles.delete_by_predicate4Tuple[List[ManifestFile], List[ManifestEntry], bool]c              	     s   j j }d fdd}t j}t| j jdj}t	| j jdj}g }g }d	}t
  _ j}	|	d
ur j j|	}
|
r|
j jdD ]}|jtjkr||j |s\|| qGg }g }|j jddD ],}||jtkr|||tj  j|j qh|||tj ||jtkrd}qht|dkr||7 }t|dkrt j jj j j  |j  j j  !  j" j#d}|D ]}|$| qW d
   n1 sw   Y  ||%  qG|| qG|| qG|||fS )aH  Computes all the delete operation and cache it when nothing changes.

        Returns:
            - List of existing manifests that are not affected by the delete operation.
            - The manifest-entries that are deleted based on the metadata.
            - Flag indicating that rewrites of data-files are needed.
        r   r    r   r!   rD   c                   s,   t j||tjkr jn| j| j| j| jdS )Nr   )r    r   r!   DELETEDrL   r   r   r   )r   r   r   rG   rI   _copy_with_new_status  s   z<_DeleteFiles._compute_deletes.<locals>._copy_with_new_status)r   FNr_   Tr   r   r   )r   r    r   r!   rD   r    )&rk   rl   r   r9   r   r   r   r   evalr   rn   rY   rV   r   r   rR   contentr   DATApartition_spec_idr   r   r   r   r!   r   r   EXISTINGr   r   r$   r   r   r   rT   rZ   r   r   )rx   r   r   manifest_evaluatorsstrict_metrics_evaluatorinclusive_metrics_evaluatorr   total_deleted_entriespartial_rewrites_needed$parent_snapshot_id_for_delete_sourcery   manifest_filer   existing_entriesr   r   existing_entryrG   r   rI   _compute_deletes  sh   	





z_DeleteFiles._compute_deletesr   c                 C  
   | j d S Nr   r  r   rG   rG   rI   r        
z _DeleteFiles._existing_manifestsr   c                 C  r  )N   r  r   rG   rG   rI   r     r  z_DeleteFiles._deleted_entriesc                 C  r  )z,Indicate if data files need to be rewritten.   r  r   rG   rG   rI   rewrites_needed  s   
z_DeleteFiles.rewrites_neededc                 C  s   t |  dkS )z0Indicate if any manifest-entries can be dropped.r   )r   r   r   rG   rG   rI   r     s   z_DeleteFiles.files_affected)r]   r)   r^   r?   r_   r   rc   r[   rB   r`   ra   rb   r   )r   rA   rD   r   )rD   r   )r   rA   rD   r   r   )r   r   r   r   rD   rd   )rD   r   r   r   )rD   r   )r   r   r   __doc__r   r'   r8   rh   r   r   r   r   r   r   r  r   r   r   r  r   r   rG   rG   rz   rI   r   _  s,   
 	


P
r   c                   @  s    e Zd Zd	ddZd
ddZdS )_FastAppendFilesrD   r   c                 C  st   g }| j dur8| jj| j }|du rtd| j  |j| jdD ]}| s2| s2|j	| j
kr7|| q"|S )zTo determine if there are any existing manifest files.

        A fast append will add another ManifestFile to the ManifestList.
        All the existing manifest files are considered existing.
        NzSnapshot could not be found: r   )rV   rk   rl   r   r   r   rR   has_added_fileshas_existing_filesadded_snapshot_idrT   r   )rx   r   r   r   rG   rG   rI   r     s   

z$_FastAppendFiles._existing_manifestsr   c                 C  s   g S )z{To determine if we need to record any deleted manifest entries.

        In case of an append, nothing is deleted.
        rG   r   rG   rG   rI   r   
  s   z!_FastAppendFiles._deleted_entriesNr   r   )r   r   r   r   r   rG   rG   rG   rI   r    s    
r  c                      sJ   e Zd ZU ded< ded< ded< edefd fddZdddZ  ZS )_MergeAppendFilesrA   _target_size_bytes_min_count_to_merger   _merge_enabledNr]   r)   r^   r?   r_   r   rc   r[   rB   r`   ra   rb   rD   rd   c                   sn   ddl m} t |||||| t| jjj|j|j	| _
t| jjj|j|j| _t| jjj|j|j| _d S )Nr   re   )rq   rf   rg   rh   r>   rk   rl   rr   MANIFEST_TARGET_SIZE_BYTES"MANIFEST_TARGET_SIZE_BYTES_DEFAULTr  MANIFEST_MIN_MERGE_COUNT MANIFEST_MIN_MERGE_COUNT_DEFAULTr  r=   MANIFEST_MERGE_ENABLEDMANIFEST_MERGE_ENABLED_DEFAULTr  )rx   r]   r^   r_   rc   rB   ra   rf   rz   rG   rI   rh     s"   	
z_MergeAppendFiles.__init__r   r   c                 C  s@   dd |D }dd |D }t | j| j| j| d}||| S )zTo perform any post-processing on the manifests before writing them to the new snapshot.

        In _MergeAppendFiles, we merge manifests based on the target size and the minimum count to merge
        if automatic merge is enabled.
        c                 S     g | ]
}|j tjkr|qS rG   )r   r   r   .0r   rG   rG   rI   
<listcomp>9      z8_MergeAppendFiles._process_manifests.<locals>.<listcomp>c                 S  r#  rG   )r   r   DELETESr$  rG   rG   rI   r&  :  r'  )target_size_bytesmin_count_to_mergemerge_enabledsnapshot_producer)_ManifestMergeManagerr  r  r  merge_manifests)rx   r   unmerged_data_manifestsunmerged_deletes_manifestsdata_manifest_merge_managerrG   rG   rI   r   3  s   z$_MergeAppendFiles._process_manifests)r]   r)   r^   r?   r_   r   rc   r[   rB   r`   ra   rb   rD   rd   r   )	r   r   r   r   r'   r8   rh   r   r   rG   rG   rz   rI   r    s   
 r  c                   @  s$   e Zd ZdZd
ddZdddZd	S )_OverwriteFileszOverwrites data from the table. This will produce an OVERWRITE snapshot.

    Data and delete files were added and removed in a logical overwrite operation.
    rD   r   c                   s   g }j jjjd }r|jjdD ]y}|jjdd}fdd|D  t dkr3|| qt	 fdd	|D rt
j jjj j |j j j  jjd
$}|D ]}|j vrv|tjtj|j|j|j|jd q]W d   n1 sw   Y  ||  q|S )z3Determine if there are any existing manifest files.namer   Tr   c                   s   g | ]}|j  jv r|j qS rG   )r   rY   r%  r   r   rG   rI   r&  S  s    z7_OverwriteFiles._existing_manifests.<locals>.<listcomp>r   c                 3  s    | ]}|j  vV  qd S r   )r   r5  )found_deleted_data_filesrG   rI   	<genexpr>Y  s    z6_OverwriteFiles._existing_manifests.<locals>.<genexpr>r   r   N)rk   rl   rw   r\   r   rR   r   r   r   anyr$   r   r   r  r   r   rT   rZ   r   r   r    r   r!   r  rL   r   r   r   )rx   existing_filesry   r	  r   r   r   rG   )r6  rx   rI   r   L  sD   

z#_OverwriteFiles._existing_manifestsr   c                   sn    j dur5 jj j }|du rtd j  t }d	 fdd}||| j	}t
tj| S g S )
zTo determine if we need to record any deleted entries.

        With a full overwrite all the entries are considered deleted.
        With partial overwrites we have to use the predicate to evaluate
        which entries are affected.
        Nz&Could not find the previous snapshot: r   r"   rD   r   c                   s    fdd| j  jddD S )Nc              	     sD   g | ]}|j jtjkr|j  jv rtjtj|j	|j
|j|j d qS )r   )r   r   r   r   rY   r    r   r!   r   rL   r   r   r5  r   rG   rI   r&    s    zJ_OverwriteFiles._deleted_entries.<locals>._get_entries.<locals>.<listcomp>T)r   r   )r   r   rG   rI   _get_entries  s   
z6_OverwriteFiles._deleted_entries.<locals>._get_entries)r   r"   rD   r   )rV   rk   rl   r   r   r;   r   mapr   rR   r   ro   chain)rx   r   r   r:  list_of_entriesrG   r   rI   r   p  s   
z _OverwriteFiles._deleted_entriesNr   r   )r   r   r   r  r   r   rG   rG   rG   rI   r2  F  s    
$r2  c                   @  sh   e Zd ZU ded< ded< ded< ded< eefd ddZd!ddZd"ddZd#d$ddZ	d%ddZ
dS )&UpdateSnapshotr?   rk   r   rR   r[   _branchrb   _snapshot_propertiesr^   r_   rc   ra   rD   rd   c                 C  s   || _ || _|| _|| _d S r   )rk   rR   r@  r?  )rx   r^   r_   rc   ra   rG   rG   rI   rh     s   
zUpdateSnapshot.__init__r  c                 C     t tj| j| j| j| jdS N)r]   r^   r_   rc   ra   )r  r)   APPENDrk   rR   r?  r@  r   rG   rG   rI   fast_append     zUpdateSnapshot.fast_appendr  c                 C  rA  rB  )r  r)   rC  rk   rR   r?  r@  r   rG   rG   rI   merge_append  rE  zUpdateSnapshot.merge_appendNrB   r`   r2  c                 C  s:   t || jjj| jdd urtjntj| j| j| j| j	dS )Nr3  )rB   r]   r^   r_   rc   ra   )
r2  rk   rl   rw   r?  r)   	OVERWRITErC  rR   r@  )rx   rB   rG   rG   rI   	overwrite  s   zUpdateSnapshot.overwriter   c                 C  rA  rB  )r   r)   DELETErk   rR   r?  r@  r   rG   rG   rI   delete  rE  zUpdateSnapshot.delete)
r^   r?   r_   r   rc   r[   ra   rb   rD   rd   )rD   r  )rD   r  r   )rB   r`   rD   r2  )rD   r   )r   r   r   r   r'   r8   rh   rD  rF  rH  rJ  rG   rG   rG   rI   r>    s   
 

		r>  c                   @  s`   e Zd ZU ded< ded< ded< ded< d ddZd!ddZd"ddZd#ddZd$ddZdS )%r-  rA   r  r  r   r  r   _snapshot_producerr)  r*  r+  r,  rD   rd   c                 C  s   || _ || _|| _|| _d S r   )r  r  r  rK  )rx   r)  r*  r+  r,  rG   rG   rI   rh     s   
z_ManifestMergeManager.__init__r   r   Dict[int, List[ManifestFile]]c                 C  s&   t t}|D ]
}||j | q|S r   )r   r   r  r   )rx   r   groupsr   rG   rG   rI   _group_by_spec  s   z$_ManifestMergeManager._group_by_specr   manifest_binr"   c                 C  s   | j j| j |dL}|D ]>}| j j|ddD ]3}|jtjkr-|j| j jkr-|| q|jtj	kr@|j| j jkr@|
| q|jtjkrK|| qqW d    | S 1 sZw   Y  | S )N)r   F)r   r   )rK  r   r   r   r   r!   r   rL   rJ  r   r   existingr   )rx   r   rO  r   r   r   rG   rG   rI   _create_manifest  s    

z&_ManifestMergeManager._create_manifestfirst_manifestc           
        s   t jddd}||dd }dfd	d
t   fdd|D }dd t|D tg fddd}tj	|D ]}|
| q@dd |D }	dd |	D S )Nr  F)target_weightlookbacklargest_bin_firstc                 S  r   r   )manifest_length)mrG   rG   rI   <lambda>  s    z4_ManifestMergeManager._merge_group.<locals>.<lambda>rO  r   rD   c                   s\   g }t | dkr|| d  |S  | v r#t | jk r#||  |S ||  |S )Nr  r   )r   r   r  extendrQ  )rO  output_manifests)rR  rx   r   rG   rI   	merge_bin  s   	
z5_ManifestMergeManager._merge_group.<locals>.merge_binc                   s   g | ]}  |qS rG   )r   )r%  b)r   r[  rG   rI   r&    s    z6_ManifestMergeManager._merge_group.<locals>.<listcomp>c                 S  s   i | ]\}}||qS rG   rG   )r%  ifrG   rG   rI   
<dictcomp>  s    z6_ManifestMergeManager._merge_group.<locals>.<dictcomp>c                   s    |  S r   rG   )r^  )futures_indexrG   rI   rX    s    )iterablekeyc                 S  s   g | ]
}|  r|  qS rG   )r   )r%  r^  rG   rG   rI   r&    r'  c                 S  s   g | ]	}|D ]}|qqS rG   rG   )r%  
bin_resultr   rG   rG   rI   r&    s    )rO  r   rD   r   )r:   r  pack_endr;   r   	enumerater   
concurrentfuturesas_completedr   )
rx   rR  r   r   packerbinsrg  completed_futuresfuturebin_resultsrG   )r   rR  r`  r[  rx   r   rI   _merge_group  s   z"_ManifestMergeManager._merge_groupc              	   C  sZ   | j r	t|dkr|S |d }| |}g }t| D ]}|| ||||  q|S r  )r  r   rN  reversedkeysrY  rn  )rx   r   rR  rM  merged_manifestsr   rG   rG   rI   r.    s   
z%_ManifestMergeManager.merge_manifestsN)
r)  rA   r*  rA   r+  r   r,  r   rD   rd   )r   r   rD   rL  )r   rA   rO  r   rD   r"   )rR  r"   r   rA   r   r   rD   r   r   )	r   r   r   r   rh   rN  rQ  rn  r.  rG   rG   rG   rI   r-    s   
 



r-  c                   @  sp   e Zd ZU dZdZded< dZded< d d	d
Zd!ddZd"d#ddZ	d$ddZ
			d%d&ddZd'ddZdS )(ManageSnapshotsa  
    Run snapshot management operations using APIs.

    APIs include create branch, create tag, etc.

    Use table.manage_snapshots().<operation>().commit() to run a specific operation.
    Use table.manage_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.

    We can also use context managers to make more changes. For example,

    with table.manage_snapshots() as ms:
       ms.create_tag(snapshot_id1, "Tag_A").create_tag(snapshot_id2, "Tag_B")
    rG   Tuple[TableUpdate, ...]_updatesTuple[TableRequirement, ...]_requirementsrD   r6   c                 C  s   | j | jfS )z%Apply the pending changes and commit.)rt  rv  r   rG   rG   rI   r   0  s   zManageSnapshots._commitr   rE   c                 C  sX   t |df}t|| jjjv r| jjj| jnd|df}|  j|7  _|  j|7  _| S )zRemove a snapshot ref.

        Args:
            ref_name: branch / tag name to remove
        Stages the updates and requirements for the remove-snapshot-ref.
        Returns
            This method for chaining
        r   Nr   )r0   r/   rk   rl   r|   rL   rt  rv  )rx   r   updatesrequirementsrG   rG   rI   _remove_ref_snapshot4  s   	z$ManageSnapshots._remove_ref_snapshotNrL   rA   tag_namemax_ref_age_msrU   c                 C  s8   | j j||d|d\}}|  j|7  _|  j|7  _| S )aJ  
        Create a new tag pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of the existing snapshot to tag
            tag_name (str): name of the tag
            max_ref_age_ms (Optional[int]): max ref age in milliseconds

        Returns:
            This for method chaining
        tag)rL   r   r   r|  rk   _set_ref_snapshotrt  rv  )rx   rL   r{  r|  updaterequirementrG   rG   rI   
create_tagJ  s   
zManageSnapshots.create_tagc                 C     | j |dS )z
        Remove a tag.

        Args:
            tag_name (str): name of tag to remove
        Returns:
            This for method chaining
        rw  rz  )rx   r{  rG   rG   rI   
remove_tag`     	zManageSnapshots.remove_tagbranch_namemax_snapshot_age_msmin_snapshots_to_keepc                 C  s<   | j j||d|||d\}}|  j|7  _|  j|7  _| S )a+  
        Create a new branch pointing to the given snapshot id.

        Args:
            snapshot_id (int): snapshot id of existing snapshot at which the branch is created.
            branch_name (str): name of the new branch
            max_ref_age_ms (Optional[int]): max ref age in milliseconds
            max_snapshot_age_ms (Optional[int]): max age of snapshots to keep in milliseconds
            min_snapshots_to_keep (Optional[int]): min number of snapshots to keep for the branch
        Returns:
            This for method chaining
        rc   )rL   r   r   r|  r  r  r~  )rx   rL   r  r|  r  r  r  r  rG   rG   rI   create_branchk  s   
zManageSnapshots.create_branchc                 C  r  )z
        Remove a branch.

        Args:
            branch_name (str): name of branch to remove
        Returns:
            This for method chaining
        rw  r  )rx   r  rG   rG   rI   remove_branch  r  zManageSnapshots.remove_branchr   )r   rE   rD   rr  r   )rL   rA   r{  rE   r|  rU   rD   rr  )r{  rE   rD   rr  )NNN)rL   rA   r  rE   r|  rU   r  rU   r  rU   rD   rr  )r  rE   rD   rr  )r   r   r   r  rt  r   rv  r   rz  r  r  r  r  rG   rG   rG   rI   rr    s   
 


 rr  c                   @  sj   e Zd ZU dZe Zded< dZded< dZded< dddZ	dddZ
dddZd ddZd!ddZdS )"ExpireSnapshotsa  Expire snapshots by ID.

    Use table.expire_snapshots().<operation>().commit() to run a specific operation.
    Use table.expire_snapshots().<operation-one>().<operation-two>().commit() to run multiple operations.
    Pending changes are applied on commit.
    Set[int]_snapshot_ids_to_expirerG   rs  rt  ru  rv  rD   r6   c                 C  s>   |   }|  j|8  _t| jd}|  j|f7  _| j| jfS )aF  
        Commit the staged updates and requirements.

        This will remove the snapshots with the given IDs, but will always skip protected snapshots (branch/tag heads).

        Returns:
            Tuple of updates and requirements to be committed,
            as required by the calling parent apply functions.
        )snapshot_ids)_get_protected_snapshot_idsr  r1   rt  rv  )rx   protected_idsr  rG   rG   rI   r     s
   zExpireSnapshots._commitc                 C  s   dd | j jj D S )a  
        Get the IDs of protected snapshots.

        These are the HEAD snapshots of all branches and all tagged snapshots.  These ids are to be excluded from expiration.

        Returns:
            Set of protected snapshot IDs to exclude from expiration.
        c                 S  s$   h | ]}|j tjtjfv r|jqS rG   )r}   r(   TAGr~   rL   )r%  r   rG   rG   rI   	<setcomp>  s
    z>ExpireSnapshots._get_protected_snapshot_ids.<locals>.<setcomp>)rk   rl   r|   valuesr   rG   rG   rI   r    s   	z+ExpireSnapshots._get_protected_snapshot_idsrL   rA   c                 C  sN   | j j|du rtd| d||  v rtd| d| j| | S )z
        Expire a snapshot by its ID.

        This will mark the snapshot for expiration.

        Args:
            snapshot_id (int): The ID of the snapshot to expire.
        Returns:
            This for method chaining.
        NzSnapshot with ID z does not exist.z$ is protected and cannot be expired.)rk   rl   r   r   r  r  r   )rx   rL   rG   rG   rI   by_id  s   zExpireSnapshots.by_idr  	List[int]'ExpireSnapshots'c                 C  s   |D ]}|  | q| S )z
        Expire multiple snapshots by their IDs.

        This will mark the snapshots for expiration.

        Args:
            snapshot_ids (List[int]): List of snapshot IDs to expire.
        Returns:
            This for method chaining.
        )r  )rx   r  rL   rG   rG   rI   by_ids  s   zExpireSnapshots.by_idsdtr   c                 C  sF   |   }t|}| jjjD ]}|j|k r |j|vr | j|j q| S )z
        Expire all unprotected snapshots with a timestamp older than a given value.

        Args:
            dt (datetime): Only snapshots with datetime < this value will be expired.

        Returns:
            This for method chaining.
        )	r  r<   rk   rl   	snapshotstimestamp_msrL   r  r   )rx   r  r  expire_fromry   rG   rG   rI   
older_than  s   
zExpireSnapshots.older_thanNr   )rD   r  )rL   rA   rD   r  )r  r  rD   r  )r  r   rD   r  )r   r   r   r  rn   r  r   rt  rv  r   r  r  r  r  rG   rG   rG   rI   r    s   
 



r  )r@   rA   rB   rC   rD   rE   )rL   rA   rM   rA   rB   rC   rD   rE   )b
__future__r   concurrent.futuresrf  ro   ri   abcr   collectionsr   r   r   	functoolsr   typingr   r	   r
   r   r   r   r   r   sortedcontainersr   pyiceberg.avro.codecsr   pyiceberg.expressionsr   r   r   pyiceberg.expressions.visitorsr   r   r   r   r   r   pyiceberg.ior   r   pyiceberg.manifestr   r   r   r    r!   r"   r#   r$   r%   pyiceberg.partitioningr&   pyiceberg.table.refsr'   r(   pyiceberg.table.snapshotsr)   r*   r+   r,   r-   pyiceberg.table.updater.   r/   r0   r1   r2   r3   r4   r5   r6   r7   pyiceberg.typedefr8   r9   pyiceberg.utils.bin_packingr:   pyiceberg.utils.concurrentr;   pyiceberg.utils.datetimer<   pyiceberg.utils.propertiesr=   r>   rq   r?   rJ   rP   rQ   r   r  r  r2  r>  r-  rr  r  rG   rG   rG   rI   <module>   sP   ( ,0

 y 4L:Qz