o
    iB                     @   s  U d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m	Z	 ddl
mZ ddlmZmZ ddlmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZEmFZFmGZGmHZHmIZImJZJmKZKmLZLmMZMmNZN ddlOmPZP ddlQmRZR ddlSmTZTmUZUmVZV eWeXZYerddlZm[Z[ G dd dej\Z]G dd de]Z^de^iZ_e`eaebe] f ecd< dePdefddZddePdefddZedePdefdd ZfdePdefd!d"ZgdePdefd#d$Zhededeeeeeeegegefefehd%
ZiG d&d' d'eKZjG d(d) d)eMZkG d*d+ d+eJZldS ),zbFileIO implementation for reading and writing table files that uses fsspec compatible filesystems.    N)Callable)copy)	lru_cache)TYPE_CHECKINGAny)urlparse)AbstractFileSystemLocalFileSystem)	HTTPError)TOKENURI)AUTH_MANAGER)	SignError)0ADLS_ACCOUNT_HOSTADLS_ACCOUNT_KEYADLS_ACCOUNT_NAME	ADLS_ANONADLS_CLIENT_IDADLS_CLIENT_SECRETADLS_CONNECTION_STRINGADLS_CREDENTIALADLS_SAS_TOKENADLS_TENANT_ID
ADLS_TOKENAWS_ACCESS_KEY_IDAWS_PROFILE_NAME
AWS_REGIONAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN
GCS_ACCESSGCS_CACHE_TIMEOUTGCS_CONSISTENCYGCS_DEFAULT_LOCATIONGCS_PROJECT_IDGCS_REQUESTER_PAYSGCS_SERVICE_HOSTGCS_SESSION_KWARGS	GCS_TOKENGCS_VERSION_AWAREHF_ENDPOINTHF_TOKENS3_ACCESS_KEY_IDS3_ANONYMOUSS3_CONNECT_TIMEOUTS3_ENDPOINTS3_FORCE_VIRTUAL_ADDRESSINGS3_PROFILE_NAMES3_PROXY_URI	S3_REGIONS3_REQUEST_TIMEOUTS3_SECRET_ACCESS_KEYS3_SESSION_TOKEN	S3_SIGNERS3_SIGNER_ENDPOINTS3_SIGNER_ENDPOINT_DEFAULTS3_SIGNER_URIFileIO	InputFileInputStream
OutputFileOutputStream)
Properties)	strtobool)get_first_property_valueget_header_propertiesproperty_as_bool)
AWSRequestc                   @   sH   e Zd ZU dZeed< deddfddZejddd	e	ddfd
dZ
dS )S3RequestSignerz+Abstract base class for S3 request signers.
propertiesreturnNc                 C   
   || _ d S NrG   selfrG    rN   Q/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/pyiceberg/io/fsspec.py__init__k      
zS3RequestSigner.__init__requestrE   _c                 K   s   d S rJ   rN   )rM   rR   rS   rN   rN   rO   __call__n   s   zS3RequestSigner.__call__)__name__
__module____qualname____doc__r@   __annotations__rP   abcabstractmethodr   rT   rN   rN   rN   rO   rF   f   s   
 rF   c                       sL   e Zd ZU dZejed< deddf fddZdd	d
e	ddfddZ
  ZS )S3V4RestSignerzQAn S3 request signer that uses an external REST signing service to sign requests._sessionrG   rH   Nc                    s   t  | t | _d S rJ   )superrP   requestsSessionr]   rL   	__class__rN   rO   rP   x   s   zS3V4RestSigner.__init__rR   rE   rS   c              
   K   sF  | j t| j t d}| j tt}i }d }| j t }r%| }n| j t	 }r2d| }|r8||d< |
t| j  |j|jd |jdd |j D d}	| jj| d|  ||	d}
z
|
  |
 }W n ty } ztd	|
j d
|	 |d }~ww |d  D ]\}}|j|d| q|d |_d S )N/zBearer Authorizationclient_regionc                 S   s   i | ]\}}||gqS rN   rN   ).0keyvalrN   rN   rO   
<dictcomp>   s    z+S3V4RestSigner.__call__.<locals>.<dictcomp>)methodregionuriheaders)rm   jsonzFailed to sign request z: rm   z, rl   )rG   getr:   r   rstripr8   r9   r   auth_headerr   updaterC   rj   contexturlrm   itemsr]   poststripraise_for_statusrn   r   r   status_code
add_headerjoin)rM   rR   rS   
signer_urlsigner_endpointsigner_headersrq   auth_managertokensigner_bodyresponseresponse_jsonerg   valuerN   rN   rO   rT   |   s6   

 zS3V4RestSigner.__call__)rU   rV   rW   rX   r_   r`   rY   r@   rP   r   rT   __classcell__rN   rN   ra   rO   r\   s   s
   
 
r\   SIGNERSrS   rH   c                 C   s
   t ddS )NT)
