o
    biF)                     @   s  d dl mZmZmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZ er6d dlZd dlmZ dZd	Zd
dgZdZdZG dd dZeG dd deZded deee  dededed f
ddZded ded fddZded dee fdd ZdS )!    )TYPE_CHECKINGListOptionalN)call_with_retry)BlockMetadata)FileMetadataProvider_fetch_metadata_parallel)DeveloperAPISerializedFragment      zAWS Error ACCESS_DENIEDTimeout    @   c                   @   s(   e Zd ZdZd
ddZdefddZd	S )_ParquetFileFragmentMetaDataa'  Class to store metadata of a Parquet file fragment. This includes
    all attributes from `pyarrow.parquet.FileMetaData` except for `schema`,
    which is stored in `self.schema_pickled` as a pickled object from
    `cloudpickle.loads()`, used in deduplicating schemas across multiple fragments.fragment_metadatapyarrow.parquet.FileMetaDatac                 C   sj   |j | _ |j| _|j| _|j| _|j| _|j| _d | _d| _t|jD ]}|	|}|  j|j7  _q#d S )Nr   )

created_byformat_versionnum_columnsnum_row_groupsnum_rowsserialized_sizeschema_pickledtotal_byte_sizerange	row_group)selfr   row_group_idxrow_group_metadata r!   ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/datasource/parquet_meta_provider.py__init__%   s   
z%_ParquetFileFragmentMetaData.__init__r   c                 C   s
   || _ dS )zYNote: to get the underlying schema, use
        `cloudpickle.loads(self.schema_pickled)`.N)r   )r   r   r!   r!   r"   set_schema_pickled8   s   
z/_ParquetFileFragmentMetaData.set_schema_pickledN)r   r   )__name__
__module____qualname____doc__r#   bytesr$   r!   r!   r!   r"   r      s    
r   c                   @   sT   e Zd ZdZdee dedeed  defddZ	d	ed
 deee
  fddZdS )ParquetMetadataProviderz9Provides block metadata for Arrow Parquet file fragments.pathsnum_fragmentsprefetched_metadatar   returnc                C   sj   |dur+t ||kr+tdd |D r+ttdd |D tdd |D |dd}|S tdd|dd}|S )aD  Resolves and returns block metadata for files of a single dataset block.

        Args:
            paths: The file paths for a single dataset block.
            num_fragments: The number of Parquet file fragments derived from the input
                file paths.
            prefetched_metadata: Metadata previously returned from
                `prefetch_file_metadata()` for each file fragment, where
                `prefetched_metadata[i]` contains the metadata for `fragments[i]`.

        Returns:
            BlockMetadata aggregated across the given file paths.
        Nc                 s   s    | ]}|d uV  qd S Nr!   .0mr!   r!   r"   	<genexpr>Y   s    z>ParquetMetadataProvider._get_block_metadata.<locals>.<genexpr>c                 s       | ]}|j V  qd S r/   )r   r0   r!   r!   r"   r3   ^       c                 s   r4   r/   )r   r0   r!   r!   r"   r3   _   r5   )r   
size_bytesinput_files
exec_stats)lenallr   sum)r   r+   r,   r-   block_metadatar!   r!   r"   _get_block_metadataB   s$   z+ParquetMetadataProvider._get_block_metadata	fragments#pyarrow.dataset.ParquetFileFragmentc                    sb   ddl m  t|tkr) fdd|D }dd }tt||tfi |}t|S t|}t|S )al  Pre-fetches file metadata for all Parquet file fragments in a single batch.

        Subsets of the metadata returned will be provided as input to subsequent calls
        to ``_get_block_metadata`` together with their corresponding Parquet file
        fragments.

        Args:
            fragments: The Parquet file fragments to fetch metadata for.

        Returns:
            Metadata resolved for each input file fragment, or `None`. Metadata
            must be returned in the same order as all input file fragments, such
            that `metadata[i]` always contains the metadata for `fragments[i]`.
        r   r
   c                    s   g | ]} |qS r!   r!   )r1   fragmentr
   r!   r"   
