o
    <i]                     @   s  d dl Z d dlmZ d dlmZmZmZmZ d dlZd dlm	Z	m
Z
mZmZmZmZmZ d dlmZmZmZmZmZmZmZmZmZ d dlmZmZmZmZmZ d dl m!Z!m"Z" d dl#m$Z$ d d	l%m&Z& d d
l'm(Z(m)Z)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0m1Z1 d dl2m3Z3m4Z4 d dl5m6Z6m7Z7m8Z8 d dl9m:Z: erd dl;Z<d dl=m>Z> dZ?dZ@dZAdZBdZCdZDdZEdZFdZGdZHdZIdZJdZKdZLd ZMd!ZNd"ZOd#ZPd$ZQG d%d& d&eZRd'eSd(eSd)e8d*eSd+eTeSef f
d,d-ZUd.eTeSef d/eSd0eSd+eTeSef fd1d2ZVd'eSd)e8d+eTeSef fd3d4ZWd5eTeSef d6e8d+eTeSef fd7d8ZXe@d9d:eAd9d:gZYe@d;d<eAd=d<gZZeFeAd;d<e@d=d<gd>d?id@gZ[dAeTeSeSf d+e8fdBdCZ\dDeTeSef d+eTeSeSf fdEdFZ]dGeSd+eSfdHdIZ^dGeSd+eSfdJdKZ_dS )L    N)time)TYPE_CHECKINGAnyOptionalUnion)BOTOCORE_SESSIONICEBERGMETADATA_LOCATIONPREVIOUS_METADATA_LOCATION
TABLE_TYPEMetastoreCatalogPropertiesUpdateSummary)	ConditionalCheckFailedExceptionGenericDynamoDbErrorNamespaceAlreadyExistsErrorNamespaceNotEmptyErrorNoSuchIcebergTableErrorNoSuchNamespaceErrorNoSuchPropertyExceptionNoSuchTableErrorTableAlreadyExistsError)AWS_ACCESS_KEY_ID
AWS_REGIONAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKENload_file_io)UNPARTITIONED_PARTITION_SPECPartitionSpec)Schema)FromInputFile)CommitTableResponseTableTableProperties)load_location_provider)new_table_metadata)UNSORTED_SORT_ORDER	SortOrder)TableRequirementTableUpdate)
EMPTY_DICT
Identifier
Properties)get_first_property_value)DynamoDBClientdynamodb
identifier	namespacev
updated_at
created_at	NAMESPACEznamespace-identifierPAY_PER_REQUESTz
table-nameicebergzp.ACTIVEItemzdynamodb.profile-namezdynamodb.regionzdynamodb.access-key-idzdynamodb.secret-access-keyzdynamodb.session-tokenc                       s  e Zd ZdPdeded def fddZdQd	d
ZdefddZde	e
efdeeB deedf dedB dedededefddZdeeB dedefddZdedeedf deedf defddZdeeB defddZdeeB ddfd d!Zd"eeB d#eeB defd$d%Zefd&eeB deddfd'd(Zd&eeB ddfd)d*Zd&eeB dee fd+d,Z dRd&eeB dee fd.d/Z!d&eeB defd0d1Z"defd&eeB d2e#e dB dede$fd3d4Z%d&eeB dee fd5d6Z&deeB ddfd7d8Z'deeB defd9d:Z(d;ed<ede)ee*f fd=d>Z+d;ede)ee*f fd?d@Z,d;ede)ee*f fdAdBZ-ded&ede)ee*f fdCdDZ.dEe)ee*f dFeddfdGdHZ/d&ededFeddfdIdJZ0dKe)ee*f defdLdMZ1d;ed<edefdNdOZ2  Z3S )SDynamoDbCatalogNnameclientr-   
propertiesc              
      s   t  j|fi | |dur|| _n'tj|tt|tt	|t
t|ttt|ttt|ttd}|t| _| jtt| _|   dS )zDynamodb catalog.

        Args:
            name: Name to identify the catalog.
            client: An optional boto3 dynamodb client.
            properties: Properties for dynamodb client construction and configuration.
        N)profile_nameregion_namebotocore_sessionaws_access_key_idaws_secret_access_keyaws_session_token)super__init__r.   boto3SessiongetDYNAMODB_PROFILE_NAMEr,   DYNAMODB_REGIONr   r   DYNAMODB_ACCESS_KEY_IDr   DYNAMODB_SECRET_ACCESS_KEYr   DYNAMODB_SESSION_TOKENr   r;   DYNAMODB_CLIENTr<   DYNAMODB_TABLE_NAMEDYNAMODB_TABLE_NAME_DEFAULTdynamodb_table_name&_ensure_catalog_table_exists_or_create)selfr:   r;   r<   session	__class__ W/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/pyiceberg/catalog/dynamodb.pyrD   _   s   



