o
    ei#W                     @   s<   d Z ddlZddlmZ G dd dejZG dd dZdS )zDTools for homograph disambiguation
Authors
 * Artem Ploujnikov 2021
    N)nnc                       sd   e Zd ZdZd fdd	Zedd Zejdd Zedd	 Zejd
d	 Z		dddZ	  Z
S )SubsequenceLossa	  
    A loss function for a specific word in the output, used in
    the homograph disambiguation task
    The approach is as follows:
    1. Arrange only the target words from the original batch into a
    single tensor
    2. Find the word index of each target word
    3. Compute the beginnings and endings of words in the predicted
    sequences. The assumption is that the model has been trained well
    enough to identify word boundaries with a simple argmax without
    having to perform a beam search.
    Important! This loss can be used for fine-tuning only
    The model is expected to be able to already be able
    to correctly predict word boundaries

    Arguments
    ---------
    seq_cost: callable
        the loss to be used on the extracted subsequences
    word_separator: int
        the index of the "space" character (in phonemes)
    word_separator_base: str
        the index of word separators used in unprocessed
        targets (if different, used with tokenizations)

    Example
    -------
    >>> import torch
    >>> from speechbrain.lobes.models.g2p.homograph import SubsequenceLoss
    >>> from speechbrain.nnet.losses import nll_loss
    >>> loss = SubsequenceLoss(
    ...     seq_cost=nll_loss
    ... )
    >>> phns = torch.Tensor(
    ...     [[1, 2, 0, 1, 3, 0, 2, 1, 0],
    ...      [2, 1, 3, 0, 1, 2, 0, 3, 2]]
    ... )
    >>> phn_lens = torch.IntTensor([8, 9])
    >>> subsequence_phn_start = torch.IntTensor([3, 4])
    >>> subsequence_phn_end = torch.IntTensor([5, 7])
    >>> p_seq = torch.Tensor([
    ...     [[0., 1., 0., 0.],
    ...      [0., 0., 1., 0.],
    ...      [1., 0., 0., 0.],
    ...      [0., 1., 0., 0.],
    ...      [0., 0., 0., 1.],
    ...      [1., 0., 0., 0.],
    ...      [0., 0., 1., 0.],
    ...      [0., 1., 0., 0.],
    ...      [1., 0., 0., 0.]],
    ...     [[0., 0., 1., 0.],
    ...      [0., 1., 0., 0.],
    ...      [0., 0., 0., 1.],
    ...      [1., 0., 0., 0.],
    ...      [0., 1., 0., 0.],
    ...      [0., 0., 1., 0.],
    ...      [1., 0., 0., 0.],
    ...      [0., 0., 0., 1.],
    ...      [0., 0., 1., 0.]]
    ... ])
    >>> loss_value = loss(
    ...    phns,
    ...    phn_lens,
    ...    p_seq,
    ...    subsequence_phn_start,
    ...    subsequence_phn_end
    ... )
    >>> loss_value
    tensor(-0.8000)
    r   c                    s    t    || _t||| _d S N)super__init__seq_costSubsequenceExtractor_subsequence_extractor)selfr   word_separatorword_separator_base	__class__ d/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/speechbrain/lobes/models/g2p/homograph.pyr   R   s
   