<listcomp>   s    zBParquetMetadataProvider.prefetch_file_metadata.<locals>.<listcomp>c                 S   s   t | tttdS )N)retry_matchretry_max_attemptsretry_max_interval)%_fetch_metadata_serialization_wrapper$RETRY_EXCEPTIONS_FOR_META_FETCH_TASK&RETRY_MAX_ATTEMPTS_FOR_META_FETCH_TASK'RETRY_MAX_BACKOFF_S_FOR_META_FETCH_TASK)r>   r!   r!   r"   
fetch_func   s   zBParquetMetadataProvider.prefetch_file_metadata.<locals>.fetch_func)	0ray.data._internal.datasource.parquet_datasourcer   r9    PARALLELIZE_META_FETCH_THRESHOLDlistr   FRAGMENTS_PER_META_FETCH_fetch_metadata_dedupe_metadata)r   r>   ray_remote_argsrI   raw_metadatar!   r
   r"   prefetch_file_metadatan   s    	z.ParquetMetadataProvider.prefetch_file_metadataN)r%   r&   r'   r(   r   strintr   r   r=   r   rR   r!   r!   r!   r"   r*   >   s     

,
r*   r>   r   rB   rC   rD   r.   r   c              
      s`   ddl m} ||  zt fddd|||d}W |S  ty/ } z	td| d|d }~ww )	Nr   )!_deserialize_fragments_with_retryc                      s   t  S r/   )rN   r!   deserialized_fragmentsr!   r"   <lambda>   s    z7_fetch_metadata_serialization_wrapper.<locals>.<lambda>zfetch metdata)descriptionmatchmax_attemptsmax_backoff_sz%Exceeded maximum number of attempts (a,  ) to retry metadata fetching task. Metadata fetching tasks can fail due to transient errors like rate limiting.

To increase the maximum number of attempts, configure `RETRY_MAX_ATTEMPTS_FOR_META_FETCH_TASK`. For example:
```
ray.data._internal.datasource.parquet_datasource.RETRY_MAX_ATTEMPTS_FOR_META_FETCH_TASK = 64
```
To increase the maximum retry backoff interval, configure `RETRY_MAX_BACKOFF_S_FOR_META_FETCH_TASK`. For example:
```
ray.data._internal.datasource.parquet_datasource.RETRY_MAX_BACKOFF_S_FOR_META_FETCH_TASK = 128
```
If the error continues to occur, you can also try decresasing the concurency of metadata fetching tasks by setting `NUM_CPUS_FOR_META_FETCH_TASK` to a larger value. For example:
```
ray.data._internal.datasource.parquet_datasource.NUM_CPUS_FOR_META_FETCH_TASK = 4.
```
To change which exceptions to retry on, set `RETRY_EXCEPTIONS_FOR_META_FETCH_TASK` to a list of error messages. For example:
```
ray.data._internal.datasource.parquet_datasource.RETRY_EXCEPTIONS_FOR_META_FETCH_TASK = ["AWS Error ACCESS_DENIED", "Timeout"]
```)rJ   rU   r   OSErrorRuntimeError)r>   rB   rC   rD   rU   metadataer!   rV   r"   rE      s&   
$
rE   r?   c              	   C   s8   g }| D ]}z| |j W q ty   Y  |S w |S r/   )appendr_   AttributeError)r>   r   fr!   r!   r"   rN      s   rN   raw_metadatasc           	      C   s   i }i }g }| D ]7}t |}t|j }||vr,t|}|||< |||< || n||}|| }|| || q|S )a>  For datasets with a large number of columns, the FileMetaData
    (in particular the schema) can be very large. We can reduce the
    memory usage by only keeping unique schema objects across all
    file fragments. This method deduplicates the schemas and returns
    a list of `_ParquetFileFragmentMetaData` objects.)	r   cloudpickledumpsschemato_arrow_schemar9   r$   getra   )	rd   schema_to_idid_to_schemastripped_metadatasr   stripped_md
schema_ser	schema_idexisting_schema_serr!   r!   r"   rO      s    

rO   )typingr   r   r   ray.cloudpicklere   ray.data._internal.utilr   ray.data.blockr   &ray.data.datasource.file_meta_providerr   r   ray.util.annotationsr	   pyarrowrJ   r   rM   rK   rF   rG   rH   r   r*   rS   rT   rE   rN   rO   r!   r!   r!   r"   <module>   sL    `

3
