o
    wiI                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
mZ d dlZd dlZd dlmZmZ d dlmZ d dlmZmZmZmZmZmZmZmZ d dlmZ d d	lm Z m!Z!m"Z" d d
l#m$Z$ d dl%m&Z&m'Z' d dl(m)Z)m*Z* d dl+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z? eG dd dZ@deAde)deAfddZB	dAdeeCe)f deDdeDdejEjFjGdejEjFjHf
ddZI	dAde)deDdeDdejEjFjGdejEjFjHf
dd ZJ	dAd!e)deDdeDdejEjFjGdejEjFjHf
d"d#ZKdAdeLe eAf fd$d%ZMd&edeLee!f fd'd(ZNd)d* ZOdee)eCf de)fd+d,ZPd-d. ZQd/eRe;B fd0d1ZSd&ed2eTdefd3d4ZUd&edefd5d6ZVdeWfd7d8ZXd9eAfd:d;ZYd<d= ZZd>eDeRB deWfd?d@Z[dS )B    N)	dataclass)partial)AnyOptionalSequenceUnion)CutSetRecordingSet)Cut)CutConcatenateDynamicBucketingSamplerDynamicCutSamplerIterableDatasetWrapperReverbWithImpulseResponseRoundRobinSampler
ZipSamplermake_worker_init_fn)resolve_seed)
CutSamplerSamplingConstraintTimeConstraint)LazyFlattener)fastcopyfix_random_seed)
DictConfig	OmegaConf)IncompleteConfigErrorguess_parse_cutsetread_cutset_from_config)BucketingFilterDurationFilter FixedBucketBatchSizeConstraint2D*MultimodalFixedBucketBatchSizeConstraint2DMultimodalSamplingConstraintTokenCountFilterTokenPerSecondFilterTokenPerTokenFilter)apply_prompt_format_fn)PromptFormatter)TokenizerWrapper)loggingc                   @   s  e Zd ZU dZdZeed< dZeed< dZeed< dZ	e
dB ed< dZeed< dZeed	< dZeed
< dZedB ed< dZedB ed< dZedB ed< dZeed< dZee dB ed< dZeed< dZeed< dZeed< dZeed< dZeed< dZeed< dZedB ed< dZeed< dZee
B ed< dZ edB ed< dZ!eed< dZ"eed< d Z#e
ed!< dZ$e%e
ef dB ed"< dZ&eed#< dZ'e
dB ed$< dZ(eed%< dZ)e
dB ed&< dZ*edB ed'< dZ+edB ed(< dZ,edB ed)< d*Z-edB ed+< ed,Z.edB ed-< d*Z/eed.< ed,Z0eed/< dZ1edB ed0< dZ2edB ed1< dZ3eed2< d*Z4eed3< ed,Z5eed4< dZ6eed5< d6Z7eed7< d8Z8ee
B ed9< d8Z9eed:< dZ:eed;< dZ;ee
B dB ed<< dZ<edB ed=< d>Z=e>eef ed?< d@Z?eedA< dZ@eedB< dZAeedC< dDZBeedE< dFZCeedG< dZDeedH< dIZEeFe edJ< dZGeFe edK< dLZHe
edM< dZIeFe edN< dZJeFe edO< dZKeedP< dZLeedQ< dZMe
dB edR< d@ZNeedS< dZOeFe edT< dUZPe
edV< dWZQe
edX< dYZRe
edZ< dZSeed[< dZTeed\< dZUeed]< dZVeed^< dS )_LhotseDataLoadingConfiga  
    Structured config used for OmegaConf schema validation.
    It's also a single source of truth for reading default option values.
    The options not supported anymore but present, e.g., in old configs,
    will be emitted in a DeprecationWarning and ignored.
    N	input_cfgmanifest_filepathtarred_audio_filepaths	cuts_path	shar_pathFskip_missing_manifest_entriestarred_random_access
