o
    bi-                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlmZmZ d dlmZmZ eZeZe eZdefddZdd	d
ZdddZedeg ef dee fddZG dd deZdS )    N)contextmanager)AnyCallableIterableIteratorListOptional)BlockBlockMetadata)
DatasourceReadTaskreturnc                    sB   dd l }|   dd | jD } fddt|D }|j|S )Nr   c                 S   s   g | ]}|d  qS )r    ).0column_descriptionr   r   `/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/sql_datasource.py
<listcomp>       z$_cursor_to_block.<locals>.<listcomp>c                    s$   i | ]\ }| fd dD qS )c                    s   g | ]}|  qS r   r   )r   rowir   r   r      r   z/_cursor_to_block.<locals>.<dictcomp>.<listcomp>r   )r   columnrowsr   r   
<dictcomp>   s   $ z$_cursor_to_block.<locals>.<dictcomp>)pyarrowfetchalldescription	enumerateTablefrom_pydict)cursorpacolumnspydictr   r   r   _cursor_to_block   s
   r%   c                 C   (   dD ]}t | |std|dqd S )N)closecommitr!   zBYour `connection_factory` created a `Connection` object without a  method, but this method is required by the Python DB API2 specification. Check that your database connector is DB API2-compliant. To learn more, read https://peps.python.org/pep-0249/.hasattr
ValueError)
connectionattrr   r   r   %_check_connection_is_dbapi2_compliant   s   
r/   c                 C   r&   )N)executeexecutemanyfetchoner   r   z<Your database connector created a `Cursor` object without a r)   r*   )r!   r.   r   r   r   !_check_cursor_is_dbapi2_compliant%   s   
r3   connection_factoryc                 c   s    |  }t | zKz| }t| |V  |  W n2 tyL   z|  W   tyK } zt|ts:|jj	dkrA	 W Y d }~ W Y d }~ d }~ww w W |
  d S |
  w )NNotSupportedError)r/   r!   r3   r(   	Exceptionrollback
isinstanceAttributeError	__class____name__r'   )r4   r-   r!   er   r   r   _connect2   s2   
	
r=   c                   @   s   e Zd ZdZ	ddedeg ef dedeee  fddZ	d	ee
 fd
dZde
d	efddZde
d	ee fddZd	e
fddZde
de
fddZdS )SQLDatasource2   Nsqlr4   shard_hash_fn
shard_keysc                 C   s`   || _ |rt|dkrdd| d| _n|r%t|dkr%|d  | _nd | _|| _|| _d S )N   zCONCAT(,)r   )r@   lenjoinrB   rA   r4   )selfr@   r4   rA   rB   r   r   r   __init__P   s   
zSQLDatasource.__init__r   c                 C   s   d S Nr   rH   r   r   r   estimate_inmemory_data_sizea   s   z)SQLDatasource.estimate_inmemory_data_sizeparallelismc              
   C   s   |dks	| j du rdS | j}d| j d| d| j  d| d	}zt| j}|| W d   W d	S 1 s7w   Y  W d	S  ty\ } ztd
t	| d W Y d}~dS d}~ww )zCheck if database supports sharding with MOD/ABS/CONCAT operations.

        Returns:
            bool: True if sharding is supported, False otherwise.
        rC   NFzSELECT COUNT(1) FROM () as T WHERE MOD(ABS(()), z) = 0Tz$Database does not support sharding: .)
rB   rA   r@   r=   r4   r0   r6   loggerinfostr)rH   rM   hash_fnqueryr!   r<   r   r   r   supports_shardingd   s.   

zSQLDatasource.supports_shardingc                    s   dt t f fdd}  }|dkrg S t|t| j }|| }|| } |s?t	d t
d d d d }t||gS g }t|D ]"}|}	||k rQ|	d7 }	 ||}
t
|	d d d d}|t|
| qE|S )Nr   c                     sD   t  j} |  j t| gW  d   S 1 sw   Y  dS )z?Read all data in a single block when sharding is not supported.N)r=   r4   r0   r@   r%   )r!   rK   r   r   fallback_read_fn}   s   $z6SQLDatasource.get_read_tasks.<locals>.fallback_read_fnr   zMSharding is not supported. Falling back to reading all data in a single task.rC   )num_rows
size_bytesinput_files
exec_stats)r   r	   _get_num_rowsminmathceilMIN_ROWS_PER_READ_TASKrW   rR   rS   r
   r   range_create_parallel_read_fnappend)rH   rM   rX   num_rows_totalnum_rows_per_blocknum_blocks_with_extra_rowmetadatatasksr   rY   read_fnr   rK   r   get_read_tasks|   s:   
zSQLDatasource.get_read_tasksc                 C   sN   t | j}|d| j d | d W  d    S 1 s w   Y  d S )NzSELECT COUNT(*) FROM (z) as Tr   )r=   r4   r0   r@   r2   )rH   r!   r   r   r   r]      s   
$zSQLDatasource._get_num_rowstask_idc              
      sH   j }dj d| dj d| d| 
 dtt f fdd}|S )	NzSELECT * FROM (rN   rO   rP   z) = r   c                     sF   t j} |   t| }|gW  d    S 1 sw   Y  d S rJ   )r=   r4   r0   r%   )r!   blockrV   rH   r   r   rj      s
   
$z7SQLDatasource._create_parallel_read_fn.<locals>.read_fn)rA   r@   rB   r   r	   )rH   rl   rM   rU   rj   r   rn   r   rc      s   
z&SQLDatasource._create_parallel_read_fnrJ   )r;   
__module____qualname__ra   rT   r   
Connectionr   r   rI   intrL   boolrW   r   rk   r]   rc   r   r   r   r   r>   M   s"    


,r>   )r   N)loggingr_   
contextlibr   typingr   r   r   r   r   r   ray.data.blockr	   r
   ray.data.datasource.datasourcer   r   rq   Cursor	getLoggerr;   rR   r%   r/   r3   r=   r>   r   r   r   r   <module>   s     


 