o
    wiO                     @   s>  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZd dlmZmZmZ d dlmZmZ d dlmZmZmZ d d	lmZmZmZ d d
lmZm Z m!Z! d dl"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z* d dl+m,Z, deee-f deee.f fddZ/G dd de0Z1i a2de
e3 fddZ4de3fddZ5dee3e6e3 f fddZ7de8ee.f fddZ9dede-dee.gfdd Z:e7d!dede8ee.f fd"d#Z;e7d$dede8ee.f fd%d&Z<e7d'dede8ee.f fd(d)Z=e7d*dede8ee.f fd+d,Z>e7d-gde8ee.f fd.d/Z?d0ee3ef d1e.de-fd2d3Z@d4e-fd5d6ZAe7d7d8ee6e ef de-de8ee.f fd9d:ZBe7d;d<gde8ee.f fd=d>ZCd?ed@e3dAeDde&fdBdCZEe7dDgde8ee.f fdEdFZFeGdGeGdHfdIe3de3fdJdKZHG dLdM dMZI	N	O	Pdnd?ed@e3dAeDdQee3 dRee3 dSe.de&fdTdUZJe7dVgde8ee.f fdWdXZKd0ee3ef d1e.de-fdYd3Z@d?edZe3defd[d\ZLe7d]d^gde8ee.f fd_d`ZMddadbdcddedee6eeNeDf  dfeeNdf dgee3eNf dhe.defdidjZOdkee3e-ejf defdldmZPdS )o    N)partialrepeat)Path)KeysViewMappingSequenceTupleUnion)CutSetFeatures	Recording)ArrayTemporalArray)CutMixedCut
PaddingCut)
DictConfig
ListConfig	OmegaConf)LazyNeMoIteratorLazyNeMoTarredIteratorexpand_sharded_filepaths)	AudioTurnLhotseTextAdapterLhotseTextPairAdapterNeMoMultimodalConversation&NeMoMultimodalConversationJsonlAdapter.NeMoMultimodalConversationShareGPTJsonlAdapterNeMoSFTJsonlAdapterTextTurn)get_full_pathconfigreturnc                    s   t  ts	t   ddurt \}}||fS t fdddD }|r< ddu r2tdt \}}||fS t \}}||fS )z
    Reads NeMo configuration and creates a CutSet either from Lhotse or NeMo manifests.

    Returns a tuple of ``CutSet`` and a boolean indicating whether the data is tarred (True) or not (False).
    	input_cfgNc                 3   s    | ]
}  |d u V  qd S N)get).0optr"    g/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/collections/common/data/lhotse/cutset.py	<genexpr><   s    z*read_cutset_from_config.<locals>.<genexpr>)	cuts_path	shar_pathmanifest_filepathzCYou must specify either: manifest_filepath, cuts_path, or shar_path)
isinstancer   r&   read_dataset_configallIncompleteConfigErrorread_nemo_manifestread_lhotse_manifest)r"   cuts	is_tarreduse_nemo_manifestr*   r)   r+   read_cutset_from_config/   s   
r9   c                   @   s   e Zd ZdZdS )r3   z Placeholder for an error raised.N)__name__
__module____qualname____doc__r*   r*   r*   r+   r3   G   s    r3   c                   C   s   t  S )z
    Return the names of all registered data type parsers.

    Example:

        >>> get_known_config_data_types()
        ["nemo", "nemo_tarred", "lhotse", ...]
    )KNOWN_DATA_CONFIG_TYPESkeysr*   r*   r*   r+   get_known_config_data_typesP   s   	r@   data_type_namec                 C   s   t |  S )a  
    Return the parsing function for a given data type name.
    Parsing function reads a dataloading config and returns a tuple
    of lhotse ``CutSet`` and boolean indicating whether we should use
    iterable dataset (True) or map dataset (False) mechanism ("is tarred").
    )r>   )rA   r*   r*   r+   get_parser_fn\   s   rB   namec                    s    fdd}|S )a?  
    Decorator used to register data type parser functions.
    Parsing function reads a dataloading config and returns a tuple
    of lhotse ``CutSet`` and boolean indicating whether we should use
    iterable dataset (True) or map dataset (False) mechanism ("is tarred").

    Example:

        >>> @data_type_parser("my_new_format")
        ... def my_new_format(config):
        ...     return CutSet(read_my_format(**config)), True
        ...
        ... fn = get_parser_fn("my_new_format")
        ... cuts, is_tarred = fn({"my_arg_0": ..., "my_arg_1": ..., ...})
    c                    s,   t  tr| t < | S  D ]}| t|< q| S r%   )r0   strr>   )fnnrC   r*   r+   
