o
    `۷iHC                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d d	lmZ er^d dlZe e!Z"eG d
d dZ#eG dd de#Z$eG dd de$Z%de&dee'e	e' f de'fddZ(de	e' ddde
e de
e	e'  de	ee'e)f  f
ddZ*de	e' ddde
e de
e	e'  deee'e)f  f
ddZ+	d8de	e' ddde
e de,deee'e)f  f
d d!Z-	d8de	e' ddde,deee'e)f  fd"d#Z.	d8de	e' d$e'dd%de,deee'e)f  f
d&d'Z/	d8de	e' ddde,deee'e)f  fd(d)Z0ed*Z1ed+Z2d,e	e1 d-ee	e1 ge	e2 f d.e)dee2 fd/d0Z3	d8d1e'ddd2e,de	ee'e)f  fd3d4Z4		d9d1e'ddd5e
e	e'  d2e,de	ee'e)f  f
d6d7Z5dS ):    N)TYPE_CHECKINGCallableIteratorListOptionalTupleTypeVarUnion)ProgressBar)cached_remote_fn)RetryingPyFileSystem)BlockMetadata)PartitioningPathPartitionFilter)_has_file_extension)DeveloperAPIc                   @   s<   e Zd ZdZdee defddZdee defddZdS )	FileMetadataProviderzAbstract callable that provides metadata for the files of a single dataset block.

    Current subclasses:
        - :class:`BaseFileMetadataProvider`
    pathsreturnc                 K      t )az  Resolves and returns block metadata for files in the given paths.

        All file paths provided should belong to a single dataset block.

        Args:
            paths: The file paths for a single dataset block.
            **kwargs: Additional kwargs used to determine block metadata.

        Returns:
            BlockMetadata aggregated across the given paths.
        NotImplementedErrorselfr   kwargs r   \/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/datasource/file_meta_provider.py_get_block_metadata*   s   z(FileMetadataProvider._get_block_metadatac                 K   s   | j |fi |S N)r   r   r   r   r   __call__<   s   zFileMetadataProvider.__call__N)	__name__
__module____qualname____doc__r   strr   r   r   r   r   r   r   r   "   s    
r   c                   @   sv   e Zd ZdZdee dee deee  defddZ				ddee d
ed dee
 dedeeeef  f
ddZdS )BaseFileMetadataProvideram  Abstract callable that provides metadata for
    :class:`~ray.data.datasource.file_based_datasource.FileBasedDatasource`
    implementations that reuse the base :meth:`~ray.data.Datasource.prepare_read`
    method.

    Also supports file and file size discovery in input directory paths.

    Current subclasses:
        - :class:`DefaultFileMetadataProvider`
    r   rows_per_file
file_sizesr   c                C   r   )a  Resolves and returns block metadata for files of a single dataset block.

        Args:
            paths: The file paths for a single dataset block. These
                paths will always be a subset of those previously returned from
                :meth:`.expand_paths`.
            rows_per_file: The fixed number of rows per input file, or None.
            file_sizes: Optional file size per input file previously returned
                from :meth:`.expand_paths`, where `file_sizes[i]` holds the size of
                the file at `paths[i]`.

        Returns:
            BlockMetadata aggregated across the given file paths.
        r   )r   r   r&   r'   r   r   r   r   Q   s   z,BaseFileMetadataProvider._get_block_metadataNF
filesystemr   partitioningignore_missing_pathsc                 C   r   )a  Expands all paths into concrete file paths by walking directories.

        Also returns a sidecar of file sizes.

        The input paths must be normalized for compatibility with the input
        filesystem prior to invocation.

        Args:
            paths: A list of file and/or directory paths compatible with the
                given filesystem.
            filesystem: The filesystem implementation that should be used for
                expanding all paths and reading their files.
            ignore_missing_paths: If True, ignores any file paths in ``paths`` that
                are not found. Defaults to False.

        Returns:
            An iterator of `(file_path, file_size)` pairs. None may be returned for the
            file size if it is either unknown or will be fetched later by
            `_get_block_metadata()`, but the length of
            both lists must be equal.
        r   r   r   r(   r)   r*   r   r   r   expand_pathsh   s   z%BaseFileMetadataProvider.expand_pathsNFr    r!   r"   r#   r   r$   r   intr   r   r   boolr   r   r,   r   r   r   r   r%   D   s0    

r%   c                   @   sr   e Zd ZdZdee dee deee  defddZ				ddee d
ddee
 dedeeeef  f
ddZdS )DefaultFileMetadataProvidera,  Default metadata provider for
    :class:`~ray.data.datasource.file_based_datasource.FileBasedDatasource`
    implementations that reuse the base `prepare_read` method.

    Calculates block size in bytes as the sum of its constituent file sizes,
    and assumes a fixed number of rows per file.
    r   r&   r'   r   c                C   s>   |d u rd }nt || }t|d |v rd ntt||d dS )N)num_rows
size_bytesinput_files
exec_stats)lenr   r/   sum)r   r   r&   r'   r2   r   r   r   r      s   z/DefaultFileMetadataProvider._get_block_metadataNFr(   r   r)   r*   c                 c   s    t ||||E d H  d S r   )_expand_pathsr+   r   r   r   r,      s   z(DefaultFileMetadataProvider.expand_pathsr-   r.   r   r   r   r   r1      s0    

r1   errorr   r   c                 C   s>   d}t |t| rt|trd| d}td| d| )Nz^(?:(.*)AWS Error \[code \d+\]: No response body\.(.*))|(?:(.*)AWS Error UNKNOWN \(HTTP status 400\) during HeadObject operation: No response body\.(.*))|(?:(.*)AWS Error ACCESS_DENIED during HeadObject operation: No response body\.(.*))$"z Failing to read AWS S3 file(s): ak  . Please check that file exists and has properly configured access. You can also run AWS CLI command to get more detailed error message (e.g., aws s3 ls <file-name>). See https://awscli.amazonaws.com/v2/documentation/api/latest/reference/s3/index.html and https://docs.ray.io/en/latest/data/creating-datasets.html#reading-from-remote-storage for more information.)rematchr$   
isinstanceOSError)r9   r   aws_error_patternr   r   r   _handle_read_os_error   s   

r@   r(   r   partition_filterfile_extensionsc                C   s   t t| |||dS )N)rA   rB   )list_list_files_internal)r   r(   rA   rB   r   r   r   _list_files   s   rE   c                c   sF    t  }|| |D ]\}}|r||rt||sq
||fV  q
d S r   )r1   r,   applyr   )r   r(   rA   rB   default_meta_providerpath	file_sizer   r   r   rD      s   rD   Fr)   r*   c           	      #   s    ddl m} ddlm} ddlm}m} t||}t|tr&t|	 |}t
| |k s.|r9t| ||E dH  dS tj|  | se|durN ||jksYt fdd| D ret|  ||E dH  dS t| ||E dH  dS )z/Get the file sizes for all provided file paths.r   )LocalFileSystem))FILE_SIZE_FETCH_PARALLELIZATION_THRESHOLD)_is_http_url_unwrap_protocolNc                 3   s$    | ]}t t|j kV  qd S r   )r$   pathlibPathparent.0rH   common_pathr   r   	<genexpr>$  s   " z _expand_paths.<locals>.<genexpr>)
pyarrow.fsrJ   )ray.data.datasource.file_based_datasourcerK   ray.data.datasource.path_utilrL   rM   r=   r   unwrapr6   _get_file_infos_serialosrH   
commonpathbase_dirall"_get_file_infos_common_path_prefix_get_file_infos_parallel)	r   r(   r)   r*   rJ   rK   rL   rM   is_localr   rS   r   r8      s*   


r8   c                 c   s"    | D ]}t |||E d H  qd S r   _get_file_infos)r   r(   r*   rH   r   r   r   rZ   /  s   rZ   rT   zpyarrow.fs.FileSystemc                 c   s    dd | D }t |||D ]\}}||v r|||< qd}| D ]}|| d u r4td| d d} nq|rBt| ||E d H  d S | D ]	}||| fV  qDd S )Nc                 S   s   i | ]}|d qS r   r   rQ   r   r   r   
<dictcomp>>  s    z6_get_file_infos_common_path_prefix.<locals>.<dictcomp>FzFinding path zX not have file size metadata. Fall back to get files metadata in parallel for all paths.T)rc   loggerdebugr`   )r   rT   r(   r*   path_to_sizerH   rI   have_missing_pathr   r   r   r_   8  s,   
r_   c                 #   st    ddl m}m m} tdt|  d |dtt dtt	tt
f  f fdd}t| ||E d H  d S )	Nr   )PATHS_PER_FILE_SIZE_FETCH_TASK#_unwrap_s3_serialization_workaround!_wrap_s3_serialization_workaroundz
Expanding z path(s). This may be a HIGH LATENCY operation on some cloud storage services. Moving all the paths to a common parent directory will lead to faster metadata fetching.r   r   c                    s(    t tj fdd| D S )Nc                 3   s    | ]	}t | V  qd S r   rb   rQ   )fsr*   r   r   rU   w  s    
zH_get_file_infos_parallel.<locals>._file_infos_fetcher.<locals>.<genexpr>)rC   	itertoolschainfrom_iterable)r   rj   r(   r*   )rl   r   _file_infos_fetchert  s   z5_get_file_infos_parallel.<locals>._file_infos_fetcher)rW   ri   rj   rk   re   warningr6   r   r$   r   r/   _fetch_metadata_parallel)r   r(   r*   ri   rk   rq   r   rp   r   r`   ^  s   	*r`   UriMetauris
fetch_funcdesired_uris_per_taskc           
      k   s    t |}|r|jdi |}tt| | d}td|dd}g }t| |D ]}t|dkr0q'||| q'|	|}	t
