o
    wiG                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlZddlm	Z	m
Z
mZmZmZ ddlmZmZ ddlmZ ddlmZ dd	lmZmZ G d
d dZdd ZG dd deeZG dd deeZG dd deeZdS )    N)SimpleNamespace)urlparse   )
autodecodecachefilters
shardlistsutils)pipelinefilterreraise_exception)DataPipeline)
DataLoader)group_by_keystar_file_expanderc                   @   s   e Zd ZejdfddZdd Zd/ddZdd	 Zd0ddZ	dd Z
efddZd
d
d
dedddZefddZdd Zdd ZedddZdd Zd d! Zd1d#d$Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd
S )2FluidInterfaceTc                 C   s   |  tj|||dS )a  Create batches of the given size.

        This method forwards to the filters.batched function.

        Args:
            batchsize (int): Target batch size.
            collation_fn (callable, optional): Function to collate samples into a batch.
                Defaults to filters.default_collation_fn.
            partial (bool, optional): Whether to return partial batches. Defaults to True.

        Returns:
            FluidInterface: Updated pipeline with batched filter.
        )collation_fnpartialcomposer   batched)self	batchsizer   r    r   N/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/webdataset/compat.pyr      s   zFluidInterface.batchedc                 C      |  t S )zTurn batched data back into unbatched data.

        This method forwards to the filters.unbatched function.

        Returns:
            FluidInterface: Updated pipeline with unbatched filter.
        )r   r   	unbatchedr   r   r   r   r   !      zFluidInterface.unbatchedc                 C   s   |  tj|ddS )az  Create lists of samples without collation.

        This method forwards to the filters.batched function with collation_fn set to None.

        Args:
            batchsize (int): Target list size.
            partial (bool, optional): Whether to return partial lists. Defaults to True.

        Returns:
            FluidInterface: Updated pipeline with listed filter.
        N)r   r   r   )r   r   r   r   r   r   listed+      zFluidInterface.listedc                 C   r   )zTurn listed data back into individual samples.

        This method forwards to the filters.unlisted function.

        Returns:
            FluidInterface: Updated pipeline with unlisted filter.
        )r   r   unlistedr   r   r   r   r    9   r   zFluidInterface.unlistedNc                 C      |  t|S )a7  Log keys of samples passing through the pipeline.

        This method forwards to the filters.log_keys function.

        Args:
            logfile (str, optional): Path to the log file. If None, logging is disabled.

        Returns:
            FluidInterface: Updated pipeline with log_keys filter.
        )r   r   log_keys)r   logfiler   r   r   r"   C      zFluidInterface.log_keysc                 K   s$   |dk r| S |  tj|fi |S )ab  Shuffle the data in the stream.

        This method forwards to the filters.shuffle function if size > 0.

        Args:
            size (int): Buffer size for shuffling.
            **kw: Additional keyword arguments for filters.shuffle.

        Returns:
            FluidInterface: Updated pipeline with shuffle filter, or self if size < 1.
        r   )r   r   shuffle)r   sizekwr   r   r   r%   P   s   zFluidInterface.shufflec                 C   s   |  tj||dS )ai  Apply a function to each sample in the stream.

        This method forwards to the filters.map function.

        Args:
            f (callable): Function to apply to each sample.
            handler (callable, optional): Exception handler. Defaults to reraise_exception.

        Returns:
            FluidInterface: Updated pipeline with map filter.
        handler)r   r   map)r   fr)   r   r   r   r*   a   r   zFluidInterface.mapF)prepostonlyr   r)   c          	      G   s0   dd |D }t j|||||d}| j||dS )a  Decode data based on the decoding functions given as arguments.

        This method creates a decoder using autodecode.Decoder and applies it using filters.map.

        Args:
            *args: Decoding functions or strings representing image handlers.
            pre (callable, optional): Pre-processing function.
            post (callable, optional): Post-processing function.
            only (list, optional): List of keys to decode.
            partial (bool, optional): Whether to allow partial decoding. Defaults to False.
            handler (callable, optional): Exception handler. Defaults to reraise_exception.

        Returns:
            FluidInterface: Updated pipeline with decode filter.
        c                 S   s$   g | ]}t |trt|n|qS r   )