_decoratorw   s   

z$data_type_parser.<locals>._decoratorr*   )rC   rH   r*   rG   r+   data_type_parserf   s   	rI   c                 C   s   |  dd|  dd|  dd|  dd|  d	d|  d
d|  dd|  dd|  dd|  dd|  dd|  ddd}t| j|d\}}||fS )a	  
    Input configuration format examples.
    Example 1. Combine two datasets with equal weights and attach custom metadata in ``tags`` to each cut::
        input_cfg:
          - type: nemo_tarred
            manifest_filepath: /path/to/manifest__OP_0..512_CL_.json
            tarred_audio_filepath: /path/to/tarred_audio/audio__OP_0..512_CL_.tar
            weight: 0.5
            tags:
              lang: en
              some_metadata: some_value
          - type: nemo_tarred
            manifest_filepath: /path/to/manifest__OP_0..512_CL_.json
            tarred_audio_filepath: /path/to/tarred_audio/audio__OP_0..512_CL_.tar
            weight: 0.5
            tags:
              lang: pl
              some_metadata: some_value
    Example 2. Combine multiple (4) datasets, with 2 corresponding to different tasks (ASR, AST).
        There are two levels of weights: per task (outer) and per dataset (inner).
        The final weight is the product of outer and inner weight::
        input_cfg:
          - type: group
            weight: 0.7
            tags:
              task: asr
            input_cfg:
              - type: nemo_tarred
                manifest_filepath: /path/to/asr1/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/tarred_audio/asr1/audio__OP_0..512_CL_.tar
                weight: 0.6
                tags:
                  lang: en
                  some_metadata: some_value
              - type: nemo_tarred
                manifest_filepath: /path/to/asr2/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/asr2/tarred_audio/audio__OP_0..512_CL_.tar
                weight: 0.4
                tags:
                  lang: pl
                  some_metadata: some_value
          - type: group
            weight: 0.3
            tags:
              task: ast
            input_cfg:
              - type: nemo_tarred
                manifest_filepath: /path/to/ast1/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/ast1/tarred_audio/audio__OP_0..512_CL_.tar
                weight: 0.2
                tags:
                  src_lang: en
                  tgt_lang: pl
              - type: nemo_tarred
                manifest_filepath: /path/to/ast2/manifest__OP_0..512_CL_.json
                tarred_audio_filepath: /path/to/ast2/tarred_audio/audio__OP_0..512_CL_.tar
                weight: 0.8
                tags:
                  src_lang: pl
                  tgt_lang: en
    shuffleF
shard_seedtrng
text_fieldtext
lang_fieldlangmetadata_onlyforce_finitemax_open_streamsNaudio_locator_tagtoken_equivalent_durationskip_missing_manifest_entriesforce_map_datasetforce_iterable_dataset)rJ   rK   rM   rO   rQ   rR   rS   rT   rU   rV   rW   rX   propagate_attrs)r&   parse_and_combine_datasetsr$   )r"   rZ   r6   r7   r*   r*   r+   r1      s   
?










r1   grp_cfgrZ   c                 C   s   | j t v sJ d| j | j dkrt| j }|| \}}n	t| j|d\}}| d }dur<|jtt|ddd}||fS )zEParse a group configuration, potentially combining multiple datasets.z7Unknown item type in dataset config list: grp_cfg.type=grouprY   tagsNr^   apply_fn)	typer@   rB   r[   r$   r&   mapr   attach_tags)r\   rZ   	parser_fnr6   r7   
extra_tagsr*   r*   r+   parse_group   s   