auto_mkdirr	   )rS   rN   rN   rO   _file   rQ   r   rG   c                 C   s  ddl m} | tt| ttt| ttt| t	t
t| ttd}i }i }| t }rStd| t| }rL|| }||d< ddlm} ||d< ntd| | t }ra||d	|d
< | t }	rnt|	|d< | t }
r{t|
|d< | t }rddi|d< | t }rt|}nd}|||d}t| tt }r||d< |di |}| D ]\}}|jjj j!|dd |jjj j"||dd q|S )Nr   )S3FileSystem)endpoint_urlaws_access_key_idaws_secret_access_keyaws_session_tokenregion_namezLoading signer %szbefore-sign.s3)UNSIGNEDsignature_versionzSigner not available: )httphttpsproxiesconnect_timeoutread_timeoutaddressing_stylevirtuals3F)anonclient_kwargsconfig_kwargsprofilei  )	unique_idrN   )#s3fsr   ro   r/   rB   r,   r   r5   r   r6   r   r3   r   r7   loggerinfor   botocorer   
ValueErrorr2   r.   floatr4   r0   r-   rA   r1   r   ru   r   metaevents
unregisterregister_last)rG   r   r   r   register_eventssigner
signer_clsr   	proxy_urir   request_timeout_force_virtual_addressings3_anonymousr   s3_fs_kwargsprofile_namefs
event_nameevent_functionrN   rN   rO   _s3   sN   





r   c                 C   st   ddl m} || t| td| t| td| tt| t	dt
| td| t| tt| tdd
S )Nr   )GCSFileSystemfull_controlnoneFz{})
projectaccessr   consistencycache_timeoutrequester_payssession_kwargsr   default_locationversion_aware)gcsfsr   ro   r$   r    r(   r"   r!   rD   r%   rn   loadsr'   r&   r#   r)   )rG   r   rN   rN   rO   _gs   s   



r   c                    s   ddl m} ddlm  ddlm} dd |  D  D ]\}}t| vr.|dd | t< t	| vr6|| t	< qG  fdd	d	|}| 
t }rM||}n| 
t}|| 
t|| 
t| 
t| 
t	| 
t| 
t| 
t| 
t| 
td

S )Nr   )AzureBlobFileSystemAccessToken)AsyncTokenCredentialc                 S   s4   i | ]\}}| t d r|t d d|qS ). )
startswithr   replace)rf   rg   r   rN   rN   rO   ri      s
    z_adls.<locals>.<dictcomp>r   c                       s<   e Zd ZdZdeddfddZdeded f fd	d
ZdS )z$_adls.<locals>.StaticTokenCredentiali  token_stringrH   Nc                 S   rI   rJ   )_token)rM   r   rN   rN   rO   rP   	  rQ   z-_adls.<locals>.StaticTokenCredential.__init__scopeskwargsc                    s(   dd l }t|  | j } | j|S )Nr   )timeint_DEFAULT_EXPIRY_SECONDSr   )rM   r   r   r   
expires_onr   rN   rO   	get_token  s   z._adls.<locals>.StaticTokenCredential.get_token)rU   rV   rW   r   strrP   r   r   rN   r   rN   rO   StaticTokenCredential  s    r   )
connection_string
credentialaccount_nameaccount_key	sas_token	tenant_id	client_idclient_secretaccount_hostr   )adlfsr   azure.core.credentialsr   azure.core.credentials_asyncr   ru   r   splitr   ro   r   r   r   r   r   r   r   r   r   )rG   r   r   rg   r   r   r   r   rN   r   rO   _adls   s:   


r   c                 C   s$   ddl m} || t| tdS )Nr   )HfFileSystem)endpointr   )huggingface_hubr   ro   r*   r+   )rG   r   rN   rN   rO   _hf&  s
   r   )
r   filer   s3as3nabfsabfssgsgcshfc                       sZ   e Zd ZdZdedef fddZdefddZde	fd	d
Z
dde	defddZ  ZS )FsspecInputFilezAn input file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    locationr   c                       || _ t j|d d S N)r   _fsr^   rP   rM   r   r   ra   rN   rO   rP   E     zFsspecInputFile.__init__rH   c                 C   B   | j | j}|d }r|S |d }r|S td| j z.Return the total length of the file, in bytes.SizesizezCannot retrieve object info: r   r   r   ro   RuntimeErrorrM   object_infor   rN   rN   rO   __len__I     zFsspecInputFile.__len__c                 C      | j | jS z"Check whether the location exists.r   lexistsr   rM   rN   rN   rO   existsR     zFsspecInputFile.existsTseekablec              
   C   sR   z	| j | jdW S  ty( } z|jr||ttjttj| j|d}~ww )aH  Create an input stream for reading the contents of the file.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileNotFoundError: If the file does not exist.
        rbN)	r   openr   FileNotFoundErrorfilenameerrnoENOENTosstrerror)rM   r
  r   rN   rN   rO   r  V  s   &zFsspecInputFile.open)T)rU   rV   rW   rX   r   r   rP   r   r  boolr  r=   r  r   rN   rN   ra   rO   r   =  s    	r   c                       sh   e Zd ZdZdedef fddZdefddZde	fd	d
