o
    `۷iW                     @   sj  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlZd dlmZmZmZmZmZmZmZ d dlmZmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z& d d	l'm(Z(m)Z)m*Z* d d
l+m,Z,m-Z- d dl.m/Z/ erd dl0Z1d dl2Z2e3e4Z5dZ6dZ7e/eG dd dZ8e/G dd de"Z9ded dee:ef ded fddZ;dddee:ef ddfddZ<dddee:ef ddfddZ=d6d!d"Z>ded# fd$d%Z?G d&d' d'Z@d(eg ee:ef f dee:ef fd)d*ZAd+eed, e8df ddfd-d.ZBed/ZCd0ee: d1eeC d2eed, e8df d3eDdeee: eeC f f
d4d5ZEdS )7    N)	dataclass)TYPE_CHECKINGAnyCallableDictIterableIteratorListLiteralOptionalTupleTypeVarUnion)RetryingContextManagerRetryingPyFileSystem_check_pyarrow_version_is_local_schemeinfer_compressioniterate_with_retrymake_async_gen)BlockBlockAccessor)DataContext)
DatasourceReadTask)BaseFileMetadataProviderDefaultFileMetadataProvider)PartitioningPathPartitionFilterPathPartitionParser)_has_file_extension_resolve_paths_and_filesystem)DeveloperAPI   c                   @   sN   e Zd ZU dZdZee ed< dZe	ed< dd Z
dd	ed
ee fddZdS )FileShuffleConfiga\  Configuration for file shuffling.

    This configuration object controls how files are shuffled while reading file-based
    datasets. The random seed behavior is determined by the combination of ``seed``
    and ``reseed_after_execution``:

    - If ``seed`` is None, the random seed is always None (non-deterministic shuffling).
    - If ``seed`` is not None and ``reseed_after_execution`` is False, the random seed is
      constantly ``seed`` across executions.
    - If ``seed`` is not None and ``reseed_after_execution`` is True, the random seed is
      different for each execution.

    .. note::
        Even if you provided a seed, you might still observe a non-deterministic row
        order. This is because tasks are executed in parallel and their completion
        order might vary. If you need to preserve the order of rows, set
        ``DataContext.get_current().execution_options.preserve_order``.

    Args:
        seed: An optional integer seed for the file shuffler. If None, shuffling is
            non-deterministic. If provided, shuffling is deterministic based on this
            seed and the ``reseed_after_execution`` setting.
        reseed_after_execution: If True, the random seed considers both ``seed`` and
            ``execution_idx``, resulting in different shuffling orders across executions.
            If False, the random seed is constantly ``seed``, resulting in the same
            shuffling order across executions. Only takes effect when ``seed`` is not None.
            Defaults to True.

    Example:
        >>> import ray
        >>> from ray.data import FileShuffleConfig
        >>> # Fixed seed - same shuffle across executions
        >>> shuffle = FileShuffleConfig(seed=42, reseed_after_execution=False)
        >>> ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea", shuffle=shuffle)
        >>>
        >>> # Seed with reseed_after_execution - different shuffle per execution
        >>> shuffle = FileShuffleConfig(seed=42, reseed_after_execution=True)
        >>> ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea", shuffle=shuffle)
    NseedTreseed_after_executionc                 C   s&   | j durt| j tstddS dS )z2Ensure that the seed is either None or an integer.Nz Seed must be an integer or None.)r%   
isinstanceint
ValueErrorself r,   _/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py__post_init__n   s   zFileShuffleConfig.__post_init__r   execution_idxreturnc                 C   s,   | j d u rd S | jrt| j |fd S | j S )Nl        )r%   r&   hash)r+   r/   r,   r,   r-   get_seeds   s
   
zFileShuffleConfig.get_seed)r   )__name__
__module____qualname____doc__r%   r   r(   __annotations__r&   boolr.   r2   r,   r,   r,   r-   r$   @   s   
 (r$   c                       s  e Zd ZU dZdZdZeeee	e f  e
d< dZddde ddddddd
deee	e f ded	 d
eeedf  deeeef  dededededeeed ef  dedee	e  f fddZede	e fddZde	e fddZde	e fddZdee fddZ		d;d ed!ee d"ed# de	e fd$d%Zd&ed'eeef dee fd(d)Z d'eeef dee fd*d+Z!	,	-		.		/d<d0d1Z"dd.d&edd-fd2d3Z#d4d5 Z$d6d-d&ede%e& fd7d8Z'edefd9d:Z(  Z)S )=FileBasedDatasourcezFile-based datasource for reading files.

    Don't use this class directly. Instead, subclass it and implement `_read_stream()`.
    FN_FILE_EXTENSIONSr   )

filesystemschemaopen_stream_argsmeta_providerpartition_filterpartitioningignore_missing_pathsshuffleinclude_pathsfile_extensionspathsr;   pyarrow.fs.FileSystemr<   zpyarrow.lib.Schemar=   r>   r?   r@   rA   rB   filesrC   rD   c       
      	      s  t    t  t| | _| jstjjj rt	d|| _
t | _|| _|| _|| _|| _|| _|
| _t|| _t||\}| _tj| j| jjd| _ttt|j|| j||d \}}|rlt|dkrlt	d| jd urt t||| |}fdd|D }t|dkrt	d d urt t|| fd	d|D }fd
d|D }t|dkrt	d  dt!|	 |	| _"t|| _#t|| _$d S )NzBecause you're using Ray Client, read tasks scheduled on the Ray cluster can't access your local files. To fix this issue, store files in cloud storage or a distributed filesystem like NFS.)retryable_errors)rA   r   zRNone of the provided paths exist. The 'ignore_missing_paths' field is set to True.c                       g | ]} | qS r,   r,   .0ppath_to_sizer,   r-   
<listcomp>       z0FileBasedDatasource.__init__.<locals>.<listcomp>z`No input files found to read. Please double check that 'partition_filter' field is set properly.c                    s   g | ]	}t | r|qS r,   )r    rJ   )rD   r,   r-   rO      s    c                    rI   r,   r,   rJ   rM   r,   r-   rO      rP   zANo input files found to read with the following file extensions: zC. Please double check that 'file_extensions' field is set properly.)%super__init__r   r   _supports_distributed_readsrayutilclientis_connectedr)   _schemar   get_current_data_context_open_stream_args_meta_provider_partition_filter_partitioning_ignore_missing_paths_include_pathsput_source_paths_refr!   _filesystemr   wrapretried_io_errorsmaplistzipexpand_pathslendict_validate_shuffle_arg_shuffle
_paths_ref_file_sizes_ref)r+   rE   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   
file_sizes	__class__)rD   rN   r-   rR      sr   




