o
    <iB                     @   s  d dl mZmZ d dlmZmZmZmZmZm	Z	m
Z
 d dlmZmZmZmZ d dlmZmZmZmZmZ d dlmZmZmZmZmZ d dlmZmZmZm Z m!Z!m"Z"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d d	l)m*Z* d d
l+m,Z, d dl-m.Z.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6m7Z7 d dl8m9Z9m:Z: d dl;m<Z<m=Z=m>Z> d dl?m@Z@ erd dlAZBdZCdZDdZEG dd deeZFG dd deFZGG dd deFZHG dd deZIdS )    )TYPE_CHECKINGUnion)Stringcreate_enginedeleteinsertselectunionupdate)IntegrityErrorNoResultFoundOperationalErrorProgrammingError)DeclarativeBaseMappedMappedAsDataclassSessionmapped_column)METADATA_LOCATIONURICatalogMetastoreCatalogPropertiesUpdateSummary)CommitFailedExceptionNamespaceAlreadyExistsErrorNamespaceNotEmptyErrorNoSuchNamespaceErrorNoSuchPropertyExceptionNoSuchTableErrorTableAlreadyExistsError)load_file_io)UNPARTITIONED_PARTITION_SPECPartitionSpec)Schema)FromInputFile)CommitTableResponseTableTableProperties)load_location_provider)new_table_metadata)UNSORTED_SORT_ORDER	SortOrder)TableRequirementTableUpdate)
EMPTY_DICT
Identifier
Properties)	strtoboolNfalsetruec                   @   s   e Zd ZdS )SqlCatalogBaseTableN)__name__
__module____qualname__ r8   r8   R/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/pyiceberg/catalog/sql.pyr4   P   s    r4   c                   @   s   e Zd ZU dZeeddddZee e	d< eeddddZ
ee e	d< eeddddZee e	d< eed	dd
ZeedB  e	d< eed	dd
ZeedB  e	d< dS )IcebergTablesiceberg_tables   FTnullableprimary_keycatalog_nametable_namespace
table_name  r>   Nmetadata_locationprevious_metadata_location)r5   r6   r7   __tablename__r   r   r@   r   str__annotations__rA   rB   rE   rF   r8   r8   r8   r9   r:   T   s   
  $r:   c                   @   s   e Zd ZU dZddiZeeddddZee	 e
d< eeddddZee	 e
d	< eeddddZee	 e
d
< eedddZee	 e
d< dS )IcebergNamespacePropertiesiceberg_namespace_propertiesexistsr3   r<   FTr=   r@   	namespaceproperty_keyrC   rD   property_valueN)r5   r6   r7   rG   NAMESPACE_MINIMAL_PROPERTIESr   r   r@   r   rH   rI   rM   rN   rO   r8   r8   r8   r9   rJ   ^   s   
  rJ   c                       s:  e Zd ZdZdedef fddZdCdd	ZdCd
