o
    }oiz                     @   sR  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZ d dlZd dlmZmZmZ d dlmZmZ d dlmZmZmZ d d	lmZmZmZ d d
lmZmZm Z  d dl!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* deee+f deee,f fddZ-G dd de.Z/i a0de	e1 fddZ2de1fddZ3dee1e4e1 f fddZ5de6ee,f fddZ7dede+dee,gfdd Z8e5d!dede6ee,f fd"d#Z9e5d$dede6ee,f fd%d&Z:e5d'dede6ee,f fd(d)Z;e5d*dede6ee,f fd+d,Z<d-e+fd.d/Z=e5d0d1ee4e ef de+de6ee,f fd2d3Z>e5d4d5gde6ee,f fd6d7Z?e5d8gde6ee,f fd9d:Z@d;ee1ef d<e,de+fd=d>ZAd?ed@e1defdAdBZBe5dCdDgde6ee,f fdEdFZCddGdHdIdJedKe4eeDeEf  dLeeDdf dMee1eDf dNe,defdOdPZFdQee1e+ejf defdRdSZGdS )T    N)partialrepeat)Path)KeysViewMappingSequenceTupleUnion)CutSetFeatures	Recording)ArrayTemporalArray)CutMixedCut
PaddingCut)
DictConfig
ListConfig	OmegaConf)LazyNeMoIteratorLazyNeMoTarredIteratorexpand_sharded_filepaths)	AudioTurnLhotseTextAdapterLhotseTextPairAdapterNeMoMultimodalConversation&NeMoMultimodalConversationJsonlAdapterNeMoSFTJsonlAdapterTextTurn)get_full_pathconfigreturnc                    s   t  ts	t   ddurt \}}||fS t fdddD }|r< ddu r2tdt \}}||fS t \}}||fS )z
    Reads NeMo configuration and creates a CutSet either from Lhotse or NeMo manifests.

    Returns a tuple of ``CutSet`` and a boolean indicating whether the data is tarred (True) or not (False).
    	input_cfgNc                 3   s    | ]
}  |d u V  qd S N)get).0optr!    ^/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/common/data/lhotse/cutset.py	<genexpr>:   s    z*read_cutset_from_config.<locals>.<genexpr>)	cuts_path	shar_pathmanifest_filepathzCYou must specify either: manifest_filepath, cuts_path, or shar_path)
isinstancer   r%   read_dataset_configallIncompleteConfigErrorread_nemo_manifestread_lhotse_manifest)r!   cuts	is_tarreduse_nemo_manifestr)   r(   r*   read_cutset_from_config-   s   
r8   c                   @   s   e Zd ZdZdS )r2   z Placeholder for an error raised.N)__name__
__module____qualname____doc__r)   r)   r)   r*   r2   E   s    r2   c                   C   s   t  S )z
    Return the names of all registered data type parsers.

    Example:

        >>> get_known_config_data_types()
        ["nemo", "nemo_tarred", "lhotse", ...]
    )KNOWN_DATA_CONFIG_TYPESkeysr)   r)   r)   r*   get_known_config_data_typesN   s   	r?   data_type_namec                 C   s   t |  S )a  
    Return the parsing function for a given data type name.
    Parsing function reads a dataloading config and returns a tuple
    of lhotse ``CutSet`` and boolean indicating whether we should use
    iterable dataset (True) or map dataset (False) mechanism ("is tarred").
    )r=   )r@   r)   r)   r*   get_parser_fnZ   s   rA   namec                    s    fdd}|S )a?  
    Decorator used to register data type parser functions.
    Parsing function reads a dataloading config and returns a tuple
    of lhotse ``CutSet`` and boolean indicating whether we should use
    iterable dataset (True) or map dataset (False) mechanism ("is tarred").

    Example:

        >>> @data_type_parser("my_new_format")
        ... def my_new_format(config):
        ...     return CutSet(read_my_format(**config)), True
        ...
        ... fn = get_parser_fn("my_new_format")
        ... cuts, is_tarred = fn({"my_arg_0": ..., "my_arg_1": ..., ...})
    c                    s,   t  tr| t < | S  D ]}| t|< q| S r$   )r/   strr=   )fnnrB   r)   r*   