j|	E dH  dS )z0Fetch file metadata in parallel using Ray tasks.   zMetadata Fetch Progresstask)totalunitr   Nr   )r   optionsmaxr6   r
   nparray_splitappendremotefetch_until_completerm   rn   ro   )
rv   rw   rx   ray_remote_argsremote_fetch_funcparallelismmetadata_fetch_barfetch_tasks	uri_chunkresultsr   r   r   rs     s   
rs   rH   ignore_missing_pathc           	   
   C   s   ddl m} g }z|| }W n ty& } zt||  W Y d}~nd}~ww |j|jkr@t| |D ]\}}|||f q2|S |j|j	krP|| |j
f |S |j|jkr[|r[	 |S t| )z>Get the file info for all files at or under the provided path.r   )FileTypeN)rV   r   get_file_infor>   r@   type	Directory_expand_directoryr   FilesizeNotFoundFileNotFoundError)	rH   r(   r   r   
file_infos	file_infoe	file_pathrI   r   r   r   rc     s&   rc   exclude_prefixesc                    s   |du rddg}ddl m} || d|d}||}|j}g }|D ]+}	|	js'q!|	j}
|
|s0q!|
t|d  t fdd	|D rDq!|	|
|	j
f q!t|S )
a  
    Expand the provided directory path to a list of file paths.

    Args:
        path: The directory path to expand.
        filesystem: The filesystem implementation that should be used for
            reading these files.
        exclude_prefixes: The file relative path prefixes that should be
            excluded from the returned file set. Default excluded prefixes are
            "." and "_".

    Returns:
        An iterator of (file_path, file_size) tuples.
    N._r   )FileSelectorT)	recursiveallow_not_foundc                 3   s    | ]}  |V  qd S r   )
startswith)rR   prefixrelativer   r   rU     s    z$_expand_directory.<locals>.<genexpr>)rV   r   r   r]   is_filerH   r   r6   anyr   r   sorted)rH   r(   r   r   r   selectorfiles	base_pathoutfile_r   r   r   r   r     s$   

r   )Fr-   )6rm   loggingr[   rN   r;   typingr   r   r   r   r   r   r   r	   numpyr   (ray.data._internal.progress.progress_barr
   ray.data._internal.remote_fnr   ray.data._internal.utilr   ray.data.blockr    ray.data.datasource.partitioningr   r   rX   r   ray.util.annotationsr   pyarrow	getLoggerr    re   r   r%   r1   r>   r$   r@   r/   rE   rD   r0   r8   rZ   r_   r`   rt   ru   rs   rc   r   r   r   r   r   <module>   s    (
!B"%#




:

)
#