rg   txtc                 C   s8   t t| j| j| j| jd}| dds| }|dfS )z-Read paths to text files and create a CutSet.pathslanguageshuffle_shardsrK   rR   FT)r   r   rj   rk   rJ   rK   r&   r   r"   r6   r*   r*   r+   read_txt_paths   s   rn   txt_pairc                 C   sX   t t| j| j| d| d| d| d| j| jd}| dds(| }|dfS )	z?Read paths to source and target text files and create a CutSet.source_languagetarget_languagequestions_pathquestions_language)source_pathstarget_pathsrp   rq   rr   rs   rl   rK   rR   FT)r   r   rt   ru   r&   rJ   rK   r   rm   r*   r*   r+   read_txt_pair_paths   s   rv   nemo_sft_jsonlc                 C   s<   t t| j| d| j| jd}| dds| }|dfS )z7Read paths to Nemo SFT JSONL files and create a CutSet.rk   ri   rR   FT)r   r   rj   r&   rJ   rK   r   rm   r*   r*   r+   read_nemo_sft_jsonl  s   rx   multimodal_conversationc                 C   sX   t t| j| d| j| d| j| j| di dd}| dds(| }|dfS )	zFRead paths to multimodal conversation JSONL files and create a CutSet.tarred_audio_filepathsrU   r^   system_prompt)r/   rz   rT   rU   rl   rK   r{   rR   FT)r   r   r/   r&   rT   rJ   rK   r   rm   r*   r*   r+   "read_multimodal_conversation_jsonl  s   r|   	share_gptc              
   C   sL   t t| j| d| j| j| d| j| jd}| dds"| }|dfS )zURead paths to ShareGPT JSONL files and create a CutSet of NeMoMultimodalConversation.rz   rU   )r/   rz   rT   audio_placeholdersrU   rl   rK   rR   FT)	r   r   r/   r&   rT   r~   rJ   rK   r   rm   r*   r*   r+   read_share_gpt_as_conversation/  s   r   pathonly_metadatac                 C   *   |rt dtt| didS t | dS Nr6   zcuts.*)fields)in_dirdictsortedr   globr   r   r*   r*   r+   _resolve_shar_inputsB     
r   r^   c                 C   s"   |  D ]
\}}t| || q| S )z'Attach extra tags to a cut dynamically.)itemssetattr)cutr^   keyvalr*   r*   r+   rd   I  s   rd   r]   config_listc                    s  g }g }g  t | ttfrt| } t| dksJ d| D ]=}| }| D ]\}}||vr5|||< q(|| ||< q(t||\}}	|	|  	|	 |
d }
dur[|	|
 qt fdd D }|s|d sq|d rtd	|d  d
|d  d ntdt|dkst|t|ksJ dt|dkrt||r|nd|d |d |d p|d d}n|\}| d fS )zPParse a list of dataset configurations, potentially combining multiple datasets.r   z#Empty group in dataset config list.weightNc                 3   s    | ]	}| d  kV  qdS )r   Nr*   r'   ttarred_statusr*   r+   r,   r  s    z-parse_and_combine_datasets.<locals>.<genexpr>rW   rX   z]Not all datasets in the group have the same tarred status, using provided force_map_dataset (z) and force_iterable_dataset (z') to determine the final tarred status.zyMixing tarred and non-tarred datasets is not supported when neither force_map_dataset nor force_iterable_dataset is True.z\Missing dataset weight. When weighting datasets, every dataset must have a specified weight.   rS   rK   rR   rQ   weightsrS   seedrR   )r0   rD   r   r   loadlencopyr   rg   appendr&   r2   loggingwarning
ValueErrormux)r   rZ   r6   r   itemnext_propagate_attrskv	item_cutsitem_is_tarredwall_same_tarred_statusr*   r   r+   r[   P  s\   






r[   lhotselhotse_sharc                 C   s  |  ddu}|r/|  dd}|  dd}|  dd}|  ddur(td	 t| jttfrWtd
| j d t	j
di t| j|d|d}|sS|sS| }||fS t| jtrtd g }g }| jD ]e}t|ttfr|}	t	j
di t|	|d|d}
t|
}n0t|trt|dkrt|d ttfsJ d| d|\}	}t	j
di t|	|d|d}
td|	d| ||
 || qit|||  dd||d}||fS t| jtr!dd | j D }d| j v sJ d| j|rd|d i}t	j
