o
    birJ                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlmZmZmZmZmZmZ d dlmZmZ d dlmZ d dlmZm Z  d dl!m"Z"m#Z# d d	l$m%Z%m&Z&m'Z' d d
l(m)Z)m*Z* d dl+m,Z, er~d dl-Z.d dl/Z/e0e1Z2dZ3dZ4e,eG dd dZ5e,G dd deZ6ded dee7ef ded fddZ8dddee7ef ddfddZ9dddee7ef ddfddZ:d/d!d"Z;ded# fd$d%Z<G d&d' d'Z=d(eg ee7ef f dee7ef fd)d*Z>d+eed, e5df ddfd-d.Z?dS )0    N)	dataclass)
TYPE_CHECKINGAnyCallableDictIterableIteratorListLiteralOptionalUnion)RetryingContextManagerRetryingPyFileSystem_check_pyarrow_version_is_local_schemeiterate_with_retrymake_async_gen)BlockBlockAccessor)DataContext)
DatasourceReadTask)BaseFileMetadataProviderDefaultFileMetadataProvider)PartitioningPathPartitionFilterPathPartitionParser)_has_file_extension_resolve_paths_and_filesystem)DeveloperAPI   c                   @   s*   e Zd ZU dZdZee ed< dd ZdS )FileShuffleConfiga^  Configuration for file shuffling.

    This configuration object controls how files are shuffled while reading file-based
    datasets.

    .. note::
        Even if you provided a seed, you might still observe a non-deterministic row
        order. This is because tasks are executed in parallel and their completion
        order might vary. If you need to preserve the order of rows, set
        `DataContext.get_current().execution_options.preserve_order`.

    Args:
        seed: An optional integer seed for the file shuffler. If provided, Ray Data
            shuffles files deterministically based on this seed.

    Example:
        >>> import ray
        >>> from ray.data import FileShuffleConfig
        >>> shuffle = FileShuffleConfig(seed=42)
        >>> ds = ray.data.read_images("s3://anonymous@ray-example-data/batoidea", shuffle=shuffle)
    Nseedc                 C   s&   | j durt| j tstddS dS )z2Ensure that the seed is either None or an integer.Nz Seed must be an integer or None.)r"   
isinstanceint
ValueErrorself r(   ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/datasource/file_based_datasource.py__post_init__X   s   zFileShuffleConfig.__post_init__)	__name__
__module____qualname____doc__r"   r   r$   __annotations__r*   r(   r(   r(   r)   r!   =   s   
 r!   c                   @   s\  e Zd ZU dZdZdZeeee	e f  e
d< dZddde ddddddd
deee	e f ded	 d
eeedf  deeeef  dededededeeed ef  dedee	e  fddZde	e fddZde	e fddZdee fddZdede	e fdd Zdd!d"edd#fd$d%Zd&d' Zd(d#d"ede e! fd)d*Z"e#defd+d,Z$dS )-FileBasedDatasourcezFile-based datasource for reading files.

    Don't use this class directly. Instead, subclass it and implement `_read_stream()`.
    FN_FILE_EXTENSIONSr   )

filesystemschemaopen_stream_argsmeta_providerpartition_filterpartitioningignore_missing_pathsshuffleinclude_pathsfile_extensionspathsr2   pyarrow.fs.FileSystemr3   zpyarrow.lib.Schemar4   r5   r6   r7   r8   r9   filesr:   r;   c       
      	      s  t   t| | _| jstjjj rtd|| _t	
 | _|| _|| _|| _|| _|| _|