zSubsequenceLoss.__init__c                 C      | j jS z/
        The word separator being used
        r	   r   r
   r   r   r   r   Y      zSubsequenceLoss.word_separatorc                 C      || j _dS )z)
        Sets the word separator
        Nr   r
   valuer   r   r   r   `      c                 C   r   r   r	   r   r   r   r   r   r   g   r   z#SubsequenceLoss.word_separator_basec                 C   r   )z.
        Sets the base word separator
        Nr   r   r   r   r   r   n   r   Nc              	   C   s*   |  |||||||\}}	}
| ||	|
S )aR  
        Evaluates the subsequence loss

        Arguments
        ---------
        phns: torch.Tensor
            the phoneme tensor (batch x length)
        phn_lens: torch.Tensor
            the phoneme length tensor
        p_seq: torch.Tensor
            the output phoneme probability tensor
            (batch x length x phns)
        subsequence_phn_start: torch.Tensor
            the beginning of the target subsequence
            (i.e. the homograph)
        subsequence_phn_end: torch.Tensor
            the end of the target subsequence
            (i.e. the homograph)
        phns_base: torch.Tensor
            the phoneme tensor (not preprocessed)
        phn_lens_base: torch.Tensor
            the phoneme lengths (not preprocessed)

        Returns
        -------
        loss: torch.Tensor
            the loss tensor
        )r	   r   )r
   phnsphn_lensp_seqsubsequence_phn_startsubsequence_phn_end	phns_basephn_lens_basep_seq_subsequencephns_subsequencesubsequence_lengthsr   r   r   forwardu   s    *
zSubsequenceLoss.forward)r   r   NN)__name__
__module____qualname____doc__r   propertyr   setterr   r%   __classcell__r   r   r   r   r   
   s    G



r   c                   @   s|   e Zd ZdZdddZdd Z		ddd	Zd
d Zdd Zdd Z		dddZ
	dddZdd Z	dddZdd ZdS )r   a  
    A utility class to help extract subsequences out of a batch
    of sequences

    Arguments
    ---------
    word_separator: int
        the index of the word separator (used in p_seq)
    word_separator_base: int
        the index of word separators used in unprocessed
        targets (if different)

    Example
    -------
    >>> import torch
    >>> from speechbrain.lobes.models.g2p.homograph import SubsequenceExtractor
    >>> extractor = SubsequenceExtractor()
    >>> phns = torch.Tensor(
    ...     [[1, 2, 0, 1, 3, 0, 2, 1, 0],
    ...      [2, 1, 3, 0, 1, 2, 0, 3, 2]]
    ... )
    >>> phn_lens = torch.IntTensor([8, 9])
    >>> subsequence_phn_start = torch.IntTensor([3, 4])
    >>> subsequence_phn_end = torch.IntTensor([5, 7])
    >>> p_seq = torch.Tensor([
    ...     [[0., 1., 0., 0.],
    ...      [0., 0., 1., 0.],
    ...      [1., 0., 0., 0.],
    ...      [0., 1., 0., 0.],
    ...      [0., 0., 0., 1.],
    ...      [1., 0., 0., 0.],
    ...      [0., 0., 1., 0.],
    ...      [0., 1., 0., 0.],
    ...      [1., 0., 0., 0.]],
    ...     [[0., 0., 1., 0.],
    ...      [0., 1., 0., 0.],
    ...      [0., 0., 0., 1.],
    ...      [1., 0., 0., 0.],
    ...      [0., 1., 0., 0.],
    ...      [0., 0., 1., 0.],
    ...      [1., 0., 0., 0.],
    ...      [0., 0., 0., 1.],
    ...      [0., 0., 1., 0.]]
    ... ])
    >>> extractor.extract_seq(
    ...    phns,
    ...    phn_lens,
    ...    p_seq,
    ...    subsequence_phn_start,
    ...    subsequence_phn_end
    ... )
    (tensor([[[0., 1., 0., 0.],
             [0., 0., 0., 1.],
             [0., 0., 0., 0.]],
    <BLANKLINE>
            [[0., 1., 0., 0.],
             [0., 0., 1., 0.],
             [0., 0., 0., 0.]]]), tensor([[1., 3., 0.],
            [1., 2., 0.]]), tensor([0.6667, 1.0000]))
    r   Nc                 C   s   || _ |d u r	|}|| _d S r   )r   r   )r
   r   r   r   r   r   r      s   
zSubsequenceExtractor.__init__c                 O   s   | j |i |S r   )extract_seq)r
   argskwargsr   r   r   __call__   s   zSubsequenceExtractor.__call__c                 C   sn  d}|du r|du r|}|}n|du s|du rt dd}|d}	|d|  d}
|| }| }| ||}| ||}|d}tjj	|ddd|f}|d}tj
|d|jd|}tj
||jd|d|}| j|||| j|d	}|r| ||||
\}}n||k||| k @ }|| |j}d
|||dk< | ||||	}|||| fS )ag  
        Extracts the subsequence from the complete sequence

        Arguments
        ---------
        phns: torch.Tensor
            the phoneme tensor (batch x length)
        phn_lens: torch.Tensor
            the phoneme length tensor
        p_seq: torch.Tensor
            the output phoneme probability tensor
            (batch x length x phns)
        subsequence_phn_start: torch.Tensor
            the beginning of the target subsequence
            (i.e. the homograph)
        subsequence_phn_end: torch.Tensor
            the end of the target subsequence
            (i.e. the homograph)
        phns_base: torch.Tensor
            the phoneme tensor (not preprocessed)
        phn_base_lens: torch.Tensor
            the phoneme lengths (not preprocessed)

        Returns
        -------
        p_seq_subsequence: torch.Tensor
            the output subsequence (of probabilities)
        phns_subsequence: torch.Tensor
            the target subsequence
        subsequence_lengths: torch.Tensor
            subsequence lengths, expressed as a fraction
            of the tensor's last dimension

        FNzDphn_base and phn_lens_base, if provided, should be provided togetherT   r   device)r           )
ValueErrorsizelong	unsqueezemax_pad_subsequencetorchr   
functionalpadaranger5   	expand_asexpand_get_target_word_indexesr   _get_phns_subsequencereshapeshape_get_p_seq_subsequence)r
   r   r   r   r   r   r    phn_base_lenshas_base
p_seq_edge	phns_edger$   longest_subsequence	p_seq_padsubsequence_phn_start_unsqrange_phns_baserange_phns_subsequencetarget_word_indexesr#   matchr"   r   r   r   r.      sp   ,



z SubsequenceExtractor.extract_seqc                 C   s    |dkrt jj|d|f}|S )aa  Pads a subsequence to the length of the longest subsequence

        Arguments
        ---------
        sequence: torch.Tensor
            the sequence to be padded
        longest_subsequence: int
            the length of the longest subsequence

        Returns
        -------
        sequence: torch.Tensor
            The padded sequence
        r   )r=   r   r>   r?   )r
   sequencerL   r   r   r   r<   g  s
   z%SubsequenceExtractor._pad_subsequencec                 C   s   |  |||\}}|d}|d}tj|d|jdd|}	|	|k|	|| k @ }
||
 |d|}tj|d|jdd|}d|||| k< t|| t	|d}||fS )a  Extracts a subsequence

        Arguments
        ---------
        phns: torch.Tensor
            a tensor of phoneme indexes
        target_word_indexes: torch.Tensor
            a tensor of word indexes to extract, zero-based
            (e.g.) torch.IntTensor([2, 3])  means extracting
            the third word from the first sample and the
            fourth word from the second sample
        longest_subsequence: int
            the length of the longest subsequence
        edge: int
            the index of the "edge" of the sequence

        Returns
        -------
        phn_subsequence: torch.Tensor
            a tensor with only the target words
        subsequence_lengths: torch.Tensor
            the lengths of the extracted words
        r3   r2   r4   r   r6   )
_get_word_boundariesr:   r=   r@   r8   r5   rA   viewminimumtensor)r
   r   rQ   rL   edge
word_startword_endword_start_unsqword_end_unsq
phns_range	phn_matchr#   phns_subsequence_ranger$   r   r   r   rD   |  s:   



	
z*SubsequenceExtractor._get_phns_subsequencec                 C   s   |  |||\}}tj|d|jddd|}|dd}|dd}	||k||| k @ }
||
 |d||d}tj|d|jddd|}d|||	| k< |S )a:  Extracts a subsequence out of a tensor of probabilities

        Arguments
        ---------
        p_seq: torch.Tensor
            a tensor of phoneme probabilities
            (batch x sequence index x phoneme index)
        target_word_indexes: torch.Tensor
            a tensor of word indexes to extract, zero-based
            (e.g.) torch.IntTensor([2, 3])  means extracting
            the third word from the first sample and the
            fourth word from the second sample
        longest_subsequence: int
            the length of the longest subsequence
        edge: int
            the index of the "edge" of the sequence

        Returns
        -------
        p_seq_subsequence: torch.Tensor
            a probability tensor composed of the phoneme
            probabilities for target words only
        r2   r4   r   r3   r6   )rT   r=   r@   r8   r5   r:   rA   rU   )r
   r   rQ   rL   rX   rY   rZ   p_seq_ranger[   r\   r^   r"   p_seq_subsequence_ranger   r   r   rG     s8   


z+SubsequenceExtractor._get_p_seq_subsequencec           	      C   sL   |dur|| d|d  knd}||k ||k|B @ }|jdd}|S )a%  Computes the target word indexes

        Arguments
        ---------
        phns: torch.Tensor
            a phoneme batch tensor
        range_phns: torch.Tensor
            a range tensor over thephoneme sequence
        start: torch.Tensor
            the beginning of the subsequence
        word_separator: int
            the word separator being used
        phn_lens: torch.Tensor
            Lengths corresponding to input phns

        Returns
        -------
        word_indexes: torch.Tensor
            the word index tensor
        Nr3   r2   Fdim)r:   r8   r9   sum)	r
   r   
range_phnsstartr   r   end_of_sequenceword_boundariesword_indexesr   r   r   rC     s   
z-SubsequenceExtractor._get_target_word_indexesc                 C   s   |du r| j }| dkr|dn|}tj|d|jd|}||k||kB }|jdd}||	dk}	| 
|	|tj|}
| 
|	|tjd}|
|fS )a&  Determines the word boundaries for the specified
        word indexes within a sequence

        Arguments
        ---------
        seq: torch.Tensor
            a sequence (phonemes or graphemes)
        word_indexes: torch.Tensor
            the word indexes
        edge: int
            a tensor indicating the last position
        word_separator: int
            the word separator token

        Returns
        -------
        start: torch.Tensor
            word start indexes
        end: torch.Tensor
            word end indexes
        N   r3   r4   rb   r   )r   rc   argmaxr=   r@   r8   r5   rA   cumsumr:   _get_positionsminr;   )r
   seqri   rX   r   tokenswords_rangerh   wordsindex_matchrf   endr   r   r   rT     s   z)SubsequenceExtractor._get_word_boundariesc                 C   s2   t |||}||ddj}t |dkd|d S )a  A helper method to calculate start or end positions corresponding
        to specific words

        Arguments
        ---------
        index_match: torch.Tensor
            a mask where positions matching the word index are
            indicated as a 1 and the remaining positions are 0
        words_range: torch.Tensor
            a range tensor over the tokens
        aggregation: callable
            the aggregation to use (torch.min or torch.max)
        no_match_value: int
            the value to output if no match is found (this could
            happen when searching in model outputs rather than
            in source data)

        Returns
        -------
        Start or end positions of specific words.
        r3   rb   r   r2   )r=   wherevalues)r
   rs   rq   aggregationno_match_value	positionsr   r   r   rm   ?  s   z#SubsequenceExtractor._get_positionsFc           	         sn   t j|d|jd|} |||d|r jn j} fdd|D } fddt	|||D }|S )a  Extracts a subsequence from hypotheses (e.g. the result of a beam
        search) based on a reference sequence, which can be either a sequence of phonemes (the target during training)

        Arguments
        ---------
        ref_seq: torch.Tensor
            a reference sequence (e.g. phoneme targets)
        hyps: list
            a batch of hypotheses, a list of list of
            integer indices (usually of phonemes)
        subsequence_phn_start: torch.Tensor
            the index of the beginning of the subsequence to
        use_base: bool
            whether to use the raw (token) space for word separators

        Returns
        -------
        result: torch.Tensor
            The extracted subsequence.
        r2   r4   r3   c                    s.   g | ]}d g fddt |D  dg qS )r3   c                    s   g | ]\}}| j kr|qS r   )r   ).0idxphnr   r   r   
<listcomp>}  s
    
z@SubsequenceExtractor.extract_hyps.<locals>.<listcomp>.<listcomp>N)	enumerate)rz   	item_hypsr   r   r   r}   {  s    
z5SubsequenceExtractor.extract_hyps.<locals>.<listcomp>c                    s    g | ]\}}}  |||qS r   )_extract_hyp_word)rz   r   item_separator_indexes
word_indexr   r   r   r}     s    )
r=   r@   r8   r5   rA   rC   r:   r   r   zip)	r
   ref_seqhypsr   use_basere   rQ   separator_indexesresultr   r   r   extract_hyps[  s(   


z!SubsequenceExtractor.extract_hypsc                 C   sL   |t |k r"|| }|du rdS |d7 }||d  }||| }|S g }|S )a  Extracts a single word out of a hypothesis sequence

        Arguments
        ---------
        hyps: list
            a hypotheses list (or tensor)
        separator_indexes: torch.Tensor
            a tensor of word separators
        word_index: int
            the index of the word to eb retrieved

        Returns
        -------
        result: list|str
            the extracted word
        N r2   )len)r
   r   r   r   leftrightr   r   r   r   r     s   z&SubsequenceExtractor._extract_hyp_word)r   Nr&   r   )F)r'   r(   r)   r*   r   r1   r.   r<   rD   rG   rC   rT   rm   r   r   r   r   r   r   r      s$    
=

s::
#
.
4r   )r*   r=   r   Moduler   r   r   r   r   r   <module>   s     $