dZdCddZde	de
fddZdeeefdeeB deedf dedB dededede
fddZdeeB dede
fddZdeeB de
fddZdeeB ddfddZd eeB d!eeB de
fd"d#Zd$e
d%eed&f d'eed&f defd(d)ZdeeB defd*d+Z efd,eeB deddfd-d.Z!d,eeB ddfd/d0Z"d,eeB de#e fd1d2Z$dDd,eeB de#e fd4d5Z%d,eeB defd6d7Z&defd,eeB d8e'e dB d'ede(fd9d:Z)d,eeB de#e fd;d<Z*deeB defd=d>Z+deeB ddfd?d@Z,dCdAdBZ-  Z.S )E
SqlCataloga  Implementation of a SQL based catalog.

    In the `JDBCCatalog` implementation, a `Namespace` is composed of a list of strings separated by dots: `'ns1.ns2.ns3'`.
    And you can have as many levels as you want, but you need at least one.  The `SqlCatalog` honors the same convention.

    In the `JDBCCatalog` implementation, a `TableIdentifier` is composed of an optional `Namespace` and a table name.
    When a `Namespace` is present, the full name will be `'ns1.ns2.ns3.table'`.
    A valid `TableIdentifier` could be `'name'` (no namespace).
    The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`.
    name
propertiesc                    s   t  j|fi | | jt }stdt| jdt }|dkr)t	|nd}t	| jdt
}t	| jdt}t|||d| _|rM|   d S d S )NzSQL connection URI is requiredechodebugpool_pre_pinginit_catalog_tables)rT   rV   )super__init__rS   getr   r   rH   DEFAULT_ECHO_VALUElowerr1   DEFAULT_POOL_PRE_PING_VALUEDEFAULT_INIT_CATALOG_TABLESr   engine_ensure_tables_exist)selfrR   rS   uri_propecho_strrT   rV   rW   	__class__r8   r9   rY   u   s   zSqlCatalog.__init__returnNc                 C   s   t | j5}ttfD ]'}td|}z|| W q
 ttfy1   | 	  Y  W d    d S w W d    d S 1 s=w   Y  d S N   )
r   r_   r:   rJ   r   select_fromscalarr   r   create_tables)ra   sessiontablestmtr8   r8   r9   r`      s   
"zSqlCatalog._ensure_tables_existc                 C      t j| j d S N)r4   metadata
create_allr_   ra   r8   r8   r9   rk         zSqlCatalog.create_tablesc                 C   ro   rp   )r4   rq   drop_allr_   rs   r8   r8   r9   destroy_tables   rt   zSqlCatalog.destroy_tables	orm_tablec                 C   s   |j  }stdt d|j }stdtj d|j }s)tdtj dt| j|d}||}t	
|}tt||f ||| |j|| dS )NzTable property z is missingrS   location)
identifierrq   rE   iocatalog)rE   r   r   rA   r:   rB   r    rS   	new_inputr$   table_metadatar&   r   identifier_to_tuple_load_file_io)ra   rw   rE   rA   rB   r{   filerq   r8   r8   r9   _convert_orm_to_iceberg   s    




z"SqlCatalog._convert_orm_to_icebergrz   schemaz	pa.Schemary   partition_spec
sort_orderc                 C   s0  |  |t|tjtj}t|}t|}| 	|s$t
d| t|}	| ||	|}t||d}
|
 }t|||||d}t| j|d}| ||| t| j4}z|t| j|	||dd |  W n ty } ztd|	 d| d	|d}~ww W d   n1 sw   Y  | j|d
S )a{  
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        Namespace does not exist: )table_locationtable_properties)ry   r   r   r   rS   rx   Nr@   rA   rB   rE   rF   Table . already existsrz   )_convert_schema_if_neededintrZ   r'   FORMAT_VERSIONDEFAULT_FORMAT_VERSIONr   namespace_fromtable_name_fromnamespace_existsr   namespace_to_string_resolve_table_locationr(    new_table_metadata_file_locationr)   r    rS   _write_metadatar   r_   addr:   rR   commitr   r   
load_table)ra   rz   r   ry   r   r   rS   namespace_identifierrB   rM   location_providerrE   rq   r{   rl   er8   r8   r9   create_table   sH   




	zSqlCatalog.create_tablerE   c                 C   s   t |}t |}t |}| |std| t| j4}z|t	| j
|||dd |  W n tyL } ztd| d| d|d}~ww W d   n1 sWw   Y  | j|dS )a  Register a new table using existing metadata.

        Args:
            identifier (Union[str, Identifier]): Table identifier for the table
            metadata_location (str): The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
            NoSuchNamespaceError: If namespace does not exist
        r   Nr   r   r   r   r   )r   r   r   r   r   r   r   r_   r   r:   rR   r   r   r   r   )ra   rz   rE   namespace_tuplerM   rB   rl   r   r8   r8   r9   register_table   s0   



	zSqlCatalog.register_tablec                 C   s   t |}t |}t |}t| j }tttj	| j