| _|| _t||\}| _tj| j| jjd| _ttt|j|| j||d \}}|rdt|dkrdtd| jd urtt||| |}fdd|D }t|dkrtd d urtt|| fd	d|D }fd
d|D }t|dkrtd  dt|	 d | _|	dkrt j!" | _nt#|	t$rt j!"|	j%| _t&|| _'t&|| _(d S )NzBecause you're using Ray Client, read tasks scheduled on the Ray cluster can't access your local files. To fix this issue, store files in cloud storage or a distributed filesystem like NFS.)retryable_errors)r8   r   zRNone of the provided paths exist. The 'ignore_missing_paths' field is set to True.c                       g | ]} | qS r(   r(   .0ppath_to_sizer(   r)   
<listcomp>       z0FileBasedDatasource.__init__.<locals>.<listcomp>z`No input files found to read. Please double check that 'partition_filter' field is set properly.c                    s   g | ]	}t | r|qS r(   )r   rA   )r;   r(   r)   rF      s    c                    r@   r(   r(   rA   rD   r(   r)   rF      rG   zANo input files found to read with the following file extensions: zC. Please double check that 'file_extensions' field is set properly.r>   ))r   r   _supports_distributed_readsrayutilclientis_connectedr%   _schemar   get_current_data_context_open_stream_args_meta_provider_partition_filter_partitioning_ignore_missing_paths_include_paths_unresolved_pathsr   _filesystemr   wrapretried_io_errorsmaplistzipexpand_pathslendict_validate_shuffle_arg_file_metadata_shufflernprandomdefault_rngr#   r!   r"   put
_paths_ref_file_sizes_ref)r'   r<   r2   r3   r4   r5   r6   r7   r8   r9   r:   r;   
file_sizesr(   )r;   rE   r)   __init__n   sx   




zFileBasedDatasource.__init__returnc                 C      t | jS N)rI   getrf   r&   r(   r(   r)   _paths      zFileBasedDatasource._pathsc                 C   rk   rl   )rI   rm   rg   r&   r(   r(   r)   _file_sizes   ro   zFileBasedDatasource._file_sizesc                 C   s&   d}|   D ]
}|d ur||7 }q|S Nr   )rp   )r'   
total_sizeszr(   r(   r)   estimate_inmemory_data_size   s   z/FileBasedDatasource.estimate_inmemory_data_sizeparallelismc                    s<  dd l }jj } }jd ur8tt||  fddjt	 D }tt
tt| \}}tjd u rCi dtt dtt ffddfdd	}t|t	|}g }|||}|||}	t||	D ]%\}
}t	|
dkrqvj|
 |d
}||
j}t||}|| qv|S )Nr   c                    r@   r(   r(   )rB   i)files_metadatar(   r)   rF      s    z6FileBasedDatasource.get_read_tasks.<locals>.<listcomp>
read_pathsrj   c              	   3   s    t }| D ]Yi }d urt}|}tj|fi jd2 t fdddjjdD ]}|r?t||}jrMt	
|}|d}|V  q6W d    n1 s[w   Y  qd S )N)contextc                      s     S rl   )_read_streamr(   )f	read_pathr'   r(   r)   <lambda>   s    zHFileBasedDatasource.get_read_tasks.<locals>.read_files.<locals>.<lambda>zread stream iteratively)descriptionmatchpath)#_unwrap_s3_serialization_workaroundr   r   _open_input_sourcerO   r   rY   _add_partitionsrU   r   	for_blockfill_column)rx   fs
partitionsparseblockblock_accessor)r2   r4   r7   r'   )r{   r|   r)   
read_files   s6   


z6FileBasedDatasource.get_read_tasks.<locals>.read_filesc                    s    fdd}|S )Nc                   3   s    j jjrd  dkr3t k rt tdt d  d tt ddE d H  d S tdt d E d H  d S )Nr   zReading z files with z	 threads.T)num_workerspreserve_orderingz files.)rO   execution_optionspreserve_orderr^   loggerdebugr   iterr(   )num_threadsr   rx   r'   r(   r)   read_task_fn  s"   
zUFileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn.<locals>.read_task_fnr(   )rx   r   r   )r   r'   )r   rx   r)   create_read_task_fn
  s   z?FileBasedDatasource.get_read_tasks.<locals>.create_read_task_fn)rows_per_filerh   )numpyrP   rS   rn   rp   ra   r[   r\   permutationr^   rZ   !_wrap_s3_serialization_workaroundrW   r   strr   minarray_splitrQ   _rows_per_file_NUM_THREADS_PER_TASKr   append)r'   ru   rb   r<   rh   shuffled_files_metadatar   
read_taskssplit_pathssplit_file_sizesrx   metar   	read_taskr(   )rw   r2   r4   r7   r   r'   r)   get_read_tasks   sH   



z"FileBasedDatasource.get_read_tasksr   r   zpyarrow.NativeFilec              	   K   s2  ddl }ddlm} |dd}|du rBz	|j|j}W n$ ttfyA   ddl	}|
|j}|r=|dd dkr=d}nd}Y nw |dd}	|	du rP| jj}	|dkrYd|d< n||d< |j|fd|	i|}
|dkrddl}t }t| |r|jj|
|d n|j|
|d |d |j|d	d
}
|
S )a  Opens a source path for reading and returns the associated Arrow NativeFile.

        The default implementation opens the source path as a sequential input stream,
        using self._data_context.streaming_read_buffer_size as the buffer size if none
        is given by the caller.

        Implementations that do not support streaming reads (e.g. that require random
        access) should override this method.
        r   N)HadoopFileSystemcompression   snappybuffer_size)srcdstr)mode)pyarrow
pyarrow.fsr   rm   Codecdetectnamer%   	TypeErrorpathlibPathsuffixpoprO   streaming_read_buffer_sizeopen_input_streamr   ioBytesIOr#   unwraphadoop_snappystream_decompressseek
PythonFile)r'   r2   r   	open_argspar   r   r   r   r   filer   streamr(   r(   r)   r   @  s<   

z&FileBasedDatasource._open_input_sourcec                 C   s   dS )z8Returns the number of rows per file, or None if unknown.Nr(   r&   r(   r(   r)   r     s   z"FileBasedDatasource._rows_per_filer{   c                 C   s   t d)z`Streaming read a single file.

        This method should be implemented by subclasses.
        z@Subclasses of FileBasedDatasource must implement _read_stream().)NotImplementedError)r'   r{   r   r(   r(   r)   rz     s   z FileBasedDatasource._read_streamc                 C      | j S rl   )rH   r&   r(   r(   r)   supports_distributed_reads  s   z.FileBasedDatasource.supports_distributed_reads)%r+   r,   r-   r.   _WRITE_FILE_PER_ROWr1   r   r   r   r	   r/   r   r   typer   r   r   r   r   boolr
   r!   ri   rn   floatrp   r$   rt   r   r   r   r   r   r   rz   propertyr   r(   r(   r(   r)   r0   ^   sj   
 	


\i
?	r0   data)pyarrow.Tablepd.DataFramer   rj   c                 C   sV   dd l }dd l}t| |j|jfsJ t| |jrt| |S t| |jr)t| |S d S rq   )pandasr   r#   Table	DataFrame_add_partitions_to_table_add_partitions_to_dataframe)r   r   pdr   r(   r(   r)   r     s   

