o
    $i|M                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlmZmZmZmZmZmZmZ d dlmZmZ d dlmZ d dlm Z m!Z! d dl"m#Z#m$Z$ d d	l%m&Z&m'Z'm(Z( d d
l)m*Z*m+Z+ d dl,m-Z- erd dl.Z/d dl0Z0e1e2Z3dZ4dZ5e-eG dd dZ6e-G dd de Z7ded dee8ef ded fddZ9dddee8ef ddfddZ:dddee8ef ddfddZ;d/d!d"Z<ded# fd$d%Z=G d&d' d'Z>d(eg ee8ef f dee8ef fd)d*Z?d+eed, e6df ddfd-d.Z@dS )0    N)	dataclass)
TYPE_CHECKINGAnyCallableDictIterableIteratorListLiteralOptionalUnion)RetryingContextManagerRetryingPyFileSystem_check_pyarrow_version_is_local_schemeinfer_compressioniterate_with_retrymake_async_gen)BlockBlockAccessor)DataContext)
DatasourceReadTask)BaseFileMetadataProviderDefaultFileMetadataProvider)PartitioningPathPartitionFilterPathPartitionParser)_has_file_extension_resolve_paths_and_filesystem)DeveloperAPI   c                   @   s*   e Zd ZU dZdZee ed< dd ZdS )FileShuffleConfiga^  Configuration for file shuffling.

    This configuration object controls how files are shuffled while reading file-based
    datasets.

    .. note::
        Even if you provided a seed, you might still observe a non-deterministic row
        order. This is because tasks are executed in parallel and their completion
        order might vary. If you need to preserve the order of rows, set
        `DataContext.get_current().execution_options.preserve_order`.

    Args:
        seed: An optional integer seed for the file shuffler. If provided, Ray Data
            shuffles files deterministically based on this seed.

    Example:
        >>> import ray
        >>> from ray.data import FileShuffleConfig
        >>> shuffle = FileShuffleConfig(seed=42)
        >>> ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea", shuffle=shuffle)
    Nseedc                 C   s&   | j durt| j tstddS dS )z2Ensure that the seed is either None or an integer.Nz Seed must be an integer or None.)r#   
isinstanceint
ValueErrorself r)   f/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py__post_init__Y   s   zFileShuffleConfig.__post_init__)	__name__
__module____qualname____doc__r#   r   r%   __annotations__r+   r)   r)   r)   r*   r"   >   s   
 r"   c                       s  e Zd ZU dZdZdZeeee	e f  e
d< dZddde ddddddd
deee	e f ded	 d
eeedf  deeeef  dededededeeed ef  dedee	e  f fddZde	e fddZde	e fddZdee fddZ	d7dedee de	e fd d!Zd"ed#eeef dee fd$d%Zd#eeef dee fd&d'Z	(	)		*		+d8d,d-Z dd*d"edd)fd.d/Z!d0d1 Z"d2d)d"ede#e$ fd3d4Z%e&defd5d6Z'  Z(S )9FileBasedDatasourcezFile-based datasource for reading files.

    Don't use this class directly. Instead, subclass it and implement `_read_stream()`.
    FN_FILE_EXTENSIONSr   )

filesystemschemaopen_stream_argsmeta_providerpartition_filterpartitioningignore_missing_pathsshuffleinclude_pathsfile_extensionspathsr3   pyarrow.fs.FileSystemr4   zpyarrow.lib.Schemar5   r6   r7   r8   r9   r:   filesr;   r<   c       
      	      s  t    t  t| | _| jstjjj rt	d|| _
