o
    ˳i"F                     @   s  U d Z ddlZddlZddlZddlZddlZddlZddlmZ ddl	m	Z	 ddl
mZ ddlmZmZ ddlmZmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZmZ ddlmZ ddlmZ ddlm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z=m>Z>m?Z?m@Z@mAZAmBZBmCZCmDZDmEZEmFZFmGZGmHZHmIZImJZJmKZKmLZLmMZMmNZNmOZO ddlPmQZQ ddlRmSZS ddlTmUZUmVZVmWZW eXeYZZerddl[m\Z\ G dd dej]Z^G dd de^Z_de_iZ`eaebece^ f edd< deQdefddZedeQdefddZfdeQdefdd Zgd0deQd!ebdB defd"d#ZhdeQdefd$d%Zieeeeefefefehehegegeid&
Zjeaebed'ef f edd(< ekh d)ZlG d*d+ d+eLZmG d,d- d-eNZnG d.d/ d/eKZodS )1zbFileIO implementation for reading and writing table files that uses fsspec compatible filesystems.    N)Callable)copy)	lru_cache)TYPE_CHECKINGAny)ParseResulturlparse)AbstractFileSystemLocalFileSystem)	HTTPError)TOKENURI)AUTH_MANAGER)	SignError)0ADLS_ACCOUNT_HOSTADLS_ACCOUNT_KEYADLS_ACCOUNT_NAME	ADLS_ANONADLS_CLIENT_IDADLS_CLIENT_SECRETADLS_CONNECTION_STRINGADLS_CREDENTIALADLS_SAS_TOKENADLS_TENANT_ID
ADLS_TOKENAWS_ACCESS_KEY_IDAWS_PROFILE_NAME
AWS_REGIONAWS_SECRET_ACCESS_KEYAWS_SESSION_TOKEN
GCS_ACCESSGCS_CACHE_TIMEOUTGCS_CONSISTENCYGCS_DEFAULT_LOCATIONGCS_PROJECT_IDGCS_REQUESTER_PAYSGCS_SERVICE_HOSTGCS_SESSION_KWARGS	GCS_TOKENGCS_VERSION_AWAREHF_ENDPOINTHF_TOKENS3_ACCESS_KEY_IDS3_ANONYMOUSS3_CONNECT_TIMEOUTS3_ENDPOINTS3_FORCE_VIRTUAL_ADDRESSINGS3_PROFILE_NAMES3_PROXY_URI	S3_REGIONS3_REQUEST_TIMEOUTS3_SECRET_ACCESS_KEYS3_SESSION_TOKEN	S3_SIGNERS3_SIGNER_ENDPOINTS3_SIGNER_ENDPOINT_DEFAULTS3_SIGNER_URIFileIO	InputFileInputStream
OutputFileOutputStream)
Properties)	strtobool)get_first_property_valueget_header_propertiesproperty_as_bool)
AWSRequestc                   @   sH   e Zd ZU dZeed< deddfddZejddd	e	ddfd
dZ
dS )S3RequestSignerz+Abstract base class for S3 request signers.
propertiesreturnNc                 C   
   || _ d S NrH   selfrH    rO   G/home/ubuntu/.local/lib/python3.10/site-packages/pyiceberg/io/fsspec.py__init__k      
zS3RequestSigner.__init__requestrF   _c                 K   s   d S rK   rO   )rN   rS   rT   rO   rO   rP   __call__n   s   zS3RequestSigner.__call__)__name__
__module____qualname____doc__rA   __annotations__rQ   abcabstractmethodr   rU   rO   rO   rO   rP   rG   f   s   
 rG   c                       sL   e Zd ZU dZejed< deddf fddZdd	d
e	ddfddZ
  ZS )S3V4RestSignerzQAn S3 request signer that uses an external REST signing service to sign requests._sessionrH   rI   Nc                    s   t  | t | _d S rK   )superrQ   requestsSessionr^   rM   	__class__rO   rP   rQ   x   s   zS3V4RestSigner.__init__rS   rF   rT   c              
   K   sF  | j t| j t d}| j tt}i }d }| j t }r%| }n| j t	 }r2d| }|r8||d< |
t| j  |j|jd |jdd |j D d}	| jj| d|  ||	d}
z
|
  |
 }W n ty } ztd	|
j d
|	 |d }~ww |d  D ]\}}|j|d| q|d |_d S )N/zBearer Authorizationclient_regionc                 S   s   i | ]\}}||gqS rO   rO   ).0keyvalrO   rO   rP   
<dictcomp>   s    z+S3V4RestSigner.__call__.<locals>.<dictcomp>)methodregionuriheaders)rn   jsonzFailed to sign request z: rn   z, rm   )rH   getr;   r   rstripr9   r:   r   auth_headerr   updaterD   rk   contexturlrn   itemsr^   poststripraise_for_statusro   r   r   status_code
add_headerjoin)rN   rS   rT   
signer_urlsigner_endpointsigner_headersrr   auth_managertokensigner_bodyresponseresponse_jsonerh   valuerO   rO   rP   rU   |   s6   

 zS3V4RestSigner.__call__)rV   rW   rX   rY   r`   ra   rZ   rA   rQ   r   rU   __classcell__rO   rO   rb   rP   r]   s   s
   
 
