o
    }oi                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
mZmZmZmZmZ d dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZm Z  d dl!m"Z"m#Z# d dl$T d dl%m&Z& d dl%m'Z( d dl)m*Z*m+Z+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 g dZ4d5g ddd e6 D  Z7dd Z8G dd dZ9de:de;de;fdd Z<	!			"dAd#ee:ee: f d$e=d%ee= d&ee; d'e;f
d(d)Z>	 d#ee:ee: f de:d*e=de;de;f
d+d,Z?G d-d. d.e"Z@G d/d0 d0e@ZAG d1d2 d2e@ZBe/d3d4G d5d6 d6e#ZCG d7d8 d8eCZDG d9d: d:eCZEG d;d< d<e#ZFG d=d> d>ZGG d?d@ d@eZHdS )B    N)Iterable)CallableDictr   ListOptionalTupleUnion)ChainDataset)tqdm)WaveformFeaturizer)ChannelSelectorType)available_formats)
tokenizers)collectionsparsers)DatasetIterableDataset)*)logging)
webdataset)DataStoreObjectdatastore_object_getis_datastore_cache_sharedis_datastore_path)
deprecated)webdataset_split_by_workers)is_global_rank_zero)AudioToCharDatasetAudioToBPEDatasetTarredAudioToCharDatasetTarredAudioToBPEDataset;)wavmp3flacopusc                 C   s   g | ]}|  qS  )lower).0fmtr&   r&   [/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/asr/data/audio_to_text.py
<listcomp>1       r+   c                 C   s  t t|  }t|dkr|\}}}}}nt|dkr#d}|\}}}}ntdd}|d du}|r7t| }|d du}	|	rEt| }
g g }}| D ]R}t|dkr\|\}}}}}n|\}}}}|r| }||k rzd|| f}tjj	||}|
| |	r| }||
k rd|
| f}tjjj	|||d}|
| qL|rt|}t|}nd\}}|	rt|}t|}nd}d}|du r||||fS tj|tjd}|||||fS )	ar  collate batch of audio sig, audio len, tokens, tokens len
    Args:
        batch (Optional[FloatTensor], Optional[LongTensor], LongTensor,
               LongTensor):  A tuple of tuples of signal, signal lengths,
               encoded tokens, and encoded tokens length.  This collate func
               assumes the signals are 1d torch tensors (i.e. mono audio).
          Nz$Expects 4 or 5 tensors in the batch!r   )value)NNdtype)listziplen
ValueErrormaxitemtorchnn
functionalpadappendstacktensorint32)batchpad_idpacked_batch_audio_lengthstokens_lengths
sample_idsmax_audio_len	has_audio
has_tokensmax_tokens_lenaudio_signaltokensbsigsig_lentokens_itokens_i_lenr;   r&   r&   r*   _speech_collate_fn4   sZ   




rR   c                   @   s   e Zd ZdZ								ddedeeef dee dee d	e	d
ee	 dee	 de	de
dee fddZde	deee	 e	f fddZdedeee	 e	f fddZdejjdeee	 e	f fddZdS )ASRManifestProcessora  
    Class that processes a manifest json file containing paths to audio files, transcripts, and durations (in seconds).
    Each new line is a different sample. Example below:
    {"audio_filepath": "/path/to/audio.wav", "text_filepath": "/path/to/audio.txt", "duration": 23.147}
    ...
    {"audio_filepath": "/path/to/audio.wav", "text": "the transcription", "offset": 301.75, "duration": 0.82, "utt":
    "utterance_id", "ctm_utt": "en_4156", "side": "A"}
    Args:
        manifest_filepath: Path to manifest json as described above. Can be comma-separated paths.
        parser: Str for a language specific preprocessor or a callable.
        max_duration: If audio exceeds this length, do not include in dataset.
        min_duration: If audio is less than this length, do not include in dataset.
        max_utts: Limit number of utterances.
        bos_id: Id of beginning of sequence symbol to append if not None.
        eos_id: Id of end of sequence symbol to append if not None.
        pad_id: Id of pad symbol. Defaults to 0.
    Nr   Fmanifest_filepathparsermax_durationmin_durationmax_uttsbos_ideos_idrA   index_by_file_idmanifest_parse_funcc              	   C   s6   || _ tj||||||	|
d| _|| _|| _|| _d S )N)manifests_filesrU   rW   rV   
max_numberr[   
parse_func)rU   r   ASRAudioText
collectionrZ   rY   rA   )selfrT   rU   rV   rW   rX   rY   rZ   rA   r[   r\   r&   r&   r*   __init__   s   

zASRManifestProcessor.__init__indexreturnc                 C   s   | j | }| |S N)ra   process_text_by_sample)rb   rd   sampler&   r&   r*   process_text_by_id   s   

z'ASRManifestProcessor.process_text_by_idfile_idc                 C   s$   | j j| d }| j | }| |S Nr   )ra   mappingrg   )rb   rj   manifest_idxrh   r&   r&   r*   process_text_by_file_id   s   

z,ASRManifestProcessor.process_text_by_file_idrh   c                 C   sV   |j t|j }}| jd ur| jg| }|d7 }| jd ur'|| jg }|d7 }||fS )N   )text_tokensr4   rY   rZ   )rb   rh   ttlr&   r&   r*   rg      s   

