o
    bi1                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZmZ d dl	m
Z
mZ e eZdefddZd	d
 Zdd ZdefddZdd ZG dd de
ZdS )    N)ListOptional)_check_import)BlockBlockMetadata)
DatasourceReadTaskreturnc                  C   s   dd l } d| j S )Nr   zray/)ray__version__)r
    r   e/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/bigquery_datasource.py_create_user_agent   s   r   c                  C      ddl m}  | t dS Nr   
ClientInfo)
user_agent)google.api_core.client_infor   r   r   r   r   r   _create_client_info      r   c                  C   r   r   )$google.api_core.gapic_v1.client_infor   r   r   r   r   r   _create_client_info_gapic   r   r   
project_idc                 C   s   ddl m} |j| t dS )Nr   )bigquery)projectclient_info)google.cloudr   Clientr   )r   r   r   r   r   _create_client!   s
   r   c                  C   s   ddl m}  | jt dS )Nr   bigquery_storage)r   )r   r!   BigQueryReadClientr   r    r   r   r   _create_read_client*   s   r#   c                   @   sn   e Zd Z		ddedee dee fddZdedee fd	d
Z	dee fddZ
dededdfddZdS )BigQueryDatasourceNr   datasetqueryc                 C   s\   t | ddd t | ddd t | ddd || _|| _|| _|d ur*|d ur,tdd S d S )Nzgoogle.cloudr   )modulepackager!   zgoogle.api_core
exceptionszNQuery and dataset kwargs cannot both be provided (must be mutually exclusive).)r   _project_id_dataset_query
ValueError)selfr   r%   r&   r   r   r   __init__3   s   zBigQueryDatasource.__init__parallelismr	   c                    sd  ddl m} dtfdd | jr4t| jd}|| j}|  t|j	}|
dd }|
dd	 }n| | j| j | j
dd }| j
dd
 }t }d| j d| d| }	|d	krad }|jj|	|jjjd}
|jd| j |
|d}g }tdtt|j  t|j|k rtd |jD ]}td d d d d}t|f fdd	|}|| q|S )Nr   r    r	   c                 S   s   t  }|| j}| S N)r#   	read_rowsnameto_arrow)streamclientreaderr   r   r   _read_single_partitionJ   s   zABigQueryDatasource.get_read_tasks.<locals>._read_single_partitionr   .   z	projects/z
/datasets/z/tables/)tabledata_format)parentread_sessionmax_stream_countzCreated streams: zThe number of streams created by the BigQuery Storage Read API is less than the requested parallelism due to the size of the dataset.)num_rows
size_bytesinput_files
exec_statsc                    s
    | gS r1   r   )r5   r8   r   r   <lambda>~   s   
 z3BigQueryDatasource.get_read_tasks.<locals>.<lambda>)r   r!   r   r,   r   r*   r&   resultstrdestinationsplit_validate_dataset_table_existr+   r#   typesReadSession
DataFormatARROWcreate_read_sessionloggerinfolenstreamsr   r   append)r.   r0   r!   query_client	query_jobrK   
dataset_idtable_id
bqs_clientr>   requested_sessionrA   
read_tasksr5   metadata	read_taskr   rG   r   get_read_tasksG   sX   


z!BigQueryDatasource.get_read_tasksc                 C   s   d S r1   r   )r.   r   r   r   estimate_inmemory_data_size   s   z.BigQueryDatasource.estimate_inmemory_data_sizec                 C   s   ddl m} t|d}|dd }z|| W n |jy(   td|w z|| W d S  |jy@   td|w )Nr   )r)   r9   r:   z6Dataset {} is not found. Please ensure that it exists.z4Table {} is not found. Please ensure that it exists.)	google.api_corer)   r   rL   get_datasetNotFoundr-   format	get_table)r.   r   r%   r)   r6   rZ   r   r   r   rM      s&   
z0BigQueryDatasource._validate_dataset_table_exist)NN)__name__
__module____qualname__rJ   r   r/   intr   r   ra   rb   rM   r   r   r   r   r$   2   s    
>r$   )loggingtypingr   r   ray.data._internal.utilr   ray.data.blockr   r   ray.data.datasource.datasourcer   r   	getLoggerrh   rS   rJ   r   r   r   r   r#   r$   r   r   r   r   <module>   s    
	