o
    biy                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZ d dlmZm Z  d dl!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z*m+Z+ d dl,m-Z- d dl.m/Z/m0Z0m1Z1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7 erd dl8Z8d dl9m:Z: e ;e<Z=dZ>dZ?dZ@dZAdZBdZCdZDdZEdZFeddG dd dZGG dd  d ZHd!e
eH d"e
d# fd$d%ZId&d' ZJG d(d) d)e$ZKd!e
eH d*eLd+e0d"e	d, fd-d.ZMd/d0 ZNd1eHd"eGfd2d3ZOd4e
eG d"ePfd5d6ZQd4e
eG d"eRfd7d8ZSd9d: ZTdd;d"e
eG fd<d=ZUd>eeVe/f d?d,d"d,fd@dAZWd+e0dBdCdDdEd"dCfdFdGZXdHe
eV fdIdJZY	"	CdRdKdLZZdMe
eV dNdOd+ee0 d"ee
eV e
eV f fdPdQZ[dS )S    N)	dataclass)
TYPE_CHECKINGAnyCallableDictIteratorListLiteralOptionalTupleUnion)ProgressBar)cached_remote_fn)RetryingPyFileSystem_check_pyarrow_version_is_local_schemecall_with_retryiterate_with_retry)BlockBlockAccessor)DataContext)
Datasource)ReadTask)FileShuffleConfig)DefaultFileMetadataProvider_handle_read_os_error)ParquetMetadataProvider)PartitionDataTypePartitioningPathPartitionFilterPathPartitionParser)_has_file_extension_resolve_paths_and_filesystem)log_once)ParquetFileFragmentg      ?i'           g{Gz?   
   i   T)frozenc                   @   s&   e Zd ZU ee ed< ee ed< dS )_SampleInfoactual_bytes_per_rowestimated_bytes_per_rowN)__name__
__module____qualname__r
   int__annotations__ r3   r3   d/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/parquet_datasource.pyr+   b   s   
 r+   c                   @   s    e Zd Zd	ddZd
ddZdS )SerializedFragmentfragr$   c                 C   s    t |j|j|j|jf| _d S N)cloudpickledumpsformatpath
filesystempartition_expression_data)selfr6   r3   r3   r4   __init__k   s   
zSerializedFragment.__init__returnc                 C   s*   dd l }t| j\}}}}||||S Nr   )
pyarrow.fsr8   loadsr>   make_fragment)r?   pyarrowfile_formatr;   r<   r=   r3   r3   r4   deserializep   s
   zSerializedFragment.deserializeN)r6   r$   )rA   r$   )r.   r/   r0   r@   rH   r3   r3   r3   r4   r5   j   s    
r5   serialized_fragmentsrA   z$pyarrow._dataset.ParquetFileFragmentc                 C   s   dd | D S )Nc                 S   s   g | ]}|  qS r3   )rH   .0pr3   r3   r4   
<listcomp>       z*_deserialize_fragments.<locals>.<listcomp>r3   rI   r3   r3   r4   _deserialize_fragments|   s   rP   c                 C   sL   ddl }t| j| jD ]\}}t||jr#t||jr#td| dqdS )aX  Check for the legacy tensor extension type and raise an error if found.

    Ray Data uses an extension type to represent tensors in Arrow tables. Previously,
    the extension type extended `PyExtensionType`. However, this base type can expose
    users to arbitrary code execution. To prevent this, we don't load the type by
    default.
    r   Nz,Ray Data couldn't infer the type of column 'aN  '. This might mean you're trying to read data written with an older version of Ray. Reading data written with older versions of Ray might expose you to arbitrary code execution. To try reading the data anyway, set `RAY_DATA_AUTOLOAD_PYEXTENSIONTYPE=1` on *all* nodes.To learn more, see https://github.com/ray-project/ray/issues/41314.)rF   zipnamestypes
