o
    Si                     @   sR   d dl Z d dlZd dlmZ d dlmZmZ d dlmZ G dd dej	j
jZdS )    N)CutSet)get_rankget_world_size)
CutSamplerc                       s|   e Zd ZdZ		ddejjjdede	de	ddf
 fd	d
Z
deddfddZdd ZdefddZdeddfddZ  ZS )IterableDatasetWrappera  
    A wrapper that creates an iterable-style dataset out of a map-style dataset and a
    :class:`~lhotse.dataset.sampling.base.CutSampler`.

    It's intended use is for training with cuts (:class:`~lhotse.cut.Cut`) that contain
    binary data instead of paths/urls/keys pointing to an external storage.
    Currently, we support this kind of dataloading using WebDataset library
    (to export a :class:`~lhotse.CutSet` into WebDataset tarball format,
    see :func:`lhotse.dataset.webdataset.export_to_webdataset`).

    .. caution: With an iterable-style dataset, the sampler replicas exist in the
        dataloading worker subprocesses. That means unless you take extra steps to avoid
        data duplication, it may happen that each worker returns exactly the same data.
        This problem is avoided with WebDataset by using sharding -- we let WebDataset
        subset the shards visible in each subprocess (and each node in multi-GPU DDP training).

    .. note: If you are going to use this class with ``persistent_workers=True`` option of
        ``torch.utils.data.DataLoader``, set ``auto_increment_epoch=True`` argument.
        It will ensure that each epoch is shuffled differently than the previous one.

    Example usage::

        >>> from lhotse import CutSet
        >>> from lhotse.dataset import K2SpeechRecognitionDataset, DynamicCutSampler
        >>> # Preparing data -- WebDataset takes care of sharding and de-duplicating data
        >>> cuts = CutSet.from_webdataset(
        ...     "data/shard-{000000..000321}.tar",
        ...     shuffle_shards=True,
        ...     split_on_workers=True,
        ...     split_on_nodes=True,
        ... )
        >>> dataset = K2SpeechRecognitionDataset()
        >>> sampler = DynamicCutSampler(cuts, max_duration=200, shuffle=True)
        >>> # Creating terable dataset wrapper
        >>> iter_dset = IterableDatasetWrapper(dataset, sampler)
        >>> dloader = torch.utils.data.DataLoader(iter_dset, batch_size=None, num_workers=2)
        >>> # Training loop
        >>> for epoch in range(10):
        ...     dloader.dataset.set_epoch(epoch)
        ...     for batch in dloader:
        ...         pass  # training step
    Fdatasetsamplerauto_increment_epochreset_on_iterreturnNc                    sn   t    || _|| _|| _|| _d| _d | _| jj}| jj	}|dks'|dkr5t
d| d| d d S d S )Nr      z8We detected you're trying to use a CutSampler with rank z and world_size a   inside an IterableDatasetWrapper. Setting rank != 0 and world_size != 1 in Lhotse's CutSampler is inteded for map-style datasets, when the sampler exists in the main training loop. Make sure these settings are intentional or pass rank=0 and world_size=1 to the sampler's constructor.
)super__init__r   r   r	   r
   epoch_sampler_iterrank
world_sizewarningswarn)selfr   r   r	   r
   r   ws	__class__ S/home/ubuntu/.local/lib/python3.10/site-packages/lhotse/dataset/iterable_dataset.pyr   6   s   
zIterableDatasetWrapper.__init__r   c                 C   s^   || _ | j| t| jdr)t| jjtr+| jjD ]}t|jdr(|j| qd S d S d S )Ncuts	set_epoch)r   r   r   hasattr
isinstancer   tupledata)r   r   csr   r   r   r   P   s   z IterableDatasetWrapper.set_epochc                 C   s    | j d u s| jrt| j| _ | S )N)r   r
   iterr   )r   r   r   r   __iter__[   s   zIterableDatasetWrapper.__iter__c                 C   sN   zt | j}| | | j| W S  ty&   | jr"| | jd  d | _ w )Nr   )nextr   _update_dataloading_infor   StopIterationr	   r   r   )r   sampledr   r   r   __next__`   s   

zIterableDatasetWrapper.__next__r   c                 C   s.   t  }t }|D ]}||jd< ||jd< qd S )Nr   r   )r   r   dataloading_info)r   r   r   r   cr   r   r   r%   k   s   
z/IterableDatasetWrapper._update_dataloading_info)FF)__name__
__module____qualname____doc__torchutilsr    Datasetr   boolr   intr   r#   dictr(   r   r%   __classcell__r   r   r   r   r   
   s&    /r   )r   r/   lhotser   lhotse.dataset.dataloadingr   r   lhotse.dataset.sampling.baser   r0   r    IterableDatasetr   r   r   r   r   <module>   s    