ktj|ktj|k}||}W d   n1 s7w   Y  |rC| |S td| d| )a  Load the table's metadata and return the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except NoSuchTableError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier (str | Identifier): Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        NTable does not exist: r   )r   r   r   r   r   r_   r   r:   wherer@   rR   rA   rB   rj   r   r   )ra   rz   r   rM   rB   rl   rn   resultr8   r8   r9   r     s   




zSqlCatalog.load_tablec           	      C   s  t |}t |}t |}t| jq}| jjjr@|t	t
t
j| jkt
j|kt
j|k}|jdk r?td| d| n9z!|t
jt
dt
j| jkt
j|kt
j|k }|	| W n tyx } ztd| d| |d}~ww |  W d   dS 1 sw   Y  dS )zDrop a table.

        Args:
            identifier (str | Identifier): Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
        rh   r   r   ofN)r   r   r   r   r   r_   dialectsupports_sane_rowcountexecuter   r:   r   r@   rR   rA   rB   rowcountr   querywith_for_updatefilteroner   r   )	ra   rz   r   rM   rB   rl   restblr   r8   r8   r9   
drop_table/  sB   
	







"zSqlCatalog.drop_tablefrom_identifierto_identifierc                 C   s  t |}t |}t |}t |}t |}t |}| |s*td| t| j}	zl| jjj	r`t
ttj| jktj|ktj|kj||d}
|	|
}|jdk r_td| n7z"|	tjtdtj| jktj|ktj|k }||_||_W n ty } ztd| |d}~ww |	  W n ty } ztd| d| d	|d}~ww W d   n1 sw   Y  | |S )
a  Rename a fully classified table name.

        Args:
            from_identifier (str | Identifier): Existing table identifier.
            to_identifier (str | Identifier): New table identifier.

        Returns:
            Table: the updated table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist.
            TableAlreadyExistsError: If a table with the new name already exist.
            NoSuchNamespaceError: If the target namespace does not exist.
        r   )rA   rB   rh   r   r   Nr   r   r   )r   r   r   r   r   r   r   r_   r   r   r
   r:   r   r@   rR   rA   rB   valuesr   r   r   r   r   r   r   r   r   r   r   r   )ra   r   r   from_namespace_tuplefrom_namespacefrom_table_nameto_namespace_tupleto_namespaceto_table_namerl   rn   r   r   r   r8   r8   r9   rename_tableW  sb   









	




"zSqlCatalog.rename_tablerm   requirements.updatesc                 C   s.  |  }t|}t|}t|}z| |}W n ty%   d}Y nw | ||  ||}	|r@|	j|jkr@t	|j|j
dS | j|	j|	j|	j
d t| j}
|r| jjjrtttj| j ktj|ktj|ktj
|j
kj|	j
|j
d}|
|}|jdk rtd| d| nAz)|
tjtdtj| j ktj|ktj|ktj
|j
k }|	j
|_
|j
|_W n t y } ztd| d| |d}~ww |
!  n-z|
"t| j |||	j
dd	 |
!  W n t#y } zt$d
| d| d|d}~ww W d   n	1 s
w   Y  t	|	j|	j
dS )a   Commit updates to a table.

        Args:
            table (Table): The table to be updated.
            requirements: (Tuple[TableRequirement, ...]): Table requirements.
            updates: (Tuple[TableUpdate, ...]): Table updates.

        Returns:
            CommitTableResponse: The updated metadata.

        Raises:
            NoSuchTableError: If a table with the given identifier does not exist.
            CommitFailedException: Requirement not met, or a conflict with a concurrent commit.
        N)rq   rE   )rq   r{   metadata_path)rE   rF   rh   z+Table has been updated by another process: r   r   r   r   r   )%rR   r   r   r   r   r   r   _update_and_stage_tablerq   r%   rE   r   r{   r   r_   r   r   r
   r:   r   r@   rA   rB   r   r   r   r   r   r   r   r   rF   r   r   r   r   r   )ra   rm   r   r   table_identifierr   rM   rB   current_tableupdated_staged_tablerl   rn   r   r   r   r8   r8   r9   commit_table  s   










	6zSqlCatalog.commit_tablec              	   C   s  t |}t |t}|ddddddd }t| ja}tt	tj
