o
    ˳i0                     @   s,  d dl mZmZmZmZmZ d dlZd dlmZ d dl	m
Z
mZmZmZmZmZmZmZmZ d dlmZmZmZmZmZmZmZmZ d dlmZmZmZm Z m!Z! d dl"m#Z#m$Z$ d dl%m&Z&m'Z'm(Z( d d	l)m*Z* d d
l+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7m8Z8m9Z9 d dl:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZEmFZFmGZGmHZHmIZImJZJmKZKmLZLmMZM d dlNmOZOmPZP erd dlQZRd dlSmTZT d dlUmVZVmWZWmXZXmYZYmZZZm[Z[ dZ\dZ]dZ^dZ_dZ`dZadZbdZcdZddZedZfdZgdZhd Zid!Zjd"Zkd#Zld$ZmekelemgZn			dOd%eod&ed' d(eodB d)e9dB d*e9f
d+d,Zpe<d-eBd.eDd/eAd0e?d1e=d2eLd3eHd3eMd3eJd4eKd4e@d5e;d5iZqG d6d7 d7e'eo Zrd8e/d*esd9 fd:d;Zt		dPd<eod%eod=e9d8e/d&ed' d(eodB d*d>fd?d@ZudAeod&d'd*d>fdBdCZvdDeod=e9d*dEfdFdGZwdHdIdJeod*dfdKdLZxG dMdN dNeZydS )Q    )TYPE_CHECKINGAnyOptionalUnioncastN)Config)	BOTOCORE_SESSIONEXTERNAL_TABLEICEBERGLOCATIONMETADATA_LOCATIONPREVIOUS_METADATA_LOCATION
TABLE_TYPEMetastoreCatalogPropertiesUpdateSummary)CommitFailedExceptionNamespaceAlreadyExistsErrorNamespaceNotEmptyErrorNoSuchIcebergTableErrorNoSuchNamespaceErrorNoSuchPropertyExceptionNoSuchTableErrorTableAlreadyExistsError)AWS_ACCESS_KEY_IDAWS_PROFILE_NAME
AWS_REGIONAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN)UNPARTITIONED_PARTITION_SPECPartitionSpec)SchemaSchemaVisitorvisit)FromInputFile)CommitTableResponseTable)TableMetadata)UNSORTED_SORT_ORDER	SortOrder)TableRequirementTableUpdate)
EMPTY_DICT
Identifier
Properties)
BinaryTypeBooleanTypeDateTypeDecimalType
DoubleType	FixedType	FloatTypeIntegerTypeListTypeLongTypeMapTypeNestedFieldPrimitiveType
StringType
StructTypeTimestampTypeTimestamptzTypeTimeTypeUUIDType)get_first_property_valueproperty_as_bool)
GlueClient)ColumnTypeDefDatabaseInputTypeDefDatabaseTypeDefStorageDescriptorTypeDefTableInputTypeDefTableTypeDefzglue.idzglue.skip-archiveTzglue.endpointziceberg.field.idziceberg.field.optionalziceberg.field.currentzglue.profile-namezglue.regionzglue.access-key-idzglue.secret-access-keyzglue.session-tokenzglue.max-retrieszglue.retry-mode
   standardadaptivelegacymetadata_location
glue_tablerI   prev_metadata_locationmetadata_propertiesreturnc                 C   s\   |r| di ni }|tt t| i |r||t< |r,| D ]
\}}t|||< q!|S )N
Parameters)	getupdater   r
   upperr   r   itemsstr)rN   rO   rP   rQ   new_parameterskeyvalue r\   J/home/ubuntu/.local/lib/python3.10/site-packages/pyiceberg/catalog/glue.py_construct_parameters   s   r^   booleanintbigintfloatdoubledatestring	timestampbinaryc                   @   s   e Zd ZdededefddZdedee defdd	Zd
e	dedefddZ
dededefddZdedededefddZdedefddZdS )_IcebergSchemaToGlueTypeschemastruct_resultrR   c                 C   s   |S Nr\   )selfri   rj   r\   r\   r]   ri         z_IcebergSchemaToGlueType.schemastructfield_resultsc                 C   s   dd | dS )Nzstruct<,>)join)rl   rn   ro   r\   r\   r]   rn         z_IcebergSchemaToGlueType.structfieldfield_resultc                 C   s   |j  d| S )N:)name)rl   rt   ru   r\   r\   r]   rt      s   z_IcebergSchemaToGlueType.field	list_typeelement_resultc                 C   s   d| dS )Nzarray<rq   r\   )rl   rx   ry   r\   r\   r]   list   s   z_IcebergSchemaToGlueType.listmap_type
key_resultvalue_resultc                 C   s   d| d| dS )Nzmap<rp   rq   r\   )rl   r{   r|   r}   r\   r\   r]   map   rs   z_IcebergSchemaToGlueType.map	primitivec                 C   s@   t |trd|j d|j dS t| }tvrt|S t| S )Nzdecimal(rp   ))
isinstancer1   	precisionscaletypeGLUE_PRIMITIVE_TYPESrX   )rl   r   primitive_typer\   r\   r]   r      s
   
z"_IcebergSchemaToGlueType.primitiveN)__name__
__module____qualname__r    rX   ri   r<   rz   rn   r9   rt   r6   r8   r~   r:   r   r\   r\   r\   r]   rh      s    rh   metadatarD   c                    s   i  dt dtdd f fdd}| | j }r"|jD ]}||d q| jD ]}|j| jkr.q%|jD ]}||d q1q%t  S )Nrt   
is_currentrR   c                    st   | j  v rd S td| j t| jt tt| jtt| j	
 tt|
 id | j < | jr8| j | j  d< d S d S )NrD   )NameTyperS   Comment)rw   r   r"   
field_typerh   ICEBERG_FIELD_IDrX   field_idICEBERG_FIELD_OPTIONALoptionallowerICEBERG_FIELD_CURRENTdoc)rt   r   resultsr\   r]   _append_to_results   s   


z'_to_columns.<locals>._append_to_resultsTF)	r9   boolschema_by_idcurrent_schema_idcolumnsschemas	schema_idrz   values)r   r   current_schemart   ri   r\   r   r]   _to_columns   s   


r   
table_name
propertiesrH   c                 C   s<   | t t||||t||jdd}d|v r|d |d< |S )N)ColumnsLocation)r   	TableTyperS   StorageDescriptorDescription)r	   r^   r   location)r   rN   r   r   rO   rP   table_inputr\   r\   r]   _construct_table_input   s   	
r   to_table_namec                 C   s~   d| i}|d st d|d |d< d|v r|d |d< d|v r&|d |d< d|v r3td|d |d< d|v r=|d |d< |S )	Nr   r   z/Glue table type is missing, cannot rename tableOwnerrS   r   rG   r   )
ValueErrorr   )r   rO   rename_table_inputr\   r\   r]   _construct_rename_table_input   s   r   database_namerE   c                 C   sV   d| i}i }|  D ]\}}|dkr||d< q
|tkr ||d< q
|||< q
||d< |S )Nr   r   LocationUrirS   )rW   r   )r   r   database_input
parameterskvr\   r\   r]   _construct_database_input  s   


r   gluerC   glue_catalog_idc                    s:   | j j}dtttf dtddf fdd}|d| dS )a+  
    Register the Glue Catalog ID (AWS Account ID) as a parameter on all Glue client methods.

    It's more ergonomic to do this than to pass the CatalogId as a parameter to every client call since it's an optional
    parameter and boto3 does not support 'None' values for missing parameters.
    paramskwargsrR   Nc                    s   d| vr
 | d< d S d S )N	CatalogIdr\   )r   r   r   r\   r]   add_glue_catalog_id0  s   zG_register_glue_catalog_id_with_glue_client.<locals>.add_glue_catalog_idzprovide-client-params.glue)metaeventsdictrX   r   register)r   r   event_systemr   r\   r   r]   *_register_glue_catalog_id_with_glue_client'  s   "r   c                       s  e Zd ZU ded< dKdeded def fddZd	d
defddZ	dededdddfddZ
dededddeddf
ddZdededd
fddZdeeefdeeB deedf dedB dedededefdd ZdeeB d!edefd"d#Zd$ed%eed&f d'eed&f defd(d)ZdeeB defd*d+ZdeeB ddfd,d-Zd.eeB d/eeB defd0d1Zefd2eeB deddfd3d4Z d2eeB ddfd5d6Z!d2eeB de"e fd7d8Z#dLd2eeB de"e fd:d;Z$d2eeB defd<d=Z%defd2eeB d>e&e dB d'ede'fd?d@Z(d2eeB de"e fdAdBZ)deeB ddfdCdDZ*deeB de+fdEdFZ,e-d$d
de+fdGdHZ.dededefdIdJZ/  Z0S )MGlueCatalogrC   r   Nrw   clientr   c              
      s   t  j|fi | |dur|| _dS t|t}tjt|ttt|t	t
|tt|ttt|ttt|ttd}|jd|tt|tt|tv rM|ntddd| _|t }ret| j| dS dS )aD  Glue Catalog.

        You either need to provide a boto3 glue client, or one will be constructed from the properties.

        Args:
            name: Name to identify the catalog.
            client: An optional boto3 glue client.
            properties: Properties for glue client construction and configuration.
        N)profile_nameregion_namebotocore_sessionaws_access_key_idaws_secret_access_keyaws_session_tokenr   )max_attemptsmode)retries)endpoint_urlconfig)super__init__r   rA   GLUE_RETRY_MODEboto3SessionGLUE_PROFILE_NAMEr   GLUE_REGIONr   rT   r   GLUE_ACCESS_KEY_IDr   GLUE_SECRET_ACCESS_KEYr   GLUE_SESSION_TOKENr   r   GLUE_CATALOG_ENDPOINTr   GLUE_MAX_RETRIESMAX_RETRIESEXISTING_RETRY_MODESSTANDARD_RETRY_MODEGLUE_IDr   )rl   rw   r   r   retry_mode_prop_valuesessionr   	__class__r\   r]   r   :  s0   








zGlueCatalog.__init__rO   rI   rR   c           
   	   C   s  | d }d u rtd| d }d u rtd| d }d u r'td| t }d u r=tdt d| d	| | tkrStd
| dt d| d	| | t }d u ritdt d| d	| | j|d}|	|}t
|}	t||f|	|| |	j|| dS )NDatabaseNamez+Glue table is missing DatabaseName propertyr   z#Glue table is missing Name propertyrS   z)Glue table is missing Parameters propertyz	Property z$ missing, could not determine type: .zProperty table_type is z, expected z: zTable property z' is missing, cannot find metadata for: r   )
identifierr   rN   iocatalog)rT   r   r   r   r   r
   r   r   _load_file_io	new_inputr#   table_metadatar%   r   )
rl   rO   r   r   r   glue_table_typerN   r   filer   r\   r\   r]   _convert_glue_to_iceberga  s8   

z$GlueCatalog._convert_glue_to_icebergr   r   r   rH   c              
   C   s|   z| j j||d W d S  | j jjy& } ztd| d| d|d }~w | j jjy= } z	td| d|d }~ww )N)r   
TableInputzTable r    already exists	Database z does not exist)r   create_table
exceptionsAlreadyExistsExceptionr   EntityNotFoundExceptionr   )rl   r   r   r   er\   r\   r]   _create_glue_table  s   zGlueCatalog._create_glue_table
version_idc                 C   s   z| j j||t| jtt|d W d S  | j jjy0 } ztd| d| d| d|d }~w | j jj	yL } zt
d| d| d| |d }~ww )N)r   r   SkipArchive	VersionIdTable does not exist: r   z (Glue table version r   Cannot commit z: because Glue detected concurrent update to table version )r   update_tablerB   r   GLUE_SKIP_ARCHIVEGLUE_SKIP_ARCHIVE_DEFAULTr   r   r   ConcurrentModificationExceptionr   )rl   r   r   r   r   r   r\   r\   r]   _update_glue_table  s(   zGlueCatalog._update_glue_tablec              
   C   sP   z| j j||d}|d W S  | j jjy' } ztd| d| |d }~ww )Nr   r   r%   r   r   )r   	get_tabler   r   r   )rl   r   r   load_table_responser   r\   r\   r]   _get_glue_table  s   
zGlueCatalog._get_glue_tabler   ri   z	pa.Schemar   partition_spec
sort_orderc                 C   sf   | j ||||||d}| |\}}	| |j|j|j t|	|j||j}
| j||	|
d | j|dS )a{  
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        )r   ri   r   r
  r  r   r   r   r   r   )	_create_staged_table identifier_to_database_and_table_write_metadatar   r   rN   r   r   
load_table)rl   r   ri   r   r
  r  r   staged_tabler   r   r   r\   r\   r]   r     s   zGlueCatalog.create_tablerN   c           
      C   s\   |  |\}}t}| j|d}||}t|}t||||}	| j|||	d | j|dS )ai  Register a new table using existing metadata.

        Args:
            identifier (Union[str, Identifier]): Table identifier for the table
            metadata_location (str): The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        r   r  r  )	r  r+   r   r   r#   r   r   r   r  )
rl   r   rN   r   r   r   r   r   r   r   r\   r\   r]   register_table  s   

zGlueCatalog.register_tabletablerequirements.updatesc                 C   s*  |  }| |t\}}z| j||d}|d}| j|d}	W n ty/   d}d}d}	Y nw | |	|||}
|	rH|
j|	jkrHt|	j|	j	dS | j
|
j|
j|
j	d |	rz|sbtd| d| d	t||
j	|
j|
j||	j	d
}| j||||d nt||
j	|
j|
jd}| j|||d t|
j|
j	dS )a   Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        r   r   r   rO   N)r   rN   )r   r   metadata_pathr   r   z) because Glue table version id is missing)r   rN   r   r   rO   rP   )r   r   r   r   )r   rN   r   r   r  )rw   r  r   r	  rT   r   _update_and_stage_tabler   r$   rN   r  r   r   r   r   r  r   )rl   r  r  r  table_identifierr   r   current_glue_tableglue_table_version_idcurrent_tableupdated_staged_tableupdate_table_inputcreate_table_inputr\   r\   r]   commit_table  s`   
zGlueCatalog.commit_tablec                 C   s$   |  |t\}}| | j||dS )a  Load the table's metadata and returns the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier: Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        r  )r  r   r   r	  )rl   r   r   r   r\   r\   r]   r  5  s   zGlueCatalog.load_tablec              
   C   s\   |  |t\}}z| jj||d W dS  | jjjy- } ztd| d| |d}~ww )zDrop a table.

        Args:
            identifier: Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        r  r   r   N)r  r   r   delete_tabler   r   )rl   r   r   r   r   r\   r\   r]   