_decoratoru   s   

z$data_type_parser.<locals>._decoratorr)   )rB   rG   r)   rF   r*   data_type_parserd   s   	rH   c                 C   s   |  dd|  dd|  dd|  dd|  d	d|  d
d|  dd|  dd|  dd|  dd|  ddd}| j}t|ttfrIt|}t||d\}}||fS )a	  
    Input configuration format examples.
    Example 1. Combine two datasets with equal weights and attach custom metadata in ``tags`` to each cut::
        input_cfg:
          - type: nemo_tarred
            manifest_filepath: /path/to/manifest__OP_0..512_CL_.json
            tarred_audio_filepath: /path/to/tarred_audio/audio__OP_0..512_CL_.tar
            weight: 0.5
            tags:
              lang: en
              some_metadata: some_value
          - type: nemo_tarred
            manifest_filepath: /path/to/manifest__OP_0..512_CL_.json
            tarred_audio_filepath: /path/to/tarred_audio/audio__OP_0..512_CL_.tar
            weight: 0.5
            tags:
              lang: pl
              some_metadata: some_value
    Example 2. Combine multiple (4) datasets, with 2 corresponding to different tasks (ASR, AST).
        There are two levels of weights: per task (outer) and per dataset (inner).
        The final weight is the product of outer and inner weight::
        input_cfg:
          - type: group
            weight: 0.7
            tags:
              task: asr
            input_cfg:
              - type: nemo_tarred
                manifest_filepath: /path/to/asr1/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/tarred_audio/asr1/audio__OP_0..512_CL_.tar
                weight: 0.6
                tags:
                  lang: en
                  some_metadata: some_value
              - type: nemo_tarred
                manifest_filepath: /path/to/asr2/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/asr2/tarred_audio/audio__OP_0..512_CL_.tar
                weight: 0.4
                tags:
                  lang: pl
                  some_metadata: some_value
          - type: group
            weight: 0.3
            tags:
              task: ast
            input_cfg:
              - type: nemo_tarred
                manifest_filepath: /path/to/ast1/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/ast1/tarred_audio/audio__OP_0..512_CL_.tar
                weight: 0.2
                tags:
                  src_lang: en
                  tgt_lang: pl
              - type: nemo_tarred
                manifest_filepath: /path/to/ast2/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/ast2/tarred_audio/audio__OP_0..512_CL_.tar
                weight: 0.8
                tags:
                  src_lang: pl
                  tgt_lang: en
    shuffleF
shard_seedtrng
text_fieldtext
lang_fieldlangmetadata_onlyforce_finitemax_open_streamsNtoken_equivalent_durationskip_missing_manifest_entriesforce_map_datasetforce_iterable_dataset)rI   rJ   rL   rN   rP   rQ   rR   rS   rT   rU   rV   propagate_attrs)r%   r#   r/   rC   r   r   loadparse_and_combine_datasets)r!   rX   r#   r5   r6   r)   r)   r*   r0      s"   
?










r0   grp_cfgrX   c                 C   s   | j t v sJ d| j | j dkrt| j }|| \}}n	t| j|d\}}| d }dur<|jtt|ddd}||fS )zEParse a group configuration, potentially combining multiple datasets.z7Unknown item type in dataset config list: grp_cfg.type=grouprW   tagsN)r]   )apply_fn)	typer?   rA   rZ   r#   r%   mapr   attach_tags)r[   rX   	parser_fnr5   r6   
extra_tagsr)   r)   r*   parse_group   s   