|d|d}|s|s| }||fS tdt| jd| j| j}	t	|	tt|	d}||fS )z8Read paths to Lhotse manifest files and create a CutSet.r.   NrK   rL   rQ   FrR   r-   zMNote: lhotse.cuts_path will be ignored because lhotse.shar_path was provided.zEInitializing Lhotse Shar CutSet (tarred) from a single data source: ''T)rl   r   zInitializing Lhotse Shar CutSet (tarred) from multiple data sources with a weighted multiplexer. We found the following sources and weights:    r   zSupported inputs types for config.shar_path are: str | list[str] | list[tuple[str, number]] where str is a path and number is a mixing weight (it may exceed 1.0). We got: 'z- path= weight=rS   r   c                 S   s   i | ]	\}}|t |qS r*   )r   r'   r   r   r*   r*   r+   
<dictcomp>  s    z(read_lhotse_manifest.<locals>.<dictcomp>r6   zInvalid value for key 'shar_path': a dict was provided, but didn't specify key 'cuts' pointing to the manifests. We got the following: config.shar_path=)r   rl   r   zUnexpected value for key 'shar_path'. We support string, list of strings, list of tuples[string,float], and dict[string,list[string]], but got: type(config.shar_path)=z config.shar_path=manifest_pathr*   )r&   warningswarnr0   r.   rD   r   r   infor   	from_sharr   r   r   r   intfloatr   r   r   r   r?   RuntimeErrorrb   r-   	from_filerc   r   resolve_relative_paths)r"   r7   rK   rQ   rR   r6   cutsetsr   r   r   csr   r   r*   r*   r+   r5     s   


<


*


r5   r   rT   rU   c                 C   s   t | tr| S t| d|| jd jdt| jd jddg}t| dr,t| jddg| }t| dr;t| jddg| }t| j	||| j
d	S )
a?  
    Converts a lhotse Cut into a two-turn NeMoMultimodalConversation, where the user turn contains cut's audio,
    and assistant turn contains text response from ``cut.supervisions[0].text``.

    If ``cut`` has a custom field ``context``, it's pre-pended as an extra user text turn before the user's audio turn.
    userr   r   rolerT   rN   	assistantvaluer   contextr{   systemidturnsrU   custom)r0   r   r   supervisionsrN   r    hasattrr   r{   r   r   )r   rT   rU   r   r*   r*   r+   cut_to_conversation  s   
	