isinstanceUnknownExtensionTypePyExtensionTypeRuntimeError)schemapanametyper3   r3   r4   check_for_legacy_tensor_type   s   
r\   c                   @   s  e Zd ZdZdgZdddddde deddddddeee	e f de
e	e  d	e
eeef  d
e
eeef  de
eegef  de
d de
eedf  dedede
e deed df dede
e	e  fddZde
e fddZdede	e fddZdd  Zedefd!d"ZdS )#ParquetDatasourceaj  Parquet datasource, for reading and writing Parquet files.

    The primary difference from ParquetBulkDatasource is that this uses
    PyArrow's `ParquetDataset` abstraction for dataset reads, and thus offers
    automatic Arrow dataset schema inference and row count collection at the
    cost of some potential performance and/or compatibility penalties.
    parquetNhiveF)columnsdataset_kwargsto_batch_kwargs
_block_udfr<   rX   meta_providerpartition_filterpartitioningshuffleinclude_pathsfile_extensionspathsr`   ra   rb   rc   r<   zpyarrow.fs.FileSystemrX   zpyarrow.lib.Schemard   re   rf   rg   filesrh   ri   c             
      s  t   t| | _| jstjjj rtdd | _| js.ddl	m
} |t  dd| _|| _t||\}| _tj| jt jd}|	d usL d urt }ttt||| \}}t|}|	d urh|	|} d uru fdd|D }t|t| }|rtd	t| d
 |d u ri }d|v rtdd |d< t|||}|}t||||
|}d\}}|d urt ||j!d |
\}}z%i }t"|d< | jr| j|d< nt j#|d< |j$|j!fi |pg | _%W n t&y } zt'|| W Y d }~nd }~ww |d u ri }dd |j!D | _(dd |j!D | _)|| _*|| _+|| _,|| _-|| _.|| _/|| _0d | _1|| _2|
| _3|dkr?t4j56 | _1nt7|t8rMt4j56|j9| _1t:| j(||| j/| jd}t;|| _<t=|| _> d u r| j)D ]}t?|| j@stAdrtB| j@  d S qkd S d S )NzBecause you're using Ray Client, read tasks scheduled on the Ray cluster can't access your local files. To fix this issue, store files in cloud storage or a distributed filesystem like NFS.r   )NodeAffinitySchedulingStrategyF)soft)retryable_errorsc                    s   g | ]	}t | r|qS r3   )r!   )rK   r;   ri   r3   r4   rM      s
    
z.ParquetDatasource.__init__.<locals>.<listcomp>zFiltered out z pathsrf   zuThe 'partitioning' parameter isn't supported in 'dataset_kwargs'. Use the top-level 'partitioning' parameter instead.)NNnum_cpusscheduling_strategyc                 S   s   g | ]}t |qS r3   )r5   rJ   r3   r3   r4   rM   '  rN   c                 S   s   g | ]}|j qS r3   )r;   rJ   r3   r3   r4   rM   (  s    rk   )to_batches_kwargsr`   rX   local_scheduling+read_parquet_file_extensions_future_warning)Cr   r   _supports_distributed_readsrayutilclientis_connected
ValueError_local_schedulingray.util.scheduling_strategiesrl   get_runtime_contextget_node_id_unresolved_pathsr"   _filesystemr   wrapr   get_currentretried_io_errorsr   maplistrQ   expand_pathssetloggerinfolenget_parquet_dataset_infer_schema!_infer_data_and_partition_columns	fragmentsNUM_CPUS_FOR_META_FETCH_TASKrq   prefetch_file_metadata	_metadataOSErrorr   _pq_fragments	_pq_paths_meta_providerrc   _to_batches_kwargs_data_columns_partition_columns_read_schema_inferred_schema_file_metadata_shuffler_include_paths_partitioningnprandomdefault_rngrT   r   seedsample_fragmentsestimate_files_encoding_ratio_encoding_ratio%estimate_default_read_batch_size_rows_default_read_batch_size_rowsr!   _FUTURE_FILE_EXTENSIONSr#   #emit_file_extensions_future_warning)r?   rj   r`   ra   rb   rc   r<   rX   rd   re   rf   rg   rh   ri   rl   default_meta_providerexpanded_paths_filtered_pathspq_dsread_schemainferred_schemadata_columnspartition_columnsprefetch_remote_argsesample_infosr;   r3   ro   r4   r@      s   
	
	





