o
    7wiP                     @   s  U d Z ddlZddlZddlZddlmZ ddlmZmZm	Z	m
Z
 ddlZddlmZ ddlmZ eeZdZd	Zd
ZdZedZejed< G dd deZg ddfddZdedee	eee f  fddZde
eef dee	eee f  ddfddZdS )a+  Lexicon class and utilities. Provides functions to read/write
lexicon files and convert them to k2 ragged tensors. The Lexicon
class provides a way to convert a list of words to a ragged tensor
containing token IDs. It also stores the lexicon graph which can
be used by a graph compiler to decode sequences.

This code was adjusted, and therefore heavily inspired or taken from
from icefall's (https://github.com/k2-fsa/icefall) Lexicon class and
its utility functions.


Authors:
  * Pierre Champion 2023
  * Zeyu Zhao 2023
  * Georgios Karakasidis 2023
    N)Path)ListOptionalTupleUnion)
get_logger   )k2z<UNK>z<unk>z<eow>z<eps>z^#\d+$DISAMBIG_PATTERNc                	   @   s"  e Zd ZdZdefddZedee fddZ	ede
jfdd	Zd
e
jfddZde
jde
jfddZ			d%dee dee deee  fddZ	d&dee deeee   fddZ	d&dee deeeee    fddZ	d'dee dedefddZdd  Zd(d"efd#d$ZdS ))LexiconaE  
    Unit based lexicon. It is used to map a list of words to each word's
    sequence of tokens (characters). It also stores the lexicon graph which
    can be used by a graph compiler to decode sequences.

    Arguments
    ---------
    lang_dir: str
        Path to the lang directory. It is expected to contain the following
        files:
            - tokens.txt
            - words.txt
            - L.pt

    Example
    -------
    >>> from speechbrain.k2_integration import k2
    >>> from speechbrain.k2_integration.lexicon import Lexicon
    >>> from speechbrain.k2_integration.graph_compiler import CtcGraphCompiler
    >>> from speechbrain.k2_integration.prepare_lang import prepare_lang

    >>> # Create a small lexicon containing only two words and write it to a file.
    >>> lang_tmpdir = getfixture('tmpdir')
    >>> lexicon_sample = '''hello h e l l o\nworld w o r l d'''
    >>> lexicon_file = lang_tmpdir.join("lexicon.txt")
    >>> lexicon_file.write(lexicon_sample)
    >>> # Create a lang directory with the lexicon and L.pt, L_inv.pt, L_disambig.pt
    >>> prepare_lang(lang_tmpdir)
    >>> # Create a lexicon object
    >>> lexicon = Lexicon(lang_tmpdir)
    >>> # Make sure the lexicon was loaded correctly
    >>> assert isinstance(lexicon.token_table, k2.SymbolTable)
    >>> assert isinstance(lexicon.L, k2.Fsa)
    lang_dirc           	         s  t |  _}tj|d  _tj|d  _i  _t|d ddd:}|D ]/}|	 
 d }|	 
 dd  } fd	d
|D }| jvrOg  j|<  j| | q(W d    n1 sbw   Y  d  _|d  rtd| d tjt|d }n	t| d| |d  rtd| d tjt|d }ntd t| }t| |d  | _| _d S )Nz
tokens.txtz	words.txtlexicon.txtrutf-8encodingr   r   c                    s   g | ]} j | qS  )token_table).0tselfr   _/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/speechbrain/k2_integration/lexicon.py
<listcomp>Z   s    z$Lexicon.__init__.<locals>.<listcomp>zL.ptLoading compiled z/L.ptzM/L.pt does not exist. Please make sure you have successfully created L.pt in zLinv.ptz/Linv.ptzConverting L.pt to Linv.pt)r   r   r	   SymbolTable	from_filer   
word_tableword2tokenidsopenstripsplitappend_L_disambigexistsloggerinfoFsa	from_dicttorchloadRuntimeErrorarc_sortinvertsaveas_dictL_invL)	r   r   flinewordtokenstidsr1   r0   r   r   r   __init__N   s>   




zLexicon.__init__returnc                 C   sD   | j j}g }|D ]}t|r|tkr|| j |  q|  |S )zm
        Return a list of token IDs excluding those from
        disambiguation symbols and epsilon.
        )r   symbolsr
   matchEPSr"   sort)r   r9   anssr   r   r   r5   x   s   zLexicon.tokensc                 C   sh   | j du r1td| j d | jd  r&tjt	| jd | _ | j S t