drop_tableH  s   	zGlueCatalog.drop_tablefrom_identifierto_identifierc                 C   s  |  |t\}}|  |\}}z
| jj||d}W n | jjjy3 } ztd| d| |d}~ww |d }	z| j|	d W n/ tyX } ztd| d| d|d}~w tyo } ztd| d| d	|d}~ww t	||	d
}
| j
|||
d z| | W nA ty } z5d| d| d}z| | |d| d| d7 }W n ty   |d| d| d7 }Y nw t||d}~ww | |S )a  Rename a fully classified table name.

        This method can only rename Iceberg tables in AWS Glue.

        Args:
            from_identifier: Existing table identifier.
            to_identifier: New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            ValueError: When from table identifier is invalid.
            NoSuchTableError: When a table with the name does not exist.
            NoSuchIcebergTableError: When from table is not a valid iceberg table.
            NoSuchPropertyException: When from table miss some required properties.
            NoSuchNamespaceError: When the destination namespace doesn't exist.
        r  r   r   Nr%   r  zFailed to rename table z( since it is missing required propertiesz& since it is not a valid iceberg table)r   rO   r  zFailed to drop old table z. zRolled back table creation for z'Failed to roll back table creation for z. Please clean up manually)r  r   r   r  r   r   r   r   r   r   r   r$  	Exceptionr   r  )rl   r%  r&  from_database_namefrom_table_nameto_database_namer   get_table_responser   rO   r   log_messager\   r\   r]   rename_tableW  sX   