r   tabler   c              	   C   s   dd l }dd lm} t| j}| D ]W\}}||gt|  }||v rd| j	|j
}||}|||| | }	|	 }	|	sVtd| d| d| |    d| j|}
| |
||} q| ||} q| S )Nr   Partition column , exists in table data, but partition value '$' is different from in-data values: .)r   pyarrow.computecomputesetcolumn_namesitemsarrayr^   r3   fieldr   castallequalas_pyr%   unique	to_pylistget_field_index
set_columnappend_column)r   r   r   pcr   r   valuecolumncolumn_typevalues_are_equalrv   r(   r(   r)   r     s,   

r   dfr   c              
   C   s   dd l }| D ]E\}}|j|gt|  |d}|| v rI|| | j}| |  }| | | || sItd| d| dt	| | 
  d|| |< q| S )Nr   )r   r   r   r   r   r   )r   r   Seriesr^   astypedtypenotnaequalsr%   r[   r   )r   r   r   r   r   r   maskr(   r(   r)   r     s    
r   r2   r=   c                 C   s@   dd l }dd l}| }t| tr|  }t||jjrt| S | S rq   )r   r   r#   r   r   r   S3FileSystem_S3FileSystemWrapper)r2   r   r   base_fsr(   r(   r)   r     s   
r   )r=   r   c                 C   s   t | tr	|  } | S rl   )r#   r   r   )r2   r(   r(   r)   r     s   
r   c                   @   s6   e Zd ZdZdddZdd Zedd	 Zd
d ZdS )r   ax  pyarrow.fs.S3FileSystem wrapper that can be deserialized safely.

    Importing pyarrow.fs during reconstruction triggers the pyarrow
    S3 subsystem initialization.

    NOTE: This is only needed for pyarrow<14.0.0 and should be removed
        once the minimum supported pyarrow version exceeds that.
        See https://github.com/apache/arrow/pull/38375 for context.
    r   r=   c                 C   s
   || _ d S rl   _fs)r'   r   r(   r(   r)   ri     s   
z_S3FileSystemWrapper.__init__c                 C   r   rl   r   r&   r(   r(   r)   r     s   z_S3FileSystemWrapper.unwrapc                 C   s   dd l }| || S rq   )r   )clsfs_reconstructfs_argsr   r(   r(   r)   _reconstruct  s   z!_S3FileSystemWrapper._reconstructc                 C   s   t j| j fS rl   )r   r  r   
__reduce__r&   r(   r(   r)   r    s   z_S3FileSystemWrapper.__reduce__N)r   r=   )	r+   r,   r-   r.   ri   r   classmethodr  r  r(   r(   r(   r)   r     s    


r   	kwargs_fnc                 K   s   | r
|  }| | |S rl   )update)r  kwargskwarg_overridesr(   r(   r)   _resolve_kwargs
  s   
r  r9   r>   c                 C   s6   | d u s| dkst | tstd|  dd S d S d S )Nr>   zInvalid value for 'shuffle': z6. Valid values are None, 'files', `FileShuffleConfig`.)r#   r!   r%   )r9   r(   r(   r)   r`     s
   
r`   )r2   r=   )@r   loggingdataclassesr   typingr   r   r   r   r   r   r	   r
   r   r   r   rb   rI   ray.data._internal.utilr   r   r   r   r   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasourcer   r   &ray.data.datasource.file_meta_providerr   r    ray.data.datasource.partitioningr   r   r   ray.data.datasource.path_utilr   r   ray.util.annotationsr   r   r   r   	getLoggerr+   r   )FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLDPATHS_PER_FILE_SIZE_FETCH_TASKr!   r0   r   r   r   r   r   r   r   r  r`   r(   r(   r(   r)   <module>   s~    0 
  4



 





	