Z
dde	defddZdefddZ  ZS )FsspecOutputFilezAn output file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    r   r   c                    r   r   r   r   ra   rN   rO   rP   q  r   zFsspecOutputFile.__init__rH   c                 C   r   r   r   r   rN   rN   rO   r  u  r  zFsspecOutputFile.__len__c                 C   r  r  r  r  rN   rN   rO   r  ~  r	  zFsspecOutputFile.existsF	overwritec                 C   s,   |s|   rtd| j | j| jdS )a/  Create an output stream for reading the contents of the file.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileExistsError: If the file already exists at the location and overwrite is set to False.

        Note:
            If overwrite is set to False, a check is first performed to verify that the file does not exist.
            This is not thread-safe and a possibility does exist that the file can be created by a concurrent
            process after the existence check yet before the output stream is created. In such a case, the default
            behavior will truncate the contents of the existing file when opening the output stream.
        z)Cannot create file, file already exists: wb)r  FileExistsErrorr   r   r  )rM   r  rN   rN   rO   create  s   zFsspecOutputFile.createc                 C   s   t | j| jdS )zAReturn a new FsspecInputFile for the location at `self.location`.r   r   )r   r   r   r  rN   rN   rO   to_input_file  s   zFsspecOutputFile.to_input_file)F)rU   rV   rW   rX   r   r   rP   r   r  r  r  r?   r  r   r  r   rN   rN   ra   rO   r  i  s    	r  c                       s   e Zd ZdZdef fddZdedefddZdede	fd	d
Z
deeB eB ddfddZdedefddZdedefddZdeeef fddZdeeef ddfddZ  ZS )FsspecFileIOz)A FileIO implementation that uses fsspec.rG   c                    s.   i | _ | j t t | _t j|d d S )NrK   )_scheme_to_fsrr   SCHEME_TO_FS	threadinglocal_thread_localsr^   rP   rL   ra   rN   rO   rP     s   
zFsspecFileIO.__init__r   rH   c                 C       t |}| |j}t||dS )a
  Get an FsspecInputFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecInputFile: An FsspecInputFile instance for the given location.
        r  )r   get_fsschemer   rM   r   rl   r   rN   rN   rO   	new_input     	zFsspecFileIO.new_inputc                 C   r!  )a  Get an FsspecOutputFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecOutputFile: An FsspecOutputFile instance for the given location.
        r  )r   r"  r#  r  r$  rN   rN   rO   
new_output  r&  zFsspecFileIO.new_outputNc                 C   s<   t |ttfr|j}n|}t|}| |j}|| dS )a9  Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an
                OutputFile instance is provided, the location attribute for that instance is used as the location
                to delete.
        N)
isinstancer<   r>   r   r   r"  r#  rm)rM   r   str_locationrl   r   rN   rN   rO   delete  s   zFsspecFileIO.deleter#  c                 C   s&   t | jdst| j| j_| j|S )z:Get a filesystem for a specific scheme, cached per thread.get_fs_cached)hasattrr   r   _get_fsr,  rM   r#  rN   rN   rO   r"    s   zFsspecFileIO.get_fsc                 C   s(   || j vrtd| | j | | jS )z'Get a filesystem for a specific scheme.z%No registered filesystem for scheme: )r  r   rG   r/  rN   rN   rO   r.    s   
zFsspecFileIO._get_fsc                 C   s   t | j}|d= |S )zBCreate a dictionary of the FsSpecFileIO fields used when pickling.r   )r   __dict__)rM   fileio_copyrN   rN   rO   __getstate__  s   
zFsspecFileIO.__getstate__statec                 C   s   || _ t | _dS )z3Deserialize the state into a FsSpecFileIO instance.N)r0  r  r  r   )rM   r3  rN   rN   rO   __setstate__  s   zFsspecFileIO.__setstate__)rU   rV   rW   rX   r@   rP   r   r   r%  r  r'  r<   r>   r+  r   r"  r.  dictr   r2  r4  r   rN   rN   ra   rO   r    s    "r  )mrX   rZ   r  rn   loggingr  r  collections.abcr   r   	functoolsr   typingr   r   urllib.parser   r_   fsspecr   fsspec.implementations.localr
   r   pyiceberg.catalogr   r   pyiceberg.catalog.rest.authr   pyiceberg.exceptionsr   pyiceberg.ior   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   pyiceberg.typedefr@   pyiceberg.typesrA   pyiceberg.utils.propertiesrB   rC   rD   	getLoggerrU   r   botocore.awsrequestrE   ABCrF   r\   r   r5  r   typerY   r   r   r   r   r   r  r   r  r  rN   rN   rN   rO   <module>   s`   2
.=/
,4