zParquetDatasource.__init__rA   c                 C   s$   d}| j D ]}||j7 }q|| j S rB   )r   total_byte_sizer   )r?   
total_sizefile_metadatar3   r3   r4   estimate_inmemory_data_sizeL  s   

z-ParquetDatasource.estimate_inmemory_data_sizeparallelismc                    s  | j }t|t| jk r|d gt| jt|  7 }| jd urEtt| j| j|fdd| jtD }tttt| \}}}n
| j| j|}}}g }tt	
||t	
||t	
||D ]a\}}}	t|dkroqc| j|t||	d}
| jdd urd |
_|
jd urt|
j| j |
_| j| j| j| j| j| j| j| jf\ |t|f fdd	|
| jd qc|S )	Nc                       g | ]} | qS r3   r3   )rK   i)files_metadatar3   r4   rM   _      z4ParquetDatasource.get_read_tasks.<locals>.<listcomp>r   )num_fragmentsprefetched_metadatafilterc              
      s   t  | 	S r7   )read_fragments)f)	block_udfr   default_read_batch_size_rowsrh   r   rf   r   rr   r3   r4   <lambda>  s    z2ParquetDatasource.get_read_tasks.<locals>.<lambda>rX   )r   r   r   r   r   rQ   r   permutationr   r   array_splitr   r   getnum_rows
size_bytesr1   r   rc   r   r   r   r   r   r   appendr   r   )r?   r   pq_metadatashuffled_files_metadatapq_fragmentspq_paths
read_tasksr   rj   metadatametar3   )	r   r   r   r   rh   r   rf   r   rr   r4   get_read_tasksR  st   






z ParquetDatasource.get_read_tasksc                 C   s   dS )zuReturn a human-readable name for this datasource.

        This will be used as the names of the read tasks.
        Parquetr3   r?   r3   r3   r4   get_name  s   zParquetDatasource.get_namec                 C   s   | j S r7   )ru   r   r3   r3   r4   supports_distributed_reads  s   z,ParquetDatasource.supports_distributed_reads)r.   r/   r0   __doc__r   r   r   r   strr   r
   r   r   r   r   r[   r   r	   boolr@   r1   r   r   r   r   propertyr   r3   r3   r3   r4   r]      s`    
	


 (Yr]   rh   rf   zpyarrow.Tablec	                 #   s:   ddl m}	 t|}
t|
dksJ dd l}tdt|
 d ddd| |
D ]ji }|d urAt|}|j	}d urPfdd	|
 D } fd
d}tjj }t|d|jdD ]0}|jj|gd}|rt|dj	}|rt||}|jdkr| d ur| |V  qi|V  qiq0d S )Nr   )ArrowTensorTypezReading z parquet fragmentsuse_threadsF
batch_sizec                    s   i | ]\}}| v r||qS r3   r3   )rK   
field_namevalue)r   r3   r4   
<dictcomp>  s
    z"read_fragments.<locals>.<dictcomp>c                      s   j d dS )N)r   r`   rX   r   r3   )
to_batchesr3   )r   r   fragmentrX   rr   r   r3   r4   get_batch_iterable  s   z*read_fragments.<locals>.get_batch_iterablez
load batch)matchr   r;   )$ray.data.extensions.tensor_extensionr   !_deserialize_fragments_with_retryr   rF   r   debugpopr    r;   itemsrv   datar   r   r   r   Tablefrom_batchesr   	for_blockfill_column_add_partitions_to_tabler   )r   rr   r   r   r   rX   rI   rh   rf   r   r   rY   
partitionsparser   ctxbatchtabler3   )r   r   r   r   rX   rr   r   r4   r     sJ   