zDynamoDbCatalog.__init__returnc              
   C   sh   |   rd S z| jj| jttttd W d S  | jjj	| jjj
| jjjfy3 } zt|j|d }~ww )N)	TableNameAttributeDefinitions	KeySchemaGlobalSecondaryIndexesBillingMode)_dynamodb_table_existsr.   create_tablerP   $CREATE_CATALOG_ATTRIBUTE_DEFINITIONSCREATE_CATALOG_KEY_SCHEMA'CREATE_CATALOG_GLOBAL_SECONDARY_INDEXESDYNAMODB_PAY_PER_REQUEST
exceptionsResourceInUseExceptionLimitExceededExceptionInternalServerErrorr   message)rR   erV   rV   rW   rQ   x   s$   z6DynamoDbCatalog._ensure_catalog_table_exists_or_createc              
   C   s   z
| j j| jd}W n! | j jjy   Y dS  | j jjy+ } zt|j|d }~ww |d d tkr?td| j dt dS )N)rY   Fr!   TableStatuszDynamoDB table for catalog z is not T)	r.   describe_tablerP   rd   ResourceNotFoundExceptionrg   r   rh   r7   )rR   responseri   rV   rV   rW   r^      s   z&DynamoDbCatalog._dynamodb_table_existsr/   schemaz	pa.Schemalocationpartition_spec
sort_orderc              
   C   s   |  |t|tjtj}| |\}}| |||}t||d}	|		 }
t
|||||d}t| j|
d}| |||
 | j|d z| jt||||
ddt dd W n tyn } ztd	| d
| d|d}~ww | j|dS )a{  
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        )table_locationtable_properties)ro   rn   rp   rq   r<   r<   ro   database_name)rv   
table_namer<   metadata_locationattribute_not_exists()itemcondition_expressionTable . already existsN)r/   )_convert_schema_if_neededintrG   r"   FORMAT_VERSIONDEFAULT_FORMAT_VERSION identifier_to_database_and_table_resolve_table_locationr#    new_table_metadata_file_locationr$   r   r<   _write_metadata_ensure_namespace_exists_put_dynamo_item_get_create_table_itemDYNAMODB_COL_IDENTIFIERr   r   
load_table)rR   r/   rn   ro   rp   rq   r<   rv   rw   providerrx   metadataiori   rV   rV   rW   r_      s4   


zDynamoDbCatalog.create_tablerx   c                 C      t )ai  Register a new table using existing metadata.

        Args:
            identifier (Union[str, Identifier]): Table identifier for the table
            metadata_location (str): The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        NotImplementedError)rR   r/   rx   rV   rV   rW   register_table   s   zDynamoDbCatalog.register_tabletablerequirements.updatesc                 C   r   )a   Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        r   )rR   r   r   r   rV   rV   rW   commit_table   s   zDynamoDbCatalog.commit_tablec                 C   s*   |  |t\}}| j||d}| j|dS )a   
        Load the table's metadata and returns the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier: Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        rv   rw   dynamo_table_item)r   r   _get_iceberg_table_item+_convert_dynamo_table_item_to_iceberg_table)rR   r/   rv   rw   r   rV   rV   rW   r      s   zDynamoDbCatalog.load_tablec              
   C   sh   |  |t\}}z| j|| d| dt dd W dS  ty3 } ztd| d| |d}~ww )zDrop a table.

        Args:
            identifier: Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        r   attribute_exists(rz   r0   r/   r}   Table does not exist: N)r   r   _delete_dynamo_itemr   r   )rR   r/   rv   rw   ri   rV   rV   rW   
drop_table	  s   	
zDynamoDbCatalog.drop_tablefrom_identifierto_identifierc           
      C   s  |  |t\}}|  |\}}| j||d}z| j|d W n/ ty6 } ztd| d| d|d}~w tyM } ztd| d| d|d}~ww | j|d | j|d z| jt|||d	d
