o
    bi                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZmZ d dlm	Z
 d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ e eZd	Zd
ZG dd ded ZdS )    N)IterableOptional)bigquery_datasource)TaskContext)cached_remote_fn)_check_import)BlockBlockAccessor)Datasink
      c                   @   sX   e Zd Zedfdedededee ddf
dd	Zdd
dZ	de
e deddfddZdS )BigQueryDatasinkT
project_iddatasetmax_retry_cntoverwrite_tablereturnNc                 C   sF   t | ddd t | ddd t | ddd || _|| _|| _|| _d S )Nzgoogle.cloudbigquery)modulepackagebigquery_storagezgoogle.api_core
exceptions)r   r   r   r   r   )selfr   r   r   r    r   c/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/bigquery_datasink.py__init__   s   
zBigQueryDatasink.__init__c                 C   s   ddl m} | jd u s| jd u rtdtj| jd}| jddd }z|| W n |j	yI   |j
| j d| dd td	|  Y nw | jrhtd
| j d  |j| j d| j dd d S td| j d  d S )Nr   r   z(project_id and dataset are required argsr   .      )timeoutzCreated dataset zAttempting to delete table z9 if it already exists since kwarg overwrite_table = True.T)not_found_okzThe write will append to table z: if it already exists since kwarg overwrite_table = False.)google.api_corer   r   r   
ValueErrorr   _create_clientsplitget_datasetNotFoundcreate_datasetloggerinfor   delete_table)r   r   client
dataset_idr   r   r   on_write_start)   s.   
 
zBigQueryDatasink.on_write_startblocksctxc                    sD   dt dtdtdd ffdd t  t fdd|D  d S )	Nblockr   r   r   c                    s  ddl m} ddlm} t|  } tj|d}|j	dd}|j
j|_|jj|_t }tj|dt  d}tj| |d	d
 d}	|	 jkrt|d}
|j|
||d}W d    n1 sbw   Y  z	t|  W n? |j|j fy } z*|	d7 }	|	 jkrW Y d }~n%tdd|	 d  t!"| t#$t% W Y d }~nd }~ww |	 jksJ|	 jkrtd j dd  t&d|	 d d W d    d S 1 sw   Y  d S )Nr   r   )r   r   T)
autodetectblock_z.parquetSNAPPY)compressionrb)
job_configr   z5A block write encountered a rate limit exceeded error z  time(s). Sleeping to try again.z	Maximum (z) retry count exceeded. Rayz; will attempt to retry the block write via fault tolerance.zWrite failed due to z5 repeated API rate limit exceeded responses. Considerz9 specifiying the max_retry_cnt kwarg with a higher value.)'r#   r   google.cloudr   r	   	for_blockto_arrowr   r%   LoadJobConfigSourceFormatPARQUETsource_formatWriteDispositionWRITE_APPENDwrite_dispositiontempfileTemporaryDirectoryospathjoinuuiduuid4pqwrite_tabler   openload_table_from_filer*   r+   result	ForbiddenTooManyRequestsloggingdebugtimesleepRATE_LIMIT_EXCEEDED_SLEEP_TIMERuntimeError)r2   r   r   r   r   r-   r8   temp_dirfp	retry_cntsource_filejobe)r   r   r   _write_single_blockJ   sb   








"z3BigQueryDatasink.write.<locals>._write_single_blockc                    s   g | ]}  |jjqS r   )remoter   r   ).0r2   r^   r   r   r   
<listcomp>}   s    z*BigQueryDatasink.write.<locals>.<listcomp>)r   strr   rayget)r   r0   r1   r   ra   r   writeE   s   /zBigQueryDatasink.write)r   N)__name__
__module____qualname__DEFAULT_MAX_RETRY_CNTrc   intr   boolr   r/   r   r   r   rf   r   r   r   r   r      s,    

r   )rR   rF   rD   rT   rI   typingr   r   pyarrow.parquetparquetrK   rd   ray.data._internal.datasourcer   'ray.data._internal.execution.interfacesr   ray.data._internal.remote_fnr   ray.data._internal.utilr   ray.data.blockr   r	   ray.data.datasource.datasinkr
   	getLoggerrg   r*   rj   rV   r   r   r   r   r   <module>   s$    