rd   txtc                 C   s8   t t| j| j| j| jd}| dds| }|dfS )z-Read paths to text files and create a CutSet.pathslanguageshuffle_shardsrJ   rQ   FT)r   r   rg   rh   rI   rJ   r%   r   r!   r5   r)   r)   r*   read_txt_paths   s   rk   txt_pairc                 C   sX   t t| j| j| d| d| d| d| j| jd}| dds(| }|dfS )	z?Read paths to source and target text files and create a CutSet.source_languagetarget_languagequestions_pathquestions_language)source_pathstarget_pathsrm   rn   ro   rp   ri   rJ   rQ   FT)r   r   rq   rr   r%   rI   rJ   r   rj   r)   r)   r*   read_txt_pair_paths   s   rs   nemo_sft_jsonlc                 C   s<   t t| j| d| j| jd}| dds| }|dfS )z7Read paths to Nemo SFT JSONL files and create a CutSet.rh   rf   rQ   FT)r   r   rg   r%   rI   rJ   r   rj   r)   r)   r*   read_nemo_sft_jsonl  s   ru   multimodal_conversationc              	   C   sH   t t| j| d| j| d| j| jd}| dds | }|dfS )zFRead paths to multimodal conversation JSONL files and create a CutSet.tarred_audio_filepathsrS   )r.   rw   audio_locator_tagrS   ri   rJ   rQ   FT)r   r   r.   r%   rx   rI   rJ   r   rj   r)   r)   r*   "read_multimodal_conversation_jsonl  s   
ry   r]   c                 C   s"   |  D ]
\}}t| || q| S )z'Attach extra tags to a cut dynamically.)itemssetattr)cutr]   keyvalr)   r)   r*   ra   /  s   ra   r\   config_listc                    sh  g }g }g  t | dksJ d| D ]=}| }| D ]\}}||vr)|||< q|| ||< qt||\}}	||  |	 |d }
durO||
 qt fdd D }|sz|d se|d rvtd	|d  d
|d  d nt	dt |dkst |t |ksJ dt |dkrt
||r|nd|d |d |d p|d d}n|\}| d fS )zPParse a list of dataset configurations, potentially combining multiple datasets.r   z#Empty group in dataset config list.weightNc                 3   s    | ]	}| d  kV  qdS )r   Nr)   )r&   ttarred_statusr)   r*   r+   T  s    z-parse_and_combine_datasets.<locals>.<genexpr>rU   rV   z]Not all datasets in the group have the same tarred status, using provided force_map_dataset (z) and force_iterable_dataset (z') to determine the final tarred status.zyMixing tarred and non-tarred datasets is not supported when neither force_map_dataset nor force_iterable_dataset is True.z\Missing dataset weight. When weighting datasets, every dataset must have a specified weight.   rR   rJ   rQ   rP   weightsrR   seedrQ   )lencopyrz   rd   appendr%   r1   loggingwarning
ValueErrormux)r   rX   r5   r   itemnext_propagate_attrskv	item_cutsitem_is_tarredwall_same_tarred_statusr)   r   r*   rZ   6  sX   





rZ   lhotselhotse_sharc                 C   s  |  ddu}|r/|  dd}|  dd}|  dd}|  ddur(td	 t| jttfrWtd
| j d t	j
di t| j|d|d}|sS|sS| }||fS t| jtrtd g }g }| jD ]e}t|ttfr|}	t	j
di t|	|d|d}
t|
}n0t|trt|dkrt|d ttfsJ d| d|\}	}t	j
di t|	|d|d}
td|	d| ||
 || qit|||  dd||d}||fS t| jtr!dd | j D }d| j v sJ d| j|rd|d i}t	j