zFileBasedDatasource.__init__r0   c                 C      t | jS N)rT   getrb   r*   r,   r,   r-   _source_paths   s   z!FileBasedDatasource._source_pathsc                 C   rs   rt   )rT   ru   rn   r*   r,   r,   r-   _paths      zFileBasedDatasource._pathsc                 C   rs   rt   )rT   ru   ro   r*   r,   r,   r-   _file_sizes   rx   zFileBasedDatasource._file_sizesc                 C   s&   d}|   D ]
}|d ur||7 }q|S Nr   )ry   )r+   
total_sizeszr,   r,   r-   estimate_inmemory_data_size   s   z/FileBasedDatasource.estimate_inmemory_data_sizeparallelismper_task_row_limitdata_contextr   c                    s  dd l }jj } }|d ur|jnd}t||j|\}}tj	 d u r0i dt
t dt
t f fddfdd}t|t|}g }	|||}
|||}t|
|D ]'\}}t|dkrnqcj| |d}||j}t|||d	}|	| qc|	S )
Nr   
read_pathsr0   c              	   3   s    t }| D ]Yi }d urt}|}tj|fi jd2 t fdddjjdD ]}|r?t||}jrMt	
|}|d}|V  q6W d    n1 s[w   Y  qd S )N)contextc                      s     S rt   )_read_streamr,   )f	read_pathr+   r,   r-   <lambda>#  s    zHFileBasedDatasource.get_read_tasks.<locals>.read_files.<locals>.<lambda>zread stream iteratively)descriptionmatchpath)#_unwrap_s3_serialization_workaroundr   r   _open_input_sourcerZ   r   re   _add_partitionsr`   r   	for_blockfill_column)r   fs
partitionsparseblockblock_accessor)r;   r=   r@   r+   )r   r   r-   
read_files  s6   


