o
    <iR                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZ dedee de
fdd	Zdedee defd
dZdejdejdee dejfddZdS )    N)Table)compute)AlwaysFalseBooleanExpressionEqualToInOrdf	join_colsreturnc                    s|   |    g }t dkrt d |d  S  fdd| D }t|dkr0t S t|dkr:|d S t| S )N   r   c                    s(   g | ] t tj fd dD qS )c                    s   g | ]	}t | | qS  )r   ).0colrowr   X/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/pyiceberg/table/upsert_util.py
<listcomp>(   s    z2create_match_filter.<locals>.<listcomp>.<listcomp>)	functoolsreduceoperatorand_)r   r
   r   r   r   '   s    z'create_match_filter.<locals>.<listcomp>)selectgroup_by	aggregatelenr   	to_pylistr   r   )r	   r
   unique_keysfiltersr   r   r   create_match_filter!   s   
r    c                 C   s4   t | ||g dfgtddkdkS )zFCheck for duplicate rows in a PyArrow table based on the join columns.	count_allr   r   )r   r   r   r   filterpcfield)r	   r
   r   r   r   has_duplicate_rows3   s   4r%   source_tabletarget_tablec                 C   sp  t | j}t |}t|| }t||rtdt|dkr#| j S d}d}||v s/||v r:t| d| dd| |j	|
|ttt| }|	|
|ttt|}	|j|	t|dd	}
g }t|
|  |
|  d
dD ]2\}}| |d}||d}|D ]}||d  }||d  }||kr||  nqqy|r| |S | j S )a	  
    Return a table with rows that need to be updated in the target table based on the join columns.

    The table is joined on the identifier columns, and then checked if there are any updated rows.
    Those are selected and everything is renamed correctly.
    z0Target table has duplicate rows, aborting upsertr   __source_index__target_indexz and zH are reserved for joining DataFrames, and cannot be used as column namesNinner)keys	join_typeT)strictr   )setcolumn_nameslistr%   
ValueErrorr   schemaempty_tablecastr   append_columnpaarrayrangejoinzipr   slicecolumnas_pyappendtake)r&   r'   r
   all_columnsjoin_cols_setnon_key_colsSOURCE_INDEX_COLUMN_NAMETARGET_INDEX_COLUMN_NAMEsource_indextarget_indexmatching_indicesto_update_indices
source_idx
target_idx
source_row
target_rowkey
source_val
target_valr   r   r   get_rows_to_update8   sP   



	 




rP   )r   r   pyarrowr6   r   pyarrow_tabler   r#   pyiceberg.expressionsr   r   r   r   r   r0   strr    boolr%   rP   r   r   r   r   <module>   s   	(