batch_sizebatch_durationquadratic_durationuse_bucketingbucket_batch_size   num_bucketsi'  num_cuts_for_bins_estimatebucket_duration_binsbucket_buffer_sizeTconcurrent_bucketingbucketing_2d_strict_modeshuffle_buffer_size	drop_lasttrng
shard_seedmax_open_streamscuda_expandable_segmentsmulti_configround_robinsampler_fusionsampler_weightspretokenizeprompt_formatuse_multimodal_samplingaudio_locator_tagtoken_equivalent_durationbatch_tokensquadratic_factormin_durationinfmax_durationmin_tpsmax_tps
min_tokens
max_tokensmeasure_total_lengthmin_tptmax_tptshufflei>  sample_rater   seednum_workers
pin_memorychannel_selector
noise_path)g      $@g      4@	noise_snrg      ?noise_mix_probperturb_speedconcatenate_samplesg?concatenate_gap_secondsg      ?concatenate_duration_factorconcatenate_merge_supervisionsg      9db_normtruncate_durationrandomtruncate_offset_typecut_into_windows_durationcut_into_windows_hopkeep_excessive_supervisionsrir_enabledrir_pathrir_probpad_min_durationrightpad_directiontext
text_fieldlang
lang_fieldmetadata_onlyforce_finiteforce_map_datasetforce_iterable_dataset)W__name__
__module____qualname____doc__r,   r   __annotations__r-   r.   r/   strr0   r1   boolr2   r3   intr4   floatr5   r6   r7   listr9   r:   r;   r<   r=   r>   r?   r@   rB   rC   rD   rE   rG   rH   dictrI   rJ   rK   rL   rM   rN   rO   rQ   rS   rT   rU   rV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   tuplerc   rd   re   rf   rg   rh   ri   r   rj   rl   rm   rn   ro   rp   rq   rr   rs   ru   rw   ry   rz   r{   r|   r}    r   r   k/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/collections/common/data/lhotse/dataloader.pyr+   >   s   
 	
r+   use_iterable_datasetconfigreturnc                 C   s*   |j r
|jr
J d| s|jo|j  } | S )zDDetermine whether to use iterable dataset for a given configuration.zKConflicting options: force_map_dataset=True and force_iterable_dataset=True)r|   r}   )r   r   r   r   r   determine_use_iterable_dataset   s   r   global_rank
world_sizedatasetc                 C   sV   t | ts
t| } t| ddd | ddr"t| ||||dS t| ||||dS )a  
    Set up a Lhotse training dataloder.

    Expects a typical NeMo dataset configuration format, with additional fields: "use_lhotse=True".
    Some fields in the original NeMo configuration may be ignored.

    The ``dataset`` parameter should be an instance of a Lhotse-compatible PyTorch Dataset class.
    It only needs to define the following method ``__getitem__(self, cuts: CutSet) -> Dict[str, torch.Tensor]``.
    This dataset is not expected to hold a reference to any actual data; it may be interpreted as a function
    mapping a Lhotse CutSet into a mini-batch of tensors.

    For an example, see: :class:`nemo.collections.asr.data.audio_to_text_lhotse.LhotseSpeechToTextBpeDataset`,
    which is constructed from just a tokenizer and essentially loads and collates audio and tokenizes the transcript.

    The ``tokenizer`` is used both for audio and text datasets for on-the-fly tokenization.
    This allows us to stratify the bucketing by the count of input/output tokens (depending on modality).
    If "prompt_format" is additionally provided in the config, we will also apply a prompt formatter.
    Note that ``tokenizer`` can be any tokenizer type (e.g. both SentencePiece and Aggregate tokenizers work).
    rD   T)enabledrE   F)top_level_configr   r   r   	tokenizerr   r   r   r   r   )
isinstancer   r   create"maybe_set_cuda_expandable_segmentsget'get_lhotse_dataloader_from_multi_config(get_lhotse_dataloader_from_single_configr   r   r   r   !get_lhotse_dataloader_from_config   s   


