o
    bi1                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZmZmZmZ d dlmZ d dlmZ d dlmZ erDd dlZdefdd	Zd
eee	f fddZ	d.dee
ee
 f d
eee	f de
fddZdedeeef fddZ				d/de	deeeeef  deeeeef  dedef
ddZeddfdeeee	f  dedeeeef  defdd Z d0d
eee	f d"eeeef  fd#d$Z!d%d&iZ"d0d
eee	f d"eeeef  fd'd(Z#d)efd*d+Z$G d,d- d-eZ%dS )1    N)partial)TYPE_CHECKINGAnyCallableDictListOptionalUnion)iterate_with_retry)BlockAccessor)FileBasedDatasourcepathc                 C   s(   t d| }|s
dS |d|dfS )zSplit off all file extensions.

    Returns base, allext.

    Args:
        path: path with extensions

    Returns:
        str: path with all extensions removed
    z^((?:.*/|)[^.]+)[.]([^/]*)$)NN      )rematchgroup)r   r    r   g/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/webdataset_datasource.py_base_plus_ext   s   r   samplec                 C   s4   | duot | tott|  dko| dd S )zUCheck whether a sample is valid.

    Args:
        sample: sample to be checked
    Nr   __bad__F)
isinstancedictlenlistkeysget)r   r   r   r   _valid_sample%   s   r   fdefaultc                 C   sN   | du r|S t | ts| g} | D ]}|dur t|s t||d}||}q|S )a  Apply a list of functions to a sample.

    Args:
        f: function or list of functions
        sample: sample to be modified
        default: default function to be applied to all keys.
            Defaults to None.

    Returns:
        modified sample
    Nformat)r   r   callabler   )r   r   r    gr   r   r   _apply_list3   s   

r%   suffixsuffixesc                 C   sl   |du rdS t |r|| S |D ]#}d|v sd|v r&td|  |r% dS q| |ks0d|  |kr3 dS qdS )aC  Check whether a suffix is valid.

    Suffixes can be either None (=accept everything), a callable,
    or a list of patterns. If the pattern contains */? it is treated
    as a glob pattern, otherwise it is treated as a literal.

    Args:
        suffix: suffix to be checked
        suffixes: list of valid suffixes
    NT*?.F)r#   fnmatch)r&   r'   patternr   r   r   _check_suffixL   s   r-   Ffileobj
fileselect
filerenameverbose_openmetac           
      c   s    |pi }t j| dd}|rtd|  |D ]0}|j}| r$|du r%q|| }t||}t|t	s8J t
||s>qt||d}	|	V  q|rStd|  dS dS )a  Iterate over tar file, yielding filename, content pairs for the given tar stream.

    Args:
        fileobj: file object
        fileselect: patterns or function selecting
            files to be selected
        meta: metadata to be added to each sample
    zr|*)r.   modezstart N)fnamedatazdone )tarfileopenprintnameisregextractfilereadr%   r   strr-   r   )
r.   r/   r0   r1   r2   streamtarinfor4   r5   resultr   r   r   _tar_file_iteratord   s&   

rA   r5   r   c           
   	   c   s   |pi }d}| D ]h}t |tsJ |d |d }}||\}}	|du r&q	|du s0||d krKt|r<|| |V  t|d}d|v rK|d |d< |	|v rdt| d|	 d|  d	|d   |du smt|	|rq|||	< q	t|r|| |V  dS dS )
a7  Return function over iterator that groups key, value pairs into samples.

    Args:
        data: iterator over key, value pairs
        keys: function that returns key, suffix for a given key
        suffixes: list of suffixes to be included in the sample
        meta: metadata to be added to each sample
    Nr4   r5   __key__)rB   __url__z": duplicate file name in tar file  z	, tar is )r   r   r   update
ValueErrorr   r-   )
r5   r   r'   r2   current_sample
filesampler4   valueprefixr&   r   r   r   _group_by_keys   s:   



rK   Tr"   c           
      C   sh  t | } |  D ]\}}|dd }|drq|dv r%|d| |< q|dv r3t|d| |< q|dv r^dd	l}dd	l}|d
krO|j	t
|| |< q||j	t
|| |< q|dkrjt|| |< q|dkr}dd	l}|t
|| |< q|dkrdd	l}|j|dd| |< q|dv rdd	l}|t
|| |< q|dv rdd	l}	|	|| |< q| S )aT  A default decoder for webdataset.

    This handles common file extensions: .txt, .cls, .cls2,
        .jpg, .png, .json, .npy, .mp, .pt, .pth, .pickle, .pkl.
    These are the most common extensions used in webdataset.
    For other extensions, users can provide their own decoder.

    Args:
        sample: sample, modified in place
    r*   __)txttextutf-8clscls2)jpgpngppmpgmpbmpnmr   NPILjsonnpympF)rawptpthpicklepkl)r   itemssplit
