o
    bi"D                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZ d d	lmZmZ e eZd
ZdZ dej!de"fddZ#dej$de"fddZ%eG dd dZ&eddG dd deZ'eG dd deZ(dS )    N)	dataclass)IntEnum)AnyDictIterableOptional)TaskContext)_check_import)BlockBlockAccessor)DatasinkWriteReturnType)DeveloperAPI	PublicAPI&   
   schemareturnc                 C   sf   t | dkrdS | D ]}t|jr|j  S q
| D ]}t|js-t|js-|j  S q| d jS )Nr   ztuple())lenpatis_timestamptypename	is_stringis_large_string)r   f r   e/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/clickhouse_datasink.py#_pick_best_arrow_field_for_order_by   s   


r   fieldc                 C   s   | j }t|r|jpt}|jpt}d| d| dS t|r"dS t|r)dS t	|r0dS t
|r7dS t|r>dS t|rEdS t|rLd	S t|rSd
S t|rZdS t|radS t|rhdS t|rodS t|rvdS dS )zAConvert a PyArrow field to an appropriate ClickHouse column type.zDecimal(z, )UInt8Int8Int16Int32Int64UInt16UInt32UInt64Float32Float64zDateTime64(3)String)r   r   
is_decimal	precisionDEFAULT_DECIMAL_PRECISIONscaleDEFAULT_DECIMAL_SCALE
is_booleanis_int8is_int16is_int32is_int64is_uint8	is_uint16	is_uint32	is_uint64
is_float16
is_float32
is_float64r   )r   tr-   r/   r   r   r   _arrow_to_clickhouse_type*   s@   















r>   c                   @   s^   e Zd ZU dZdZeed< dZee ed< dZ	ee ed< dZ
ee ed< dZee ed< dS )	ClickHouseTableSettingsa  
    Additional table creation instructions for ClickHouse.

    Attributes:
        engine: The engine definition for the created table. Defaults
            to "MergeTree()".
        order_by: The ORDER BY clause for the table.
        partition_by: The PARTITION BY clause for the table.
        primary_key: The PRIMARY KEY clause for the table.
        settings: Additional SETTINGS clause for the table
            (comma-separated or any valid string).
    zMergeTree()engineNorder_bypartition_byprimary_keysettings)__name__
__module____qualname____doc__r@   str__annotations__rA   r   rB   rC   rD   r   r   r   r   r?   N   s   
 r?   alpha)	stabilityc                   @   s   e Zd ZdZdZdZdZdS )SinkModea8  
    Enum of possible modes for sinking data

    Attributes:
        CREATE: Create a new table; fail if that table already exists.
        APPEND: Use an existing table if present, otherwise create one; then append data.
        OVERWRITE: Drop the table if it already exists, then re-create it and write.
             N)rE   rF   rG   rH   CREATEAPPEND	OVERWRITEr   r   r   r   rM   d   s
    
rM   c                   @   s  e Zd ZdZdZdZdZdZdZe	j
dddddfded	ed
e	deej deeeef  deeeef  dee dee ddfddZdd ZdejdefddZdefddZdee fddZedefddZd%ddZdefdd Zd!ee d"ede fd#d$Z!dS )&ClickHouseDatasinku%	  ClickHouse Ray Datasink.

    A Ray Datasink for writing data into ClickHouse, with support for distributed
    writes and mode-based table management (create, append, or overwrite).

    Args:
        table: Fully qualified table identifier (e.g., "default.my_table").
        dsn: A string in DSN (Data Source Name) HTTP format
            (e.g., "clickhouse+http://username:password@host:8123/default").
            For more information, see `ClickHouse Connection String doc
            <https://clickhouse.com/docs/en/integrations/sql-clients/cli#connection_string>`_.
        mode: One of SinkMode.CREATE, SinkMode.APPEND,
            or SinkMode.OVERWRITE.
            - **CREATE**: Create a new table; fail if that table already exists.
              Requires a user-supplied schema if the table doesn’t already exist.
            - **APPEND**: Use an existing table if present, otherwise create one.
              If the table does not exist, the user must supply a schema. Data
              is then appended to the table.
            - **OVERWRITE**: Drop the table if it exists, then re-create it.
              **Always requires** a user-supplied schema to define the new table.
        schema: An optional PyArrow schema object that, if provided, will
            override any inferred schema for table creation.
            - If you are creating a new table (CREATE or APPEND when the table
              doesn’t exist) or overwriting an existing table, you **must**
              provide a schema.
            - If you’re appending to an already-existing table, the schema is
              not strictly required unless you want to cast data or enforce
              column types. If omitted, the existing table definition is used.
        client_settings: Optional ClickHouse server settings to be used with the
            session/every request.
        client_kwargs: Additional keyword arguments to pass to the
            ClickHouse client.
        table_settings: An optional dataclass with additional table creation
            instructions (e.g., engine, order_by, partition_by, primary_key, settings).
        max_insert_block_rows: If you have extremely large blocks, specifying
            a limit here will chunk the insert into multiple smaller insert calls.
            Defaults to None (no chunking).
    
