o
    bi                     @   sT   d dl Z d dlZd dlmZmZmZmZ d dlZd dlZdddZ	G dd dZ
dS )    N)AnyCallableDictOptional
read_deltaread_parquet)deltaparquetc                   @   s   e Zd ZdZdddddddededed	ee d
ee dedee dee fddZdefddZ	dd Z
dd ZdefddZd
ededef fddZdd ZdS )UnityCatalogConnectora  
    Load a Unity Catalog table or files into a Ray Dataset, handling cloud credentials automatically.

    Currently only supports Databricks-managed Unity Catalog

    Supported formats: delta, parquet.
    Supports AWS, Azure, and GCP with automatic credential handoff.
    Nr   READ)regiondata_format	operationray_init_kwargsreader_kwargsbase_urltokentable_full_namer   r   r   r   r   c          	      C   sN   | d| _|| _|| _|r| nd | _|| _|| _|pi | _|p#i | _	d S )N/)
rstripr   r   r   lowerr   r   r   r   r   )	selfr   r   r   r   r   r   r   r    r   _/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/uc_datasource.py__init__   s   
zUnityCatalogConnector.__init__returnc                 C   sT   | j  d| j }dd| j i}tj||d}|  | }|| _|d | _|S )Nz/api/2.1/unity-catalog/tables/AuthorizationBearer )headerstable_id)	r   r   r   requestsgetraise_for_statusjson_table_info	_table_id)r   urlr   respdatar   r   r   _get_table_info.   s   
z%UnityCatalogConnector._get_table_infoc                 C   s^   | j  d}dd| j d}| j| jd}tj|||d}|  | | _| jd | _	d S )Nz2/api/2.1/unity-catalog/temporary-table-credentialszapplication/jsonr   )zContent-Typer   )r   r   )r#   r   r&   )
r   r   r%   r   r    postr"   r#   _creds_response
_table_url)r   r&   r   payloadr'   r   r   r   
_get_creds8   s   

z UnityCatalogConnector._get_credsc                 C   s  i }| j }d|v r-|d }|d |d< |d |d< |d |d< | jr,| j|d< | j|d	< n@d
|v r8|d
 |d< n5d|v ri|d }tjdddd}||  |  |j|d< W d    n1 scw   Y  ntd|	 D ]	\}}|t
j|< qqd|i| _d S )Naws_temp_credentialsaccess_key_idAWS_ACCESS_KEY_IDsecret_access_keyAWS_SECRET_ACCESS_KEYsession_tokenAWS_SESSION_TOKEN
AWS_REGIONAWS_DEFAULT_REGIONazuresasuriAZURE_STORAGE_SAS_TOKENgcp_service_accountgcp_sa_z.jsonT)prefixsuffixdeleteGOOGLE_APPLICATION_CREDENTIALSz9No known credential type found in Databricks UC response.env_vars)r+   r   tempfileNamedTemporaryFilewriteencodeflushname
ValueErroritemsosenviron_runtime_env)r   r@   credsawsgcp_json	temp_filekvr   r   r   _set_envD   s<   

zUnityCatalogConnector._set_envc                 C   s   | j r| j S | jp|  }d|v r|d r|d  }|S |dp't| dd }|r>tj|d 	dd }|t
v r>|S td)Ndata_source_formatstorage_locationr,   . z0Could not infer data format from table metadata.)r   r$   r)   r   r!   getattrrI   pathsplitextreplace_FILE_FORMAT_TO_RAY_READERrG   )r   infofmtstorage_locextr   r   r   _infer_data_formatc   s   z(UnityCatalogConnector._infer_data_format.c                 C   s8   |  }|tv rttjt| d }|r|S td| )NzUnsupported data format: )r   r\   rX   rayr(   rG   )r   r   r^   reader_funcr   r   r   _get_ray_readert   s   z%UnityCatalogConnector._get_ray_readerc                 C   sf   |    |   |   |  }| |}t s%tjdd| ji| j	 | j
}||fi | j}|S )Nruntime_envr   )r)   r.   rR   ra   rd   rb   is_initializedinitrK   r   r,   r   )r   r   readerr&   dsr   r   r   read|   s   
zUnityCatalogConnector.read)__name__
__module____qualname____doc__strr   r   r   dictr)   r.   rR   ra   r   r   rd   rj   r   r   r   r   r
      s<    	


r
   )rI   rA   typingr   r   r   r   r    rb   r\   r
   r   r   r   r   <module>   s    