r   c           	      C   s   t d t| } t| j| _t| j t| |||d\}}|r5tt||dt	||| jd| j
dkd}nt||d}tjjjd	i |d| j
| jd}|S )
a  
    Set up a Lhotse training dataloder.

    Expects a typical NeMo dataset configuration format, with additional fields: "use_lhotse=True".
    Some fields in the original NeMo configuration may be ignored.

    The ``dataset`` parameter should be an instance of a Lhotse-compatible PyTorch Dataset class.
    It only needs to define the following method ``__getitem__(self, cuts: CutSet) -> Dict[str, torch.Tensor]``.
    This dataset is not expected to hold a reference to any actual data; it may be interpreted as a function
    mapping a Lhotse CutSet into a mini-batch of tensors.

    For an example, see: :class:`nemo.collections.asr.data.audio_to_text_lhotse.LhotseSpeechToTextBpeDataset`,
    which is constructed from just a tokenizer and essentially loads and collates audio and tokenizes the transcript.

    The ``tokenizer`` is used when text-only datasets are included in dataloading.
    In these cases we will tokenize ``TextExample``s before sampling mini-batches so that
    we can account for their number of tokens.
    Note: this behaviour might eventually be extended to audio datasets too.

    Note that ``tokenizer`` can be any tokenizer type (e.g. both SentencePiece and Aggregate tokenizers work).
    z%We will be using a Lhotse DataLoader.r   r   r   r   r   samplerrankr   r]   r   r   worker_init_fnpersistent_workersNr3   r^   r_   r   )r*   info$make_structured_with_schema_warningsr   r]   r   get_lhotse_sampler_from_configr   r   r   r^   torchutilsdata
DataLoaderr_   )	r   r   r   r   r   r   r   dloader_kwargsdloaderr   r   r   r     s,   



	
r   r   c                    s  fdd}| }t |j dd  D }i g } | D ]B\}	}
zt|
}| D ]\}}|||< q-t||||d\}}W n tyX } ztd|	 d|
 |d}~ww |||	<  | q t fd	d
 dd D svJ dt }|jdkrt	|
  }nK|jdkrt|
  }n?|jdkrg g }}| D ]}|||  |jdur||j|  qt|t|dkr|nd|jd}ntd|j |rtt||dt|||jd|jdkd}nt||d}tjjjdi |d|j|jd}|S )a  
    Set up a Lhotse training dataloder.

    It works similarly to :func:`get_lhotse_dataloader_from_config`, except that
    you can provide multiple configs to set up different sampling, batching, and
    augmentation settings for every dataset and decide how to merge them.

    The expected format is that the ``configs`` is a dict of group name -> actual config.

    The first config is treated as a "main" config that determines the RNG, CUDA allocator,
    and sampler fusion settings.
    c                     sZ   t fdddD sJ dg d} tt td d< t fdd| D S )	z
        In multi-config setting, the top-level config defines several attributes that overwrite
        the ones present in sub-configs.
        c                 3   s    | ]}| v V  qd S Nr   .0kr   r   r   	<genexpr>d  s    zVget_lhotse_dataloader_from_multi_config.<locals>.gather_shared_opts.<locals>.<genexpr>)r]   rB   r[   zIn a multi-config setting (multi_config=True), the top-level namespace (typically train_ds)must define at least 'seed', 'shard_seed', and 'shuffle' keys that will be shared by all sub-configs.)