t	 dd W n t
y } ztd| d| d|d}~ww z| | W nE ttfy } z7d| d| d}	z| | |	d| d| d7 }	W n ttfy   |	d| d| d7 }	Y nw t|	|d}~ww | |S )a  Rename a fully classified table name.

        This method can only rename Iceberg tables in AWS Glue.

        Args:
            from_identifier: Existing table identifier.
            to_identifier: New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            ValueError: When from table identifier is invalid.
            NoSuchTableError: When a table with the name does not exist.
            NoSuchIcebergTableError: When from table is not a valid iceberg table.
            NoSuchPropertyException: When from table miss some required properties.
            NoSuchNamespaceError: When the destination namespace doesn't exist.
        r   r   zFailed to rename table r   z( since it is missing required propertiesNz& since it is not a valid iceberg tableru   )from_dynamo_table_itemto_database_nameto_table_namery   rz   r{   r~   r   zFailed to drop old table z. zRolled back table creation for z'Failed to roll back table creation for z. Please clean up manually)r   r   r   r   r   r   r   r   _get_rename_table_itemr   r   r   r   r   
ValueErrorr   )
rR   r   r   from_database_namefrom_table_namer   r   from_table_itemri   log_messagerV   rV   rW   rename_table  sb   




zDynamoDbCatalog.rename_tabler0   c              
   C   sZ   |  |}z| jt||ddt dd W dS  ty, } z	td| d|d}~ww )aT  Create a namespace in the catalog.

        Args:
            namespace: Namespace identifier.
            properties: A string dictionary of properties for the given namespace.

        Raises:
            ValueError: If the identifier is invalid.
            AlreadyExistsError: If a namespace with the given name already exists.
        )rv   r<   ry   rz   r{   	Database r   N)identifier_to_databaser   _get_create_database_itemDYNAMODB_COL_NAMESPACEr   r   )rR   r0   r<   rv   ri   rV   rV   rW   create_namespace_  s   


z DynamoDbCatalog.create_namespacec              
   C   s|   |  |t}| j|d}t|dkrtd| dz| j|tdt dd W d	S  ty= } ztd| |d	}~ww )
a[  Drop a namespace.

        A Glue namespace can only be dropped if it is empty.

        Args:
            namespace: Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
            NamespaceNotEmptyError: If the namespace is not empty.
        )r0   r   r   z is not emptyr   rz   r   zDatabase does not exist: N)	r   r   list_tableslenr   r   DYNAMODB_NAMESPACEr   r   )rR   r0   rv   table_identifiersri   rV   rV   rW   drop_namespacet  s   
zDynamoDbCatalog.drop_namespacec                 C   s   |  |t}| jd}z|j| jtt ddd|iid}W n" | jjj	| jjj
| jjj| jjjfyA } zt|j|d}~ww g }|D ]}|d D ]}t|}	|	t }
|
tkr[qL|| |
 qLqF|S )zList Iceberg tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.
        queryz = :namespace z
:namespaceS)rY   	IndexNameKeyConditionExpressionExpressionAttributeValuesNItems)r   r   r.   get_paginatorpaginaterP   DYNAMODB_NAMESPACE_GSIr   rd   &ProvisionedThroughputExceededExceptionRequestLimitExceededrg   rl   r   rh   $_convert_dynamo_item_to_regular_dictr   r   appendidentifier_to_tuple)rR   r0   rv   	paginatorpage_iteratorri   r   pager|   _dictidentifier_colrV   rV   rW   r     s>   	

zDynamoDbCatalog.list_tablesrV   c           
      C   s   |rg S | j d}z|j| jdt dddtiid}W n" | j jj| j jj| j jj	| j jj
fy? } zt|j|d}~ww g }|D ]}|d D ]}t|}|t }	|| |	 qJqD|S )	zList top-level namespaces from the catalog.

        We do not support hierarchical namespace.

        Returns:
            List[Identifier]: a List of namespace identifiers.
        r   Tz = :identifierz:identifierr   )rY   ConsistentReadr   r   Nr   )r.   r   r   rP   r   r   rd   r   r   rg   rl   r   rh   r   r   r   r   )
rR   r0   r   r   ri   database_identifiersr   r|   r   namespace_colrV   rV   rW   list_namespaces  s<   	

zDynamoDbCatalog.list_namespacesc                 C   s*   |  |t}| j|d}t|}t|dS )a;  
        Get properties for a namespace.

        Args:
            namespace: Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist, or identifier is invalid.
        ru   namespace_dict)r   r   _get_iceberg_namespace_itemr   _get_namespace_properties)rR   r0   rv   namespace_itemr   rV   rV   rW   load_namespace_properties  s   
z)DynamoDbCatalog.load_namespace_propertiesremovalsc              
   C   s   |  |t}| j|d}t|}t|d}| j|||d\}}	z| jt||	ddt dd W |S  t	yF }
 z	td| d	|