| j d| j | j S )zl
        Return the lexicon FSA (with disambiguation symbols).
        Needed for HLG construction.
        Nr   z/L_disambig.ptzL_disambig.ptz_/L_disambig.pt does not exist. Please make sure you have successfully created L_disambig.pt in )r#   r%   r&   r   r$   r	   r'   r(   r)   r*   r+   r   r   r   r   
L_disambig   s   
zLexicon.L_disambigGc                 C   s   d|j |j | jd k< dS )z
        Remove the disambiguation symbols of a G graph

        Arguments
        ---------
        G: k2.Fsa
            The G graph to be modified
        r   #0N)labelsr   )r   r@   r   r   r   #remove_G_rescoring_disambig_symbols   s   	z+Lexicon.remove_G_rescoring_disambig_symbolsLGc                 C   sd   | j d }| jd }td |j }d|||k< ||_t|jtj	s&J d|jj
|jj
|k< |S )a  
        Remove the disambiguation symbols of an LG graph
        Needed for HLG construction.

        Arguments
        ---------
        LG: k2.Fsa
            The LG graph to be modified

        Returns
        -------
        LG: k2.Fsa
            The modified LG graph
        rA   z%Removing disambiguation symbols on LGr   )r   r   r%   debugrB   clone
isinstance
aux_labelsr	   RaggedTensorvalues)r   rD   first_token_disambig_idfirst_word_disambig_idrB   r   r   r   remove_LG_disambig_symbols   s   



z"Lexicon.remove_LG_disambig_symbolsFNTtextssil_token_idc                    s\   | j ||dd}|r, dusJ dtt|D ]} fdd|| D dd ||< q|S )a.  
        Convert a list of texts into word IDs.

        This method performs the mapping of each word in the input texts to its corresponding ID.
        The result is a list of lists, where each inner list contains the word IDs for a sentence.
        If the `add_sil_token_as_separator` flag is True, a silence token is inserted between words,
        and the `sil_token_id` parameter specifies the ID for the silence token.
        If a word is not found in the vocabulary, a warning is logged if `log_unknown_warning` is True.

        Arguments
        ---------
        texts: List[str]
            A list of strings where each string represents a sentence.
            Each sentence is composed of space-separated words.

        add_sil_token_as_separator: bool
            Flag indicating whether to add a silence token as a separator between words.

        sil_token_id: Optional[int]
            The ID of the silence token. If not provided, the separator is not added.

        log_unknown_warning: bool
            Flag indicating whether to log a warning for unknown words.

        Returns
        -------
        word_ids: List[List[int]]
            A list of lists where each inner list represents the word IDs for a sentence.
            The word IDs are obtained based on the vocabulary mapping.
        r   _mapperNz7sil_token_id=None while add_sil_token_as_separator=Truec                    s   g | ]}| fD ]}|qqS r   r   )r   itemxrO   r   r   r      s
    z-Lexicon.texts_to_word_ids.<locals>.<listcomp>)_texts_to_idsrangelen)r   rN   add_sil_token_as_separatorrO   log_unknown_warningword_idsir   rT   r   texts_to_word_ids   s   %