startswithdecodeintnumpy	PIL.ImageImager7   ioBytesIOasarrayr[   loadsloadmsgpackunpackbtorchrc   )
r   r"   keyrI   	extensionnprZ   rr   rt   rc   r   r   r   _default_decoder   s>   
rx   rT   jpegc                 C   s  t | } |  D ]\}}|dd }|drq|dv r%|d| |< q|dv r3t|d| |< q|dv rldd	l}dd	l}t||j	rK|j
|}t||j
j
sTJ t }|j|t| |d
 | | |< q|dkr{t|d| |< q|dkrdd	l}t }||| | | |< q|dkrdd	l}||| |< q|dv rdd	l}	t }|	|| | | |< q|dv rdd	l}
t }|
|| | | |< q| S )aQ  A default encoder for webdataset.

    This handles common file extensions: .txt, .cls, .cls2, .jpg,
        .png, .json, .npy, .mp, .pt, .pth, .pickle, .pkl
    These are the most common extensions used in webdataset.
    For other extensions, users can provide their own encoder.

    Args:
        sample (Dict[str, Any]): sample
    r*   rL   rM   )rN   rP   rQ   )rT   ry   rU   rV   rW   rX   rY   r   Nr!   r[   r\   r]   r_   rb   )r   re   rf   rg   encoder=   rj   rk   r   ndarrayrl   	fromarrayrm   rn   saveextension_to_formatr   lowergetvaluer[   dumpsrr   rt   rc   dump)r   r"   ru   rI   rv   rw   rZ   r>   rr   rt   rc   r   r   r   _default_encoder   sT   
r   blockc                 C   s   | j ddS )zMake a block iterable.

    This is a placeholder for dealing with more complex blocks.

    Args:
        block: Ray Dataset block

    Returns:
        Iterable[Dict[str,Any]]: Iterable of samples
    F)public_row_format)	iter_rows)r   r   r   r   _make_iterable  s   r   c                       s   e Zd ZdZdgZ						ddeeee f deee	ee
ef  deee	e
ef  d	eee	e
ef  d
eee	e
ef  de	de	f fddZdddefddZ  ZS )WebDatasetDatasourcezJA Datasource for WebDataset datasets (tar format with naming conventions).tarTNFpathsdecoderr/   r0   r'   r1   expand_jsonc           	         s<   t  j|fi | || _|| _|| _|| _|| _|| _d S N)super__init__r   r/   r0   r'   r1   r   )	selfr   r   r/   r0   r'   r1   r   file_based_datasource_kwargs	__class__r   r   r   1  s   
zWebDatasetDatasource.__init__r>   zpyarrow.NativeFiler   c                 #   s*   ddl } fdd}t|d jjd}t|t|d jd}|D ]p} jdur1t j|t	d	} j
rt|d
 trFt|d
 d}n't|d
 trUt|d
 }nt|d
 tra|d
 }ntdt|d
  d| D ]\}	}
|	|vr}g ||	< ||	 |
 qq|dd | D V  q"dS )a  Read and decode samples from a stream.

        Note that fileselect selects files during reading, while suffixes
        selects files during the grouping step.

        Args:
            stream: File descriptor to read from.
            path: Path to the data.
            decoder: decoder or list of decoders to be applied to samples
            fileselect: Predicate for skipping files in tar decoder.
                Defaults to lambda_:False.
            suffixes: List of suffixes to be extracted. Defaults to None.
            verbose_open: Print message when opening files. Defaults to False.

        Yields:
            List[Dict[str, Any]]: List of sample (list of length 1).
        r   Nc                      s   t  j j jdS )N)r/   r0   r1   )rA   r/   r0   r1   r   r   r>   r   r   get_tar_file_iteratorZ  s   z@WebDatasetDatasource._read_stream.<locals>.get_tar_file_iteratorziterate tar file)r   )rC   )r2   r'   )r    r[   rP   zUnsupported data type z for samplec                 S   s2   i | ]\}}|t |trt|d kr|n|gqS )r   )r   r   r   ).0kvr   r   r   
<dictcomp>}  s     z5WebDatasetDatasource._read_stream.<locals>.<dictcomp>)pandasr
   _data_contextretried_io_errorsrK   r   r'   r   r%   rx   r   r   bytesr[   rp   rh   r=   	TypeErrortypere   append	DataFrame)r   r>   r   pdr   filessamplesr   parsed_jsonr   r   r   r   r   _read_streamE  s@   	

z!WebDatasetDatasource._read_stream)TNNNFF)__name__
__module____qualname____doc___FILE_EXTENSIONSr	   r=   r   r   boolr#   r   r   r   __classcell__r   r   r   r   r   ,  s2    r   r   )NNFN)T)&r+   rm   r[   r   r6   	functoolsr   typingr   r   r   r   r   r   r	   ray.data._internal.utilr
   ray.data.blockr   )ray.data.datasource.file_based_datasourcer   pyarrowr=   r   r   r%   r   r#   r-   r   r   rA   rK   rx   r~   r   r   r   r   r   r   r   <module>   sn   $


$
()1(;