r   c                    s   t  fdddtdS )Nc                      s   t  S r7   )rP   r3   r   r3   r4   r     s    z3_deserialize_fragments_with_retry.<locals>.<lambda>zdeserialize fragments)descriptionmax_attempts)r   FILE_READING_RETRYr   r3   r   r4   r     s
   
r   file_fragmentc                 C   s   t |gd }|jdgd}tt|jjtd}| dd  |jd|||d| }zt	|}W n t
y@   td d d}Y |S w |jdkrj|j}	d}
t|	jD ]
}|
|	|j7 }
qPt|j|j |
|	j d}|S td d d}|S )Nr   )row_group_idsr'   r   )r`   rX   r   )r,   r-   r3   )r   subsetmaxminr   r   (PARQUET_ENCODING_RATIO_ESTIMATE_NUM_ROWSr   r   nextStopIterationr+   rangenum_row_groups	row_groupr   nbytes)rr   r`   rX   r   r   r   batchesr   sample_datar   r   idxr3   r3   r4   _sample_fragment  sF   

r
  r   c                 C   sP   t  jstS dtdtfdd}ttt	|| }t
d| d t|tS )zReturn an estimate of the Parquet files encoding ratio.

    To avoid OOMs, it is safer to return an over-estimate than an underestimate.
    sample_inforA   c                 S   s$   | j d u s
| jd u rtS | j | j S r7   )r,   r-   +PARQUET_ENCODING_RATIO_ESTIMATE_LOWER_BOUND)r  r3   r3   r4   compute_encoding_ratioE  s
   


z=estimate_files_encoding_ratio.<locals>.compute_encoding_ratioz2Estimated Parquet encoding ratio from sampling is .)r   r   decoding_size_estimation'PARQUET_ENCODING_RATIO_ESTIMATE_DEFAULTr+   floatr   meanr   r   r   r   r   r  )r   r  ratior3   r3   r4   r   =  s   

r   c                 C   s&   dt dtfdd}ttt|| S )Nr  rA   c                 S   s.   | j stS t jd }tdtt|| j  S )Nr)   r'   )r,   PARQUET_READER_ROW_BATCH_SIZEr   r   target_max_block_sizer   r   )r  'max_parquet_reader_row_batch_size_bytesr3   r3   r4   compute_batch_size_rowsV  s   zFestimate_default_read_batch_size_rows.<locals>.compute_batch_size_rows)r+   r1   r   r  r   r   )r   r  r3   r3   r4   r   U  s   r   c              
   C   sr   dd l m} t| dkr| d } z|j| fi |d|i}W |S  ty8 } zt||  W Y d }~|S d }~ww )Nr   r'   r<   )pyarrow.parquetr^   r   ParquetDatasetr   r   )rj   r<   ra   pqdatasetr   r3   r3   r4   r   k  s"   
r   )rs   c             	      s   t  }t|t }tt|}tt|}tt|||} fddtd|d |	t
 D }	tt}