|d|d}|s|s| }||fS tdt| jd| j| j}	t	|	tt|	d}||fS )z8Read paths to Lhotse manifest files and create a CutSet.r-   NrJ   rK   rP   FrQ   r,   zMNote: lhotse.cuts_path will be ignored because lhotse.shar_path was provided.zEInitializing Lhotse Shar CutSet (tarred) from a single data source: ''T)ri   r   zInitializing Lhotse Shar CutSet (tarred) from multiple data sources with a weighted multiplexer. We found the following sources and weights:    r   zSupported inputs types for config.shar_path are: str | list[str] | list[tuple[str, number]] where str is a path and number is a mixing weight (it may exceed 1.0). We got: 'z- path= weight=rR   r   c                 S   s   i | ]	\}}|t |qS r)   )r   r&   r   r   r)   r)   r*   
<dictcomp>  s    z(read_lhotse_manifest.<locals>.<dictcomp>r5   zInvalid value for key 'shar_path': a dict was provided, but didn't specify key 'cuts' pointing to the manifests. We got the following: config.shar_path=)fieldsri   r   zUnexpected value for key 'shar_path'. We support string, list of strings, list of tuples[string,float], and dict[string,list[string]], but got: type(config.shar_path)=z config.shar_path=manifest_pathr)   )r%   warningswarnr/   r-   rC   r   r   infor   	from_shar_resolve_shar_inputsr   r   r   intfloatr   r   r   rz   r>   RuntimeErrorr_   r,   	from_filer`   r   resolve_relative_paths)r!   r6   rJ   rP   rQ   r5   cutsetsr   r   pathcsr   r   r)   r)   r*   r4   s  s   


<


*


r4   lhotse_as_conversationc                    s4   dt dtf fdd}t \}}||}||fS )Nr|   r"   c                    sX   t | d jdt| jd jddg}t| dr!t| jddg| }t| j| j	| j
dS )Nuser)r|   rolerx   r   	assistant)valuer   context)idturnsrS   custom)r   rx   r   supervisionsrM   hasattrr   r   r   rS   r   )r|   r   r(   r)   r*   cut_to_conversation  s   
z8read_lhotse_as_conversation.<locals>.cut_to_conversation)r   r   r8   r`   )r!   r   r5   r6   r)   r(   r*   read_lhotse_as_conversation  s   
r   r   only_metadatac                 C   s*   |rt dtt| didS t | dS )Nr5   zcuts.*)r   )in_dir)dictsortedr   glob)r   r   r)   r)   r*   r     s   
r   r|   r   c                    s   t | tr| S t | tr| jD ]	}t|j |_q| S  fdd} fdd| jr0|| j | jr8| j	 | j
dur]| j
 D ]\}}t |trP|| qBt |tttfr\| qB| S )z;Resolve relative paths in a Cut object to their full paths.c                    s*   | j D ]}|jdkrt|j d|_qd S )Nfilemanifest_file)sourcesr_   r    source)r   audio_sourcer   r)   r*   resolve_recording  s
   

z1resolve_relative_paths.<locals>.resolve_recordingc                    s   t | tr| j| _d S | jdv r0tttt| j| j  d}t|j	| _t|j
| _d S | jdv r?t| j d| _d S d S )N)numpy_fileslilcom_filesr   )kaldiiochunked_lilcom_hdf5lilcom_chunkylilcom_hdf5
numpy_hdf5)r/   r   arraystorage_typer   r    rC   storage_pathstorage_keyparentrB   )r   abspathr   resolve_arrayr)   r*   r     s   


z-resolve_relative_paths.<locals>.resolve_arrayN)r/   r   r   tracksr   r|   has_recording	recordinghas_featuresfeaturesr   rz   r   r   r   r   )r|   r   trackr   r}   r   r)   r   r*   r     s(   







r   nemonemo_tarredc              	   C   s  i }dD ]}|| v r|dkr| | |d< q| | ||< q|  dd}|  dd}d|i}|  ddu}t| jttfrztd	| d
| j d |ri|sitt| jf| j	|  ddd|}|se|
 }||fS tt| jfi ||}||fS td| d g }g }	|r| j	nt
d}
t| j|
D ]\}}|rt|tttfr|\}|d }n|}|r|std|||  ddd|}n
t|fi ||}t|tst|dkrt|}n t|trt|dkrt|d ttfsJ d| d|d }td|d| |  ddur$| D ]}|t| |	| qq|t| |	| qt||	|  d|  dd|pA|d}||fS )z.Read NeMo manifest and return a Lhotse CutSet.)rL   rN   rI   rJ   extra_fieldsrI   ri   rP   FrQ   rw   NzOInitializing Lhotse CutSet from a single NeMo manifest 
            (is_tarred=z): 'r   rT   )	tar_pathsrT   zOInitializing Lhotse CutSet from multiple NeMo manifest 
            (is_tarred=z_) sources with a weighted multiplexer.
            We found the following sources and weights: r$   r   )r   r   rT   r   r   zSupported inputs types for config.manifest_filepath are: str | list[list[str]] | list[tuple[str, number]] where str is a path and number is a mixing weight (it may exceed 1.0). We got: 'z- manifest_path=r   rR   rJ   rK   r   r)   )r%   r/   r.   rC   r   r   r   r   r   rw   r   r   ziplisttupler   r   r   r   r   	to_shardsr   r   )r!   common_kwargsr}   rP   rQ   notar_kwargsr6   r5   r   r   r   manifest_infotar_pathr   	nemo_iterr   subiterr)   r)   r*   r3     s   
NL



r3   rK   F)rR   r   rQ   r   r   rR   r   rQ   c                 G   sl   |dur|r
J dt j|| ||d}|S |sdd |D }t|dkr+|d }|S t j|| |d}|S )	a  
    Helper function to call the right multiplexing method flavour in lhotse.
    The result is always an infinitely iterable ``CutSet``, but depending on whether ``max_open_streams`` is set,
    it will select a more appropriate multiplexing strategy.
    NzJmax_open_streams and metadata_only/force_finite options are not compatible)r   r   rR   c                 S   s   g | ]}|  qS r)   r   )r&   r   r)   r)   r*   
<listcomp>  s    zmux.<locals>.<listcomp>r   r   )r   r   )r   infinite_muxr   r   )r   rR   r   rQ   r   r5   r)   r)   r*   r     s   	r   inpc              
   C   s  ddl m} t| ttjfr;z|tdd |  D }t	|\}}|W S  t
y: } z	td|  d|d}~ww t| trz| drQ|td	|  g}n!| d
s[| drg|td|  g}n|td|  g}t	|\}}|S tdt|  d)a
  
    Utility function that supports opening a CutSet from:
    * a string path to YAML input spec (see :func:`read_dataset_config` for details)
    * a string path to Lhotse non-tarred JSONL manifest
    * a string path to NeMo non-tarred JSON manifest
    * a dictionary specifying inputs with keys available in
        :class:`nemo.collections.common.data.lhotse.dataloader.LhotseDataLoadingConfig`

    It's intended to be used in a generic context where we are not sure which way the user will specify the inputs.
    r   )$make_structured_with_schema_warningsc                 S   s   g | ]\}}| d | qS )=r)   r   r)   r)   r*   r     s    z&guess_parse_cutset.<locals>.<listcomp>z)Couldn't open CutSet based on dict input z1 (is it compatible with LhotseDataLoadingConfig?)Nz.yamlz
input_cfg=z.jsonlz	.jsonl.gzz
cuts_path=zmanifest_filepath=zUnsupported input type: z (expected a dict or a string)).nemo.collections.common.data.lhotse.dataloaderr   r/   r   	omegaconfr   r   from_dotlistrz   r8   	Exceptionr   rC   endswithr_   )r   r   r!   r5   _er)   r)   r*   guess_parse_cutset  s.   


r   )Hr   r   	functoolsr   	itertoolsr   pathlibr   typingr   r   r   r	   r
   r   r   r   r   r   lhotse.arrayr   r   
lhotse.cutr   r   r   r   r   r   1nemo.collections.common.data.lhotse.nemo_adaptersr   r   r   1nemo.collections.common.data.lhotse.text_adaptersr   r   r   r   r   r   r   4nemo.collections.common.parts.preprocessing.manifestr    r   boolr8   r   r2   r=   rC   r?   rA   r   rH   r   r0   rd   rk   rs   ru   ry   ra   rZ   r4   r   r   r   r3   r   r   r   r   r)   r)   r)   r*   <module>   s   $	"
S

<\
1v


"