t | _|| _|| _|| _|| _|| _|
| _|| _t||\}| _tj| j| jjd| _ttt|j|| j||d \}}|rit|dkrit	d| jd urtt||| |}fdd|D }t|dkrt	d d urtt|| fd	d|D }fd
d|D }t|dkrt	d  dt |	 d | _!|	dkrt"j#$ | _!nt%|	t&rt"j#$|	j'| _!t(|| _)t(|| _*d S )NzBecause you're using Ray Client, read tasks scheduled on the Ray cluster can't access your local files. To fix this issue, store files in cloud storage or a distributed filesystem like NFS.)retryable_errors)r9   r   zRNone of the provided paths exist. The 'ignore_missing_paths' field is set to True.c                       g | ]} | qS r)   r)   .0ppath_to_sizer)   r*   
<listcomp>       z0FileBasedDatasource.__init__.<locals>.<listcomp>z`No input files found to read. Please double check that 'partition_filter' field is set properly.c                    s   g | ]	}t | r|qS r)   )r   rB   )r<   r)   r*   rG      s    c                    rA   r)   r)   rB   rE   r)   r*   rG      rH   zANo input files found to read with the following file extensions: zC. Please double check that 'file_extensions' field is set properly.r?   )+super__init__r   r   _supports_distributed_readsrayutilclientis_connectedr&   _schemar   get_current_data_context_open_stream_args_meta_provider_partition_filter_partitioning_ignore_missing_paths_include_paths_source_pathsr   _filesystemr   wrapretried_io_errorsmaplistzipexpand_pathslendict_validate_shuffle_arg_file_metadata_shufflernprandomdefault_rngr$   r"   r#   put
_paths_ref_file_sizes_ref)r(   r=   r3   r4   r5   r6   r7   r8   r9   r:   r;   r<   
file_sizes	__class__)r<   rF   r*   rJ   o   sz   





zFileBasedDatasource.__init__returnc                 C      t | jS N)rL   getri   r'   r)   r)   r*   _paths      zFileBasedDatasource._pathsc                 C   ro   rp   )rL   rq   rj   r'   r)   r)   r*   _file_sizes   rs   zFileBasedDatasource._file_sizesc                 C   s&   d}|   D ]
}|d ur||7 }q|S Nr   )rt   )r(   
total_sizeszr)   r)   r*   estimate_inmemory_data_size   s   z/FileBasedDatasource.estimate_inmemory_data_sizeparallelismper_task_row_limitc                    s@  dd l }jj } }jd ur8tt||  fddjt	 D }tt
tt| \}}tjd u rCi dtt dtt ffddfdd	}t|t	|}g }|||}	|||}
t|	|
D ]'\}}t	|dkrqvj| |d
}||j}t|||d}|| qv|S )Nr   c                    rA   r)   r)   )rC   i)files_metadatar)   r*   rG      s    z6FileBasedDatasource.get_read_tasks.<locals>.<listcomp>
read_pathsrn   c              	   3   s    t }| D ]Yi }d urt}|}tj|fi jd2 t fdddjjdD ]}|r?t||}jrMt	
|}|d}|V  q6W d    n1 s[w   Y  qd S )N)contextc                      s     S rp   )_read_streamr)   )f	read_pathr(   r)   r*   <lambda>  s    zHFileBasedDatasource.get_read_tasks.<locals>.read_files.<locals>.<lambda>zread stream iteratively)descriptionmatchpath)#_unwrap_s3_serialization_workaroundr   r   _open_input_sourcerR   r   r\   _add_partitionsrX   r   	for_blockfill_column)r}   fs
partitionsparseblockblock_accessor)r3   r5   r8   r(   )r   r   r*   
read_files   s6   


z6FileBasedDatasource.get_read_tasks.<locals>.read_filesc                    s    fdd}|S )Nc                   3   s    j jjrd  dkr0t t tdt d  d tt ddE d H  d S tdt d E d H  d S )Nr   zReading z files with z	 threads.T)num_workerspreserve_orderingz files.)	rR   execution_optionspreserve_orderminra   loggerdebugr   iterr)   )num_threadsr   r}   r(   r)   r*   read_task_fn  s    
zUFileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn.<locals>.read_task_fnr)   )r}   r   r   )r   r(   )r   r}   r*   create_read_task_fn  s   z?FileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn)rows_per_filerk   )rz   )numpyrS   rV   rr   rt   rd   r^   r_   permutationra   r]   !_wrap_s3_serialization_workaroundrZ   r   strr   r   array_splitrT   _rows_per_file_NUM_THREADS_PER_TASKr   append)r(   ry   rz   re   r=   rk   shuffled_files_metadatar   
read_taskssplit_pathssplit_file_sizesr}   metar   	read_taskr)   )r|   r3   r5   r8   r   r(   r*   get_read_tasks   sL   


z"FileBasedDatasource.get_read_tasksr   	open_argsc                 C   s    | dd}|du rt|}|S )a  Resolves the compression format for a stream.

        Args:
            path: The file path to resolve compression for.
            open_args: kwargs passed to
                `pyarrow.fs.FileSystem.open_input_stream <https://arrow.apache.org/docs/python/generated/pyarrow.fs.FileSystem.html#pyarrow.fs.FileSystem.open_input_stream>`_
                when opening input files to read.

        Returns:
            The compression format (e.g., "gzip", "snappy", "bz2") or None if
            no compression is detected or specified.
        compressionN)rq   r   )r(   r   r   r   r)   r)   r*   resolve_compressionF  s   z'FileBasedDatasource.resolve_compressionc                 C   s    | dd }|d u r| jj}|S )Nbuffer_size)poprR   streaming_read_buffer_size)r(   r   r   r)   r)   r*   _resolve_buffer_sizeZ  s   z(FileBasedDatasource._resolve_buffer_sizefilepyarrow.NativeFiler   pyarrow.PythonFilec                 C   sj   dd l }dd l}ddlm} t }t| |r"|jj	||d n|j	||d |