z+ASRManifestProcessor.process_text_by_sample)NNr   NNr   FN)__name__
__module____qualname____doc__strr   r   r   floatintboolrc   r   r   ri   rn   r   r`   OUTPUT_TYPErg   r&   r&   r&   r*   rS   q   sD    
	

&rS   shard_strategy
world_sizeglobal_rankc           
      C   s>  ddg}||vrt d| t| tr:g d}|D ]}|| v r&| |d} qg d}|D ]}|| v r9| |d} q-t| trHttj| dd	} |d
kr|dkrtd t| | dkrkt	dt|  d| d t| | | }|t| |  }	| ||	 } td|||	 | S |dkrtd | S t d| | S )Nscatter	replicatez `shard_strategy` must be one of )([<_OP_{))]>_CL_}F)escapero   zDAll tarred dataset shards will be scattered evenly across all nodes.r   z$Number of shards in tarred dataset (z5) is not divisible by number of distributed workers (z).z@Partitioning tarred dataset: process (%d) taking shards [%d, %d)z>All tarred dataset shards will be replicated across all nodes.z.Invalid shard strategy ! Allowed values are : )
r5   
isinstancerw   replacer2   braceexpandr   infor4   warning)
sharded_filepathsr|   r}   r~   valid_shard_strategiesbrace_keys_openbkeybrace_keys_close	begin_idxend_idxr&   r&   r*   expand_sharded_filepaths   sJ   


	
r   F   manifest_filepathscache_audioshared_cachenum_workersmax_num_workersc           	      C   s  t | tr
| d} tdd | D }|dkrdd }tj rntj rnt	d |du r1t
 }|r<t	d	 t }nt	d
 ttjdd}|dk}|rbtd|rWdnd || |||d t	d tj  dS t rtd || |||d dS tddS )a=  Cache manifests and audio from an object store.
    It is assumed that remote manifests are using relative paths.

    Args:
        manifest_filepaths: list of paths to manifest files (list of strings or a string with `,` as separator)
        cache_audio: If True, audio from manifest will also be cached
        shared_cache: Optional, True if cache is shared across all nodes
        num_workers: Optional, number of workers to be used for download
        max_num_workers: max number of workers to be used for download, used when setting num_workers automatically
    ,c                 S   s   g | ]}t |qS r&   )r   )r(   fr&   r&   r*   r+      r,   z-cache_datastore_manifests.<locals>.<listcomp>r   c              	   S   s  |du r
t  d }t||}| D ]}t|rtd| t| }tdt| |rtd| t j	
|}g }t|d#}|D ]}	t|	}
t j	||
d }|t|d qCW d   n1 sfw   Y  |dur|dkrtd	| tj|d
}tt|t|t|d}W d   n1 sw   Y  ntd g }t|D ]}|| du qt|stdtd qtd| qdS )z1Cache manifests and audio data from object store.Nro   zCache manifest file: %szCached at: %sz"Cache audio from manifest file: %sraudio_filepath)
store_pathz+Using multiprocessing with num_workers: %d.)	processes)totalzUsing a single process.z&Some files not downloaded successfullyzCaching completez#Manifest is not on a data store: %s)os	cpu_countminr   r   r   r   getrw   pathdirnameopenjsonloadsjoinr<   debugmultiprocessingPoolr2   r
   imapr   r4   allRuntimeError)r   r   r   r   manifest_filecached_manifest_filemanifest_diraudio_objectsr   liner7   r   presultaudio_objectr&   r&   r*   
cache_data   sJ   



z-cache_datastore_manifests.<locals>.cache_dataz5Distributed environment is available and initialized.Nz<Cache is shared among nodes, cache data on global rank zero.z?Cache is not shared among nodes, cache data on local rank zero.
LOCAL_RANKzCache data from %s rank 0globallocal)r   r   r   r   zReached barrierzTorch distributed is not initialized and caching may be prone to data race conditions. Now caching data from global rank 0. If there are other ranks and they pass this before rank 0, errors might result.zTorch distributed is not initialized and caching on nodes other than global rank zero is disabled to avoid race condition between different ranks. To ensure distributed environment is initialized, please update data config to use `defer_setup = True`.)r   rw   splitsumr8   distributedis_availableis_initializedr   r   r   r   ry   r   environr   r   barrierr   r   )	r   r   r   r   r   num_datastore_manifestsr   is_rank_zero
local_rankr&   r&   r*   cache_datastore_manifests   sN   

1




r   shard_manifestsc                 C   sH   |r"t j std | S t j std | S t| |||d} | S )NzFNot running in torch.distributed mode. Manifest sharding not availableztManifest sharding was requested but torch.distributed is not initialized Did you intend to set the defer_setup flag?r   r|   r}   r~   )r8   r   r   r   r   r   r   )r   r|   r   r~   r}   r&   r&   r*   shard_manifests_if_neededk  s    


r   c                    @   s   e Zd ZdZedeeeef  fddZ													d$ded	e
eef d
ededddee dee dededee dee dededee dee fddZdd Zdd Zdd Zd d! Zd"d# ZdS )%_AudioTextDataseta1  
    Dataset that loads tensors via a json file containing paths to audio files, transcripts, and durations (in seconds).
    Each new line is a different sample. Example below:
    {"audio_filepath": "/path/to/audio.wav", "text_filepath": "/path/to/audio.txt", "duration": 23.147}
    ...
    {"audio_filepath": "/path/to/audio.wav", "text": "the transcription", "offset": 301.75, "duration": 0.82, "utt":
    "utterance_id", "ctm_utt": "en_4156", "side": "A"}
    Args:
        manifest_filepath: Path to manifest json as described above. Can be comma-separated paths.
        parser: Str for a language specific preprocessor or a callable.
        sample_rate (int): Sample rate to resample loaded audio to
        int_values (bool): If true, load samples as 32-bit integers. Defauts to False.
        augmentor (nemo.collections.asr.parts.perturb.AudioAugmentor): An AudioAugmentor object used to augment loaded
            audio
        max_duration: If audio exceeds this length, do not include in dataset
        min_duration: If audio is less than this length, do not include in dataset
        max_utts: Limit number of utterances
        trim: whether or not to trim silence. Defaults to False
        bos_id: Id of beginning of sequence symbol to append if not None
        eos_id: Id of end of sequence symbol to append if not None
        pad_id: Id of pad symbol. Defaults to 0
        return_sample_id (bool): whether to return the sample_id as a part of each sample
        channel_selector (int | Iterable[int] | str): select a single channel or a subset of channels from multi-channel audio. If set to `'average'`, it performs averaging across channels. Disabled if set to `None`. Defaults to `None`. Uses zero-based indexing.
        manifest_parse_func: Optional function to parse manifest entries. Defaults to None.
    re   c              	   C   H   t dt t tdt t dt t tdt t tdt dddS z+Returns definitions of module output ports.)BTr   T)optional)rK   a_sig_lengthtranscriptstranscript_length	sample_id
NeuralTypeAudioSignaltupleLengthsType
LabelsTyperb   r&   r&   r*   output_types     

z_AudioTextDataset.output_typesFNr   rT   rU   sample_rate
int_values	augmentor1nemo.collections.asr.parts.perturb.AudioAugmentorrV   rW   rX   trimrY   rZ   rA   return_sample_idchannel_selectorr\   c                 C   sd   t |tkr|d}t|dd t||||||
|||d	| _t|||d| _|	| _|| _	|| _
d S )Nr   T)r   r   )	rT   rU   rV   rW   rX   rY   rZ   rA   r\   r   r   r   )typerw   r   r   rS   manifest_processorr   
featurizerr   r   r   )rb   rT   rU   r   r   r   rV   rW   rX   r   rY   rZ   rA   r   r   r\   r&   r&   r*   rc     s$   

z_AudioTextDataset.__init__c                 C      | j j| S rf   r   ra   rb   r   r&   r&   r*   get_manifest_sample     z%_AudioTextDataset.get_manifest_samplec                    s&   t |tr fdd|D S  |S )Nc                    s   g | ]}  |qS r&   )_process_sample)r(   _indexr   r&   r*   r+     s    z1_AudioTextDataset.__getitem__.<locals>.<listcomp>)r   IterableABCr   )rb   rd   r&   r   r*   __getitem__  s   

z_AudioTextDataset.__getitem__c           
      C   s   | j j| }|j}|d u rd}| jj|j||j| j|j| j	d}|t
|jd  }}| j j|d\}}| jrL||t
| t
| |f}	|	S ||t
| t
| f}	|	S )Nr   )offsetdurationr   orig_srr   rh   )r   ra   r   r   process
audio_filer   r   r   r   r8   r>   shapelongrg   r   )
rb   rd   rh   r   featuresr   flrq   rr   outputr&   r&   r*   r     s&   " z!_AudioTextDataset._process_samplec                 C   s   t | jjS rf   )r4   r   ra   r   r&   r&   r*   __len__  r   z_AudioTextDataset.__len__c                 C   s   t || jjdS )N)rA   )rR   r   rA   rb   r@   r&   r&   r*   _collate_fn  s   z_AudioTextDataset._collate_fn)FNNNr   FNNr   FNN)rs   rt   ru   rv   propertyr   r   rw   r   r   r   r   ry   rz   r   rc   r   r   r   r  r  r&   r&   r&   r*   r     sh    
	

(r   c                (       s   e Zd ZdZedeeeef  fddZ															
			d!dede
eee f dededddee dee dedededededee dee dede
eef dedee dee f& fdd Z  ZS )"r   a  
    Dataset that loads tensors via a json file containing paths to audio
    files, transcripts, and durations (in seconds). Each new line is a
    different sample. Example below:
    {"audio_filepath": "/path/to/audio.wav", "text_filepath":
    "/path/to/audio.txt", "duration": 23.147}
    ...
    {"audio_filepath": "/path/to/audio.wav", "text": "the
    transcription", "offset": 301.75, "duration": 0.82, "utt":
    "utterance_id", "ctm_utt": "en_4156", "side": "A"}

    Args:
        manifest_filepath: Path to manifest json as described above. Can
            be comma-separated paths.
        labels: String containing all the possible characters to map to
        sample_rate (int): Sample rate to resample loaded audio to
        int_values (bool): If true, load samples as 32-bit integers. Defauts to False.
        augmentor (nemo.collections.asr.parts.perturb.AudioAugmentor): An AudioAugmentor
            object used to augment loaded audio
        max_duration: If audio exceeds this length, do not include in dataset
        min_duration: If audio is less than this length, do not include
            in dataset
        max_utts: Limit number of utterances
        blank_index: blank character index, default = -1
        unk_index: unk_character index, default = -1
        normalize: whether to normalize transcript text (default): True
        bos_id: Id of beginning of sequence symbol to append if not None
        eos_id: Id of end of sequence symbol to append if not None
        return_sample_id (bool): whether to return the sample_id as a part of each sample
        channel_selector (int | Iterable[int] | str): select a single channel or a subset of channels from multi-channel audio. If set to `'average'`, it performs averaging across channels. Disabled if set to `None`. Defaults to `None`. Uses zero-based indexing.
        manifest_parse_func: Optional function to parse manifest entries. Defaults to None.
    re   c              	   C   r   r   r   r   r&   r&   r*   r   "  r   zAudioToCharDataset.output_typesFNr   TenrT   labelsr   r   r   r   rV   rW   rX   blank_index	unk_index	normalizer   rY   rZ   rA   rU   r   r   r\   c                    sH   || _ tj|||
|	|d}t j|||||||||||||||d d S )Nr  nameunk_idblank_iddo_normalize)rT   rU   r   r   r   rV   rW   rX   r   rY   rZ   rA   r   r   r\   r  r   make_parsersuperrc   )rb   rT   r  r   r   r   rV   rW   rX   r	  r
  r  r   rY   rZ   rA   rU   r   r   r\   	__class__r&   r*   rc   -  s*   

zAudioToCharDataset.__init__)FNNNr   r  r  TFNNr   r  FNN)rs   rt   ru   rv   r  r   r   rw   r   r   r   r   ry   rz   rx   r   r   rc   __classcell__r&   r&   r  r*   r      sv    !	

r   c                       s   e Zd ZdZedeeeef  fddZ											dd	ed
dde
dedddee
 dee
 de
dedededee dee f fddZ  ZS )r   a}  
    Dataset that loads tensors via a json file containing paths to audio
    files, transcripts, and durations (in seconds). Each new line is a
    different sample. Example below:
    {"audio_filepath": "/path/to/audio.wav", "text_filepath":
    "/path/to/audio.txt", "duration": 23.147}
    ...
    {"audio_filepath": "/path/to/audio.wav", "text": "the
    transcription", "offset": 301.75, "duration": 0.82, "utt":
    "utterance_id", "ctm_utt": "en_4156", "side": "A"}

    In practice, the dataset and manifest used for character encoding and byte pair encoding
    are exactly the same. The only difference lies in how the dataset tokenizes the text in
    the manifest.

    Args:
        manifest_filepath: Path to manifest json as described above. Can
            be comma-separated paths.
        tokenizer: A subclass of the Tokenizer wrapper found in the common collection,
            nemo.collections.common.tokenizers.TokenizerSpec. ASR Models support a subset of
            all available tokenizers.
        sample_rate (int): Sample rate to resample loaded audio to
        int_values (bool): If true, load samples as 32-bit integers. Defauts to False.
        augmentor (nemo.collections.asr.parts.perturb.AudioAugmentor): An AudioAugmentor
            object used to augment loaded audio
        max_duration: If audio exceeds this length, do not include in dataset
        min_duration: If audio is less than this length, do not include
            in dataset
        max_utts: Limit number of utterances
        trim: Whether to trim silence segments
        use_start_end_token: Boolean which dictates whether to add [BOS] and [EOS]
            tokens to beginning and ending of speech respectively.
        return_sample_id (bool): whether to return the sample_id as a part of each sample
        channel_selector (int | Iterable[int] | str): select a single channel or a subset of channels from multi-channel audio. If set to `'average'`, it performs averaging across channels. Disabled if set to `None`. Defaults to `None`. Uses zero-based indexing.
        manifest_parse_func: Optional function to parse manifest entries. Defaults to None.
    re   c              	   C   r   r   r   r   r&   r&   r*   r     r   zAudioToBPEDataset.output_typesFNr   TrT   	tokenizer0nemo.collections.common.tokenizers.TokenizerSpecr   r   r   r   rV   rW   rX   r   use_start_end_tokenr   r   r\   c                    s   |
rt |dr|jdkr|j}nd }|
r"t |dr"|jdkr"|j}nd }t |dr2|jdkr2|j}nd}G dd d}t j|||||||||||||	|||d d S )NrY   r   rZ   rA   c                   @      e Zd Zdd Zdd ZdS )z4AudioToBPEDataset.__init__.<locals>.TokenizerWrapperc                 S   &   t |tjjrd| _nd| _|| _d S NTFr   r   aggregate_tokenizerAggregateTokenizeris_aggregate
_tokenizerrb   r  r&   r&   r*   rc        
z=AudioToBPEDataset.__init__.<locals>.TokenizerWrapper.__init__c                 W   V   t |d tr#| jr#g }|d D ]}|| j|d |d  q|S | jj| }|S Nr   rw   langr   r   r   extendr!  text_to_idsrb   argsrq   spanr&   r&   r*   __call__     z=AudioToBPEDataset.__init__.<locals>.TokenizerWrapper.__call__Nrs   rt   ru   rc   r-  r&   r&   r&   r*   TokenizerWrapper      r0  )rT   rU   r   r   r   rV   rW   rX   rY   rZ   rA   r   r   r   r\   hasattrrY   rZ   rA   r  rc   )rb   rT   r  r   r   r   rV   rW   rX   r   r  r   r   r\   rY   rZ   rA   r0  r  r&   r*   rc     s6   
zAudioToBPEDataset.__init__)
FNNNr   FTFNN)rs   rt   ru   rv   r  r   r   rw   r   r   ry   rz   r   r   rc   r  r&   r&   r  r*   r   \  sR    %	
r   zlWebdataset support will be removed in v2.1.0 versions, please use LhotseSpeechToTextBpeDataset class instead)explanationc                (   @   s   e Zd ZdZ															d,deeee f deded	ed
e	de
d dede
e de
e de	de
e de
e dedede	dedede	de
e f&ddZdd Zdd Zd d! Zd"d# Zd$d% Zd&d' Zd(d) Zd*d+ ZdS )-_TarredAudioToTextDataseta  
    A similar Dataset to the AudioToCharDataset/AudioToBPEDataset, but which loads tarred audio files.

    Accepts a single comma-separated JSON manifest file (in the same style as for the AudioToCharDataset/AudioToBPEDataset),
    as well as the path(s) to the tarball(s) containing the wav files. Each line of the manifest should
    contain the information for one audio file, including at least the transcript and name of the audio
    file within the tarball.

    Valid formats for the audio_tar_filepaths argument include:
    (1) a single string that can be brace-expanded, e.g. 'path/to/audio.tar' or 'path/to/audio_{1..100}.tar.gz', or
    (2) a list of file paths that will not be brace-expanded, e.g. ['audio_1.tar', 'audio_2.tar', ...].

    Note: For brace expansion in (1), there may be cases where `{x..y}` syntax cannot be used due to shell interference.
    This occurs most commonly inside SLURM scripts. Therefore we provide a few equivalent replacements.
    Supported opening braces - { <=> (, [, < and the special tag _OP_.
    Supported closing braces - } <=> ), ], > and the special tag _CL_.
    For SLURM based tasks, we suggest the use of the special tags for ease of use.

    See the WebDataset documentation for more information about accepted data and input formats.

    If using multiple workers the number of shards should be divisible by world_size to ensure an
    even split among workers. If it is not divisible, logging will give a warning but training will proceed.
    In addition, if using mutiprocessing, each shard MUST HAVE THE SAME NUMBER OF ENTRIES after filtering
    is applied. We currently do not check for this, but your program may hang if the shards are uneven!

    Notice that a few arguments are different from the AudioToCharDataset; for example, shuffle (bool) has been
    replaced by shuffle_n (int).

    Additionally, please note that the len() of this DataLayer is assumed to be the length of the manifest
    after filtering. An incorrect manifest length may lead to some DataLoader issues down the line.

    Args:
        audio_tar_filepaths: Either a list of audio tarball filepaths, or a
            string (can be brace-expandable).
        manifest_filepath (str): Path to the manifest.
        parser (callable): A callable which is used to pre-process the text output.
        sample_rate (int): Sample rate to resample loaded audio to
        int_values (bool): If true, load samples as 32-bit integers. Defauts to False.
        augmentor (nemo.collections.asr.parts.perturb.AudioAugmentor): An AudioAugmentor
            object used to augment loaded audio
        shuffle_n (int): How many samples to look ahead and load to be shuffled.
            See WebDataset documentation for more details.
            Defaults to 0.
        min_duration (float): Dataset parameter.
            All training files which have a duration less than min_duration
            are dropped. Note: Duration is read from the manifest JSON.
            Defaults to 0.1.
        max_duration (float): Dataset parameter.
            All training files which have a duration more than max_duration
            are dropped. Note: Duration is read from the manifest JSON.
            Defaults to None.
        blank_index (int): Blank character index, defaults to -1.
        unk_index (int): Unknown character index, defaults to -1.
        normalize (bool): Dataset parameter.
            Whether to use automatic text cleaning.
            It is highly recommended to manually clean text for best results.
            Defaults to True.
        trim (bool): Whether to use trim silence from beginning and end
            of audio signal using librosa.effects.trim().
            Defaults to False.
        bos_id (id): Dataset parameter.
            Beginning of string symbol id used for seq2seq models.
            Defaults to None.
        eos_id (id): Dataset parameter.
            End of string symbol id used for seq2seq models.
            Defaults to None.
        pad_id (id): Token used to pad when collating samples in batches.
            If this is None, pads using 0s.
            Defaults to None.
        shard_strategy (str): Tarred dataset shard distribution strategy chosen as a str value during ddp.
            -   `scatter`: The default shard strategy applied by WebDataset, where each node gets
                a unique set of shards, which are permanently pre-allocated and never changed at runtime.
            -   `replicate`: Optional shard strategy, where each node gets all of the set of shards
                available in the tarred dataset, which are permanently pre-allocated and never changed at runtime.
                The benefit of replication is that it allows each node to sample data points from the entire
                dataset independently of other nodes, and reduces dependence on value of `shuffle_n`.

                .. warning::
                    Replicated strategy allows every node to sample the entire set of available tarfiles,
                    and therefore more than one node may sample the same tarfile, and even sample the same
                    data points! As such, there is no assured guarantee that all samples in the dataset will be
                    sampled at least once during 1 epoch. Scattered strategy, on the other hand, on specific
                    occasions (when the number of shards is not divisible with ``world_size``), will not sample
                    the entire dataset. For these reasons it is not advisable to use tarred datasets as validation
                    or test datasets.
        shard_manifests (bool): Whether or not to try / shard manifests. Defaults to False.
        global_rank (int): Worker rank, used for partitioning shards. Defaults to 0.
        world_size (int): Total number of processes, used for partitioning shards. Defaults to 0.
        return_sample_id (bool): whether to return the sample_id as a part of each sample
        manifest_parse_func: Optional function to parse manifest entries. Defaults to None.
    FNr   r   audio_tar_filepathsrT   rU   r   r   r   r   	shuffle_nrW   rV   r   rY   rZ   rA   r|   r   r~   r}   r   r\   c                 C   s   || _ t|||||d}t|d t|||	|d|||d|d
| _|  | _t|||d| _|
| _	|| _
|| _|| _|| _t||||d}ttj|dtt|t tjtd	d
tdd| j| jt| j	| _d S )N)r   r|   r   r}   r~   )r   r   T)
rT   rU   rV   rW   rX   rY   rZ   rA   r[   r\   r   r   )urls__key__)audiokeyr:  r;  )r   r   r   rS   r   _compute_lenr4   r   r   r   rZ   rY   rA   r   r   wdsDataPipelineSimpleShardListr   shuffletarfile_to_samplesrenameVALID_FILE_FORMATSto_tuple_filter_loop_offsetsmap_build_sample_dataset)rb   r6  rT   rU   r   r   r   r7  rW   rV   r   rY   rZ   rA   r|   r   r~   r}   r   r\   r&   r&   r*   rc   1  sZ   
	




z"_TarredAudioToTextDataset.__init__c                       G  fddd}|| j jS )a  This function is used to remove samples that have been filtered out by ASRAudioText already.
        Otherwise, we would get a KeyError as _build_sample attempts to find the manifest entry for a sample
        that was filtered out (e.g. for duration).
        Note that if using multi-GPU training, filtering may lead to an imbalance in samples in each shard,
        which may make your code hang as one process will finish before the other.
        c                       (   e Zd Z fddZdd Zdd ZdS )z<_TarredAudioToTextDataset._filter.<locals>.TarredAudioFilterc                    s    | _ || _d S rf   )iteratorra   rb   ra   rL  r&   r*   rc     s   
zE_TarredAudioToTextDataset._filter.<locals>.TarredAudioFilter.__init__c                 S      | S rf   r&   r   r&   r&   r*   __iter__     zE_TarredAudioToTextDataset._filter.<locals>.TarredAudioFilter.__iter__c                 S   s>   	 t | j\}}tjtj|\}}|| jjv r||fS qrf   )nextrL  r   r   splitextbasenamera   rl   )rb   audio_bytesaudio_filenamerj   rC   r&   r&   r*   __next__  s   zE_TarredAudioToTextDataset._filter.<locals>.TarredAudioFilter.__next__Nrs   rt   ru   rc   rP  rW  r&   rN  r&   r*   TarredAudioFilter  s    rY  r   )rb   rL  rY  r&   rN  r*   rE    s   z!_TarredAudioToTextDataset._filterc                    rJ  )zYThis function is used to iterate through utterances with different offsets for each file.c                       rK  )zG_TarredAudioToTextDataset._loop_offsets.<locals>.TarredAudioLoopOffsetsc                    s"    | _ || _d | _d | _d| _d S rk   )rL  ra   
current_fncurrent_bytes	offset_idrM  rN  r&   r*   rc     s
   
zP_TarredAudioToTextDataset._loop_offsets.<locals>.TarredAudioLoopOffsets.__init__c                 S   rO  rf   r&   r   r&   r&   r*   rP    rQ  zP_TarredAudioToTextDataset._loop_offsets.<locals>.TarredAudioLoopOffsets.__iter__c                 S   s|   | j d u rt| j\| _| _ d| _n$| jj| j  }t|| jd kr/t| j\| _| _ d| _n|  jd7  _| j| j | jfS )Nr   ro   )rZ  rR  rL  r[  r\  ra   rl   r4   )rb   offset_listr&   r&   r*   rW    s   
zP_TarredAudioToTextDataset._loop_offsets.<locals>.TarredAudioLoopOffsets.__next__NrX  r&   rN  r&   r*   TarredAudioLoopOffsets  s    r^  r   )rb   rL  r^  r&   rN  r*   rF    s   z'_TarredAudioToTextDataset._loop_offsetsc                 C   s   t || jS rf   )rR   rA   r  r&   r&   r*   r    r   z%_TarredAudioToTextDataset._collate_fnc                 C   s>  |\}}}t jt j|\}}| jjj| | }| jj| }|j}	|	du r)d}	t	|}
| j
j|
|	|j| j|jd}|
  |t|jd  }}|jt|j}}| jj|d | jdurl| jg| }|d7 }| jdur{|| jg }|d7 }| jr||t| t| |fS ||t| t| fS )z\Builds the training sample by combining the data from the WebDataset with the manifest info.Nr   )r   r   r   r   r   ro   )r   r   rS  rT  r   ra   rl   r   ioBytesIOr   r   r   r   r   closer8   r>   r   r   rp   r4   rg   rY   rZ   r   )rb   tuprU  rV  r\  rj   rC   rm   manifest_entryr   audio_filestreamr   r   r   rq   rr   r&   r&   r*   rH    s8   



" z'_TarredAudioToTextDataset._build_samplec                 C   r   rf   r   r   r&   r&   r*   r     r   z-_TarredAudioToTextDataset.get_manifest_samplec                 C   s
   | j  S rf   )rI  rP  r   r&   r&   r*   rP    s   
z"_TarredAudioToTextDataset.__iter__c                 C   sn   | j r/tj r/tj r/tjt| jjtj	d
 }tj| | }td|  |S t| jj}|S )Nr0   z!Sharded manifests: Total length: )r   r8   r   r   r   r>   r4   r   ra   r?   cuda
all_reducery   r   r   )rb   my_lenr&   r&   r*   r<    s   z&_TarredAudioToTextDataset._compute_lenc                 C   s   | j S rf   )r4   r   r&   r&   r*   r    s   z!_TarredAudioToTextDataset.__len__)FNr   NNFNNr   r   Fr   r   FN)rs   rt   ru   rv   r   rw   r   r   ry   rz   r   rx   rc   rE  rF  r  rH  r   rP  r<  r  r&   r&   r&   r*   r5    s    b	

N-r5  c                0       s   e Zd ZdZ																			d#d	eeee f d
edee dedede	d dede	e
 de	e
 dedededede	e de	e de	e dedededededed e	e f. fd!d"Z  ZS )$r   a  
    A similar Dataset to the AudioToCharDataset, but which loads tarred audio files.

    Accepts a single comma-separated JSON manifest file (in the same style as for the AudioToCharDataset),
    as well as the path(s) to the tarball(s) containing the wav files. Each line of the manifest should
    contain the information for one audio file, including at least the transcript and name of the audio
    file within the tarball.

    Valid formats for the audio_tar_filepaths argument include:
    (1) a single string that can be brace-expanded, e.g. 'path/to/audio.tar' or 'path/to/audio_{1..100}.tar.gz', or
    (2) a list of file paths that will not be brace-expanded, e.g. ['audio_1.tar', 'audio_2.tar', ...].

    See the WebDataset documentation for more information about accepted data and input formats.

    If using multiple workers the number of shards should be divisible by world_size to ensure an
    even split among workers. If it is not divisible, logging will give a warning but training will proceed.
    In addition, if using mutiprocessing, each shard MUST HAVE THE SAME NUMBER OF ENTRIES after filtering
    is applied. We currently do not check for this, but your program may hang if the shards are uneven!

    Notice that a few arguments are different from the AudioToCharDataset; for example, shuffle (bool) has been
    replaced by shuffle_n (int).

    Additionally, please note that the len() of this DataLayer is assumed to be the length of the manifest
    after filtering. An incorrect manifest length may lead to some DataLoader issues down the line.

    Args:
        audio_tar_filepaths: Either a list of audio tarball filepaths, or a
            string (can be brace-expandable).
        manifest_filepath (str): Path to the manifest.
        labels (list): List of characters that can be output by the ASR model.
            For Jasper, this is the 28 character set {a-z '}. The CTC blank
            symbol is automatically added later for models using ctc.
        sample_rate (int): Sample rate to resample loaded audio to
        int_values (bool): If true, load samples as 32-bit integers. Defauts to False.
        augmentor (nemo.collections.asr.parts.perturb.AudioAugmentor): An AudioAugmentor
            object used to augment loaded audio
        shuffle_n (int): How many samples to look ahead and load to be shuffled.
            See WebDataset documentation for more details.
            Defaults to 0.
        min_duration (float): Dataset parameter.
            All training files which have a duration less than min_duration
            are dropped. Note: Duration is read from the manifest JSON.
            Defaults to 0.1.
        max_duration (float): Dataset parameter.
            All training files which have a duration more than max_duration
            are dropped. Note: Duration is read from the manifest JSON.
            Defaults to None.
        blank_index (int): Blank character index, defaults to -1.
        unk_index (int): Unknown character index, defaults to -1.
        normalize (bool): Dataset parameter.
            Whether to use automatic text cleaning.
            It is highly recommended to manually clean text for best results.
            Defaults to True.
        trim (bool): Whether to use trim silence from beginning and end
            of audio signal using librosa.effects.trim().
            Defaults to False.
        bos_id (id): Dataset parameter.
            Beginning of string symbol id used for seq2seq models.
            Defaults to None.
        eos_id (id): Dataset parameter.
            End of string symbol id used for seq2seq models.
            Defaults to None.
        pad_id (id): Token used to pad when collating samples in batches.
            If this is None, pads using 0s.
            Defaults to None.
        shard_strategy (str): Tarred dataset shard distribution strategy chosen as a str value during ddp.

            -   `scatter`: The default shard strategy applied by WebDataset, where each node gets
                a unique set of shards, which are permanently pre-allocated and never changed at runtime.
            -   `replicate`: Optional shard strategy, where each node gets all of the set of shards
                available in the tarred dataset, which are permanently pre-allocated and never changed at runtime.
                The benefit of replication is that it allows each node to sample data points from the entire
                dataset independently of other nodes, and reduces dependence on value of `shuffle_n`.

                .. warning::

                    Replicated strategy allows every node to sample the entire set of available tarfiles,
                    and therefore more than one node may sample the same tarfile, and even sample the same
                    data points! As such, there is no assured guarantee that all samples in the dataset will be
                    sampled at least once during 1 epoch. Scattered strategy, on the other hand, on specific
                    occasions (when the number of shards is not divisible with ``world_size``), will not sample
                    the entire dataset. For these reasons it is not advisable to use tarred datasets as validation
                    or test datasets.

        global_rank (int): Worker rank, used for partitioning shards. Defaults to 0.
        world_size (int): Total number of processes, used for partitioning shards. Defaults to 0.
        return_sample_id (bool): whether to return the sample_id as a part of each sample
        manifest_parse_func: Optional function to parse manifest entries. Defaults to None.
    FNr   r  Tr  r   r6  rT   r  r   r   r   r   r7  rW   rV   r	  r
  r  r   rY   rZ   rU   rA   r|   r   r~   r}   r   r\   c                    s   || _ tj||||
|d}t jdi d|d|d|d|d|d|d|d	|d
|	d|d|d|d|d|d|d|d|d|d| d S )Nr  r6  rT   rU   r   r   r   r7  rW   rV   r   rY   rZ   rA   r|   r   r~   r}   r   r\   r&   r  )rb   r6  rT   r  r   r   r   r7  rW   rV   r	  r
  r  r   rY   rZ   rU   rA   r|   r   r~   r}   r   r\   r  r&   r*   rc   V  sV   
	
z!TarredAudioToCharDataset.__init__)FNr   NNr  r  TFNNr  r   r   Fr   r   FNrs   rt   ru   rv   r   rw   r   ry   rz   r   rx   r   rc   r  r&   r&   r  r*   r     s    `	
r   c                $       s   e Zd ZdZ													ddeeee f ded	d
dedede	d dede	e
 de	e
 dededededededede	e f" fddZ  ZS )r    aS  
    A similar Dataset to the AudioToBPEDataset, but which loads tarred audio files.

    Accepts a single comma-separated JSON manifest file (in the same style as for the AudioToBPEDataset),
    as well as the path(s) to the tarball(s) containing the wav files. Each line of the manifest should
    contain the information for one audio file, including at least the transcript and name of the audio
    file within the tarball.

    Valid formats for the audio_tar_filepaths argument include:
    (1) a single string that can be brace-expanded, e.g. 'path/to/audio.tar' or 'path/to/audio_{1..100}.tar.gz', or
    (2) a list of file paths that will not be brace-expanded, e.g. ['audio_1.tar', 'audio_2.tar', ...].

    See the WebDataset documentation for more information about accepted data and input formats.

    If using multiple workers the number of shards should be divisible by world_size to ensure an
    even split among workers. If it is not divisible, logging will give a warning but training will proceed.
    In addition, if using mutiprocessing, each shard MUST HAVE THE SAME NUMBER OF ENTRIES after filtering
    is applied. We currently do not check for this, but your program may hang if the shards are uneven!

    Notice that a few arguments are different from the AudioToBPEDataset; for example, shuffle (bool) has been
    replaced by shuffle_n (int).

    Additionally, please note that the len() of this DataLayer is assumed to be the length of the manifest
    after filtering. An incorrect manifest length may lead to some DataLoader issues down the line.

    Args:
        audio_tar_filepaths: Either a list of audio tarball filepaths, or a
            string (can be brace-expandable).
        manifest_filepath (str): Path to the manifest.
        tokenizer (TokenizerSpec): Either a Word Piece Encoding tokenizer (BERT),
            or a Sentence Piece Encoding tokenizer (BPE). The CTC blank
            symbol is automatically added later for models using ctc.
        sample_rate (int): Sample rate to resample loaded audio to
        int_values (bool): If true, load samples as 32-bit integers. Defauts to False.
        augmentor (nemo.collections.asr.parts.perturb.AudioAugmentor): An AudioAugmentor
            object used to augment loaded audio
        shuffle_n (int): How many samples to look ahead and load to be shuffled.
            See WebDataset documentation for more details.
            Defaults to 0.
        min_duration (float): Dataset parameter.
            All training files which have a duration less than min_duration
            are dropped. Note: Duration is read from the manifest JSON.
            Defaults to 0.1.
        max_duration (float): Dataset parameter.
            All training files which have a duration more than max_duration
            are dropped. Note: Duration is read from the manifest JSON.
            Defaults to None.
        trim (bool): Whether to use trim silence from beginning and end
            of audio signal using librosa.effects.trim().
            Defaults to False.
        use_start_end_token: Boolean which dictates whether to add [BOS] and [EOS]
            tokens to beginning and ending of speech respectively.
        pad_id (id): Token used to pad when collating samples in batches.
            If this is None, pads using 0s.
            Defaults to None.
        shard_strategy (str): Tarred dataset shard distribution strategy chosen as a str value during ddp.

            -   `scatter`: The default shard strategy applied by WebDataset, where each node gets
                a unique set of shards, which are permanently pre-allocated and never changed at runtime.
            -   `replicate`: Optional shard strategy, where each node gets all of the set of shards
                available in the tarred dataset, which are permanently pre-allocated and never changed at runtime.
                The benefit of replication is that it allows each node to sample data points from the entire
                dataset independently of other nodes, and reduces dependence on value of `shuffle_n`.

                .. warning::

                    Replicated strategy allows every node to sample the entire set of available tarfiles,
                    and therefore more than one node may sample the same tarfile, and even sample the same
                    data points! As such, there is no assured guarantee that all samples in the dataset will be
                    sampled at least once during 1 epoch. Scattered strategy, on the other hand, on specific
                    occasions (when the number of shards is not divisible with ``world_size``), will not sample
                    the entire dataset. For these reasons it is not advisable to use tarred datasets as validation
                    or test datasets.

        global_rank (int): Worker rank, used for partitioning shards. Defaults to 0.
        world_size (int): Total number of processes, used for partitioning shards. Defaults to 0.
        return_sample_id (bool): whether to return the sample_id as a part of each sample
        manifest_parse_func: Optional function to parse manifest entries. Defaults to None.
    FNr   Tr   r6  rT   r  r  r   r   r   r   r7  rW   rV   r   r  r|   r   r~   r}   r   r\   c                    s   |rt |dr|jdkr|j}nd }|r"t |dr"|jdkr"|j}nd }t |dr2|jdkr2|j}nd}G dd d}t jdi d|d|d	||d