| jktj|ktjj|ddB d	}|| }|rM	 W d    d
S tt	tj
| jktj|ktjj|ddB d	}|| }|ry	 W d    d
S W d    dS 1 sw   Y  dS )N!z!!_z!_%z!%z.%)escaperh   TF)r   r   r   r   replacer   r_   r   r:   r   r@   rR   rA   likelimitr   allrJ   rM   )ra   rz   r   rM   namespace_starts_withrl   rn   r   r8   r8   r9   r     sF   
 
	
	
zSqlCatalog.namespace_existsrM   c              
   C   s   |  |rtd| d|stj}|r|ntj}t| j&}| D ]\}}|t| jt	
|t||d q#|  W d   dS 1 sGw   Y  dS )aG  Create a namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier.
            properties (Properties): A string dictionary of properties for the given namespace.

        Raises:
            NamespaceAlreadyExistsError: If a namespace with the given name already exists.
        
Namespace r   )r@   rM   rN   rO   N)r   r   rJ   rP   r   r_   itemsr   rR   r   r   r   r   )ra   rM   rS   create_propertiesrl   keyvaluer8   r8   r9   create_namespace  s"   



"zSqlCatalog.create_namespacec                 C   s   |  |std| t|}| | }r%td| dt| dt| j}|	t
ttj| jktj|k |  W d   dS 1 sLw   Y  dS )a  Drop a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
            NamespaceNotEmptyError: If the namespace is not empty.
        r   r   z is not empty. z tables exist.N)r   r   r   r   list_tablesr   lenr   r_   r   r   rJ   r   r@   rR   rM   r   )ra   rM   namespace_strtablesrl   r8   r8   r9   drop_namespace.  s   




"zSqlCatalog.drop_namespacec                 C   s   |r|  |std| t|}tttj| jktj	|k}t
| j}||}dd |D W  d   S 1 s>w   Y  dS )aH  List tables under the given namespace in the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: list of table identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        r   c                 S   s    g | ]}t |j|jf qS r8   )r   r   rA   rB   ).0rm   r8   r8   r9   
<listcomp>[  s     z*SqlCatalog.list_tables.<locals>.<listcomp>N)r   r   r   r   r   r:   r   r@   rR   rA   r   r_   scalars)ra   rM   rn   rl   r   r8   r8   r9   r   H  s   

$zSqlCatalog.list_tablesr8   c                    s  |r|  |std| ttjtj| jk}ttj	tj| jk}|rBt
|td }|tj|}|tj	|}t||}t| j+}t
| t d t fdddd || D D }|W  d   S 1 szw   Y  dS )a|  List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

        Args:
            namespace (str | Identifier): Namespace identifier to search.

        Returns:
            List[Identifier]: a List of namespace identifiers.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        r   r   rh   c                    s8   h | ]}t |kr|d d   kr|d  qS rg   )r   r   nsr   sub_namespaces_level_lengthr8   r9   	<setcomp>{  s    
z-SqlCatalog.list_namespaces.<locals>.<setcomp>c                 S   s   h | ]}t |qS r8   )r   r   r   r8   r8   r9   r   }      N)r   r   r   r:   rA   r   r@   rR   rJ   rM   r   r   r   r	   r   r_   r   r   listr   r   )ra   rM   
table_stmtnamespace_stmtnamespace_likern   rl   
namespacesr8   r   r9   list_namespaces]  s,   

$zSqlCatalog.list_namespacesc                 C   s   t |}| |std| dtttj| jktj	|k}t
| j}||}dd |D W  d   S 1 s=w   Y  dS )a+  Get properties for a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.

        Returns:
            Properties: Properties for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
        r    does not existsc                 S   s   i | ]}|j |jqS r8   )rN   rO   )r   propsr8   r8   r9   