r]   rB   r^   r_   r[   rG   rH   rE   rz   r{   r]   c                    s   i | ]}| | | qS r   )r   r   )defaultsr   r   r   
<dictcomp>w  s    zWget_lhotse_dataloader_from_multi_config.<locals>.gather_shared_opts.<locals>.<dictcomp>)allr   
structuredr+   r   r   )overwriting_optsr   )r   r   gather_shared_opts_  s   
zCget_lhotse_dataloader_from_multi_config.<locals>.gather_shared_optsc                 S   s(   i | ]\}}t |tr|d vr||qS ))rH   )r   r   )r   namecr   r   r   r   |  s    z;get_lhotse_dataloader_from_multi_config.<locals>.<dictcomp>r   zoCannot create a sampler for one of the sub-configs in a multi_config setup.The problematic config is under key=z! and has the following contents: Nc                 3   s    | ]	}| d  kV  qdS )r   Nr   )r   st)source_use_iterable_datasetr   r   r     s    z:get_lhotse_dataloader_from_multi_config.<locals>.<genexpr>   zWhen using multiple input_cfg sources ensure they are all tarred or non-tarred (can't mix). You can provide force_iterable_dataset=True to each namespace to fix.ziprF   randomized_round_robinr   T)	randomizer]   z%Unsupported sampler fusion strategy: r   r   r   r   r   )r   r]   itemsr   r   r   appendr   rG   r   valuesr   keysrH   lenRuntimeErrorr   r   r   r^   r   r   r   r   r_   )r   r   r   r   r   r   shared_optsconfigssource_samplersr   r   expanded_configr   vster   r   	_samplers_weightskeyr   r   r   )r   r   r   r   K  s   


 






	
r   c           
      C   sv  t | \}}t|| }| jdur!td| j |tt| jd}|jtt| j	ddd}t
t|jtdd}| jrC|dusCJ d|durx| jrx|sQtd | jdurd|jtt|| jddd}nt|tsmt|}|jtt|d	dd}| jdurt| j}|j|t| j| j| jd
d}| jrt
||d|d}| jdur|j| j| j | j!d}| j"dur|j#| j"| j$| j!d}| j%dur|j&| j%| j'd
d}|(t)| j*| j+}|(t,| j-| j.| j/d}|dur| jr|(t0| j1| j2}|(t3| j4| j5}t6| }t7||| \}}| j8rHtd| j9 d| j: d t;||| j<| j=| j>| j| j?t6| | j@| jA| jB|r=dn||rCdn|d}	n(td| j9 d| j: d tC||| j<| j=| j>| j|rfdn||rldn|d}	| jDr|	tE| jF| jGd}	| jHdur|	ttI| jHd}	| jJr|	tK}	| jLr|	tM| jNdurtOP| jNnd| jQtRS| jTd}	|	|fS )z-Create a CutSampler from a dataloader config.NzUsing channel selector %s.)r`   )sampling_rate)apply_fnzYou must pass a tokenizer to `get_lhotse_dataloader_from_config` in order toread text-only datasets (enabled via use_multimodal_dataloading)a  You are using a non-tarred dataset and requested tokenization during data sampling (pretokenize=True). This will cause the tokenization to happen in the main (GPU) process,possibly impacting the training speed if your tokenizer is very large.If the impact is noticable, set pretokenize=False in dataloader config.(note: that will disable token-per-second filtering and 2D bucketing features))r   rJ   )r   T)cutssnrmix_probr]   random_mix_offsetg?g?)rS   offset_typero   )durationhopro   )r   	directionpreserve_id)rX   z>Creating a Lhotse DynamicBucketingSampler (max_batch_duration=z max_batch_size=)r   r   )
constraintr[   r@   r?   r]   r9   duration_binsr:   buffer_size
concurrentr   r   zPCreating a Lhotse DynamicCutSampler (bucketing is disabled, (max_batch_duration=)r   r[   r@   r?   r]   r   r   )gapduration_factor)ri   )rir_recordingsprandgen)Ur   r   r`   r*   r   mapr   _select_channelresampler\   r   r   _flatten_alt_textrK   rI   warningrJ   tokenize_with_promptr   r)   tokenizera   r   mixr   rb   rc   rB   rd   muxrj   truncaterl   ro   rm   cut_into_windowsrn   rs   padru   filterr    rQ   rS   r$   rV   rW   rX   r%   rT   rU   r&   rY   rZ   determine_bucket_duration_binsdetermine_sampling_constraintr6   r4   r3   r   r[   r@   r?   r9   r:   r<   r=   r   re   r   rf   rg   ri   _normalize_loudnessrh   _merge_supervisionsrp   r   rq   r	   	from_filerr   rk   Randomr]   )
r   r   r   r   r   r   noiser;   r   r   r   r   r   r     s   