d
}
~
ww )u  
        Remove or update provided property keys for a namespace.

        Args:
            namespace: Namespace identifier
            removals: Set of property keys that need to be removed. Optional Argument.
            updates: Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist， or identifier is invalid.
            ValueError: If removals and updates have overlapping keys.
        ru   r   )current_propertiesr   r   )r   updated_propertiesr   rz   r{   r   z does not existN)
r   r   r   r   r   %_get_updated_props_and_update_summaryr   _get_update_database_itemr   r   )rR   r0   r   r   rv   r   r   r   properties_update_summaryr   ri   rV   rV   rW   update_namespace_properties  s(   



z+DynamoDbCatalog.update_namespace_propertiesc                 C   r   Nr   )rR   r0   rV   rV   rW   
list_views     zDynamoDbCatalog.list_viewsc                 C   r   r   r   rR   r/   rV   rV   rW   	drop_view  r   zDynamoDbCatalog.drop_viewc                 C   r   r   r   r   rV   rV   rW   view_exists"  r   zDynamoDbCatalog.view_existsrv   rw   c              
   C   sJ   z| j | d| |dW S  ty$ } ztd| d| |d }~ww )Nr   r/   r0   r   )_get_dynamo_itemr   r   )rR   rv   rw   ri   rV   rV   rW   r   %  s   z'DynamoDbCatalog._get_iceberg_table_itemc              
   C   s:   z| j t|dW S  ty } ztd| |d }~ww )Nr   zNamespace does not exist: )r   r   r   r   )rR   rv   ri   rV   rV   rW   r   +  s   z+DynamoDbCatalog._get_iceberg_namespace_itemc                 C   s
   |  |S r   )r   )rR   rv   rV   rV   rW   r   1     
z(DynamoDbCatalog._ensure_namespace_existsc              
   C   s   z%| j j| jdtd|itd|iid}t|v r|t W S td| d|  | j jjy? } ztd| d| |d }~w | j jj	| j jj
| j jjfy\ } zt|j|d }~ww )NTr   )rY   r   KeyzItem not found. identifier: z - namespace: )r.   get_itemrP   r   r   ITEMr   rd   rl   r   r   rg   r   rh   )rR   r/   r0   rm   ri   rV   rV   rW   r   4  s4   
z DynamoDbCatalog._get_dynamo_itemr|   r}   c                 C   s   z| j j| j||d W d S  | j jjy' } ztd| d| |d }~w | j jj| j jj| j jj| j jj| j jj	| j jj
fyP } zt|j|d }~ww )N)rY   r8   ConditionExpression#Condition expression check failed:  - )r.   put_itemrP   rd   r   r   r   rg   rl   (ItemCollectionSizeLimitExceededExceptionTransactionConflictExceptionr   rh   )rR   r|   r}   ri   rV   rV   rW   r   O  s    z DynamoDbCatalog._put_dynamo_itemc                 C   s   z| j j| jtd|itd|ii|d W d S  | j jjy/ } ztd| d| |d }~w | j jj| j jj| j jj	| j jj
| j jj| j jjfyX } zt|j|d }~ww )Nr   )rY   r   r   r   r   )r.   delete_itemrP   r   r   rd   r   r   r   rg   rl   r   r   r   rh   )rR   r0   r/   r}   ri   rV   rV   rW   r   ^  s<   z#DynamoDbCatalog._delete_dynamo_itemr   c              	   C   s   t |}dd ttfD tttg D ]}|| vr$td| d| q|tt }|t }|tt }| 	|t
\}}| tkrStd| dt d| d| t| j|d	}	|	|}
t|
}t||f||| |j|| d
S )Nc                 S   s   g | ]}t |qS rV   )_add_property_prefix).0proprV   rV   rW   
<listcomp>}  s    zODynamoDbCatalog._convert_dynamo_table_item_to_iceberg_table.<locals>.<listcomp>zIceberg required property z is missing: zProperty table_type is z, expected z: r   rt   )r/   r   rx   r   catalog)r   r   r	   r   r   DYNAMODB_COL_CREATED_ATkeysr   r   r   r   lowerr   r   r   r<   	new_inputr   table_metadatar!   _load_file_io)rR   r   
table_dictr   
table_typer/   rx   rv   rw   r   filer   rV   rV   rW   r   z  s6   