d |j|ddS )Nr   )HadoopFileSystem)srcdstr)mode)pyarrowsnappy
pyarrow.fsr   ioBytesIOr$   unwraphadoop_snappystream_decompressseek
PythonFile)r(   r   r3   par   r   streamr)   r)   r*   _file_to_snappy_stream`  s   
z*FileBasedDatasource._file_to_snappy_streamc                 K   sf   |  ||}| |}|dkr$d|d< |j|fd|i|}| ||S ||d< |j|fd|i|S )a  Opens a source path for reading and returns the associated Arrow NativeFile.

        The default implementation opens the source path as a sequential input stream,
        using self._data_context.streaming_read_buffer_size as the buffer size if none
        is given by the caller.

        Implementations that do not support streaming reads (e.g. that require random
        access) should override this method.
        r   Nr   r   )r   r   open_input_streamr   )r(   r3   r   r   r   r   r   r)   r)   r*   r   r  s   
z&FileBasedDatasource._open_input_sourcec                 C   s   dS )z8Returns the number of rows per file, or None if unknown.Nr)   r'   r)   r)   r*   r     s   z"FileBasedDatasource._rows_per_filer   c                 C   s   t d)z`Streaming read a single file.

        This method should be implemented by subclasses.
        z@Subclasses of FileBasedDatasource must implement _read_stream().)NotImplementedError)r(   r   r   r)   r)   r*   r     s   z FileBasedDatasource._read_streamc                 C      | j S rp   )rK   r'   r)   r)   r*   supports_distributed_reads  s   z.FileBasedDatasource.supports_distributed_readsrp   )r   r   r3   r   rn   r   ))r,   r-   r.   r/   _WRITE_FILE_PER_ROWr2   r   r   r   r	   r0   r   r   typer   r   r   r   r   boolr
   r"   rJ   rr   floatrt   r%   rx   r   r   r   r   r   r   r   r   r   r   propertyr   __classcell__r)   r)   rl   r*   r1   _   s   
 	

^
l



 	r1   data)pyarrow.Tablepd.DataFramer   rn   c                 C   sV   dd l }dd l}t| |j|jfsJ t| |jrt| |S t| |jr)t| |S d S ru   )pandasr   r$   Table	DataFrame_add_partitions_to_table_add_partitions_to_dataframe)r   r   pdr   r)   r)   r*   r     s   