isinstancestrr   ImageHandler).0xr   r   r   
<listcomp>   s   $ z)FluidInterface.decode.<locals>.<listcomp>)r,   r-   r.   r   r(   )r   Decoderr*   )	r   r,   r-   r.   r   r)   argshandlersdecoderr   r   r   decodeo   s   zFluidInterface.decodec                 K   s   |  tjdd|i|S )a}  Map the entries in a dict sample with individual functions.

        This method forwards to the filters.map_dict function.

        Args:
            handler (callable, optional): Exception handler. Defaults to reraise_exception.
            **kw: Mapping of keys to functions to apply.

        Returns:
            FluidInterface: Updated pipeline with map_dict filter.
        r)   Nr   )r   r   map_dict)r   r)   r'   r   r   r   r:      s   zFluidInterface.map_dictc                 K   s   |  tj|fi |S )ab  Select samples based on a predicate.

        This method forwards to the filters.select function.

        Args:
            predicate (callable): Function that returns True for samples to keep.
            **kw: Additional keyword arguments for filters.select.

        Returns:
            FluidInterface: Updated pipeline with select filter.
        )r   r   select)r   	predicater'   r   r   r   r;      s   zFluidInterface.selectc                 O      |  tj|i |S )aC  Convert dict samples to tuples.

        This method forwards to the filters.to_tuple function.

        Args:
            *args: Keys to extract from the dict.
            **kw: Additional keyword arguments for filters.to_tuple.

        Returns:
            FluidInterface: Updated pipeline with to_tuple filter.
        )r   r   to_tupler   r6   r'   r   r   r   r>         zFluidInterface.to_tupler(   c                G   s   |  tj|d|iS )a  Map the entries of a tuple with individual functions.

        This method forwards to the filters.map_tuple function.

        Args:
            *args: Functions to apply to each element of the tuple.
            handler (callable, optional): Exception handler. Defaults to reraise_exception.

        Returns:
            FluidInterface: Updated pipeline with map_tuple filter.
        r)   )r   r   	map_tuple)r   r)   r6   r   r   r   rA      r@   zFluidInterface.map_tuplec                 G   s   |  tj| S )zSlice the data stream.

        This method forwards to the filters.slice function.

        Args:
            *args: Arguments for slicing (start, stop, step).

        Returns:
            FluidInterface: Updated pipeline with slice filter.
        )r   r   slicer   r6   r   r   r   rB      r$   zFluidInterface.slicec                 K   s   |  tjdi |S )a  Rename samples based on keyword arguments.

        This method forwards to the filters.rename function.

        Args:
            **kw: Mapping of old names to new names.

        Returns:
            FluidInterface: Updated pipeline with rename filter.
        Nr   )r   r   rename)r   r'   r   r   r   rD      s   zFluidInterface.rename      ?c                 C   r!   )a%  Randomly subsample a stream of data.

        This method forwards to the filters.rsample function.

        Args:
            p (float, optional): Probability of keeping each sample. Defaults to 0.5.

        Returns:
            FluidInterface: Updated pipeline with rsample filter.
        )r   r   rsample)r   pr   r   r   rF      r$   zFluidInterface.rsamplec                 O   r=   )aZ  Rename keys in samples based on patterns.

        This method forwards to the filters.rename_keys function.

        Args:
            *args: Positional arguments for filters.rename_keys.
            **kw: Keyword arguments for filters.rename_keys.

        Returns:
            FluidInterface: Updated pipeline with rename_keys filter.
        )r   r   rename_keysr?   r   r   r   rH      r@   zFluidInterface.rename_keysc                 O   r=   )aQ  Extract specific keys from samples.

        This method forwards to the filters.extract_keys function.

        Args:
            *args: Keys or patterns to extract.
            **kw: Additional keyword arguments for filters.extract_keys.

        Returns:
            FluidInterface: Updated pipeline with extract_keys filter.
        )r   r   extract_keysr?   r   r   r   rI      r@   zFluidInterface.extract_keysc                 O   r=   )aF  Decode data based on file extensions.

        This method forwards to the filters.xdecode function.

        Args:
            *args: Positional arguments for filters.xdecode.
            **kw: Keyword arguments for filters.xdecode.

        Returns:
            FluidInterface: Updated pipeline with xdecode filter.
        )r   r   xdecoder?   r   r   r   rJ     r@   zFluidInterface.xdecodec                 C   r   )zCache samples in memory.

        This method forwards to the filters.Cached class.

        Returns:
            FluidInterface: Updated pipeline with memory caching.
        )r   r   Cachedr   r   r   r   mcached  r   zFluidInterface.mcachedc                 O   r=   )a>  Cache samples using LMDB.

        This method forwards to the filters.LMDBCached class.

        Args:
            *args: Positional arguments for filters.LMDBCached.
            **kw: Keyword arguments for filters.LMDBCached.

        Returns:
            FluidInterface: Updated pipeline with LMDB caching.
        )r   r   