z6FileBasedDatasource.get_read_tasks.<locals>.read_filesc                    s    fdd}|S )Nc                   3   s    j jjrd  dkr0t t tdt d  d tt ddE d H  d S tdt d E d H  d S )Nr   zReading z files with z	 threads.T)num_workerspreserve_orderingz files.)	rZ   execution_optionspreserve_orderminrj   loggerdebugr   iterr,   )num_threadsr   r   r+   r,   r-   read_task_fn/  s    
zUFileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn.<locals>.read_task_fnr,   )r   r   r   )r   r+   )r   r   r-   create_read_task_fn.  s   z?FileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn)rows_per_filerp   )r   )numpyr[   r^   rw   ry   _execution_idx_shuffle_file_metadatarm   !_wrap_s3_serialization_workaroundrc   r   strr   r   rj   array_splitrh   r\   _rows_per_file_NUM_THREADS_PER_TASKr   append)r+   r~   r   r   nprE   rp   r/   r   
read_taskssplit_pathssplit_file_sizesr   metar   	read_taskr,   )r;   r=   r@   r   r+   r-   get_read_tasks   sH   

z"FileBasedDatasource.get_read_tasksr   	open_argsc                 C   s    | dd}|du rt|}|S )a  Resolves the compression format for a stream.

        Args:
            path: The file path to resolve compression for.
            open_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_input_stream>`_
                when opening input files to read.

        Returns:
            The compression format (e.g., "gzip", "snappy", "bz2") or None if
            no compression is detected or specified.
        compressionN)ru   r   )r+   r   r   r   r,   r,   r-   resolve_compressione  s   z'FileBasedDatasource.resolve_compressionc                 C   s    | dd }|d u r| jj}|S )Nbuffer_size)poprZ   streaming_read_buffer_size)r+   r   r   r,   r,   r-   _resolve_buffer_sizey  s   z(FileBasedDatasource._resolve_buffer_sizefilepyarrow.NativeFiler   pyarrow.PythonFilec                 C   sj   dd l }dd l}ddlm} t }t| |r"|jj	||d n|j	||d |
d |j|ddS )Nr   )HadoopFileSystem)srcdstr)mode)pyarrowsnappy
pyarrow.fsr   ioBytesIOr'   unwraphadoop_snappystream_decompressseek
PythonFile)r+   r   r;   par   r   streamr,   r,   r-   _file_to_snappy_stream  s   
z*FileBasedDatasource._file_to_snappy_streamc                 K   sf   |  ||}| |}|dkr$d|d< |j|fd|i|}| ||S ||d< |j|fd|i|S )a  Opens a source path for reading and returns the associated Arrow NativeFile.

        The default implementation opens the source path as a sequential input stream,
        using self._data_context.streaming_read_buffer_size as the buffer size if none
        is given by the caller.

        Implementations that do not support streaming reads (e.g. that require random
        access) should override this method.
        r   Nr   r   )r   r   open_input_streamr   )r+   r;   r   r   r   r   r   r,   r,   r-   r     s   
z&FileBasedDatasource._open_input_sourcec                 C   s   dS )z8Returns the number of rows per file, or None if unknown.Nr,   r*   r,   r,   r-   r     s   z"FileBasedDatasource._rows_per_filer   c                 C   s   t d)z`Streaming read a single file.

        This method should be implemented by subclasses.
        z@Subclasses of FileBasedDatasource must implement _read_stream().)NotImplementedError)r+   r   r   r,   r,   r-   r     s   z FileBasedDatasource._read_streamc                 C      | j S rt   )rS   r*   r,   r,   r-   supports_distributed_reads  s   z.FileBasedDatasource.supports_distributed_reads)NN)r   r   r;   r   r0   r   )*r3   r4   r5   r6   _WRITE_FILE_PER_ROWr:   r   r   r   r	   r7   r   r   typer   r   r   r   r   r8   r
   r$   rR   propertyrv   rw   floatry   r(   r}   r   r   r   r   r   r   r   r   r   r   r   __classcell__r,   r,   rq   r-   r9   }   s   
 	

[

l



 	r9   data)pyarrow.Tablepd.DataFramer   r0   c                 C   sV   dd l }dd l}t| |j|jfsJ t| |jrt| |S t| |jr)t| |S d S rz   )pandasr   r'   Table	DataFrame_add_partitions_to_table_add_partitions_to_dataframe)r   r   pdr   r,   r,   r-   r     s   

