o
    %ݫi+B                     @   s  d Z ddlZddlZddlmZ ddlmZ ddlmZm	Z	m
Z
mZmZ ddlZddlmZ ddlmZ dd	lmZmZmZ eeZe
eee
e f  Zd
eeef de	eef ddfddZ	d-dede
e fddZdede
e fddZdedeeef fddZde
e de	eef fddZde
e
e  dedede
e
e  fddZ 		 	d.ded!e	eef d"e	eef d#ed$e!d%e"dej#fd&d'Z$	d/ded!e	eef d"e	eef d%e"dej#f
d(d)Z%d0d+d,Z&dS )1ao   This module contains functions to prepare the lexicon and the language model
for k2 training. It is based on the script `prepare_lang.sh` from k2/icefall (work
of Fangjun Kuang). The original script is under Apache 2.0 license.
This script is modified to work with SpeechBrain.

Modified by:
  * Pierre Champion 2023
  * Zeyu Zhao 2023
  * Georgios Karakasidis 2023
    N)defaultdict)Path)AnyDictListTupleUnion)
get_logger   )k2)EPSread_lexiconwrite_lexiconfilenamesym2idreturnc                 C   s\   t | ddd}| D ]\}}|| d| d qW d   dS 1 s'w   Y  dS )a9  
    Write a symbol to ID mapping to a file.

    NOTE: No need to implement `read_mapping` as it can be done through
      :func:`k2.SymbolTable.from_file`.

    Arguments
    ---------
    filename: str
        Filename to save the mapping.
    sym2id: Dict[str, int]
        A dict mapping symbols to IDs.
    wzutf-8)encoding 
N)openitemswrite)r   r   fsymi r   [/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/k2_integration/prepare_lang.pywrite_mapping    s
   "r   SILFlexiconc                 C   sV   t  }|r
|| | D ]\}}||vsJ | d| || qtt|}|S )a0  
    Get tokens from a lexicon.

    Arguments
    ---------
    lexicon: Lexicon
        It is the return value of :func:`read_lexicon`.
    sil_token: str
        The optional silence token between words. It should not appear in the lexicon,
        otherwise it will cause an error.
    manually_add_sil_to_tokens: bool
        If true, add `sil_token` to the tokens. This is useful when the lexicon
        does not contain `sil_token` but it is needed in the tokens.

    Returns
    -------
    sorted_ans: List[str]
        A list of unique tokens.
    z5 should not appear in the lexicon but it is found in )setaddupdatesortedlist)r    	sil_tokenmanually_add_sil_to_tokensans_tokens
sorted_ansr   r   r   
get_tokens3   s   

r,   c                 C   s.   t  }| D ]	\}}|| qtt|}|S )z
    Get words from a lexicon.

    Arguments
    ---------
    lexicon: Lexicon
        It is the return value of :func:`read_lexicon`.

    Returns
    -------
    sorted_ans:
        Return a list of unique words.
    )r!   r"   r$   r%   )r    r(   wordr)   r+   r   r   r   	get_wordsU   s
   r.   c                 C   s.  t t}| D ]\}}|d|  d7  < qt t}| D ]\}}| }|  |r7d|d|< |  |s*qg }d}|d }t t}| D ]L\}	}d|}
|
dksUJ ||
 dkri||
 dkri||	|f qF||
 }|dkrt|}n|d7 }||kr~|}|||
< |
d| 7 }
||	|
 f qF||fS )a
  
    It adds pseudo-token disambiguation symbols #1, #2 and so on
    at the ends of tokens to ensure that all pronunciations are different,
    and that none is a prefix of another.

    See also add_lex_disambig.pl from kaldi.

    Arguments
    ---------
    lexicon: Lexicon
        It is returned by :func:`read_lexicon`.

    Returns
    -------
    ans:
        The output lexicon with disambiguation symbols
    max_disambig:
        The ID of the max disambiguation symbol that appears
        in the lexicon
    r   r
    r   z #)r   intjoincopypopappendsplit)r    countr)   r*   issubseqr(   first_allowed_disambigmax_disambiglast_used_disambig_symbol_ofr-   tokenseqcur_disambigr   r   r   add_disambig_symbolsj   s@   	
r=   symbolsc                 C   s   dd t | D S )z
    Generate ID maps, i.e., map a symbol to a unique ID.

    Arguments
    ---------
    symbols: List[str]
        A list of unique symbols.

    Returns
    -------
    A dict containing the mapping between symbols and IDs.
    c                 S   s   i | ]\}}||qS r   r   ).0r   r   r   r   r   