<dictcomp>  r   z8SqlCatalog.load_namespace_properties.<locals>.<dictcomp>N)r   r   r   r   r   rJ   r   r@   rR   rM   r   r_   r   )ra   rM   r   rn   rl   r   r8   r8   r9   load_namespace_properties  s   


$z$SqlCatalog.load_namespace_propertiesremovalsc           
   
      s   t | |std  dj|d}j|||dd }tj_}|rCtt	
t	jjkt	j kt	j|}|| |rztt	
t	jjkt	j kt	jt| }||  fdd| D }tt	|}	||	 |  W d   |S 1 sw   Y  |S )	a  Remove provided property keys and update properties for a namespace.

        Args:
            namespace (str | Identifier): Namespace identifier.
            removals (Set[str]): Set of property keys that need to be removed. Optional Argument.
            updates (Properties): Properties to be updated for the given namespace.

        Raises:
            NoSuchNamespaceError: If a namespace with the given name does not exist.
            ValueError: If removals and updates have overlapping keys.
        r   r   )rM   )current_propertiesr   r   r   c              
      s.   g | ]\}}t jjt j t j|t j|iqS r8   )rJ   r@   rR   rM   rN   rO   )r   rN   rO   r   ra   r8   r9   r     s    z:SqlCatalog.update_namespace_properties.<locals>.<listcomp>N)r   r   r   r   r   %_get_updated_props_and_update_summaryr   r_   r   rJ   r   r@   rR   rM   rN   in_r   setkeysr   r   r   r   )
ra   rM   r   r   r   properties_update_summaryrl   delete_stmtinsert_stmt_valuesinsert_stmtr8   r   r9   update_namespace_properties  sD   






	


z&SqlCatalog.update_namespace_propertiesc                 C      t rp   NotImplementedError)ra   rM   r8   r8   r9   
list_views     zSqlCatalog.list_viewsc                 C   r   rp   r   ra   rz   r8   r8   r9   view_exists  r   zSqlCatalog.view_existsc                 C   r   rp   r   r   r8   r8   r9   	drop_view  r   zSqlCatalog.drop_viewc                 C   s   t | dr| j  dS dS )ar  Close the catalog and release database connections.

        This method closes the SQLAlchemy engine and disposes of all connection pools.
        This ensures that any cached connections are properly closed, which is especially
        important for blobfuse scenarios where file handles need to be closed for
        data to be flushed to persistent storage.
        r_   N)hasattrr_   disposers   r8   r8   r9   close  s   
zSqlCatalog.close)rf   N)r8   )/r5   r6   r7   __doc__rH   rY   r`   rk   rv   r:   r&   r   r!   r*   r.   r/   r   r#   r"   r+   r0   r   r   r   r   r   tupler,   r-   r%   r   boolr   r   r   r   r   r   r   r   r   r   r   r   r   r  __classcell__r8   r8   rd   r9   rQ   i   sv    




A%(;


` )

8rQ   )Jtypingr   r   
sqlalchemyr   r   r   r   r   r	   r
   sqlalchemy.excr   r   r   r   sqlalchemy.ormr   r   r   r   r   pyiceberg.catalogr   r   r   r   r   pyiceberg.exceptionsr   r   r   r   r   r   r   pyiceberg.ior    pyiceberg.partitioningr!   r"   pyiceberg.schemar#   pyiceberg.serializersr$   pyiceberg.tabler%   r&   r'   pyiceberg.table.locationsr(   pyiceberg.table.metadatar)   pyiceberg.table.sortingr*   r+   pyiceberg.table.updater,   r-   pyiceberg.typedefr.   r/   r0   pyiceberg.typesr1   pyarrowpar[   r]   r^   r4   r:   rJ   rQ   r8   r8   r8   r9   <module>   s4   $	$	
