o
    ei<                     @   sb  d Z ddlZddlZddlZddlmZmZmZ ddlm	Z	 ddl
mZmZ ddlmZ ddlmZmZ ddlmZmZmZ dd	lmZ z ddlZdd
lmZ dZeddd dkr`ejjZnej ZW n e!yo   dZY nw ee"Z#dd Z$dddZ%dd Z&dddZ'e(e&e	j) e	j)e	_*e&e	_)e+e	dre'e	_,eG dd deZ-eG dd dZ.dS )a8  PyTorch compatible DataLoaders

Essentially we extend PyTorch DataLoader by adding the ability to save the
data loading state, so that a checkpoint may be saved in the middle of an
epoch.

Example
-------
>>> import torch
>>> from speechbrain.utils.checkpoints import Checkpointer
>>> # An example "dataset" and its loader
>>> dataset = torch.randn(10, 1)
>>> dataloader = SaveableDataLoader(dataset, num_workers = 3)
>>> # Setup the checkpointer:
>>> tmpdir = getfixture('tmpdir')
>>> checkpointer = Checkpointer(tmpdir, {"dataloader": dataloader})
>>> # Iterate:
>>> for i, data_point in enumerate(dataloader):
...     # Here you would process the data:
...     rainfall_amount_prediction = data_point * 4.
...     # Now, imagine the experiment gets killed on the fifth batch:
...     if i == 4:
...         break
...     # Luckily, you had just saved a checkpoint:
...     if i == 3:
...         _ = checkpointer.save_checkpoint(end_of_epoch = False)
>>> # So when you restart the experiment:
>>> new_dataloader = SaveableDataLoader(dataset, num_workers = 3)
>>> new_checkpointer = Checkpointer(tmpdir, {"dataloader": new_dataloader})
>>> _ = new_checkpointer.recover_if_possible()
>>> # The dataloader fast-forwards to the position where we left off:
>>> assert next(iter(new_dataloader)) == dataset[4]

Authors:
  * Aku Rouhe 2020
    N)
DataLoaderDistributedSamplerIterableDataset)_BaseDataLoaderIter)BatchsizeGuesserPaddedBatch)DynamicItemDataset)DistributedSamplerWrapperReproducibleRandomSampler)mark_as_loadermark_as_saverregister_checkpoint_hooks)
get_logger)versionT
webdataset   z0.1.Fc                 C   s   | dd}| dd}| rWt|tsW| dd}|dur/t||||d}d|d< ||d< |S | ddu rGt|||d}d|d< ||d< |S t| dd|d	}||d< |S | rct|trctd
 |S )a  Prepare loader_kwargs for DDP when necessary.

    Arguments
    ---------
    distributed_launch : bool
        DDP flag
    rank : int
        node rank in DDP
    dataset : Dataset
        The dataset to make a DataLoader for.
    loader_kwargs : dict
        Keyword args to DataLoader, see PyTorch DataLoader for
        options.

    Returns
    -------
    loader_kwargs
        augmented keyword args to DataLoader
    samplerNshuffleF	drop_last)rankr   r   batch_sampler)r   r   )r   zDCannot automatically solve distributed sampling for IterableDataset.)get
isinstancer   r	   r   loggerwarning)distributed_launchr   datasetloader_kwargsr   r   r    r   [/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/speechbrain/dataio/dataloader.pydistributed_loader_specificsL   sD   
r    c                 K   s   d|vrt | trt|d< |ddr2|ddurtdtjdd}t| |d	}||d< |d= trAt | t	rAd
|vrAd|d
< t | t
rOt| fi |}nt| fi |}|dur`t||}|S )a'  Makes a basic DataLoader with SpeechBrain defaults.

    For DynamicItemDatasets (which return dicts), use
    PaddedBatch as the default collate_fn.

    Shuffling gets implemented by ReproducibleRandomSampler.

    If the Dataset is not an IterableDataset, the DataLoader
    is a SaveableDataLoader.

    If the Dataset is a webdataset.dataset.Composable, set default
    batch_size = None.

    Can also loop over the underlying dataloader continuously,
    and stop iterations at nominal epoch lengths.

    Arguments
    ---------
    dataset : Dataset
        The dataset to make a DataLoader for.
    looped_nominal_epoch : None, int
        If an integer is given, loop the underlying DataLoader infinitely and
        set a nominal epoch length in batches (or whatever the DataLoader
        yields).
    **loader_kwargs : dict
        Keyword args to DataLoader, see PyTorch DataLoader for
        options.

    Returns
    -------
    DataLoader
        If looped_nominal_epoch is None
    LoopedLoader
        If looped_nominal_epoch is not None
    
collate_fnr   Fr   Nz?Cannot specify both shuffle=True and a sampler in loader_kwargsSB_GLOBAL_SEEDi&l!)seed
batch_size)r   r   r   r   
ValueErrorosenvironr
   WDS_AVAILABLE	WDS_CLASSr   r   SaveableDataLoaderLoopedLoader)r   looped_nominal_epochr   r#   r   
dataloaderr   r   r   make_dataloader   s2   
%

r.   c              	   O   s   | j |g|R i | t|drN|jd urPt|jD ]-}zt| j W n tyB   d}d|j d 	 | d t| Y  nw |d | _	qd |_d S d S d S )N_speechbrain_recovery_skip_toz/Tried to fast-forward Sampler after checkpoint zrecovery by  z! indices. Ignoring this mismatch.   )
__old_init__hasattrr/   rangenext_sampler_iterStopIterationwarningswarn_num_yielded)selfloaderargskwargsiMSGr   r   r   
__new_init   s&   