r]   SIGNERSrT   rI   c                 C   s
   t ddS )NT)
auto_mkdirr
   )rT   rO   rO   rP   _file   rR   r   rH   c                 C   s  ddl m} | tt| ttt| ttt| t	t
t| ttd}i }i }| t }rStd| t| }rL|| }||d< ddlm} ||d< ntd| | t }ra||d	|d
< | t }	rnt|	|d< | t }
r{t|
|d< | t }rddi|d< | t }rt|}nd}|||d}t| tt }r||d< |di |}| D ]\}}|jjj j!|dd |jjj j"||dd q|S )Nr   )S3FileSystem)endpoint_urlaws_access_key_idaws_secret_access_keyaws_session_tokenregion_namezLoading signer %szbefore-sign.s3)UNSIGNEDsignature_versionzSigner not available: )httphttpsproxiesconnect_timeoutread_timeoutaddressing_stylevirtuals3F)anonclient_kwargsconfig_kwargsprofilei  )	unique_idrO   )#s3fsr   rp   r0   rC   r-   r   r6   r   r7   r    r4   r   r8   loggerinfor   botocorer   
ValueErrorr3   r/   floatr5   r1   r.   rB   r2   r   rv   r   metaevents
unregisterregister_last)rH   r   r   r   register_eventssigner
signer_clsr   	proxy_urir   request_timeout_force_virtual_addressings3_anonymousr   s3_fs_kwargsprofile_namefs
event_nameevent_functionrO   rO   rP   _s3   sN   





r   c                 C   st   ddl m} || t| td| t| td| tt| t	dt
| td| t| tt| tdd
S )Nr   )GCSFileSystemfull_controlnoneFz{})
projectaccessr   consistencycache_timeoutrequester_payssession_kwargsr   default_locationversion_aware)gcsfsr   rp   r%   r!   r)   r#   r"   rE   r&   ro   loadsr(   r'   r$   r*   )rH   r   rO   rO   rP   _gs   s   



r   hostnamec           	         s  ddl m} ddlm  ddlm} dd |  D  D ]\}}t| vr.|dd | t< t	| vr6|| t	< q|rFt| vrF|dd | t< G  fdd	d	|}| 
t }r\||}n| 
t}|| 
t|| 
t| 
t| 
t	| 
t| 
t| 
t| 
t| 
td

S )Nr   )AzureBlobFileSystemAccessToken)AsyncTokenCredentialc                 S   s4   i | ]\}}| t d r|t d d|qS ). )
startswithr   replace)rg   rh   r   rO   rO   rP   rj      s
    z_adls.<locals>.<dictcomp>r   c                       s<   e Zd ZdZdeddfddZdeded f fd	d
