o
    i(%                     @   s  d dl Z d dlmZmZ d dlmZmZmZmZ d dl	Z	er*d dl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlmZ ddlm Z  dee dede!e	j" fddZ#G dd dZ$dgZ%dS )    N)IterableSized)TYPE_CHECKINGAnyNoReturnUnion)	DataFrame   Catalog   )	SparkConf)SparkContext)PySparkTypeErrorContributionsAcceptedError)RuntimeConfigDataFrameReaderDataStreamReader)
StructTypeUDFRegistrationdataschemareturnc                    sN   ddl m  g }| D ]} fddt|dd |D ddD }|| q
|S )Nr   Valuec                    s   g | ]
\}} ||j qS  )duckdb_type).0xdtyper   r   Y/home/ubuntu/.local/lib/python3.10/site-packages/duckdb/experimental/spark/sql/session.py
<listcomp>)   s    z,_combine_data_and_schema.<locals>.<listcomp>c                 S   s   g | ]}|j qS r   )dataType)r!   yr   r   r$   r%   )   s    F)strict)duckdbr   zipappend)r   r   new_datarownew_rowr   r   r$   _combine_data_and_schema$   s   &r/   c                   @   s  e Zd ZdeddfddZdeee df defdd	Z	ddd
e
e dB de
e dB defddZ			d<dedee f dee
e B dB dedB dedef
ddZd=ddZ			d>dededB dededB ddf
ddZdededefd d!Zd?d"d#Zd$edefd%d&Zd=d'd(Zed@d*d+Zedefd,d-Zedefd.d/Zedefd0d1Zedefd2d3Z ede!fd4d5Z"ede#fd6d7Z$edefd8d9Z%G d:d; d;Z&e& Z'dS )ASparkSessioncontextr   Nc                 C   s   |j | _|| _t| j| _d S N)
connectionconn_contextr   _confselfr1   r   r   r$   __init__/   s   zSparkSession.__init__r   PandasDataFramec                 C   s   zdd l }d}W n ty   d}Y nw |r7t||jr7dt  }| j|| t| jd| d| S dt	t
 dd fd	d
}t|t	sKt	|}|| dtdtfdd}||}dtdt	t	 fdd}||}	| jj||	d}
t|
| S )Nr   TFpyspark_pandas_df_zselect * from ""tuplesr   c                 S   sx   t | dkrd S t | d }t| dd  D ]#\}}t |}||kr#qtdd| d|d  t|t|ddd S )Nr	   r   LENGTH_SHOULD_BE_THE_SAMEr   )arg1arg2arg1_lengtharg2_lengtherror_classmessage_parameters)len	enumerater   str)r=   expected_lengthiitemactual_lengthr   r   r$   verify_tuple_integrity@   s    z>SparkSession._create_dataframe.<locals>.verify_tuple_integrityc                    sT   dt dtdtfdd t| d  fddt| D }d	|}d
| d}|S )Nr-   start_param_idxr   c                    s4   t | } fddt|D }dd| d }|S )Nc                    s   g | ]	}d |   qS )$r   )r!   r"   rN   r   r$   r%   Y   s    zjSparkSession._create_dataframe.<locals>.construct_query.<locals>.construct_values_list.<locals>.<listcomp>(, ))rF   rangejoin)r-   rN   parameter_count
parametersr   rP   r$   construct_values_listW   s   zVSparkSession._create_dataframe.<locals>.construct_query.<locals>.construct_values_listr   c                    s"   g | ]\}} |d |  qS )r	   r   )r!   rJ   r"   rX   row_sizer   r$   r%   ^   s   " zKSparkSession._create_dataframe.<locals>.construct_query.<locals>.<listcomp>rR   z'
                select * from (values z)
            )r   intrH   rF   rG   rU   )r=   values_listqueryr   rY   r$   construct_queryV   s   