LMDBCachedr?   r   r   r   lmdb_cached  r@   zFluidInterface.lmdb_cached)TN)rE   )__name__
__module____qualname__r   default_collation_fnr   r   r   r    r"   r%   r   r*   r9   r:   r;   r>   rA   rB   rD   rF   rH   rI   rJ   rL   rN   r   r   r   r   r      s4    





r   c                 c   s2    d}| D ]	}|V  |d7 }q|dkrt ddS )zCheck if the dataset is empty and yield samples.

    Args:
        source: An iterable source of samples.

    Yields:
        The samples from the source.

    Raises:
        ValueError: If no samples are found in the dataset.
    r   r   zNo samples found in dataset; perhaps you have fewer shards than workers.
Turn off using empty_check=False in the WebDataset constructor.N)
ValueError)sourcecountsampler   r   r   check_empty-  s   
rX   c                       sh   e Zd ZdZeddddddejdejej	dddddf fdd	Z
dd	 Zd
d Zdd Zdd Z  ZS )
WebDatasetaZ  Create a WebDataset pipeline for efficient data loading.

    This class sets up a data pipeline for loading and processing WebDataset-format data.
    It handles URL generation, shard shuffling, caching, and sample grouping.

    Args:
        urls: The source URLs or specifications for the dataset.
        handler: Function to handle exceptions. Defaults to reraise_exception.
        mode: The mode of operation. Defaults to None.
        resampled: Whether to use resampled mode. Defaults to False.
        repeat: Whether to repeat the dataset. Defaults to False.
        shardshuffle: The number of shards to shuffle, or None. Defaults to None.
        cache_size: The size of the cache in bytes. Defaults to -1 (unlimited).
        cache_dir: The directory to use for caching. Defaults to None.
        url_to_name: Function to convert URLs to cache names. Defaults to pipe_cleaner.
        detshuffle: Whether to use deterministic shuffling. Defaults to False.
        nodesplitter: Function to split data by node. Defaults to single_node_only.
        workersplitter: Function to split data by worker. Defaults to split_by_worker.
        select_files: Function to select files from tar archives. Defaults to None.
        rename_files: Function to rename files from tar archives. Defaults to None.
        empty_check: Whether to check for empty datasets. Defaults to True.
        verbose: Whether to print verbose output. Defaults to False.
        seed: Random seed for shuffling. Defaults to None.

    Raises:
        ValueError: If the cache directory does not exist or if the URL type is not supported.
    NFTc                    s  t    |r	d}|dkr|dvrtd n	|d u r td |du r+td d}tdi t }|d u rBtjdt	