ZdS )z$_adls.<locals>.StaticTokenCredentiali  token_stringrI   Nc                 S   rJ   rK   )_token)rN   r   rO   rO   rP   rQ     rR   z-_adls.<locals>.StaticTokenCredential.__init__scopeskwargsc                    s(   dd l }t|  | j } | j|S )Nr   )timeint_DEFAULT_EXPIRY_SECONDSr   )rN   r   r   r   
expires_onr   rO   rP   	get_token  s   z._adls.<locals>.StaticTokenCredential.get_token)rV   rW   rX   r   strrQ   r   r   rO   r   rO   rP   StaticTokenCredential
  s    r   )
connection_string
credentialaccount_nameaccount_key	sas_token	tenant_id	client_idclient_secretaccount_hostr   )adlfsr   azure.core.credentialsr   azure.core.credentials_asyncr   rv   r   splitr   rp   r   r   r   r   r   r   r   r   r   )	rH   r   r   r   rh   r   r   r   r   rO   r   rP   _adls   s>   


r   c                 C   s$   ddl m} || t| tdS )Nr   )HfFileSystem)endpointr   )huggingface_hubr   rp   r+   r,   )rH   r   rO   rO   rP   _hf*  s
   r   )
r   filer   s3as3nabfsabfssgsgcshf.SCHEME_TO_FS>   r   wasbr   wasbsc                       sZ   e Zd ZdZdedef fddZdefddZde	fd	d
Z
dde	defddZ  ZS )FsspecInputFilezAn input file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    locationr   c                       || _ t j|d d S N)r   _fsr_   rQ   rN   r   r   rb   rO   rP   rQ   K     zFsspecInputFile.__init__rI   c                 C   B   | j | j}|d }r|S |d }r|S td| j z.Return the total length of the file, in bytes.SizesizezCannot retrieve object info: r   r   r   rp   RuntimeErrorrN   object_infor  rO   rO   rP   __len__O     zFsspecInputFile.__len__c                 C      | j | jS z"Check whether the location exists.r   lexistsr   rN   rO   rO   rP   existsX     zFsspecInputFile.existsTseekablec              
   C   sR   z	| j | jdW S  ty( } z|jr||ttjttj| j|d}~ww )aH  Create an input stream for reading the contents of the file.

        Args:
            seekable: If the stream should support seek, or if it is consumed sequential.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileNotFoundError: If the file does not exist.
        rbN)	r   openr   FileNotFoundErrorfilenameerrnoENOENTosstrerror)rN   r  r   rO   rO   rP   r  \  s   &zFsspecInputFile.open)T)rV   rW   rX   rY   r   r	   rQ   r   r  boolr  r>   r  r   rO   rO   rb   rP   r   C  s    	r   c                       sh   e Zd ZdZdedef fddZdefddZde	fd	d
Z
dde	defddZdefddZ  ZS )FsspecOutputFilezAn output file implementation for the FsspecFileIO.

    Args:
        location (str): A URI to a file location.
        fs (AbstractFileSystem): An fsspec filesystem instance.
    r   r   c                    r   r   r   r   rb   rO   rP   rQ   w  r   zFsspecOutputFile.__init__rI   c                 C   r   r   r  r  rO   rO   rP   r  {  r  zFsspecOutputFile.__len__c                 C   r  r	  r
  r  rO   rO   rP   r    r  zFsspecOutputFile.existsF	overwritec                 C   s,   |s|   rtd| j | j| jdS )a/  Create an output stream for reading the contents of the file.

        Args:
            overwrite (bool): Whether to overwrite the file if it already exists.

        Returns:
            OpenFile: An fsspec compliant file-like object.

        Raises:
            FileExistsError: If the file already exists at the location and overwrite is set to False.

        Note:
            If overwrite is set to False, a check is first performed to verify that the file does not exist.
            This is not thread-safe and a possibility does exist that the file can be created by a concurrent
            process after the existence check yet before the output stream is created. In such a case, the default
            behavior will truncate the contents of the existing file when opening the output stream.
        z)Cannot create file, file already exists: wb)r  FileExistsErrorr   r   r  )rN   r  rO   rO   rP   create  s   zFsspecOutputFile.createc                 C   s   t | j| jdS )zAReturn a new FsspecInputFile for the location at `self.location`.r   r   )r   r   r   r  rO   rO   rP   to_input_file  s   zFsspecOutputFile.to_input_file)F)rV   rW   rX   rY   r   r	   rQ   r   r  r  r  r@   r  r   r  r   rO   rO   rb   rP   r  o  s    	r  c                       s   e Zd ZdZdef fddZdedefddZdede	fd	d