r   lhotse_as_conversationc                 C   sT   t | \}}| d }d ur|jtt|dd d}|tt| j| jd}||fS )Nr^   r_   r`   )rT   rU   )r9   r&   rc   r   rd   r   rT   rU   )r"   r6   r7   rf   r*   r*   r+   read_lhotse_as_conversation	  s   r   z	<\|\d+\|>z\s+rN   c                 C   s   | d| } | d|  S )z
    Strips timestamp tokens from text, e.g. turns:
      '<|0|> Hey <|3|> <|3|> how <|5|> <|7|> are <|8|> <|8|> <|10|> you? <|12|>'
      into:
      'Hey how are you?'
      )substrip)rN   _TIMESTAMP_PATTERN_SPACE_PATTERNr*   r*   r+   _strip_timestamps  s   
r   c                   @   s   e Zd ZdS )FailedConversionN)r:   r;   r<   r*   r*   r*   r+   r   (  s    r   r   Userr   	AssistantagentAgentTinput_rolesoutput_rolesstrip_timestamp_tokensc              	   C   s<  | j dd}g }d}|D ]o}	t|	jdksJ d| j |	jd j}
|	jd j}|r/t|}t|	jdkrH|	jd j||d  jd jksHJ |
|v rX|t|	d||d n|
|v rf|t	|dd	 nt
d
|
 d| j  t   S |d7 }qt| drtdd |D rt	| jdd	g| }t| j||| jdS )a(  
    Converts a lhotse Cut representing multi-turn speech-to-speech conversation (with multiple supervision segments)
    into a multi-turn NeMoMultimodalConversation, where the user has AudioTurns and assistant responds in TextTurns.

    Args:
        cut: lhotse Cut to convert.
        audio_locator_tag: special token indicating audio will be inserted in this location in the token sequence.
        token_equivalent_duration: how much speech duration is counted as one token.
        input_roles: when supervision.speaker is set to one of these values, we consider it user's turn.
        output_roles: when supervision.speaker is set to one of these values, we consider it assistant's turn.
        strip_timestamp_tokens: strips tokens like <|0|>, <|1|>, etc indicating timestamps from the text.
    F)keep_overlappingr   r   z<Expected at least one supervision per turn, got none in cut r   r   r   r   z	Speaker 'z+' not found in user or agent roles for cut r{   c                 s   s    | ]}|j d kV  qdS )r   N)r   r   r*   r*   r+   r,   X  s    z*s2s_cut_to_conversation.<locals>.<genexpr>r   r   )trim_to_supervisionsr   r   r   speakerrN   r   r   r   r    r   r   r   r   r2   r{   r   r   )r   rT   rU   r   r   r   	turn_cutsr   idxper_turn_cutturn_speaker	turn_textr*   r*   r+   s2s_cut_to_conversation,  s8   
$

r   s2s_as_conversationc                 C   s\   t | \}}|tt| j| j| dddg| dg d| dddd	d
 }||fS )Nr   r   r   r   r   r   T)rT   rU   r   r   r   c                 S   s   t | t S r%   )r0   r   )exr*   r*   r+   <lambda>o  s    z*read_s2s_as_conversation.<locals>.<lambda>)r9   rc   r   r   rT   rU   r&   filter)r"   r6   r7   r*   r*   r+   read_s2s_as_conversationc  s   

	
r   c                 C   r   r   r   r   r*   r*   r+   r   s  r   r   c                    s   t | tr| S t | tr| jD ]	}t|j |_q| S  fdd} fdd| jr0|| j | jr8| j	 | j
dur]| j
 D ]\}}t |trP|| qBt |tttfr\| qB| S )z;Resolve relative paths in a Cut object to their full paths.c                    s*   | j D ]}|jdkrt|j d|_qd S )Nfilemanifest_file)sourcesrb   r!   source)r   audio_sourcer   r*   r+   resolve_recording  s
   

z1resolve_relative_paths.<locals>.resolve_recordingc                    s   t | tr| j| _d S | jdv r0tttt| j| j  d}t|j	| _t|j
| _d S | jdv r?t| j d| _d S d S )N)numpy_fileslilcom_filesr   )kaldiiochunked_lilcom_hdf5lilcom_chunkylilcom_hdf5
numpy_hdf5)r0   r   arraystorage_typer   r!   rD   storage_pathstorage_keyparentrC   )r   abspathr   resolve_arrayr*   r+   r    s   


z-resolve_relative_paths.<locals>.resolve_arrayN)r0   r   r   tracksr   r   has_recording	recordinghas_featuresfeaturesr   r   r   r   r   r   )r   r   trackr   r   r   r*   r  r+   r   z  s(   







r   nemonemo_tarredc              	   C   s  i }dD ]}|| v r|dkr| | |d< q| | ||< q|  dd}|  dd}d|i}|  ddu}t| jttfrztd	| d
| j d |ri|sitt| jf| j	|  ddd|}|se|
 }||fS tt| jfi ||}||fS td| d g }g }	|r| j	nt
d}
t| j|
D ]\}}|rt|tttfr|\}|d }n|}|r|std|||  ddd|}n
t|fi ||}t|tst|dkrt|}n t|trt|dkrt|d ttfsJ d| d|d }td|d| |  ddur$| D ]}|t| |	| qq|t| |	| qt||	|  d|  dd|pA|d}||fS )z.Read NeMo manifest and return a Lhotse CutSet.)rM   rO   rJ   rK   extra_fieldsrJ   rl   rQ   FrR   rz   NzNInitializing Lhotse CutSet from a single NeMo manifest
            (is_tarred=z): 'r   rV   )	tar_pathsrV   zNInitializing Lhotse CutSet from multiple NeMo manifest
            (is_tarred=z_) sources with a weighted multiplexer.
            We found the following sources and weights: r%   r   )r   r  rV   r   r   zSupported inputs types for config.manifest_filepath are: str | list[list[str]] | list[tuple[str, number]] where str is a path and number is a mixing weight (it may exceed 1.0). We got: 'z- manifest_path=r   rS   rK   rL   r   r*   )r&   r0   r/   rD   r   r   r   r   r   rz   r   r   ziplisttupler   r   r   r   r   	to_shardsr   r   )r"   common_kwargsr   rQ   rR   notar_kwargsr7   r6   r   r   r  manifest_infotar_pathr   	nemo_iterr   subiterr*   r*   r+   r4     s   
NL



r4   rL   F)rS   r   rR   r   r   rS   r   rR   c                 G   sl   |dur|r
J dt j|| ||d}|S |sdd |D }t|dkr+|d }|S t j|| |d}|S )	a  
    Helper function to call the right multiplexing method flavour in lhotse.
    The result is always an infinitely iterable ``CutSet``, but depending on whether ``max_open_streams`` is set,
    it will select a more appropriate multiplexing strategy.
    NzJmax_open_streams and metadata_only/force_finite options are not compatible)r   r   rS   c                 S   s   g | ]}|  qS r*   r   )r'   r   r*   r*   r+   
<listcomp>0  s    zmux.<locals>.<listcomp>r   r   )r   r   )r   infinite_muxr   r   )r   rS   r   rR   r   r6   r*   r*   r+   r     s   	r   inpc              
   C   s  ddl m} t| ttjfr;z|tdd |  D }t	|\}}|W S  t
y: } z	td|  d|d}~ww t| trz| drQ|td	|  g}n!| d
s[| drg|td|  g}n|td|  g}t	|\}}|S tdt|  d)a
  
    Utility function that supports opening a CutSet from:
    * a string path to YAML input spec (see :func:`read_dataset_config` for details)
    * a string path to Lhotse non-tarred JSONL manifest
    * a string path to NeMo non-tarred JSON manifest
    * a dictionary specifying inputs with keys available in
        :class:`nemo.collections.common.data.lhotse.dataloader.LhotseDataLoadingConfig`

    It's intended to be used in a generic context where we are not sure which way the user will specify the inputs.
    r   )$make_structured_with_schema_warningsc                 S   s   g | ]\}}| d | qS )=r*   r   r*   r*   r+   r  H  s    z&guess_parse_cutset.<locals>.<listcomp>z)Couldn't open CutSet based on dict input z1 (is it compatible with LhotseDataLoadingConfig?)Nz.yamlz
input_cfg=z.jsonlz	.jsonl.gzz
cuts_path=zmanifest_filepath=zUnsupported input type: z (expected a dict or a string)).nemo.collections.common.data.lhotse.dataloaderr  r0   r   	omegaconfr   r   from_dotlistr   r9   	Exceptionr   rD   endswithrb   )r  r  r"   r6   _er*   r*   r+   guess_parse_cutset9  s.   


r&  )r   r   T)Qr   rer   	functoolsr   	itertoolsr   pathlibr   typingr   r   r   r	   r
   r   r   r   r   r   lhotse.arrayr   r   
lhotse.cutr   r   r   r   r   r   1nemo.collections.common.data.lhotse.nemo_adaptersr   r   r   1nemo.collections.common.data.lhotse.text_adaptersr   r   r   r   r   r   r   r    4nemo.collections.common.parts.preprocessing.manifestr!   r   boolr9   r   r3   r>   rD   r@   rB   r  rI   r  r1   rg   rn   rv   rx   r|   r   r   rd   r[   r5   r   r   r   compiler   r   r   r   r   r4   r   r   r&  r*   r*   r*   r+   <module>   s   (
"
P

@\


7
1v


"