<dictcomp>       z#generate_id_map.<locals>.<dictcomp>)	enumerate)r>   r   r   r   generate_id_map   s   rC   arcsdisambig_tokendisambig_wordc                 C   sZ   t  }| D ]}|\}}}}}	|dkr|| qg }
|D ]}|
||||dg q| |
 S )a  
    Adds self-loops to states of an FST to propagate disambiguation symbols
    through it. They are added on each state with non-epsilon output symbols
    on at least one arc out of the state.

    See also fstaddselfloops.pl from Kaldi. One difference is that
    Kaldi uses OpenFst style FSTs and it has multiple final states.
    This function uses k2 style FSTs and it does not need to add self-loops
    to the final state.

    The input label of a self-loop is `disambig_token`, while the output
    label is `disambig_word`.

    Arguments
    ---------
    arcs: List[List[Any]]
        A list-of-list. The sublist contains
        `[src_state, dest_state, label, aux_label, score]`
    disambig_token: int
        It is the token ID of the symbol `#0`.
    disambig_word: int
        It is the word ID of the symbol `#0`.

    Returns
    -------
    Return new `arcs` containing self-loops.
    r   )r!   r"   r4   )rD   rE   rF   states_needs_self_loopsarcsrcdstilabelolabelscorer(   sr   r   r   add_self_loops   s   
rO         ?token2idword2idr&   sil_probneed_self_loopsc              	      s  |dkr|dk s
J t |}t d| }d}d}	d}
d}g } t dks(J |t dks0J d} | }|||	|||g |||
|||g ||
|	||dg | D ]k\}}t|dksgJ | d|	}|| } fdd	|D }tt|d D ]}|dkr|n|}||||| |dg |}|d7 }q~t|d }|dkr|n|}|||	|| ||g |||
|| ||g qV|rӈ d
 }|d
 }t|||d}|}||	|dddg ||g t|dd d}dd	 |D }dd	 |D }d|}t	j
j|dd}|S )aR  
    Convert a lexicon to an FST (in k2 format) with optional silence at the
    beginning and end of each word.

    Arguments
    ---------
    lexicon: Lexicon
        The input lexicon. See also :func:`read_lexicon`
    token2id: Dict[str, int]
        A dict mapping tokens to IDs.
    word2id: Dict[str, int]
        A dict mapping words to IDs.
    sil_token: str
        The silence token.
    sil_prob: float
        The probability for adding a silence at the beginning and end
        of the word.
    need_self_loops: bool
        If True, add self-loop to states with non-epsilon output symbols
        on at least one arc out of the state. The input label for this
        self loop is `token2id["#0"]` and the output label is `word2id["#0"]`.

    Returns
    -------
    fsa: k2.Fsa
        An FSA representing the given lexicon.
    g        g      ?r   r
          has no pronunciationsc                       g | ]} | qS r   r   r?   r   rQ   r   r   
<listcomp>+      z"lexicon_to_fst.<locals>.<listcomp>#0rE   rF   c                 S      | d S Nr   r   rH   r   r   r   <lambda>I      z lexicon_to_fst.<locals>.<lambda>keyc                 S      g | ]	}d d |D qS )c                 S      g | ]}t |qS r   strrY   r   r   r   r[   J  r\   z-lexicon_to_fst.<locals>.<listcomp>.<listcomp>r   r?   rH   r   r   r   r[   J      c                 S      g | ]}d  |qS r   r1   rk   r   r   r   r[   K  rA   r   Facceptor)mathlogr   r4   lenrangerO   r$   r1   r   Fsafrom_str)r    rQ   rR   r&   rS   rT   	sil_scoreno_sil_scorestart_state
loop_state	sil_state
next_staterD   epssil_token_idr-   r*   	cur_stater   r   rE   rF   final_statefsar   rZ   r   lexicon_to_fst   s\   #