Z
deeB eB ddfddZdddefddZddededB defddZddededB defddZdeeef fddZdeeef ddfddZ  ZS )FsspecFileIOz)A FileIO implementation that uses fsspec.rH   c                    s&   t t| _t | _t j|d d S )NrL   )dictr   _scheme_to_fs	threadinglocal_thread_localsr_   rQ   rM   rb   rO   rP   rQ     s   

zFsspecFileIO.__init__r   rI   c                 C      t |}| |}t||dS )a
  Get an FsspecInputFile instance to read bytes from the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecInputFile: An FsspecInputFile instance for the given location.
        r  )r   _get_fs_from_urir   rN   r   rm   r   rO   rO   rP   	new_input     	
zFsspecFileIO.new_inputc                 C   r&  )a  Get an FsspecOutputFile instance to write bytes to the file at the given location.

        Args:
            location (str): A URI or a path to a local file.

        Returns:
            FsspecOutputFile: An FsspecOutputFile instance for the given location.
        r  )r   r'  r  r(  rO   rO   rP   
new_output  r*  zFsspecFileIO.new_outputNc                 C   s:   t |ttfr|j}n|}t|}| |}|| dS )a9  Delete the file at the given location.

        Args:
            location (Union[str, InputFile, OutputFile]): The URI to the file--if an InputFile instance or an
                OutputFile instance is provided, the location attribute for that instance is used as the location
                to delete.
        N)
isinstancer=   r?   r   r   r'  rm)rN   r   str_locationrm   r   rO   rO   rP   delete  s   
zFsspecFileIO.deleterm   r   c                 C   s&   |j tv r| |j |jS | |j S )zOGet a filesystem from a parsed URI, using hostname for ADLS account resolution.)scheme_ADLS_SCHEMESget_fsr   )rN   rm   rO   rO   rP   r'    s   
zFsspecFileIO._get_fs_from_urir0  r   c                 C   s(   t | jdst| j| j_| j||S )z:Get a filesystem for a specific scheme, cached per thread.get_fs_cached)hasattrr%  r   _get_fsr3  rN   r0  r   rO   rO   rP   r2    s   zFsspecFileIO.get_fsc                 C   s<   || j vrtd| |tv rt| j|S | j | | jS )z'Get a filesystem for a specific scheme.z%No registered filesystem for scheme: )r"  r   r1  r   rH   r6  rO   rO   rP   r5    s
   
zFsspecFileIO._get_fsc                 C   s   t | j}|d= |S )zBCreate a dictionary of the FsSpecFileIO fields used when pickling.r%  )r   __dict__)rN   fileio_copyrO   rO   rP   __getstate__  s   
zFsspecFileIO.__getstate__statec                 C   s   || _ t | _dS )z3Deserialize the state into a FsSpecFileIO instance.N)r7  r#  r$  r%  )rN   r:  rO   rO   rP   __setstate__  s   zFsspecFileIO.__setstate__rK   )rV   rW   rX   rY   rA   rQ   r   r   r)  r  r+  r=   r?   r/  r	   r'  r2  r5  r!  r   r9  r;  r   rO   rO   rb   rP   r     s    
"r   rK   )prY   r[   r  ro   loggingr  r#  collections.abcr   r   	functoolsr   typingr   r   urllib.parser   r   r`   fsspecr	   fsspec.implementations.localr   r   pyiceberg.catalogr   r   pyiceberg.catalog.rest.authr   pyiceberg.exceptionsr   pyiceberg.ior   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   pyiceberg.typedefrA   pyiceberg.typesrB   pyiceberg.utils.propertiesrC   rD   rE   	getLoggerrV   r   botocore.awsrequestrF   ABCrG   r]   r   r!  r   typerZ   r   r   r   r   r   r   	frozensetr1  r   r  r   rO   rO   rO   rP   <module>   sb   2
.=3
,4