g }|p=t j}|	D ]}||
j|tgd|||| q@tdt |dd}||}|  |S )	Nc                    r   r3   r3   )rK   r	  rO   r3   r4   rM     r   z$sample_fragments.<locals>.<listcomp>r   r'   )rq   retry_exceptionszParquet Files Samplefile)unit)r   r1   .PARQUET_ENCODING_RATIO_ESTIMATE_SAMPLING_RATIOr   /PARQUET_ENCODING_RATIO_ESTIMATE_MIN_NUM_SAMPLES/PARQUET_ENCODING_RATIO_ESTIMATE_MAX_NUM_SAMPLESr   r   linspaceastypetolistr   r
  r   r   rq   r   optionsr   remoter   fetch_until_completeclose)rI   rr   r`   rX   rs   	num_filesnum_samplesmin_num_samplesmax_num_samplesfile_samplessample_fragmentfutures
schedulingsample
sample_barr   r3   rO   r4   r     s8   



r   r   r   c                 C   s<   |   D ]\}}|j|}|dkrt|||}q|S )N)r   rX   get_field_indexr   r   r   )r   r   r   r   field_indexr3   r3   r4   r     s   r   rX   pyarrow.Schemaparquet_datasetzpyarrow.dataset.Datasetc           	      C   s   ddl }t|jdkr|S | du r|S |jd j}t| }||}|D ]"}|| jv r3|| j| }n| }||jvrE|	|
||}q#|S )zReturn a new schema with partition fields added.

    This function infers the partition fields from the first file path in the dataset.
    r   N)rF   r   r   r;   r    field_typesfrom_numpy_dtypestringrR   r   field)	rf   rX   r7  rY   
first_pathr   r   r   
field_typer3   r3   r4   _add_partition_fields_to_schema  s    	

r>  future_file_extensionsc                 C   s   t d|  dt d S )NzLThe default `file_extensions` for `read_parquet` will change from `None` to z after Ray 2.43, and your dataset contains files that don't match the new `file_extensions`. To maintain backwards compatibility, set `file_extensions=None` explicitly.)warningswarnFutureWarning)r?  r3   r3   r4   r     s   r   c                    s   ddl }| |du r| j t| |  |r#| fdd|D  j |durG  }z||j j W n tyF   tjddd Y nw t	   S )zBInfer the schema of read data using the user-specified parameters.r   Nc                    s   g | ]}  |qS r3   )r;  rK   columnr   r3   r4   rM     s    z!_infer_schema.<locals>.<listcomp>zdFailed to infer schema of dataset by passing dummy table through UDF due to the following exception:T)exc_info)
rF   rX   r>  r   empty_tablewith_metadata	Exceptionr   r   r\   )r7  rX   r`   rf   rc   rY   dummy_tabler3   rE  r4   r     s4   

r   user_specified_columnsr   r$   c                    sF    fdd| D }|durt |}| jfdd| D }||fS )aV  Infer which columns are in the files and which columns are partition columns.

    This function uses the schema and path of the first file to infer what columns
    represent.

    Args:
        user_specified_columns: A list of column names that the user specified.
        fragment: The first fragment in the dataset.
        partitioning: The partitioning scheme used to partition the data.

    Returns:
        A tuple of lists of column names. The first list contains the columns that are
        in the file, and the second list contains the columns that are partition
        columns.
    c                    s   g | ]
}| j jv r|qS r3   )physical_schemarR   rC  )r   r3   r4   rM   &  s
    z5_infer_data_and_partition_columns.<locals>.<listcomp>Nc                    s   g | ]}| v r|qS r3   r3   rC  )r   r3   r4   rM   .  s    )r    r;   )rK  r   rf   r   r   r   r3   )r   r   r4   r     s   


r   )rA   r6  )\loggingr@  dataclassesr   typingr   r   r   r   r   r   r	   r
   r   r   numpyr   rv   ray.cloudpickler8   ray.data._internal.progress_barr   ray.data._internal.remote_fnr   ray.data._internal.utilr   r   r   r   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasourcer   ray.data.datasource.datasourcer   )ray.data.datasource.file_based_datasourcer   &ray.data.datasource.file_meta_providerr   r   )ray.data.datasource.parquet_meta_providerr    ray.data.datasource.partitioningr   r   r   r    ray.data.datasource.path_utilr!   r"   ray.util.debugr#   rF   pyarrow.datasetr$   	getLoggerr.   r   r   r  r   r  r  r  r   r!  r   r+   r5   rP   r\   r]   r   r   r   r
  r  r   r1   r   r   r   r   r   r>  r   r   r   r3   r3   r3   r4   <module>   s    0

  	

H
/
3


#

&