|d|d|d|d|d|	d|
d|d|d|d|d|d|d|d|d| d S )NrY   r   rZ   rA   c                   @   r  )z:TarredAudioToBPEDataset.__init__.<locals>.TokenizerWrapperc                 S   r  r  r  r"  r&   r&   r*   rc     r#  zCTarredAudioToBPEDataset.__init__.<locals>.TokenizerWrapper.__init__c                 W   r$  r%  r'  r*  r&   r&   r*   r-  	  r.  zCTarredAudioToBPEDataset.__init__.<locals>.TokenizerWrapper.__call__Nr/  r&   r&   r&   r*   r0    r1  r0  r6  rT   rU   r   r   r   r7  rW   rV   r   r|   r   r~   r}   r   r\   r&   r2  )rb   r6  rT   r  r   r   r   r7  rW   rV   r   r  r|   r   r~   r}   r   r\   rY   rZ   rA   r0  r  r&   r*   rc     sb   	
z TarredAudioToBPEDataset.__init__)FNr   NNFTr   Fr   r   FNrh  r&   r&   r  r*   r      sd    V	
r    c                       sB   e Zd ZdZdedef fddZdd Zdd	 Zd
d Z	  Z
S )BucketingDatasetz
    A Dataset which wraps another IterableDataset and adopts it for bucketing
    Args:
        dataset (IterableDataset): The IterableDataset to get wrapped
        bucketing_batch_size (int): Number of samples to build a batch
    datasetbucketing_batch_sizec                    s   || _ || _t   d S rf   )wrapped_datasetrk  r  rc   )rb   rj  rk  r  r&   r*   rc   2  s   zBucketingDataset.__init__c                 C   s   t |d | jjS rk   )rR   rl  rA   r  r&   r&   r*   r  ;  s   zBucketingDataset._collate_fnc                 C   s   t | jj| jd S )N)