d	d
n|| _| | | | |d urX| | |r_| | |jd ur|jrt| tj|j| jd n| tj|j| jd |d u s|d	krtj|d}ntj|||d}| | tt}| ||||d tt}| ||d |r| t d S d S )N	resampled)FNz>WebDataset(shardshuffle=...) is ignored for resampled datasetszIWebDataset(shardshuffle=...) is None; set explicitly to False or a numberTzDset WebDataset(shardshuffle=...) to a positive integer or 0 or Falsed   WDS_SEEDr   i@B )seedr(   )	cache_dir
cache_sizer)   )r)   select_filesrename_filesr   )super__init__warningswarnr   localsosenvirongetrandomrandintr^   update_cache_infocreate_url_iteratorappendshardshuffle
detshuffler   r%   r   StreamingOpen	FileCacher
   r   r   rX   )r   urlsr)   moder[   repeatrp   r`   r_   url_to_namerq   nodesplitterworkersplitterra   rb   empty_checkverboser^   r6   openerexpandergrouper	__class__r   r   rd   a  sB   


$





zWebDataset.__init__c                 C   sj   t tjd|j|_tjd|j|_|jdur1tj|j|_tj|js3t	d|j ddS dS )zUpdate cache information based on arguments and environment variables.

        Args:
            args: A SimpleNamespace object containing the arguments.

        Raises:
            ValueError: If the specified cache directory does not exist.
        WDS_CACHE_SIZE	WDS_CACHENzcache directory z does not exist)
intrh   ri   rj   r`   r_   path
expanduserexistsrT   rC   r   r   r   rm     s   	
zWebDataset.update_cache_infoc                 C   sZ  |j }t|tr<|ds|dr<t|j }t|}W d   n1 s'w   Y  d|v s2J | t	| dS t|j t
rTd|j v sIJ | t	|j  dS t|trb|drbtdt|tr|t|jdr|| tj||jd dS t|j tst|j r|jd	kr| t|j  dS | t|j  dS td
t|j  )ah  Create an appropriate URL iterator based on the input type.

        This method determines the type of URL input and creates the corresponding
        iterator for the dataset.

        Args:
            args: A SimpleNamespace object containing the arguments.

        Raises:
            ValueError: If the URL type is not supported or implemented.
        z.yamlz.ymlNdatasetsz.jsonunimplemented/)ru   r[   zcannot handle urls of type )rt   r/   r0   endswithopenyaml	safe_loadro   r   MultiShardSampledictrT   r   r   DirectoryShardListru   r	   is_iterableResampledShardListSimpleShardListtype)r   r6   rt   streamspecr   r   r   rn     s0   
zWebDataset.create_url_iteratorc                 C   s   | S )zsEnter the runtime context for the WebDataset.

        Returns:
            self: The WebDataset instance.
        r   r   r   r   r   	__enter__  s   zWebDataset.__enter__c                 G   s   |    dS )zExit the runtime context for the WebDataset.

        Args:
            *args: Exception type, value, and traceback if an exception occurred.
        N)closerC   r   r   r   __exit__  s   zWebDataset.__exit__)rP   rQ   rR   __doc__r   r   pipe_cleanerr   single_node_onlysplit_by_workerrd   rm   rn   r   r   __classcell__r   r   r   r   rY   D  s.    K/rY   c                           e Zd ZdZ fddZ  ZS )FluidWrapperz/Small fluid-interface wrapper for DataPipeline.c                    s   t    | | d S rO   )rc   rd   ro   )r   initialr   r   r   rd     s   
zFluidWrapper.__init__rP   rQ   rR   r   rd   r   r   r   r   r   r         r   c                       r   )	WebLoaderz5A wrapper for DataLoader that adds a fluid interface.c                    s   t  t|i | d S rO   )rc   rd   r   r?   r   r   r   rd     s   zWebLoader.__init__r   r   r   r   r   r     r   r   )rh   rk   re   typesr   urllib.parser   r    r   r   r   r   r	   r
   r   pipeliner   pytorchr   tariteratorsr   r   r   rX   rY   r   r   r   r   r   r   <module>   s&       9