o
    ˳iL                     @   s  d dl Z d dlmZmZmZ d dlmZ d dlmZm	Z	m
Z
mZ d dlmZ d dlmZmZ d dlmZmZ d dlmZ d d	lmZ d d
lmZmZmZ d dlmZmZmZm Z  d dl!m"Z" d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+mZ d dl,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2m3Z3m4Z4 d dl5m6Z6m7Z7 d dl8m9Z9m:Z: d dl;m<Z<m=Z=m>Z> d dl?m@Z@ erd dlAZBdZCdZDdZEdZFdZGdZHdZIdZJd ZKd!ZLd"ZMG d#d$ d$eZNdS )%    N)TYPE_CHECKINGAnyUnion)NotFound)ClientDatasetDatasetReferenceTableReference)Table)ExternalCatalogDatasetOptionsExternalCatalogTableOptions)	SerDeInfoStorageDescriptor)Conflict)service_account)WAREHOUSE_LOCATIONMetastoreCatalogPropertiesUpdateSummary)NamespaceAlreadyExistsErrorNoSuchNamespaceErrorNoSuchTableErrorTableAlreadyExistsError)load_file_io)UNPARTITIONED_PARTITION_SPECPartitionSpec)Schema)FromInputFile)CommitTableResponser
   )load_location_provider)TableMetadatanew_table_metadata)TOTAL_DATA_FILESTOTAL_FILE_SIZETOTAL_RECORDS)UNSORTED_SORT_ORDER	SortOrder)TableRequirementTableUpdate)
EMPTY_DICT
Identifier
Properties)Configzgcp.bigquery.project-idzgcp.bigquery.locationzgcp.bigquery.credential-filezgcp.bigquery.credentials-infometadata_locationprevious_metadata_location
table_typeICEBERGz+org.apache.iceberg.mr.hive.HiveIcebergSerDez1org.apache.iceberg.mr.hive.HiveIcebergInputFormatz2org.apache.iceberg.mr.hive.HiveIcebergOutputFormatc                       s  e Zd Zdedef fddZdeeefdeeB de	e
df d	edB d
edededefddZefdeeB deddfddZdeeB defddZdeeB ddfddZdedeedf deedf defddZdeeB deeB defddZdeeB ddfd d!ZdeeB dee fd"d#ZdLdeeB dee fd%d&ZdeeB d'edefd(d)ZdeeB dee fd*d+ZdeeB ddfd,d-ZdeeB de fd.d/Z!deeB defd0d1Z"defdeeB d2e#e dB dede$fd3d4Z%d5e&d6ed7e'de(fd8d9Z)d	ed:e*ee+f de,fd;d<Z-d=ed>e*ee+f d?e.de/fd@dAZ0deeB de(defdBdCZ1d6edDe&de*ee+f fdEdFZ2d	edB d?e.dedB fdGdHZ3dIedefdJdKZ4  Z5S )MBigQueryMetastoreCatalogname
propertiesc           
   
      s,  t  j|fi | | jt}| jt}| jt}| jt}|s+tdt t	 
ds5td|r=|r=tdd }|rHtj|}n>|rzt|}tj|}W n. tjyo }	 ztdt d|	 |	d }	~	w ty }	 ztdt d|	 |	d }	~	ww t|||d| _|| _|| _d S )	NzMissing property: zlegacy-current-snapshot-idzAlegacy-current-snapshot-id must be enabled to work with BigQuery.zWCannot specify both `gcp.bigquery.credentials-file` and `gcp.bigquery.credentials-info`zInvalid JSON string for z: z"Invalid credentials structure for )projectcredentialslocation)super__init__r2   getGCP_PROJECT_IDGCP_LOCATIONGCP_CREDENTIALS_FILEGCP_CREDENTIALS_INFO
ValueErrorr+   get_boolr   Credentialsfrom_service_account_filejsonloadsfrom_service_account_infoJSONDecodeError	TypeErrorr   clientr5   
project_id)
selfr1   r2   rG   r5   credentials_filecredentials_info_strgcp_credentialscredentials_info_dicte	__class__ X/home/ubuntu/.local/lib/python3.10/site-packages/pyiceberg/catalog/bigquery_metastore.pyr7   >   s@   

