o
    bi9                     @   s   d dl Z d dlZd dlmZmZmZmZmZmZm	Z	 d dl
mZ d dlmZmZmZ d dlmZmZ d dlmZ e eZdedefd	d
ZeG dd deZdS )    N)AnyCallableDictIterableListOptionalTuple)_check_import)BlockBlockAccessorBlockMetadata)
DatasourceReadTask)DeveloperAPI
filter_strreturnc                 C   s^   d}d}| D ]&}|r|dkr|sd}|dko| }q|dkr#d}d}q|dkr* dS d}qdS )NF'\T; )r   	in_stringescape_nextcr   r   g/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/clickhouse_datasource.py_is_filter_string_safe   s   r   c                   @   sh  e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
Z					d3dededeee  dee deeee ef  deeeef  deeeef  fddZdd Zdd ZdefddZdededefddZd edeg ee f fd!d"Zd#d$ Zdee fd%d&Zdee fd'd(Z d)edee fd*d+Z!d edefd,d-Z"dee fd.d/Z#d0edee$ fd1d2Z%dS )4ClickHouseDatasourcea  
    A Ray datasource for reading from ClickHouse.

    Args:
        table: Fully qualified table or view identifier (e.g.,
            "default.table_name").
        dsn: A string in DSN (Data Source Name) HTTP format (e.g.,
            "clickhouse+http://username:password@host:8124/default").
            For more information, see `ClickHouse Connection String doc
            <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
        columns: Optional List of columns to select from the data source.
            If no columns are specified, all columns will be selected by default.
        filter: Optional SQL filter string that will be used in the
            WHERE statement (e.g., "label = 2 AND text IS NOT NULL").
            The filter must be valid for use in a ClickHouse SQL WHERE clause.
            Note: Parallel reads are not currently supported when a filter is set.
            Specifying a filter forces the parallelism to 1 to ensure deterministic
            and consistent results. For more information, see
            `ClickHouse SQL WHERE Clause doc
            <https://clickhouse.com/docs/en/sql-reference/statements/select/where>`_.
        order_by: Optional Tuple containing a list of columns to order by
            and a boolean indicating the order. Note: order_by is required to
            support parallelism.
        client_settings: Optional ClickHouse server settings to be used with the
            session/every request. For more information, see
            `ClickHouse Client Settings doc
            <https://clickhouse.com/docs/en/integrations/python#settings-argument>`_.
        client_kwargs: Optional Additional keyword arguments to pass to the
            ClickHouse client. For more information,
            see `ClickHouse Core Settings doc
            <https://clickhouse.com/docs/en/integrations/python#additional-options>`_.
    d   2   z#SELECT {select_clause} FROM {table}z3EXPLAIN SELECT 1 FROM {table} WHERE {filter_clause}z2SELECT SUM(byteSize(*)) AS estimate FROM ({query})z*SELECT COUNT(*) AS estimate FROM ({query})z{query} LIMIT {limit_row_count}zT
        {query}
        FETCH FIRST {fetch_row_count} {fetch_row_or_rows} ONLY
    z
        {query}
        OFFSET {offset_row_count} {offset_row_or_rows}
        FETCH NEXT {fetch_row_count} {fetch_row_or_rows} ONLY
    Ntabledsncolumnsfilterorder_byclient_settingsclient_kwargsc                 C   s@   || _ || _|| _|| _|| _|pi | _|pi | _|  | _d S N)	_table_dsn_columns_filter	_order_by_client_settings_client_kwargs_generate_query_query)selfr   r   r    r!   r"   r#   r$   r   r   r   __init__Y   s   


zClickHouseDatasource.__init__c                 C   s:   t | ddd dd l}|jd| j| jpi d| jpi S )Nclickhouse_connectzclickhouse-connect)modulepackager   )r   settingsr   )r	   r1   
get_clientr'   r+   r,   )r/   r1   r   r   r   _init_clientl   s   z!ClickHouseDatasource._init_clientc              
   C   s   | j sd S t| j std| j  |  }z0z| jj| j| j d}|| W n ty? } ztd| j  d| d }~ww W |	  d S |	  w )Nz9Invalid characters outside of string literals in filter: )r   filter_clausezInvalid filter expression: z	. Error: )
r)   r   
ValueErrorr6   _EXPLAIN_FILTERS_QUERYformatr&   query	Exceptionclose)r/   client
test_queryer   r   r   _validate_filterv   s.   
z%ClickHouseDatasource._validate_filterr   c                 C   s   | j j| jrd| jnd| jd}| jr!|   |d| j 7 }| jrW| j\}}|r-dnd}t|dkrB|d|d	  | 7 }|S t|dkrWd|}|d
| d| 7 }|S )Nz, *)select_clauser   z WHERE z DESC    z
 ORDER BY r   z ORDER BY ())	_BASE_QUERYr:   r(   joinr&   r)   rA   r*   len)r/   r;   r    desc	directioncolumns_clauser   r   r   r-      s"   