r   c              	      s  d}d}g } t  dksJ |t  dksJ d}| D ]_\}}	t|	dks+J | d|}
|| } fdd|	D }	tt|	d D ]}|dkrJ|n|}||
||	| |dg |}
|d7 }qBt|	d }|dkrk|n|}||
||	| |dg q|r d }|d }t|||d}|}|||dddg ||g t|d	d
 d}dd |D }dd |D }d|}tjj	|dd}|S )au  
    Convert a lexicon to an FST (in k2 format).

    Arguments
    ---------
    lexicon: Lexicon
        The input lexicon. See also :func:`read_lexicon`
    token2id: Dict[str, int]
        A dict mapping tokens to IDs.
    word2id: Dict[str, int]
        A dict mapping words to IDs.
    need_self_loops: bool
        If True, add self-loop to states with non-epsilon output symbols
        on at least one arc out of the state. The input label for this
        self loop is `token2id["#0"]` and the output label is `word2id["#0"]`.

    Returns
    -------
    fsa: k2.Fsa
        An FSA representing the given lexicon.
    r   r
   rW   c                    rX   r   r   rY   rZ   r   r   r[   |  r\   z)lexicon_to_fst_no_sil.<locals>.<listcomp>r]   r^   r_   c                 S   r`   ra   r   rb   r   r   r   rc     rd   z'lexicon_to_fst_no_sil.<locals>.<lambda>re   c                 S   rg   )c                 S   rh   r   ri   rY   r   r   r   r[     r\   z4lexicon_to_fst_no_sil.<locals>.<listcomp>.<listcomp>r   rk   r   r   r   r[     rl   c                 S   rm   rn   ro   rk   r   r   r   r[     rA   r   Frp   )
r   rt   ru   r4   rO   r$   r1   r   rv   rw   )r    rQ   rR   rT   r{   r}   rD   r~   r-   piecesr   r   r   rE   rF   r   r   r   rZ   r   lexicon_to_fst_no_silR  sH   

r   Tc              	   C   s  t | }|d }|r'|d  r'|d  j| jk r'td| d dS dD ]-}||  rVtj|d dd	 td
||  d| d|  t	|| |d |  q)t
t|}|dkrit||dd}nt|dd}t|}	t|\}
}t|d D ]}d| }||vsJ |d|  qt|vsJ tg| }t|	vsJ d|	vsJ d|	vsJ d|	vsJ tg|	 g d }	t|}t|	}td| d t|d | t|d | t|d |
 |dkrt|||||d}nt|||d}|dkrt|
||||dd}nt|
||dd}t| }td | d t| |d!  t| |d"  t| |d  dS )#aE  
    This function takes as input a lexicon file "$lang_dir/lexicon.txt"
    consisting of words and tokens (i.e., phones) and does the following:

    1. Add disambiguation symbols to the lexicon and generate lexicon_disambig.txt

    2. Generate tokens.txt, the token table mapping a token to a unique integer.

    3. Generate words.txt, the word table mapping a word to a unique integer.

    4. Generate L.pt, in k2 format. It can be loaded by

            d = torch.load("L.pt")
            lexicon = k2.Fsa.from_dict(d)

    5. Generate L_disambig.pt, in k2 format.


    Arguments
    ---------
    lang_dir: str
        The directory to store the output files and read the input file lexicon.txt.
    sil_token: str
        The silence token. Default is "SIL".
    sil_prob: float
        The probability for adding a silence at the beginning and end of the word.
        Default is 0.5.
    cache: bool
        Whether or not to load/cache from/to the .pt format.

    Returns
    -------
    None

    Example
    -------
    >>> from speechbrain.k2_integration.prepare_lang import prepare_lang

    >>> # Create a small lexicon containing only two words and write it to a file.
    >>> lang_tmpdir = getfixture('tmpdir')
    >>> lexicon_sample = '''hello h e l l o\nworld w o r l d'''
    >>> lexicon_file = lang_tmpdir.join("lexicon.txt")
    >>> lexicon_file.write(lexicon_sample)

    >>> prepare_lang(lang_tmpdir)
    >>> for expected_file in ["tokens.txt", "words.txt", "L.pt", "L_disambig.pt", "Linv.pt" ]:
    ...     assert os.path.exists(os.path.join(lang_tmpdir, expected_file))
    zlexicon.txtLinv.ptzSkipping lang preparation of 'zA'. Set 'caching: False' in the yaml if this is not what you want.N)L.ptL_disambig.pt
tokens.txt	words.txtr   lexicon_disambig.txtbackupT)exist_okzBacking up z to z/backup/r   )r&   r'   F)r'   r
   #r]   <s></s>)r]   r   r   z7Saving tokens.txt, words.txt, lexicon_disambig.txt to ''r   r   r   )rQ   rR   r&   rS   )rQ   rR   )rQ   rR   r&   rS   rT   )rQ   rR   rT   z(Saving L.pt, Linv.pt, L_disambig.pt to 'r   r   )r   existsstatst_mtimeloggerwarningosmakedirsdebugrenamer   rj   r,   r.   r=   ru   r4   r   rC   infor   r   r   r   r   arc_sortinverttorchsaveas_dict)lang_dirr&   rS   cacheout_dirlexicon_filenamer   r    r*   wordslexicon_disambigr9   r   disambigrQ   rR   L
L_disambigL_invr   r   r   prepare_lang  s   2

 



	r   )r   F)r   rP   F)F)r   rP   T)'__doc__rr   r   collectionsr   pathlibr   typingr   r   r   r   r   r   speechbrain.utils.loggerr	   r/   r   r    r   r   r   __name__r   rj   Lexiconr0   r   r,   r.   r=   rC   rO   floatboolrv   r   r   r   r   r   r   r   <module>   s|   &
"G


/


j


N