z!BigQueryMetastoreCatalog.__init__N
identifierschemaz	pa.Schemar5   partition_spec
sort_orderreturnc              
   C   s   |  |}| |\}}t| j|d}	| |||}t||d}
|
 }t|||||d}t| j	|d}| 
||| t| j|d}	z| ||t|	|d}| j| W n tyj } z	td| d|d}~ww | j|d	S )
a{  
        Create an Iceberg table.

        Args:
            identifier: Table identifier.
            schema: Table's schema.
            location: Location for the table. Optional Argument.
            partition_spec: PartitionSpec for the table.
            sort_order: SortOrder for the table.
            properties: Table properties that can be a string based dictionary.

        Returns:
            Table: the created table instance.

        Raises:
            AlreadyExistsError: If a table with the name already exists.
            ValueError: If the identifier is invalid, or no path is given to store metadata.

        r3   
dataset_id)table_locationtable_properties)r5   rS   rT   rU   r2   r2   r5   dataset_reftable_idTable  already existsNrR   )_convert_schema_if_needed identifier_to_database_and_tabler   rG   _resolve_table_locationr    new_table_metadata_file_locationr    r   r2   _write_metadata_make_new_tabler	   rF   create_tabler   r   
load_table)rH   rR   rS   r5   rT   rU   r2   dataset_name
table_namer]   providerr,   metadataiotablerM   rP   rP   rQ   rh   e   s,   

z%BigQueryMetastoreCatalog.create_table	namespacec              
   C   sn   |  |}z t| j|d}t|d}| | ||||_| j| W dS  t	y6 } zt
d|d}~ww )aT  Create a namespace in the catalog.

        Args:
            namespace: Namespace identifier.
            properties: A string dictionary of properties for the given namespace.

        Raises:
            ValueError: If the identifier is invalid.
            AlreadyExistsError: If a namespace with the given name already exists.
        rW   r]   z(Namespace {database_name} already existsN)identifier_to_databaser   rG   r   (_create_external_catalog_dataset_options+_get_default_warehouse_location_for_dataset external_catalog_dataset_optionsrF   create_datasetr   r   )rH   rp   r2   database_namer]   datasetrM   rP   rP   rQ   create_namespace   s   


z)BigQueryMetastoreCatalog.create_namespacec              
   C   s   |  |t\}}|  |t\}}ztt| j|d|d}| j|}| ||W S  ty? } ztd| d| |d}~ww )a   
        Load the table's metadata and returns the table instance.

        You can also use this method to check for table existence using 'try catalog.table() except TableNotFoundError'.
        Note: This method doesn't scan data stored in the table.

        Args:
            identifier: Table identifier.

        Returns:
            Table: the table instance with its metadata.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        rW   r\   Table does not exist: .N)	rc   r   r	   r   rG   rF   	get_table(_convert_bigquery_table_to_iceberg_tabler   )rH   rR   rw   rk   rj   	table_refro   rM   rP   rP   rQ   ri      s   z#BigQueryMetastoreCatalog.load_tablec              
   C   sh   |  |t\}}ztt| j|d|d}| j| W dS  ty3 } ztd| d| |d}~ww )zDrop a table.

        Args:
            identifier: Table identifier.

        Raises:
            NoSuchTableError: If a table with the name does not exist, or the identifier is invalid.
        rW   r\   rz   r{   N)rc   r   r	   r   rG   rF   delete_table)rH   rR   rj   rk   r~   rM   rP   rP   rQ   
