o
    biE.                     @   s6  d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZ d dlmZ erTd dlZdZdZejdej dej!dej"diZ#dZ$h dZ%dZ&e 'e(Z)de
e* de
e* de
e* de+e
e* e
e* e
e* f fddZ,G dd deZ-dS )    N)Path)TYPE_CHECKINGAnyCallableDictIterableListOptional)TaskContext)WRITE_UUID_KWARG_NAME)SaveMode)call_with_retry)BlockBlockAccessor)_resolve_kwargs)_FileDatasink)FilenameProvider
       overwrite_or_ignoredelete_matchingerrorparquet>   pathbuffermetadatai   row_group_sizemin_rows_per_filemax_rows_per_filereturnc                 C   s   | du r|du r|du rdS | du r0|||}}}|dur+|du r+|t kr+||}}|||fS | durA|du s<|du rA| | |fS t|t| |}|||fS )z
    Configure `min_rows_per_group`, `max_rows_per_group`, `max_rows_per_file` parameters of Pyarrow's `write_dataset` API based on Ray Data's configuration

    Returns
    -------
    (min_rows_per_group, max_rows_per_group, max_rows_per_file)
    N)NNN) ARROW_DEFAULT_MAX_ROWS_PER_GROUPmaxmin)r   r   r   min_rows_per_groupmax_rows_per_groupclamped_group_size r&   b/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/parquet_datasink.pychoose_row_group_limits*   s.   





r(   c                       s&  e Zd Zddddddddddejddedeee  deeg e	ee
f f  dee	ee
f  dee d	ee d
ed dedee	ee
f  dee dee def fddZdee deddfddZdededefddZded deddded e	ee
f ddfd!d"Zedee fd#d$Z  ZS )%ParquetDatasinkNT)partition_colsarrow_parquet_args_fnarrow_parquet_argsr   r   
filesystemtry_create_diropen_stream_argsfilename_providerdataset_uuidmoder   r*   r+   r,   r   r   r-   zpyarrow.fs.FileSystemr.   r/   r0   r1   r2   c             
      s   |d u rdd }|d u ri }|| _ || _|| _|| _|| _| jd ur1| jd ur1| j| jks1J d|	d urQtt|	 }|rFt	
d| d|	v rQ|	d | jd< t j||||	|
|t|d d S )Nc                   S   s   i S Nr&   r&   r&   r&   r'   <lambda>z   s    z*ParquetDatasink.__init__.<locals>.<lambda>zAmin_rows_per_file must be less than or equal to max_rows_per_filezopen_stream_args contains unsupported arguments: %s. These arguments are not supported by ParquetDatasink. They will be ignored.compression)r-   r.   r/   r0   r1   file_formatr2   )r+   r,   r   r   r*   UNSUPPORTED_OPEN_STREAM_ARGSintersectionsetkeysloggerwarningsuper__init__FILE_FORMAT)selfr   r*   r+   r,   r   r   r-   r.   r/   r0   r1   r2   intersecting_keys	__class__r&   r'   r>   i   sD   

zParquetDatasink.__init__blocksctxr   c                    s   dd l t  tdd  D rd S dd  D  j d jt jdtj	fi j
dd  fdd}td	 d
j d t|d dj djjttd d S )Nr   c                 s   s"    | ]}t | d kV  qdS )r   Nr   	for_blocknum_rows.0blockr&   r&   r'   	<genexpr>   s     z(ParquetDatasink.write.<locals>.<genexpr>c                 S   s"   g | ]}t | d kr|qS )r   rF   rI   r&   r&   r'   
<listcomp>   s    z)ParquetDatasink.write.<locals>.<listcomp>schemac                     sL   dd  D } d u r dd | D }n}| |jt  d S )Nc                 S   s   g | ]	}t | qS r&   )r   rG   to_arrowrI   r&   r&   r'   rM      s    zGParquetDatasink.write.<locals>.write_blocks_to_path.<locals>.<listcomp>c                 S   s   g | ]}|j qS r&   )rN   )rJ   tabler&   r&   r'   rM      s    )unify_schemas_write_parquet_fileskwargsr   )tablesoutput_schemarD   rE   filenamepar@   user_schemawrite_kwargsr&   r'   write_blocks_to_path   s   z3ParquetDatasink.write.<locals>.write_blocks_to_pathzWriting z	 file to .zwrite 'z' to '')descriptionmatchmax_attemptsmax_backoff_s)pyarrowlistallr0   get_filename_for_blockrS   r   task_idxr   r+   r,   popr;   debugr   r   _data_contextretried_io_errorsWRITE_FILE_MAX_ATTEMPTS$WRITE_FILE_RETRY_MAX_BACKOFF_SECONDS)r@   rD   rE   r[   r&   rV   r'   write   s2   
zParquetDatasink.writerW   
write_uuidc                 C   s   ||vr| j tjkrtd| d| dd|v r*t|vr&| dt }|S |}|S t|vr7| dt }|S td t|}|j}d|vsKJ d|j	}| d	| }|S )
NzWrite UUID 'z"' missing from filename template 'z'. This could result in files being overwritten.Modify your FileNameProvider implementation to include the `write_uuid` into the filename template or change your write mode to SaveMode.OVERWRITE. z{i}r\   z-{i}.a  FilenameProvider have to provide proper filename template including '{{i}}' macro to ensure unique filenames when writing multiple files. Appending '{{i}}' macro to the end of the file. For more details on the expected filename template checkout PyArrow's `write_to_dataset` APIz!Filename should not contain a dotz-{i})
r2   r   APPEND
ValueErrorr?   r;   r<   r   stemsuffix)r@   rW   rn   basename_templatefilename_pathrq   rr   r&   r&   r'   _get_basename_template   s,   z&ParquetDatasink._get_basename_templaterT   zpyarrow.TablerU   zpyarrow.SchemarZ   c                 C   s   dd l m} t|D ]\}}|r|j|s||}|||< q
|dd }	t| j	d}
t
|	| j| jd\}}}| ||}|j|| j||| j| jt|
dd|||| jdi |d d S )	Nr   r   r   )r   r   hiveT)database_dirrN   rs   r-   partitioningformatexisting_data_behaviorpartitioning_flavoruse_threadsr#   r$   r   file_optionsr&   )pyarrow.datasetdataset	enumeraterN   equalscastrg   EXISTING_DATA_BEHAVIOR_MAPgetr2   r(   r   r   ru   write_datasetr   r-   r*   r?   ParquetFileFormatmake_write_options)r@   rT   rW   rU   rn   rZ   dsidxrP   r   r{   r#   r$   r   rs   r&   r&   r'   rR      sF   


z$ParquetDatasink._write_parquet_filesc                 C   s   | j S r3   )r   )r@   r&   r&   r'   min_rows_per_write*  s   z"ParquetDatasink.min_rows_per_write)__name__
__module____qualname__r   ro   strr	   r   r   r   r   intboolr   r>   r   r   r
   rm   ru   rR   propertyr   __classcell__r&   r&   rB   r'   r)   h   sx    
	
:
1#

3r)   ).loggingpathlibr   typingr   r   r   r   r   r   r	   'ray.data._internal.execution.interfacesr
   (ray.data._internal.planner.plan_write_opr   ray.data._internal.savemoder   ray.data._internal.utilr   ray.data.blockr   r   )ray.data.datasource.file_based_datasourcer   !ray.data.datasource.file_datasinkr   %ray.data.datasource.filename_providerr   rb   rk   rl   ro   	OVERWRITEIGNOREERRORr   r?   r7   r    	getLoggerr   r;   r   tupler(   r)   r&   r&   r&   r'   <module>   sD    $

>