wrapped_dsrk  )BucketingIteratorrl  rI  rk  rP  r   r&   r&   r*   rP  >  s
   
zBucketingDataset.__iter__c                 C   s   t tt| jt| j S rf   )ry   mathceilr4   rl  rx   rk  r   r&   r&   r*   r  C  s   zBucketingDataset.__len__)rs   rt   ru   rv   r   ry   rc   r  rP  r  r  r&   r&   r  r*   ri  *  s    	ri  c                   @   s$   e Zd Zdd Zdd Zdd ZdS )rn  c                 C   s   || _ d | _|| _d S rf   )rm  wrapped_iterrk  )rb   rm  rk  r&   r&   r*   rc   H  s   
zBucketingIterator.__init__c                 C   s   t | j| _| S rf   )iterrm  rq  r   r&   r&   r*   rP  M  s   zBucketingIterator.__iter__c              	   C   sV   g }t | jD ]}zt| j}W n
 ty   Y  nw || qt|dkr)t|S rk   )rangerk  rR  rq  StopIterationr<   r4   )rb   batchesidxrh   r&   r&   r*   rW  Q  s   zBucketingIterator.__next__NrX  r&   r&   r&   r*   rn  G  s    rn  c                       s4   e Zd Zd	dee ddf fddZdd Z  ZS )