z7SparkSession._create_dataframe.<locals>.construct_queryc                 S   s    g }| D ]	}| t| q|S r2   )extendlist)r=   rW   r-   r   r   r$   construct_parametersh   s   z<SparkSession._create_dataframe.<locals>.construct_parameters)params)pandasImportError
isinstancer   uuiduuid1r4   registersqlr`   tupler   rH   )r8   r   rc   
has_pandasunique_namerM   r^   r]   ra   rW   relr   r   r$   _create_dataframe4   s(   

zSparkSession._create_dataframetypesnamesc                 C   s*   |  |}|r|j| }|r|j| }|S r2   )rn   _cast_typestoDF)r8   r   ro   rp   dfr   r   r$   _createDataFrameFromPandass   s   


z'SparkSession._createDataFrameFromPandasTr   samplingRatioverifySchemac                 C   s  |rt |st d }d }t|trtdddid|r)t|tr'| \}}n|}zdd l}d}W n ty<   d}Y nw |rLt||jrL| |||S d}	|s^|r^d}	t	dd	 |D g}|rjt|trjt
||}| |}
|	r~|
j}|d
}t|| }
|r|
j| }
|r|
j| }
|
S )NSHOULD_NOT_DATAFRAMEarg_namer   rC   r   TFc                 s   s    | ]}d V  qd S r2   r   )r!   _r   r   r$   	<genexpr>   s    z/SparkSession.createDataFrame.<locals>.<genexpr>z1=0)NotImplementedErrorre   r   r   r   extract_types_and_namesrc   rd   rt   rj   r/   rn   relationfilterrq   rr   )r8   r   r   ru   rv   ro   rp   rc   rk   is_emptyrs   rm   r   r   r$   createDataFrame   sN   







zSparkSession.createDataFramec                 C   s
   t | jS r2   )r0   r5   r8   r   r   r$   
newSession   s   
zSparkSession.newSessionr	   startendstepnumPartitionsr   c                 C   s4   |rt |d u r|}d}t| jjd|||gd| S )Nr   rT   )rW   )r   r   r4   table_function)r8   r   r   r   r   r   r   r$   rT      s   zSparkSession.rangesqlQuerykwargsc                 K   s   |rt | j|}t|| S r2   )r{   r4   ri   r   )r8   r   r   r}   r   r   r$   ri      s   
zSparkSession.sqlc                 C   s   | j   d S r2   )r5   stopr   r   r   r$   r      s   zSparkSession.stop	tableNamec                 C   s   | j |}t|| S r2   )r4   tabler   )r8   r   r}   r   r   r$   r      s   
zSparkSession.tablec                 C      | S r2   r   r   r   r   r$   getActiveSession      zSparkSession.getActiveSessionr   c                 C   s&   t | dsddlm} || | _| jS )N_catalogr   r
   )hasattr%duckdb.experimental.spark.sql.catalogr   r   )r8   r   r   r   r$   catalog   s   

zSparkSession.catalogc                 C      | j S r2   )r6   r   r   r   r$   conf      zSparkSession.confc                 C      t | S r2   r   r   r   r   r$   read      zSparkSession.readc                 C   r   r2   r   r   r   r   r$   
readStream   r   zSparkSession.readStreamc                 C   r   r2   )r5   r   r   r   r$   sparkContext   r   zSparkSession.sparkContextc                 C   s   t r2   r   r   r   r   r$   streams      zSparkSession.streamsc                 C   r   r2   r   r   r   r   r$   udf   r   zSparkSession.udfc                 C   s   dS )Nz1.0.0r   r   r   r   r$   version  r   zSparkSession.versionc                	   @   s   e Zd ZdddZdedd fddZdedd fdd	Zd
edd fddZdddZ			ddedB de	dB de
dB dd fddZdddZdS )SparkSession.Builderr   Nc                 C   s   d S r2   r   r   r   r   r$   r9     r   zSparkSession.Builder.__init__namec                 C   r   r2   r   r8   r   r   r   r$   master
  r   zSparkSession.Builder.masterc                 C   r   r2   r   r   r   r   r$   appName  r   zSparkSession.Builder.appNameurlc                 C   r   r2   r   )r8   r   r   r   r$   remote  r   zSparkSession.Builder.remoter0   c                 C   s   t d}t|S )N__ignored__)r   r0   r7   r   r   r$   getOrCreate  s   z SparkSession.Builder.getOrCreatekeyvaluer   c                 C   r   r2   r   )r8   r   r   r   r   r   r$   config  s   zSparkSession.Builder.configc                 C   r   r2   r   r   r   r   r$   enableHiveSupport"  r   z&SparkSession.Builder.enableHiveSupportr   Nr   r0   )NNN)r   r   )__name__
__module____qualname__r9   rH   r   r   r   r   r   r   r   r   r   r   r   r$   Builder  s&    


r   )NNTr   )Nr	   Nr   )r   r   )(r   r   r   r   r9   r   r   r   r   rn   r`   rH   rt   r   floatboolr   r   r[   rT   ri   r   r   r   propertyr   r   r   r   r   r   r   r   r   r   r   r   r   r   builderr   r   r   r$   r0   .   s~    ?




?



 r0   )&rf   collections.abcr   r   typingr   r   r   r   r)   pandas.core.framer   r:   r   r   r   r   r1   r   errorsr   	exceptionr   r   	dataframe
readwriterr   	streamingr   ro   r   r   r   r`   r   r/   r0   __all__r   r   r   r$   <module>   s*     
 
|