o
    wi                     @   s^   d dl Z d dlZd dlZd dlmZ ddlmZmZ ddlm	Z	 dd Z
G dd	 d	ee	ZdS )
    N)islice   )
DataLoaderIterableDataset)PipelineStagec                 C   s0   dd }t | jjd | jtfd|i}|| _| S )zAdd a length method to the given object.

    Args:
        obj: The object to which the length method will be added.

    Returns:
        The modified object with a new length method.
    c                 S   s   | j S )N)sizeself r
   P/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/webdataset/pipeline.pylength   s   z!add_length_method.<locals>.length_Length__len__)type	__class____name__r   )objr   Combinedr
   r
   r   add_length_method
   s   

r   c                       s~   e Zd ZdZ fddZdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd ZdddZdddZdddZ  ZS )DataPipelinezA pipeline starting with an IterableDataset and a series of filters.

    Args:
        *args: Variable length argument list of pipeline stages.
        **kwargs: Arbitrary keyword arguments.
    c                    s^   t    g | _d| _d| _d| _|D ]}|d u rqt|tr&| j| q| j	| qd S )Nr   )
super__init__pipeliner   repetitionsnsamples
isinstancelistextendappend)r	   argskwargsargr   r
   r   r   (   s   

zDataPipeline.__init__c                 C   s&   | j D ]}t|dr|  q| ` dS )z)Close the pipeline and release resources.closeN)r   hasattrr$   )r	   stepr
   r
   r   r$   6   s
   

zDataPipeline.closec                 O   sv   t |ttfrt|dkrt|S t |tr|j|i |S t |tr't|S t|r4||i |}|S t	| d)a  Apply a pipeline stage, possibly to the output of a previous stage.

        Args:
            f: The pipeline stage to invoke.
            *args: Variable length argument list.
            **kwargs: Arbitrary keyword arguments.

        Returns:
            The result of invoking the pipeline stage.

        Raises:
            ValueError: If the pipeline stage is not valid.
        r   z: not a valid pipeline stage)
r   r   r   leniterr   runr   callable
ValueError)r	   fr    r!   resultr
   r
   r   invoke=   s   

zDataPipeline.invokec                 C   s4   |  | jd }| jdd D ]}|  ||}q|S )zCreate an iterator through one epoch in the pipeline.

        Returns:
            An iterator for one epoch of the pipeline.
        r   r   N)r.   r   )r	   sourcer&   r
   r
   r   	iterator1V   s   zDataPipeline.iterator1c                 c   sD    t | jD ]}d}|  D ]	}|V  |d7 }q|dkr dS qdS )zCreate an iterator through the entire dataset, using the given number of repetitions.

        Yields:
            Samples from the dataset.
        r   r   N)ranger   r0   )r	   _countsampler
   r
   r   iteratora   s   
zDataPipeline.iteratorc                 C   s4   | j dkr| jdkrt|  | jS |  S |  S )zCreate an iterator through the pipeline, repeating and slicing as requested.

        Returns:
            An iterator through the pipeline.
        r   r   )r   r   r   r5   r   r
   r
   r   __iter__p   s
   

zDataPipeline.__iter__c                 C   s
   | j | S )zReturn pipeline stage i.

        Args:
            i: The index of the pipeline stage to return.

        Returns:
            The pipeline stage at index i.
        )r   )r	   ir
   r
   r   stage~   s   
	zDataPipeline.stagec                 C   s   | j | dS )zsAppend a pipeline stage (modifies the object).

        Args:
            f: The pipeline stage to append.
        N)r   r   )r	   r,   r
   r
   r   r      s   zDataPipeline.appendc                 G   s0   t  | }t  |j|_|D ]}|| q|S )zAppend pipeline stages to a copy of the pipeline and return the copy.

        Args:
            *args: Variable length argument list of pipeline stages to append.

        Returns:
            A new DataPipeline object with the appended stages.
        )copyr   r   )r	   r    r-   r"   r
   r
   r   compose   s
   
	zDataPipeline.composeFc                 C   s   |st d || _t| S )a  Add a __len__ method returning the desired value.

        This does not change the actual number of samples in an epoch.
        PyTorch IterableDataset should not have a __len__ method.
        This is provided only as a workaround for some broken training environments
        that require a __len__ method.

        Args:
            n: The length value to set.
            silent: If True, suppress the warning message.

        Returns:
            The modified DataPipeline object with a __len__ method.
        z.with_length() only sets the value of __len__ for compatibility with some training environments. It does not change the number of samples in an epoch.)warningswarnr   r   )r	   nsilentr
   r
   r   with_length   s   zDataPipeline.with_lengthr   c                 C   s   t j| _t||| _| S )a  Change the epoch to return the given number of samples/batches.

        Args:
            nsamples: The number of samples per epoch.
            nbatches: The number of batches per epoch.

        Returns:
            The modified DataPipeline object.
        )sysmaxsizer   maxr   )r	   r   nbatchesr
   r
   r   
with_epoch   s   
zDataPipeline.with_epochc                 C   s*   |dkr|| _ || _| S tj| _ || _| S )a8  Repeat iterating through the dataset for the given number of epochs up to the given number of samples.

        Args:
            nepochs: The number of epochs to repeat.
            nbatches: The number of batches to limit per repetition.

        Returns:
            The modified DataPipeline object.
        r   )r   r   r@   rA   )r	   nepochsrC   r
   r
   r   repeat   s   
zDataPipeline.repeat)F)r   r   )r   
__module____qualname____doc__r   r$   r.   r0   r5   r6   r8   r   r:   r?   rD   rF   __classcell__r
   r
   r#   r   r       s    

r   )r9   r@   r;   	itertoolsr   pytorchr   r   utilsr   r   r   r
   r
   r
   r   <module>   s    