zLexicon.texts_to_word_idsc                 C   s   | j ||ddS )a  
        Convert a list of text sentences into token IDs.

        Parameters
        ----------
        texts: List[str]
            A list of strings, where each string represents a sentence.
            Each sentence consists of space-separated words.
            Example:
                ['hello world', 'tokenization with lexicon']

        log_unknown_warning: bool
            Flag indicating whether to log warnings for out-of-vocabulary tokens.
            If True, warnings will be logged when encountering unknown tokens.

        Returns
        -------
        token_ids: List[List[List[int]]]
            A list containing token IDs for each sentence in the input.
            The structure of the list is as follows:
            [
                [  # For the first sentence
                    [token_id_1, token_id_2, ..., token_id_n],
                    [token_id_1, token_id_2, ..., token_id_m],
                    ...
                ],
                [  # For the second sentence
                    [token_id_1, token_id_2, ..., token_id_p],
                    [token_id_1, token_id_2, ..., token_id_q],
                    ...
                ],
                ...
            ]
            Each innermost list represents the token IDs for a word in the sentence.
        r   rP   rV   r   rN   rZ   r   r   r   texts_to_token_ids   s   (zLexicon.texts_to_token_idsc                 C   s   | j ||dddS )a  
        Convert a list of input texts to token IDs with multiple pronunciation variants.

        This method converts input texts into token IDs, considering multiple pronunciation variants.
        The resulting structure allows for handling various pronunciations of words within the given texts.

        Arguments
        ---------
        texts: List[str]
            A list of strings, where each string represents a sentence for an utterance.
            Each sentence consists of space-separated words.

        log_unknown_warning: bool
            Indicates whether to log warnings for out-of-vocabulary (OOV) tokens.
            If set to True, warnings will be logged for OOV tokens during the conversion.

        Returns
        -------
        token_ids: List[List[List[List[int]]]]
            A nested list structure containing token IDs for each utterance. The structure is as follows:
            - Outer List: Represents different utterances.
            - Middle List: Represents different pronunciation variants for each utterance.
            - Inner List: Represents the sequence of token IDs for each pronunciation variant.
            - Innermost List: Represents the token IDs for each word in the sequence.
        r   T)rQ   _multiple_pronunciationr^   r_   r   r   r   .texts_to_token_ids_with_multiple_pronunciation!  s   z6Lexicon.texts_to_token_ids_with_multiple_pronunciationrZ   rQ   c              	   C   s   | j t }|dkr| jt g}t| |}g }|D ]B}g }	| }
t|
D ]0\}}||v rA|| }t|tr;|s;|d }|		| q$|		| |rTt
d| d| d q$|	|	 q|S )a  
        Convert a list of texts to a list of IDs, which can be either word IDs or
        a list of token IDs.

        Arguments
        ---------
        texts: List[str]
            A list of strings where each string consists of space-separated words.
            Example:
                ['hello world', 'tokenization with lexicon']

        log_unknown_warning: bool
            Log a warning if a word is not found in the token-to-IDs mapping.

        _mapper: str
            The mapper to use, either "word_table" (e.g., "TEST" -> 176838) or
            "word2tokenids" (e.g., "TEST" -> [23, 8, 22, 23]).

        _multiple_pronunciation: bool
            Allow returning all pronunciations of a word from the lexicon.
            If False, only return the first pronunciation.

        Returns
        -------
        ids_list: List[List[int] or int]
            Returns a list-of-list of word IDs or a list of token IDs.
        r   r   zCannot find word z in the mapper zG. Replacing it with OOV token. Note that it is fine if you are testing.)r   UNKr   UNK_tgetattrr!   	enumeraterG   listr"   r%   warning)r   rN   rZ   rQ   ra   oov_token_ididsids_listtextr[   wordsr\   r4   idwordr   r   r   rV   F  s0   
"

zLexicon._texts_to_idsc                 C   s<   t | j| _t | j| _| jdurt | j| _dS dS )z@
        Sort L, L_inv, L_disambig arcs of every state.
        N)r	   r,   r1   r0   r#   r   r   r   r   r,     s
   
zLexicon.arc_sortcpudevicec                 C   s<   | j || _ | j|| _| jdur| j|| _dS dS )z
        Device to move L, L_inv and L_disambig to

        Arguments
        ---------
        device: str
            The device
        N)r1   tor0   r#   )r   rp   r   r   r   rq     s
   	
z
Lexicon.to)FNT)T)F)ro   )__name__
__module____qualname____doc__r   r7   propertyr   intr5   r	   r'   r?   rC   rM   strr   r]   r`   rb   boolrV   r,   rq   r   r   r   r   r   *   sV    #
*"

5
/
*
?	r   wrdTc              	   C   s  t  }t|dkrP|D ]D}t|ddd3}t|}|D ]#}	|	|  }
|
D ]}||vr>|r8t|tg ||< q&t|||< q&qW d   n1 sJw   Y  q|D ];}t|dd+}|D ] }|  d }||vr}|rwt|tg ||< q]t|||< q]W d   n1 sw   Y  qRt	j
| dd tt	j| dd	dd*}t d
t d}|D ]}||d
 d