zGlueCatalog.rename_table	namespacec              
   C   sV   |  |}z| jjt||d W dS  | jjjy* } z	td| d|d}~ww )aT  Create a namespace in the catalog.

        Args:
            namespace: Namespace identifier.
            properties: A string dictionary of properties for the given namespace.

        Raises:
            ValueError: If the identifier is invalid.
            AlreadyExistsError: If a namespace with the given name already exists.
        )DatabaseInputr   r   N)identifier_to_databaser   create_databaser   r   r   r   )rl   r.  r   r   r   r\   r\   r]   create_namespace  s   
zGlueCatalog.create_namespacec              
   C   s   |  |t}z| jj|d}|d }W n | jjjy* } ztd| |d}~ww t|dkrJ|d }| |rBtd| dtd| d| jj	|d	 dS )
a[  Drop a namespace.

        A Glue namespace can only be dropped if it is empty.

        Args:
            namespace: Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
            NamespaceNotEmptyError: If the namespace is not empty.
        r   	TableListDatabase does not exist: Nr   zCannot drop namespace z) because it still contains Iceberg tablesz- because it still contains non-Iceberg tablesr   )
r0  r   r   
get_tablesr   r   len_GlueCatalog__is_iceberg_tabler   delete_database)rl   r.  r   table_list_response
table_listr   first_tabler\   r\   r]   drop_namespace  s    