drop_table   s   	z#BigQueryMetastoreCatalog.drop_tablero   requirements.updatesc                 C      t NNotImplementedError)rH   ro   r   r   rP   rP   rQ   commit_table      z%BigQueryMetastoreCatalog.commit_tablefrom_identifierto_identifierc                 C   r   r   r   )rH   r   r   rP   rP   rQ   rename_table      z%BigQueryMetastoreCatalog.rename_tablec              
   C   s`   |  |}zt| j|d}t|d}| j| W d S  ty/ } z	td| d|d }~ww )NrW   rq   
Namespace z does not exist.)rr   r   rG   r   rF   delete_datasetr   r   )rH   rp   rw   r]   rx   rM   rP   rP   rQ   drop_namespace   s   

z'BigQueryMetastoreCatalog.drop_namespacec                 C   sl   |  |}g }zt| j|d}| jj|d}|D ]
}|||jf qW |S  ty5   td| dd w )NrW   )rx   Namespace (dataset) '' not found.)	rr   r   rG   rF   list_tablesappendr^   r   r   )rH   rp   rw   iceberg_tablesr]   bq_tables_iteratorbq_table_list_itemrP   rP   rQ   r      s   
z$BigQueryMetastoreCatalog.list_tablesrP   c                 C   s.   |rt d| dd | j }dd |D S )Nr   r   c                 S   s   g | ]}|j fqS rP   )rX   ).0rx   rP   rP   rQ   
<listcomp>  s    z<BigQueryMetastoreCatalog.list_namespaces.<locals>.<listcomp>)r   rF   list_datasets)rH   rp   datasets_iteratorrP   rP   rQ   list_namespaces  s   
z(BigQueryMetastoreCatalog.list_namespacesr,   c              
   C   s   |  |\}}t| j|d}| j|d}||}t|}z| ||t||d}	| j	
|	 W n tyF }
 z	td| d|
d}
~
ww | j|dS )ai  Register a new table using existing metadata.

        Args:
            identifier (Union[str, Identifier]): Table identifier for the table
            metadata_location (str): The location to the metadata

        Returns:
            Table: The newly registered table

        Raises:
            TableAlreadyExistsError: If the table already exists
        rW   )r5   r\   r_   r`   Nra   )rc   r   rG   _load_file_io	new_inputr   table_metadatarg   r	   rF   rh   r   r   ri   )rH   rR   r,   rj   rk   r]   rn   filerm   ro   rM   rP   rP   rQ   register_table  s   

z'BigQueryMetastoreCatalog.register_tablec                 C   r   r   r   )rH   rp   rP   rP   rQ   
list_views.  r   z#BigQueryMetastoreCatalog.list_viewsc                 C   r   r   r   rH   rR   rP   rP   rQ   	drop_view1  r   z"BigQueryMetastoreCatalog.drop_viewc                 C   r   r   r   r   rP   rP   rQ   view_exists4  r   z$BigQueryMetastoreCatalog.view_existsc              
   C   sn   |  |}z| jt| j|d}|r|jr|j W S W i S W i S  ty6 } z	td| d|d }~ww )NrW   r   z
 not found)	rr   rF   get_datasetr   rG   ru   to_api_reprr   r   )rH   rp   rj   rx   rM   rP   rP   rQ   load_namespace_properties7  s   