||  d 7 }q|| W d   dS 1 sw   Y  dS )a	  
    Read extra_csv_files to generate a $lang_dir/lexicon.txt for k2 training.
    This usually includes the csv files of the training set and the dev set in the
    output_folder. During training, we need to make sure that the lexicon.txt contains
    all (or the majority of) the words in the training set and the dev set.

    NOTE: This assumes that the csv files contain the transcription in the last column.

    Also note that in each csv_file, the first line is the header, and the remaining
    lines are in the following format:

    ID, duration, wav, spk_id, wrd (transcription)

    We only need the transcription in this function.

    Writes out $lang_dir/lexicon.txt

    Note that the lexicon.txt is a text file with the following format:
    word1 phone1 phone2 phone3 ...
    word2 phone1 phone2 phone3 ...

    In this code, we simply use the characters in the word as the phones.
    You can use other phone sets, e.g., phonemes, BPEs, to train a better model.

    Arguments
    ---------
    lang_dir: str
        The directory to store the lexicon.txt
    vocab_files: List[str]
        A list of extra vocab files. For example, for librispeech this could be the
        librispeech-vocab.txt file.
    extra_csv_files: List[str]
        A list of csv file paths
    column_text_key: str
        The column name of the transcription in the csv file. By default, it is "wrd".
    add_word_boundary: bool
        whether to add word boundary symbols <eow> at the end of each line to the
        lexicon for every word.

    Example
    -------
    >>> from speechbrain.k2_integration.lexicon import prepare_char_lexicon
    >>> # Create some dummy csv files containing only the words `hello`, `world`.
    >>> # The first line is the header, and the remaining lines are in the following
    >>> # format:
    >>> # ID, duration, wav, spk_id, wrd (transcription)
    >>> csv_file = getfixture('tmpdir').join("train.csv")
    >>> # Data to be written to the CSV file.
    >>> import csv
    >>> data = [
    ...    ["ID", "duration", "wav", "spk_id", "wrd"],
    ...    [1, 1, 1, 1, "hello world"],
    ...    [2, 0.5, 1, 1, "hello"]
    ... ]
    >>> with open(csv_file, "w", newline="", encoding="utf-8")  as f:
    ...    writer = csv.writer(f)
    ...    writer.writerows(data)
    >>> extra_csv_files = [csv_file]
    >>> lang_dir = getfixture('tmpdir')
    >>> vocab_files = []
    >>> prepare_char_lexicon(lang_dir, vocab_files, extra_csv_files=extra_csv_files, add_word_boundary=False)
    r   r   r   r   NT)exist_okr   w 
)dictrX   r   csv
DictReaderr!   rg   EOWr    osmakedirspathjoinrc   rd   write)r   vocab_filesextra_csv_filescolumn_text_keyadd_word_boundarylexiconfiler2   
csv_readerrowrm   r4   r3   fcr   r   r   prepare_char_lexicon  sN   F
 "r   filenamer8   c              	   C   s   g }t | dddU}td}|D ]D}||d}t|dkr"qt|dk r3td| d	|  d
|d }|tkrHtd| d	|  t d|dd }|||f qW d   |S 1 saw   Y  |S )a  
    Read a lexicon from `filename`.

    Each line in the lexicon contains "word p1 p2 p3 ...".
    That is, the first field is a word and the remaining
    fields are tokens. Fields are separated by space(s).

    Arguments
    ---------
    filename: str
        Path to the lexicon.txt

    Returns
    -------
    ans:
        A list of tuples., e.g., [('w', ['p1', 'p2']), ('w1', ['p3, 'p4'])]
    r   r   r   z[ 	]+z 	
r      zFound bad line z in lexicon file z3Every line is expected to contain at least 2 fieldsz should not be a valid wordr   N)	r   recompiler!   r    rX   r+   r;   r"   )r   r=   r2   
whitespacer3   ar4   r5   r   r   r   read_lexicon  s2   

r   r   c              	   C   s^   t | ddd}|D ]\}}|| dd| d q
W d   dS 1 s(w   Y  dS )z
    Write a lexicon to a file.

    Arguments
    ---------
    filename: str
        Path to the lexicon file to be generated.
    lexicon: List[Tuple[str, List[str]]]
        It can be the return value of :func:`read_lexicon`.
    r|   r   r   r}   r~   N)r   r   r   )r   r   r2   r4   r5   r   r   r   write_lexicon2  s
   "r   ) ru   r   r   r   pathlibr   typingr   r   r   r   r)   speechbrain.utils.loggerr    r	   rr   r%   rc   rd   r   r;   r   r
   Pattern__annotations__objectr   r   rx   r   r   r   r   r   r   <module>   s@      x
"k*