r   r   c                 C   s   |j rD|jdur2|dusJ dt||j|j|jt|jtr!|jnd|jd}| 	t
|} | |fS t|j|j|j|j|jd}| |fS |jduro|dusQJ dt||j|jt|jtr`|jndd}| 	t
|} | |fS t|j|j|jd}| |fS )a  
    Select an appropriate sampling strategy (constraint) for Lhotse samplers based on the configuration.
    Sampling constraint affects the batch size (static/dynamic) and bucketing behaviour (1D/2D).
    It is the appropriate customization point to introduce support of other modalities,
    as it defines a method for example sequence length measurement (audio duration, text tokens, etc.).

    Some constraints apply extra filter on ``cuts`` which is why we accept and return the ``CutSet``.

    Lhotse's default is :class:`TimeConstraint` for regular audio data, other available options are
    multimodal constraints (joint text + audio) and their 2D bucketing extensions.
    NzMCannot use bucket_batch_size option if bucket_duration_bins are not provided.)max_seq_len_bucketsbatch_sizesrM   	strict_2d	max_ratiorX   )rM   r3   rN   rO   rX   )r   r   r   r  )max_cutsrS   r5   )rK   r7   r"   rM   r>   r   rZ   r   rX   r   r   r#   r3   rN   rO   r!   rU   r   r4   r5   )r   r;   r   r   r   r   r   r   u  sR   



r   c                 C   s   | j durt| j }t|d trdd |D }|S | jr dS | jdurN| jtdk rN| jdur9| jdkr9| jnd}| j}t	
||| jd dd  S dS )	a  
    Returns appropriate bucket bins based on configuration.
    If user provided them explicitly, we just pass them along;
    otherwise, we try to create provisional bins when min/max duration is available.
    We might return None if it's impossible to determine the bins without computing data statistics,
    in which case it will be automatically done at the start of training (but may take a few minutes).
    Nr   c                 S   s   g | ]}t |qS r   )r   )r   itemr   r   r   
<listcomp>  s    z2determine_bucket_duration_bins.<locals>.<listcomp>rR   g        r   rP   )r;   r   to_containerr   r   rK   rS   r   rQ   nplinspacer9   tolist)r   ansbeginendr   r   r   r     s   
 r   c                 C   s   t t}t| tst| } tt | }tt |  }|| }|d |r6t	
dd|  t | t|} t || } | ddrRt	
d d| _| jrZt	
d | S )	z
    Checks the schema and fills missing default option values.
    Warns the user if any of the fields are not supported by the current schema
    but does not raise exceptions.
    
use_lhotsezCThe following configuration keys are ignored by Lhotse dataloader: ,r2   Fz^Option 'tarred_random_access' is deprecated and replaced with 'skip_missing_manifest_entries'.TzNote: skip_missing_manifest_entries is set to True. If any of your manifests and tar files are mismatched, the entire tar file will be skipped without warning. It's your responsibility to ensure data integrity with this setting.)r   r   r+   r   r   setr  r   discardr*   r   joinmasked_copyr   merger   r1   )r   defaultsupported_keysreceived_keysunsupported_keysr   r   r   r     s.   


r   c                 C   sn   t | tr| jD ]}|jdurt||j|j|_q| S t| dr.t	| j
r.| 
|} | S tdt|  )zCReturn the text in the example according to the provided tokenizer.Nr   zUnsupported type of example: )r   r
   supervisionsrv   r  asarraylanguagetokenshasattrcallabler   r   type)exampler   r   r   r   r   r     s   



r   rJ   c                 C   sD   t |trt||}t| |}| D ]
\}}t| || q| S )zCTokenize the example with the provided tokenizer and prompt format.)r   r   r(   resolver'   r   setattr)r  r   rJ   encodedr   valuer   r   r   r     s   

r   ri   c                 C   s   | j |ddS )NF)target	mix_first)normalize_loudness)r   ri   r   r   r   r     s   r   c                 C   s   |   S r   )merge_supervisions)r   r   r   r   r     s   r   c                    s   | g}t | tr| jd u s| jdd u r|S | jdd} | jd}| D ]"    |  fdd} d dd |_|	| q'|S )	Nalt_textwav)audio_formatc                    s   t |  d  d dS )Nrv   rx   )rv   r  )r   )r   r   r   r   <lambda>%  s    z#_flatten_alt_text.<locals>.<lambda>rv   rx   )rv   rx   )
r   r
   customr   move_to_memorypopr   copymap_supervisionsr   )cutr	  paired_texttext_instancer   r*  r   r     s   $r   r   c                 C   s|   | r:t j r<tjd }dur t|dkr d|vr td z
t jj	
d W dS  ty9   td Y dS w dS dS )a  
    Configures PyTorch memory allocator to expand existing allocated segments
    instead of re-allocating them when tensor shape grows.
    This can help speed up the training when sequence length and/or batch size change often,
    and makes GPU more robust towards OOM.

    See here for more details:
    pytorch.org/docs/stable/notes/cuda.html#optimizing-memory-usage-with-pytorch-cuda-alloc-conf
    PYTORCH_CUDA_ALLOC_CONFNr   zexpandable_segments:TruezYou have set PYTORCH_CUDA_ALLOC_CONF without expandable_segments:True option. We're setting that option anyway. To disable it, set cuda_expandable_segments=False in NeMo dataloader configuration.z~Failed to set expandable_segments:True for PyTorch CUDA allocator. You may get training speed improvements if you enable this )r   cudais_availableosenvironr   r   warningswarnmemory_set_allocator_settingsr   r*   r   )r   r"  r   r   r   r   +  s   

r   c                 C   sV   ddl m} t| tr| |S t| |r)| jD ]}t|dr&|j||_q| S | S )Nr   )NeMoMultimodalConversationr1  )1nemo.collections.common.data.lhotse.text_adaptersr=  r   r
   r   turnsr  r1  )r  r   r=  turnr   r   r   r   J  s   




r   r`   c                 C   sx   t |tr|}nt |tr || jv r| j| }ntd| d|| jkr0td| d| j | jdkr7| S | |S )NzChannel selector z not found in cut.customzChannel index z. is larger than the actual number of channels r   )r   r   r   r,  
ValueErrornum_channelswith_channels)r1  r`   channel_idxr   r   r   r   X  s   





r   r   )\r7  rk   r9  dataclassesr   	functoolsr   typingr   r   r   r   numpyr  r   lhotser   r	   
lhotse.cutr
   lhotse.datasetr   r   r   r   r   r   r   r   lhotse.dataset.dataloadingr   lhotse.dataset.sampling.baser   r   r   lhotse.lazyr   lhotse.utilsr   r   	omegaconfr   r   *nemo.collections.common.data.lhotse.cutsetr   r   r   ,nemo.collections.common.data.lhotse.samplingr   r    r!   r"   r#   r$   r%   r&   &nemo.collections.common.data.prompt_fnr'   nemo.collections.common.promptsr(   6nemo.collections.common.tokenizers.aggregate_tokenizerr)   
nemo.utilsr*   r+   r   r   r   r   r   r   Datasetr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   s   (
(
 

3
I
| /7#'