ClickHousez
        CREATE TABLE IF NOT EXISTS {table_name} (
            {columns}
        )
        ENGINE = {engine}
        ORDER BY {order_by}
        {additional_props}
    z!DROP TABLE IF EXISTS {table_name}zEXISTS {table_name}zSHOW CREATE TABLE {table_name}Ntabledsnmoder   client_settingsclient_kwargstable_settingsmax_insert_block_rowsr   c	           	      C   sH   || _ || _|| _|| _|pi | _|pi | _|pt | _|| _d| _	d S )NF)
_table_dsn_mode_schema_client_settings_client_kwargsr?   _table_settings_max_insert_block_rows_table_dropped)	selfrV   rW   rX   r   rY   rZ   r[   r\   r   r   r   __init__   s   


zClickHouseDatasink.__init__c                 C   s2   t | ddd dd l}|jd| j| jd| jS )Nclickhouse_connectclickhouse-connectmodulepackager   )rW   rD   r   )r	   rh   
get_clientr^   ra   rb   )rf   rh   r   r   r   _init_client   s   zClickHouseDatasink._init_clientc           
      C   s   | j j}| j jd ur| j j}nt|}g }| j jd ur%|d| j j  | j jd ur6|d| j j d | j jd urF|d| j j  d}|rQdd| }g }|D ]}t	|}|d|j
 d|  qUd	|}	| jj| j|	|||d
S )NzPARTITION BY zPRIMARY KEY (r    z	SETTINGS  
`z` z,
    )
table_namecolumnsr@   rA   additional_props)rc   r@   rA   r   rB   appendrC   rD   joinr>   r   _CREATE_TABLE_TEMPLATEformatr]   )
rf   r   r@   rA   additional_clausesrt   columns_defr   ch_typecolumns_strr   r   r   _generate_create_table_sql   s<   

z-ClickHouseDatasink._generate_create_table_sqlc           
      C   s8  z|| | jj| jd}|d u rW dS dD ]=}t| ddd ddlm} t||rRztt	|| W   W S  t
t|fyQ } ztd	|| W Y d }~qd }~ww qd
D ]$}t	||d }|ry|d }t|ttfrr|rp|d nd}t|  W S qUW dS  ty }	 ztd| j d|	  W Y d }	~	dS d }	~	ww )Nrr   F)scalar
first_itemfirst_valuerh   ri   rj   r   )Errorz(Helper %s failed: %s; will try fallbacks)result_rowsrowsdatazCould not verify if table z	 exists: )query_CHECK_TABLE_EXISTS_TEMPLATErx   r]   r	   $clickhouse_connect.driver.exceptionsr   hasattrboolgetattr	TypeError
ValueErrorloggerdebug
isinstancelisttuple	Exceptionwarning)
rf   clientresulthelperCHErrorexcattrr   firster   r   r   _table_exists   sF   
	z ClickHouseDatasink._table_existsc              
   C   s   t d| j  z&| jj| jd}||}t|}d}t||}|r-|	d
 W S W d S  tyN } zt d| j d|  W Y d }~d S d }~ww )Nz6Retrieving ORDER BY clause from SHOW CREATE TABLE for r~   z)(?is)\border\s+by\s+(.*?)(?=\bengine\b|$)rN   z)Could not retrieve SHOW CREATE TABLE for : )r   r   r]   _SHOW_CREATE_TABLE_TEMPLATErx   commandrI   researchgroupstripr   r   )rf   r   show_create_sqlr   ddl_strpatternmatchr   r   r   r   _get_existing_order_by  s*   