r   tabler   c              	   C   s   dd l }dd lm} t| j}| D ]W\}}||gt|  }||v rd| j	|j
}||}|||| | }	|	 }	|	sVtd| d| d| |    d| j|}
| |
||} q| ||} q| S )Nr   Partition column , exists in table data, but partition value '$' is different from in-data values: .)r   pyarrow.computecomputesetcolumn_namesitemsarrayra   r4   fieldr   castallequalas_pyr&   unique	to_pylistget_field_index
set_columnappend_column)r   r   r   pcr   r   valuecolumncolumn_typevalues_are_equalr{   r)   r)   r*   r     s,   

r   dfr   c              
   C   s   dd l }| D ]E\}}|j|gt|  |d}|| v rI|| | j}| |  }| | | || sItd| d| dt	| | 
  d|| |< q| S )Nr   )r   namer   r   r   r   )r   r   Seriesra   astypedtypenotnaequalsr&   r^   r   )r   r   r   r   r   r   maskr)   r)   r*   r     s    
r   r3   r>   c                 C   s@   dd l }dd l}| }t| tr|  }t||jjrt| S | S ru   )r   r   r$   r   r   r   S3FileSystem_S3FileSystemWrapper)r3   r   r   base_fsr)   r)   r*   r     s   
r   )r>   r   c                 C   s   t | tr	|  } | S rp   )r$   r   r   )r3   r)   r)   r*   r     s   
r   c                   @   s6   e Zd ZdZdddZdd Zedd	 Zd
d ZdS )r   ax  pyarrow.fs.S3FileSystem wrapper that can be deserialized safely.

    Importing pyarrow.fs during reconstruction triggers the pyarrow
    S3 subsystem initialization.

    NOTE: This is only needed for pyarrow<14.0.0 and should be removed
        once the minimum supported pyarrow version exceeds that.
        See https://github.com/apache/arrow/pull/38375 for context.
    r   r>   c                 C   s
   || _ d S rp   _fs)r(   r   r)   r)   r*   rJ     s   
z_S3FileSystemWrapper.__init__c                 C   r   rp   r   r'   r)   r)   r*   r     s   z_S3FileSystemWrapper.unwrapc                 C   s   dd l }| || S ru   )r   )clsfs_reconstructfs_argsr   r)   r)   r*   _reconstruct  s   z!_S3FileSystemWrapper._reconstructc                 C   s   t j| j fS rp   )r   r  r  
__reduce__r'   r)   r)   r*   r    s   z_S3FileSystemWrapper.__reduce__N)r   r>   )	r,   r-   r.   r/   rJ   r   classmethodr  r  r)   r)   r)   r*   r      s    


r   	kwargs_fnc                 K   s   | r
|  }| | |S rp   )update)r  kwargskwarg_overridesr)   r)   r*   _resolve_kwargs  s   
r  r:   r?   c                 C   s6   | d u s| dkst | tstd|  dd S d S d S )Nr?   zInvalid value for 'shuffle': z6. Valid values are None, 'files', `FileShuffleConfig`.)r$   r"   r&   )r:   r)   r)   r*   rc   &  s
   
rc   )r3   r>   )Ar   loggingdataclassesr   typingr   r   r   r   r   r   r	   r
   r   r   r   re   rL   ray.data._internal.utilr   r   r   r   r   r   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasourcer   r   &ray.data.datasource.file_meta_providerr   r    ray.data.datasource.partitioningr   r   r   ray.data.datasource.path_utilr   r   ray.util.annotationsr    r   r   r   	getLoggerr,   r   )FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLDPATHS_PER_FILE_SIZE_FETCH_TASKr"   r1   r   r   r   r   r   r   r   r  rc   r)   r)   r)   r*   <module>   s~    0$	
  F



 





	