z2BigQueryMetastoreCatalog.load_namespace_propertiesremovalsc                 C   r   r   r   )rH   rp   r   r   rP   rP   rQ   update_namespace_propertiesC  r   z4BigQueryMetastoreCatalog.update_namespace_propertiesrm   metadata_file_locationr~   c                 C   s*   t |}| |j| j||d}||_|S )zjTo make the table queryable from Hive, the user would likely be setting the HIVE_ENGINE_ENABLED parameter.)r   r   )BQTable&_create_external_catalog_table_optionsr5   _create_table_parametersexternal_catalog_table_options)rH   rm   r   r~   ro   external_config_optionsrP   rP   rQ   rg   H  s   z(BigQueryMetastoreCatalog._make_new_table
parametersc                 C   s   t t|ttttdd|dS )N)serialization_library)location_uriinput_formatoutput_format
serde_info)storage_descriptorr   )r   r   HIVE_FILE_INPUT_FORMATHIVE_FILE_OUTPUT_FORMATr   HIVE_SERIALIZATION_LIBRARY)rH   r5   r   rP   rP   rQ   r   [  s   z?BigQueryMetastoreCatalog._create_external_catalog_table_optionsdefault_storage_locationmetadataParametersr]   c                 C   s   t | |j|dS )N)default_storage_location_urir   )r   rt   rX   )rH   r   r   r]   rP   rP   rQ   rs   g  s   
zABigQueryMetastoreCatalog._create_external_catalog_dataset_optionsc           	      C   sp   |  |t\}}d}|jr|jjr|jjt }t| j|d}||}t	|}t
||f||| |j|| dS )N r[   )rR   rm   r,   rn   catalog)rc   r   r   r   METADATA_LOCATION_PROPr   r2   r   r   r   r
   r   )	rH   rR   ro   rj   rk   r,   rn   r   rm   rP   rP   rQ   r}   o  s   

zABigQueryMetastoreCatalog._convert_bigquery_table_to_iceberg_tabler   c                 C   s   |j }|jrt|j|d< ||t< t|t< d|d< | }|rH|j}|rH|t	r0|t	|d< |t
r<|t
|d< |trH|t|d< |S )NuuidTEXTERNALnumFilesnumRows	totalSize)r2   
table_uuidstrr   ICEBERG_TABLE_TYPE_VALUETABLE_TYPE_PROPcurrent_snapshotsummaryr8   r!   r#   r"   )rH   r   r   r   snapshotr   rP   rP   rQ   r     s"   


z1BigQueryMetastoreCatalog._create_table_parametersc                 C   s.   |r|S | j |}|r|jr|jjS td)Nz'Could not find default storage location)rF   r   ru   r   r=   )rH   r5   r]   rx   rP   rP   rQ   _default_storage_location  s   
z2BigQueryMetastoreCatalog._default_storage_locationrw   c                 C   s2   | j t }r|d}| d| dS td)N/z.dbzGNo default path is set, please specify a location when creating a table)r2   r8   r   rstripr=   )rH   rw   warehouse_pathrP   rP   rQ   rt     s   
zDBigQueryMetastoreCatalog._get_default_warehouse_location_for_dataset)rP   )6__name__
__module____qualname__r   r7   r   r$   r(   r)   r   r   r   r%   r*   r
   rh   ry   ri   r   tupler&   r'   r   r   r   r   listr   r   r   r   r   boolr   r   setr   r   r   r	   r   rg   dictr   r   r   r   r   rs   r}   r   r   rt   __classcell__rP   rP   rN   rQ   r0   =   s    +

8








	r0   )OrA   typingr   r   r   google.api_core.exceptionsr   google.cloud.bigqueryr   r   r   r	   r
   r   %google.cloud.bigquery.external_configr   r   google.cloud.bigquery.schemar   r   google.cloud.exceptionsr   google.oauth2r   pyiceberg.catalogr   r   r   pyiceberg.exceptionsr   r   r   r   pyiceberg.ior   pyiceberg.partitioningr   r   pyiceberg.schemar   pyiceberg.serializersr   pyiceberg.tabler   pyiceberg.table.locationsr   pyiceberg.table.metadatar   r    pyiceberg.table.snapshotsr!   r"   r#   pyiceberg.table.sortingr$   r%   pyiceberg.table.updater&   r'   pyiceberg.typedefr(   r)   r*   pyiceberg.utils.configr+   pyarrowpar9   r:   r;   r<   r   PREVIOUS_METADATA_LOCATION_PROPr   r   r   r   r   r0   rP   rP   rP   rQ   <module>   sJ   