zGlueCatalog.drop_namespacec              
      s    |t g }d}z$	 |sjj dnjj |d}||d  |d}|s,nqW n jjjyE } ztd  |d}~ww  fdd	|D S )
an  List Iceberg tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
        NTr3  )r   	NextTokenr4  r?  r5  c                    s"   g | ]} |r |d  fqS r6  )r9  ).0r  r   rl   r\   r]   
<listcomp>  s   " z+GlueCatalog.list_tables.<locals>.<listcomp>)r0  r   r   r7  extendrT   r   r   )rl   r.  r<  
next_tokenr;  r   r\   rA  r]   list_tables  s(   
	zGlueCatalog.list_tablesr\   c                    s`   |rg S g }d}	 |s j  n j j|d}||d  |d}|s&nq	 fdd|D S )zList namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

        Returns:
            List[Identifier]: a List of namespace identifiers.
        NT)r?  DatabaseListr?  c                    s   g | ]	}  |d  qS r6  )identifier_to_tuple)r@  databaserl   r\   r]   rB    s    z/GlueCatalog.list_namespaces.<locals>.<listcomp>)r   get_databasesrC  rT   )rl   r.  database_listrD  databases_responser\   rI  r]   list_namespaces  s   
zGlueCatalog.list_namespacesc              
   C   s   |  |t}z	| jj|d}W n- | jjjy& } ztd| |d}~w | jjjy< } ztd| |d}~ww |d }t|di }d|v rS|d |d< d	|v r]|d	 |d	< |S )
a2  Get properties for a namespace.

        Args:
            namespace: Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid.
        r6  r5  NzInvalid input for namespace DatabaserS   r   r   r   )	r0  r   r   get_databaser   r   InvalidInputExceptionr   rT   )rl   r.  r   database_responser   rH  r   r\   r\   r]   load_namespace_properties  s"   z%GlueCatalog.load_namespace_propertiesremovalsc                 C   sF   | j |d}| j|||d\}}| |t}| jj|t||d |S )u  Remove provided property keys and updates properties for a namespace.

        Args:
            namespace: Namespace identifier.
            removals: Set of property keys that need to be removed. Optional Argument.
            updates: Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist， or identifier is invalid.
            ValueError: If removals and updates have overlapping keys.
        )r.  )current_propertiesrS  r  )r   r/  )rR  %_get_updated_props_and_update_summaryr0  r   r   update_databaser   )rl   r.  rS  r  rT  properties_update_summaryupdated_propertiesr   r\   r\   r]   update_namespace_properties  s   
z'GlueCatalog.update_namespace_propertiesc                 C      t rk   NotImplementedError)rl   r.  r\   r\   r]   
list_views,  rm   zGlueCatalog.list_viewsc                 C   rZ  rk   r[  rl   r   r\   r\   r]   	drop_view/  rm   zGlueCatalog.drop_viewc                 C   rZ  rk   r[  r^  r\   r\   r]   view_exists2  rm   zGlueCatalog.view_existsc                 C   s   |  di  td tkS )NrS    )rT   r   r   r
   )r  r\   r\   r]   __is_iceberg_table5  s   zGlueCatalog.__is_iceberg_tablec                 C   s   |  ||S )zIOverride the default warehouse location to follow Hive-style conventions.)"_get_hive_style_warehouse_location)rl   r   r   r\   r\   r]   _get_default_warehouse_location9  s   z+GlueCatalog._get_default_warehouse_locationrk   )r\   )1r   r   r   __annotations__rX   r   r   r   r%   r   r   r  r	  r   r'   r+   r,   r   r    r   r(   r-   r   r  tupler)   r*   r$   r"  r  r$  r-  r2  r>  rz   rE  rM  rR  setr   rY  r]  r_  r   r`  staticmethodr9  rd  __classcell__r\   r\   r   r]   r   7  sx   
  '$

,


O<

r   )NNN)NN)ztypingr   r   r   r   r   r   botocore.configr   pyiceberg.catalogr   r	   r
   r   r   r   r   r   r   pyiceberg.exceptionsr   r   r   r   r   r   r   r   pyiceberg.ior   r   r   r   r   pyiceberg.partitioningr   r   pyiceberg.schemar    r!   r"   pyiceberg.serializersr#   pyiceberg.tabler$   r%   pyiceberg.table.metadatar&   pyiceberg.table.sortingr'   r(   pyiceberg.table.updater)   r*   pyiceberg.typedefr+   r,   r-   pyiceberg.typesr.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   pyiceberg.utils.propertiesrA   rB   pyarrowpamypy_boto3_glue.clientrC   mypy_boto3_glue.type_defsrD   rE   rF   rG   rH   rI   r   r  r  r   r   r   r   r   r   r   r   r   r   r   r   r   ADAPTIVE_RETRY_MODELEGACY_RETRY_MODEr   rX   r^   r   rh   rz   r   r   r   r   r   r   r\   r\   r\   r]   <module>   s   ,(
T 

(