z)ClickHouseDatasink._get_existing_order_byc                 C   s   dS )NTr   rf   r   r   r   supports_distributed_writes+  s   z.ClickHouseDatasink.supports_distributed_writesc           
   
   C   sT  d }zz|   }| |}| jtjkp| jtjtjfv o| }|rD| jd u rD| jtjkr6td| j	 dtd| j	 d| jj
 d| jtjkr|rg| jjd u rg| |}|d urg|| j_td|  | jj| j	d}td|  || d	| _| | j}|| nm| jtjkr|rd| j	 d
}t| t|| | j}|| nI| jtjkr|r| |}| jj}|d ur|r||krtd| j	 d| d| dn|r|| j_td| j	 d|  n| | j}|| W n ty }	 ztd| j	 d|	  |	d }	~	ww W |r|  d S d S |r)|  w w )NzOverwriting table u#    requires a user‑provided schema.zTable z does not exist in mode='zG' and no schema was provided. Cannot create the table without a schema.z-Reusing old ORDER BY from overwritten table: r~   zMode=OVERWRITE => TzK already exists in mode='CREATE'. Use mode='APPEND' or 'OVERWRITE' instead.z+Conflict with order_by. The existing table z has ORDER BY z", but the user specified ORDER BY z?. Please drop or overwrite the table, or use the same ordering.z$Reusing existing ORDER BY for table r   z,Could not complete on_write_start for table )rn   r   r_   rM   rS   rQ   rR   r`   r   r]   r   rc   rA   r   r   info_DROP_TABLE_TEMPLATErx   r   re   r}   errorr   close)
rf   r   table_existsschema_requiredexisting_order_bydrop_sql
create_sqlmsguser_order_byr   r   r   r   on_write_start/  s   









z!ClickHouseDatasink.on_write_startc                 C   s   | j S )N)NAMEr   r   r   r   get_name  s   zClickHouseDatasink.get_nameblocksctxc              
   C   s(  |   }d}zze|D ]`}t| }|j}| jd ur#|j| jdd}| jd ur,| j}n|dkr2|nd}tt	d||}	|	
| t	t|	d D ] }
|	|
 }|	|
d  }|||| }|| j| ||j7 }qIq
W n ty } ztd| j d|  |d }~ww W |  |gS |  w )Nr   T)saferN   z"Failed to write block(s) to table r   )rn   r   	for_blockto_arrownum_rowsr`   castrd   r   rangeru   r   sliceinsert_arrowr]   r   r   r   r   )rf   r   r   r   total_insertedblockarrow_table	row_countmax_chunk_sizeoffsetsistartendchunkr   r   r   r   write  s<   



zClickHouseDatasink.write)r   N)"rE   rF   rG   rH   r   rw   r   r   r   rM   rQ   rI   r   pyarrowSchemar   r   r?   intrg   rn   r}   r   r   r   propertyr   r   r   r   r
   r   r   r   r   r   r   r   rT   y   sf    '	



$#
]rT   ))loggingr   dataclassesr   enumr   typingr   r   r   r   r   pyarrow.typestypesr   'ray.data._internal.execution.interfacesr   ray.data._internal.utilr	   ray.data.blockr
   r   ray.data.datasource.datasinkr   r   ray.util.annotationsr   r   	getLoggerrE   r   r.   r0   r   rI   r   Fieldr>   r?   rM   rT   r   r   r   r   <module>   s.    
$