z$ClickHouseDatasource._generate_querylimit_row_countoffset_row_countc                 C   s`   |dkr| j j| j||dkrddS ddS | jj| j||dkr"dnd||dkr,ddS ddS )Nr   rE   ROWSROWr;   fetch_row_countfetch_row_or_rows)r;   rN   offset_row_or_rowsrR   rS   )_FIRST_BLOCK_QUERYr:   r.   _NEXT_BLOCK_QUERY)r/   rM   rN   r   r   r   _build_block_query   s"   

z'ClickHouseDatasource._build_block_queryr;   c                    s   dt t f fdd}|S )Nr   c                      s     gS r%   )_execute_block_queryr   r;   r/   r   r   read_fn      z5ClickHouseDatasource._create_read_fn.<locals>.read_fn)r   r
   )r/   r;   rZ   r   rY   r   _create_read_fn   s   z$ClickHouseDatasource._create_read_fnc                 C   sz   | j d ur| jj| j| j| jdkrdndd}n
| jj| j| jd}t| |}t	
| |  }| }||fS )NrE   rO   rP   rQ   )r;   rM   )r*   rU   r:   r.   NUM_SAMPLE_ROWS_SAMPLE_BLOCK_QUERYr   	for_blockrX   mathceil
size_bytesnum_rowsschema)r/   r;   sample_block_accessorestimated_size_bytes_per_rowsample_block_schemar   r   r   _get_sampled_estimates   s$   
z+ClickHouseDatasource._get_sampled_estimatesc                 C      |  | jS r%   )_execute_estimate_query_COUNT_ESTIMATE_QUERYr/   r   r   r   _get_estimate_count   r[   z(ClickHouseDatasource._get_estimate_countc                 C   ri   r%   )rj   _SIZE_ESTIMATE_QUERYrl   r   r   r   _get_estimate_size   r[   z'ClickHouseDatasource._get_estimate_sizeestimate_queryc              
   C   s   |   }zWz.|j| jd}||}|r2t|jdkr2|jd d }|d ur*t|nd W W |  S W n tyM } zt	
d|  W Y d }~nd }~ww W |  d S W |  d S |  w )N)r;   r   z"Failed to execute estimate query: )r6   r:   r.   r;   rI   result_rowsintr=   r<   loggerwarning)r/   rp   r>   r;   resultestimater@   r   r   r   rj      s(   


z,ClickHouseDatasource._execute_estimate_queryc              
   C   s   dd l }|  }z9z%||}t|}W d    n1 sw   Y  |j|W W |  S  tyA } ztd| d }~ww |  w )Nr   zFailed to execute block query: )	pyarrowr6   query_arrow_streamlistTablefrom_batchesr=   r<   RuntimeError)r/   r;   par>   streamrecord_batchesr@   r   r   r   rX      s   


z)ClickHouseDatasource._execute_block_queryc                 C   s   |   S )z
        Estimate the in-memory data size for the query.

        Returns:
            Estimated in-memory data size in bytes, or
            None if the estimation cannot be performed.
        )ro   rl   r   r   r   estimate_inmemory_data_size   s   z0ClickHouseDatasource.estimate_inmemory_data_sizeparallelismc           
         s    }|dks|du rg S t|t|j }jdur)|dkr)td d}jdu r9|dkr9td d}|| }|| }	 \ dt
dt
dtd	tf fd
d}|dkrc||ddgS g }d}t|D ]}|}	||k rw|	d7 }	|||	|d ||	7 }qk|S )a  
        Create read tasks for the ClickHouse query.

        Args:
            parallelism: The desired number of partitions to read the data into.
                - If ``order_by`` is not set, parallelism will be forced to 1.
                - If ``filter`` is set, parallelism will also be forced to 1
                  to ensure deterministic results.

        Returns:
            A list of read tasks to be executed.
        r   NrE   zwClickHouse datasource does not currently support parallel reads when a filter is set; falling back to parallelism of 1.zyClickHouse datasource requires dataset to be explicitly ordered to support parallelism; falling back to parallelism of 1.
block_rowsoffset_rowsparallelizedr   c                    s<   |r	 | |}nj}t|t|  |  d d ddS )N)rc   rb   input_files
exec_stats)rd   )rW   r.   r   r\   r   )r   r   r   r;   rf   rg   r/   r   r   _get_read_task6  s   z;ClickHouseDatasource.get_read_tasks.<locals>._get_read_taskFT)rm   minr`   ra   MIN_ROWS_PER_READ_TASKr)   rs   rt   r*   rh   rr   boolr   rangeappend)
r/   r   num_rows_totalnum_rows_per_blocknum_blocks_with_extra_rowr   
read_tasksoffsetithis_block_sizer   r   r   get_read_tasks  sT   
z#ClickHouseDatasource.get_read_tasks)NNNNN)&__name__
__module____qualname____doc__r]   r   rG   r9   rn   rk   r^   rU   rV   strr   r   r   r   r   r   r0   r6   rA   r-   rr   rW   r   r   r
   r\   rh   rm   ro   rj   rX   r   r   r   r   r   r   r   r   %   s^    !




	
r   )loggingr`   typingr   r   r   r   r   r   r   ray.data._internal.utilr	   ray.data.blockr
   r   r   ray.data.datasource.datasourcer   r   ray.util.annotationsr   	getLoggerr   rs   r   r   r   r   r   r   r   r   <module>   s    $