RandomizedChainDatasetr   datasetsre   Nc                    s&   t t| t| tj|| _d S rf   )r  rw  rc   r2   nprandomRandomStaternd_gen)rb   rx  rnd_seedr  r&   r*   rc   _  s   zRandomizedChainDataset.__init__c                 c   sj    | j t| j}|D ]&}| j| }t|tsJ dt|D ]\}}|V  |t|d kr1 nq qd S )Nz*ChainDataset only supports IterableDatasetro   )r|  permutationr4   rx  r   r   	enumerate)rb   shuffled_orderdataset_idxdrv  xr&   r&   r*   rP  c  s   
zRandomizedChainDataset.__iter__)r   )rs   rt   ru   r   r   rc   rP  r  r&   r&   r  r*   rw  ^  s    rw  )FNNr   )Ir_  r   ro  r   r   collections.abcr   r   typingr   r   r   r   r   r   r   numpyry  r8   torch.utils.datar	   r
   1nemo.collections.asr.parts.preprocessing.featuresr   0nemo.collections.asr.parts.preprocessing.segmentr   r   valid_sf_formatsnemo.collections.commonr   +nemo.collections.common.parts.preprocessingr   r   nemo.core.classesr   r   nemo.core.neural_types
nemo.utilsr   r   r=  nemo.utils.data_utilsr   r   r   r   nemo.utils.decoratorsr   nemo.utils.distributedr   nemo.utils.get_rankr   __all__r   keysrC  rR   rS   rw   ry   r   rz   r   r   r   r   r   r5  r   r    ri  rn  rw  r&   r&   r&   r*   <module>   s   $ =F2
x
x\u  )  