z;DynamoDbCatalog._convert_dynamo_table_item_to_iceberg_tablec                 C   s   |  ||S )zIOverride the default warehouse location to follow Hive-style conventions.)"_get_hive_style_warehouse_location)rR   rv   rw   rV   rV   rW   _get_default_warehouse_location  s   z/DynamoDbCatalog._get_default_warehouse_locationr   )rX   N)rV   )4__name__
__module____qualname__strr   rD   rQ   boolr^   r   r%   r)   r*   r   r   r   r&   r+   r!   r_   r   tupler'   r(   r    r   r   r   r   r   r   listr   r   r   setr   r   r   r   r   dictr   r   r   r   r   r   r   r   r   __classcell__rV   rV   rT   rW   r9   ^   s|     


;


B,*

% r9   rv   rw   r<   rx   rX   c                 C   s   t tt d }td|  d| itd| itdt t itd|it	d|ii}|
 D ]\}}d|i|t|< q,dt i|tt< d|i|tt< ddi|tt< |S )N  r   r   N )r  roundr   r   r   DYNAMODB_COL_VERSIONuuiduuid4r   DYNAMODB_COL_UPDATED_ATitemsr   r   upperr   r	   r
   )rv   rw   r<   rx   current_timestamp_msr   keyvalrV   rV   rW   r     s.   r   r   r   r   c                 C   s\   | }t tt d }| d| |t d< ||t d< t t |t d< ||t d< |S )Nr
  r   r   r  )	r  r  r   r   r   r  r  r  r  )r   r   r   r   r  rV   rV   rW   r     s   r   c                 C   sl   t tt d }tdtitd| itdt t it	d|it
d|ii}| D ]\}}d|i|t|< q'|S Nr
  r   r  )r  r  r   r   r   r   r  r  r  r   r  r  r   )rv   r<   r  r   r  r  rV   rV   rW   r     s(   r   r   r   c                 C   sl   t tt d }t| t t| t tdt t it| t t	d|ii}|
 D ]\}}d|i|t|< q'|S r  )r  r  r   r   r   r  r  r  r   r  r  r   )r   r   r  r   r  r  rV   rV   rW   r     s   r   r   )AttributeNameAttributeTypeHASH)r  KeyTypeRANGEProjectionType	KEYS_ONLY)r   r[   
Projectionr   c                 C   s   dd |   D S )Nc                 S   s$   i | ]\}}| trt||qS rV   )
startswithPROPERTY_KEY_PREFIX_remove_property_prefix)r   r  r  rV   rV   rW   
<dictcomp>"  s   $ z-_get_namespace_properties.<locals>.<dictcomp>)r  r   rV   rV   rW   r   !  s   r   dynamo_jsonc                 C   s   i }|   D ]>\}}t| }t|dkrtd| |d }|dvr)tdt| }t|dkr<td| |d }|||< q|S )a  Convert a dynamo json to a regular json.

    Example of a dynamo json:
    {
        "AlbumTitle": {
            "S": "Songs About Life",
        },
        "Artist": {
            "S": "Acme Band",
        },
        "SongTitle": {
            "S": "Happy Day",
        }
    }

    Converted to regular json:
    {
        "AlbumTitle": "Songs About Life",
        "Artist": "Acme Band",
        "SongTitle": "Happy Day"
    }

    Only "S" and "N" data types are supported since those are the only ones that Iceberg is utilizing.
       zExpecting only 1 key: r   )r   r  z&Only S and N data types are supported.zExpecting only 1 value: )r  r  r   r   r   values)r$  regular_jsoncolumn_nameval_dictr   	data_typer&  column_valuerV   rV   rW   r   %  s   
r   r   c                 C   s   t |  S r   )r!  r   rV   rV   rW   r   S  s   r   c                 C   s
   |  tS r   )lstripr!  r,  rV   rV   rW   r"  W  r   r"  )`r  r   typingr   r   r   r   rE   pyiceberg.catalogr   r   r	   r
   r   r   r   pyiceberg.exceptionsr   r   r   r   r   r   r   r   r   pyiceberg.ior   r   r   r   r   pyiceberg.partitioningr   r   pyiceberg.schemar   pyiceberg.serializersr   pyiceberg.tabler    r!   r"   pyiceberg.table.locationsr#   pyiceberg.table.metadatar$   pyiceberg.table.sortingr%   r&   pyiceberg.table.updater'   r(   pyiceberg.typedefr)   r*   r+   pyiceberg.utils.propertiesr,   pyarrowpamypy_boto3_dynamodb.clientr-   rM   r   r   r  r  r   r   r   rc   rN   rO   r!  r7   r   rH   rI   rJ   rK   rL   r9   r  r  r   r   r   r   r`   ra   rb   r   r   r   r"  rV   rV   rV   rW   <module>   s   $,	    &E*
&".