r   tabler   c              	   C   s   dd l }dd lm} t| j}| D ]W\}}||gt|  }||v rd| j	|j
}||}|||| | }	|	 }	|	sVtd| d| d| |    d| j|}
| |
||} q| ||} q| S )Nr   Partition column , exists in table data, but partition value '$' is different from in-data values: .)r   pyarrow.computecomputesetcolumn_namesitemsarrayrj   r<   fieldr   castallequalas_pyr)   unique	to_pylistget_field_index
set_columnappend_column)r   r   r   pcr   r   valuecolumncolumn_typevalues_are_equalir,   r,   r-   r     s,   

r   dfr   c              
   C   s   dd l }| D ]E\}}|j|gt|  |d}|| v rI|| | j}| |  }| | | || sItd| d| dt	| | 
  d|| |< q| S )Nr   )r   namer   r   r   r   )r   r   Seriesrj   astypedtypenotnaequalsr)   rg   r   )r   r   r   r   r   r   maskr,   r,   r-   r     s    
r   r;   rF   c                 C   s@   dd l }dd l}| }t| tr|  }t||jjrt| S | S rz   )r   r   r'   r   r   r   S3FileSystem_S3FileSystemWrapper)r;   r   r   base_fsr,   r,   r-   r     s   
r   )rF   r  c                 C   s   t | tr	|  } | S rt   )r'   r  r   )r;   r,   r,   r-   r     s   
r   c                   @   s6   e Zd ZdZdddZdd Zedd	 Zd
d ZdS )r  ax  pyarrow.fs.S3FileSystem wrapper that can be deserialized safely.

    Importing pyarrow.fs during reconstruction triggers the pyarrow
    S3 subsystem initialization.

    NOTE: This is only needed for pyarrow<14.0.0 and should be removed
        once the minimum supported pyarrow version exceeds that.
        See https://github.com/apache/arrow/pull/38375 for context.
    r   rF   c                 C   s
   || _ d S rt   _fs)r+   r   r,   r,   r-   rR   *  s   
z_S3FileSystemWrapper.__init__c                 C   r   rt   r  r*   r,   r,   r-   r   -  s   z_S3FileSystemWrapper.unwrapc                 C   s   dd l }| || S rz   )r   )clsfs_reconstructfs_argsr   r,   r,   r-   _reconstruct0  s   z!_S3FileSystemWrapper._reconstructc                 C   s   t j| j fS rt   )r  r
  r  
__reduce__r*   r,   r,   r-   r  8  s   z_S3FileSystemWrapper.__reduce__N)r   rF   )	r3   r4   r5   r6   rR   r   classmethodr
  r  r,   r,   r,   r-   r    s    


r  	kwargs_fnc                 K   s   | r
|  }| | |S rt   )update)r  kwargskwarg_overridesr,   r,   r-   _resolve_kwargs<  s   
r  rB   rG   c                 C   s6   | d u s| dkst | tstd|  dd S d S d S )NrG   zInvalid value for 'shuffle': z6. Valid values are None, 'files', `FileShuffleConfig`.)r'   r$   r)   )rB   r,   r,   r-   rl   E  s
   
rl   FileMetadatarE   file_metadatashufflerr/   c                 C   s   |du r| |fS t | t |ksJ dt |  dt | dt | dkr)| |fS |dkr0d}nt|ts7J ||}tj|}tt| |}|	| tt
tt| S )z?Shuffle file paths and sizes together using the given shuffler.Nz2Number of paths and file metadata must match. Got z paths and z file metadata.r   rG   )rj   r'   r$   r2   r   randomdefault_rngrg   rh   rB   rf   )rE   r  r  r/   r%   file_metadata_shufflerfiles_metadatar,   r,   r-   r   T  s&   

r   )r;   rF   )Fr   loggingdataclassesr   typingr   r   r   r   r   r   r	   r
   r   r   r   r   r   r   rT   ray.data._internal.utilr   r   r   r   r   r   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasourcer   r   &ray.data.datasource.file_meta_providerr   r    ray.data.datasource.partitioningr   r   r   ray.data.datasource.path_utilr    r!   ray.util.annotationsr"   r   r   r   	getLoggerr3   r   )FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLDPATHS_PER_FILE_SIZE_FETCH_TASKr$   r9   r   r   r   r   r   r   r  r  rl   r  r(   r   r,   r,   r,   r-   <module>   s    8$	
;  G



 





	