rA   c                 O   s&   |st | j| _d| _|j| _d S d S Nr   )iter_index_samplerr6   r:   _IterableDataset_len_called)r;   r<   
first_iterr=   r>   r   r   r   __new_reset   s
   rG   _resetc                       sD   e Zd ZdZ fddZ fddZedd Zedd	 Z	  Z
S )
r*   a  A saveable version of the PyTorch DataLoader.

    See `torch.utils.data.DataLoader` for usage. This class should work exactly
    like the PyTorch basic DataLoader, but this can be checkpointed with
    SpeechBrain's Checkpointer.

    Note
    ----
    1. The saveability is implemented via some unfortunately slightly magical
    means.
    2. The data loader cannot recover after entering __iter__. Normally this is
    not a problem, as recovery should happen before training begins.  However,
    just before evaluation, it is also typical to recover the checkpoint at
    which performance was the best. Thus, if a checkpoint is loaded after
    entering __iter__, we just assume it is for this reason. A warning is
    logged, but that is all.
    c                    s8   t  j|i | t| jtrtd d | _d | _d S )NzkSaveableDataLoader cannot save the position in an IterableDataset. Save the position on the dataset itself.)	super__init__r   r   r   r   r   r/   _speechbrain_iterator)r;   r=   r>   	__class__r   r   rJ     s   
zSaveableDataLoader.__init__c                    s   t   }|| _|S N)rI   __iter__rK   )r;   iteratorrL   r   r   rO   &  s   
zSaveableDataLoader.__iter__c                 C   sp   t | jtrtd | jd u rd }n| jj}t|ddd}|t	| W d    d S 1 s1w   Y  d S )NzWarning again: a checkpoint was requested on SaveableDataLoader, but the dataset is an IterableDataset. Cannot save the position in an IterableDataset. Not raising an error; assuming that you know what you're doing.wutf-8encoding)
r   r   r   r   r   rK   r:   openwritestr)r;   pathto_savefor   r   r   _speechbrain_save1  s   
"z$SaveableDataLoader._speechbrain_savec                 C   s   | j d urtd d S |rd S t|dd!}| }|td kr*	 W d    d S t|| _W d    d S 1 s:w   Y  d S )NzSaveableDataLoader was requested to load a checkpoint, but the DataLoader has already been iterated. The DataLoader file will be ignored. This is normal in evaluation, when a checkpoint is loaded just to retrieve the best model.rR   rS   )rK   r   debugrU   readrW   intr/   )r;   rX   end_of_epochfisavedr   r   r   _speechbrain_loadA  s   
"z$SaveableDataLoader._speechbrain_load)__name__
__module____qualname____doc__rJ   rO   r   r[   r   rb   __classcell__r   r   rL   r   r*     s    

r*   c                   @   sL   e Zd ZdZdddZdd Zdd Zd	d
 Zedd Z	e
dddZdS )r+   a  Loops an underlying iterable indefinitely, with nominal epoch lengths

    This is useful for working with IterableDatasets, and particularly
    webdataset-style loading. We recommend using ``.repeat()`` on the
    webdataset IterableDataset instance, so that the underlying dataloader
    naturally continues for ever.

    Arguments
    ---------
    loader : iterable
        A DataLoader or other iterable that is looped repeatedly.
    epoch_length : int
        The length of the nominal epoch. After this many steps, raises
        StopIteration
    batchsize_fn : callable
        Function for determining batch size, default ``BatchsizeGuesser``
    Nc                 C   s<   || _ d | _|| _d| _d| _d| _|d u rt | _d S d S rB   )r<   rP   epoch_lengthsteptotal_stepstotal_samplesr   batchsize_fn)r;   r<   rh   rl   r   r   r   rJ   m  s   zLoopedLoader.__init__c                 C   s   | j d u rt| j| _ | S rN   )rP   rC   r<   r;   r   r   r   rO   w  s   
zLoopedLoader.__iter__c                 C   s   | j | jk r<|  j d7  _ |  jd7  _zt| j}W n ty/   t| j| _t| j}Y nw |  j| 	|7  _|S d| _ t)Nr1   r   )
ri   rh   rj   r5   rP   r7   rC   r<   rk   rl   )r;   batchr   r   r   __next__|  s   zLoopedLoader.__next__c                 C   s   | j S rN   )rh   rm   r   r   r   __len__  s   zLoopedLoader.__len__c                 C   s^   t |ddd}t| j|d t| j|d t| j|d W d   dS 1 s(w   Y  dS )zSaves the needed information.rQ   rR   rS   )fileN)rU   printri   rj   rk   )r;   rX   rZ   r   r   r   save  s
   "zLoopedLoader.saveTc                 C   s   t |ddL}t|  | _t|  | _t|  | _|s:| jdkrB| jdkrJ| j| _W d   dS W d   dS W d   dS W d   dS 1 sUw   Y  dS )zLoads the needed information.rR   rS   r   N)rU   r^   readlinestripri   rj   rk   rh   )r;   rX   r_   r`   r   r   r   load  s   
"zLoopedLoader.loadrN   )T)rc   rd   re   rf   rJ   rO   ro   rp   r   rs   r   rv   r   r   r   r   r+   Y  s    


r+   rN   )F)/rf   	functoolsr&   r8   torch.utils.datar   r   r   torch.utils.data.dataloaderr   speechbrain.dataio.batchr   r   speechbrain.dataio.datasetr   speechbrain.dataio.samplerr	   r
   speechbrain.utils.checkpointsr   r   r   speechbrain.utils.loggerr   r   wdsimportlib_metadatar   r(   r   
Composabler)   DataPipelineImportErrorrc   r   r    r.   rA   rG   update_wrapperrJ   r2   r3   rH   r*   r+   r   r   r   r   <module>   sH    %

BR

P