o
    iB                     @   s   d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
Z
d d	lmZ d d
lmZ d dlmZ G dd deZG dd de
jjZdS )    )chainN)Any)Dict)List)
NamedTuple)Tuple)Union)
end_detect)PartialScorerInterface)ScorerInterfacec                   @   sv   e Zd ZU dZejed< dZee	ejf ed< e
 Zeeee	ejf f ed< e
 Zeeef ed< de
fdd	Zd
S )
HypothesiszHypothesis data type.yseqr   scorescoresstatesreturnc                 C   s0   | j | j t| jdd | j D d S )z#Convert data to JSON-friendly dict.c                 S   s   i | ]	\}}|t |qS  )float).0kvr   r   T/home/ubuntu/.local/lib/python3.10/site-packages/funasr/models/transformer/search.py
<dictcomp>   s    z%Hypothesis.asdict.<locals>.<dictcomp>)r   r   r   )_replacer   tolistr   r   r   items_asdictselfr   r   r   asdict   s   zHypothesis.asdictN)__name__
__module____qualname____doc__torchTensor__annotations__r   r   r   dictr   r   strr   r   r   r   r   r   r   r      s   
 
 r   c                       s   e Zd ZdZ			d7deeef deeef dededed	ed
e	e dedef fddZ
dejde	e fddZedejdedejfddZdedejdeeeejf eeef f fddZdedejdejdeeeejf eeef f fddZdejdejdeejejf fddZedeeef d eeejf d!ed"eeejf d#edeeejf fd$d%Zd&ed'ed#edefd(d)Zd*e	e dejde	e fd+d,Z	-d8dejd.ed/ede	e fd0d1Zd2ed3ed.ed*e	e d4e	e de	e fd5d6Z  ZS )9
BeamSearchzBeam search implementation.N      ?scorersweights	beam_size
vocab_sizesoseos
token_listpre_beam_ratiopre_beam_score_keyc
                    sR  t    || _t | _t | _t | _tj	 | _
| D ]E\}
}||
d}|dks0|du r1qt|tsBJ |
 dt| d|| j|
< t|trR|| j|
< n|| j|
< t|tjjrc|| j
|
< q|| _|| _|| _t|| | _|| _|| _|	dur|	dkr|	| jvrt|	 d| j |	| _| jduo| j| jk ot| jdk| _dS )aT  Initialize beam search.

        Args:
            scorers (dict[str, ScorerInterface]): Dict of decoder modules
                e.g., Decoder, CTCPrefixScorer, LM
                The scorer will be ignored if it is `None`
            weights (dict[str, float]): Dict of weights for each scorers
                The scorer will be ignored if its weight is 0
            beam_size (int): The number of hypotheses kept during search
            vocab_size (int): The number of vocabulary
            sos (int): Start of sequence id
            eos (int): End of sequence id
            token_list (list[str]): List of tokens for debug log
            pre_beam_score_key (str): key of scores to perform pre-beam search
            pre_beam_ratio (float): beam size in the pre-beam search
                will be `int(pre_beam_ratio * beam_size)`

        r   Nz (z$) does not implement ScorerInterfacefullz is not found in )super__init__r,   r'   r+   full_scorerspart_scorersr$   nn
ModuleDictnn_dictr   get
isinstancer   typer
   Moduler/   r0   r1   intpre_beam_sizer-   n_vocabKeyErrorr3   lendo_pre_beam)r   r+   r,   r-   r.   r/   r0   r1   r2   r3   r   r   w	__class__r   r   r6   %   sL   







zBeamSearch.__init__xr   c                 C   sX   t  }t  }| j D ]\}}||||< d||< qtd||tj| jg|jddgS )zGet an initial hypothesis data.

        Args:
            x (torch.Tensor): The encoder output feature

        Returns:
            Hypothesis: The initial hypothesis.

                device)r   r   r   r   )	r'   r+   r   
init_stater   r$   tensorr/   rL   )r   rI   init_statesinit_scoresr   dr   r   r   init_hypo   s   

zBeamSearch.init_hypxsc                 C   s$   t j|g| j| jd}t | |fS )zAppend new token to prefix tokens.

        Args:
            xs (torch.Tensor): The prefix token
            x (int): The new token to append

        Returns:
            torch.Tensor: New tensor contains: xs + [x] with xs.dtype and xs.device

        dtyperL   )r$   rN   rU   rL   cat)rS   rI   r   r   r   append_token   s   zBeamSearch.append_tokenhypc                 C   sJ   t  }t  }| j D ]\}}||j|j| |\||< ||< q||fS )a  Score new hypothesis by `self.full_scorers`.

        Args:
            hyp (Hypothesis): Hypothesis with prefix tokens to score
            x (torch.Tensor): Corresponding input feature

        Returns:
            Tuple[Dict[str, torch.Tensor], Dict[str, Any]]: Tuple of
                score dict of `hyp` that has string keys of `self.full_scorers`
                and tensor score values of shape: `(self.n_vocab,)`,
                and state dict that has string keys
                and state values of `self.full_scorers`

        )r'   r7   r   r   r   r   )r   rX   rI   r   r   r   rQ   r   r   r   
score_full   s
   $zBeamSearch.score_fullidsc                 C   sL   t  }t  }| j D ]\}}||j||j| |\||< ||< q||fS )aa  Score new hypothesis by `self.part_scorers`.

        Args:
            hyp (Hypothesis): Hypothesis with prefix tokens to score
            ids (torch.Tensor): 1D tensor of new partial tokens to score
            x (torch.Tensor): Corresponding input feature

        Returns:
            Tuple[Dict[str, torch.Tensor], Dict[str, Any]]: Tuple of
                score dict of `hyp` that has string keys of `self.part_scorers`
                and tensor score values of shape: `(len(ids),)`,
                and state dict that has string keys
                and state values of `self.part_scorers`

        )r'   r8   r   score_partialr   r   )r   rX   rZ   rI   r   r   r   rQ   r   r   r   r[      s
   &zBeamSearch.score_partialweighted_scoresc                 C   sz   | d| dkr|| jd }||fS || }td |dd< |||< || jd }|| | jd }||fS )a  Compute topk full token ids and partial token ids.

        Args:
            weighted_scores (torch.Tensor): The weighted sum scores for each tokens.
            Its shape is `(self.n_vocab,)`.
            ids (torch.Tensor): The partial token ids to compute topk

        Returns:
            Tuple[torch.Tensor, torch.Tensor]:
                The topk full token ids and partial token ids.
                Their shapes are `(self.beam_size,)`

        r      infN)sizetopkr-   r   )r   r\   rZ   top_idstmp	local_idsr   r   r   beam   s   zBeamSearch.beamprev_scoresnext_full_scoresfull_idxnext_part_scorespart_idxc                 C   sV   t  }| D ]\}}| | ||  ||< q| D ]\}}| | ||  ||< q|S )a  Merge scores for new hypothesis.

        Args:
            prev_scores (Dict[str, float]):
                The previous hypothesis scores by `self.scorers`
            next_full_scores (Dict[str, torch.Tensor]): scores by `self.full_scorers`
            full_idx (int): The next token id for `next_full_scores`
            next_part_scores (Dict[str, torch.Tensor]):
                scores of partial tokens by `self.part_scorers`
            part_idx (int): The new token id for `next_part_scores`

        Returns:
            Dict[str, torch.Tensor]: The new score dict.
                Its keys are names of `self.full_scorers` and `self.part_scorers`.
                Its values are scalar tensors by the scorers.

        )r'   r   )re   rf   rg   rh   ri   
new_scoresr   r   r   r   r   merge_scores   s   zBeamSearch.merge_scoresr   part_statesc                 C   sL   t  }| D ]\}}|||< q| j D ]\}}||| |||< q|S )a  Merge states for new hypothesis.

        Args:
            states: states of `self.full_scorers`
            part_states: states of `self.part_scorers`
            part_idx (int): The new token id for `part_scores`

        Returns:
            Dict[str, torch.Tensor]: The new score dict.
                Its keys are names of `self.full_scorers` and `self.part_scorers`.
                Its values are states of the scorers.

        )r'   r   r8   select_state)r   r   rl   ri   
new_statesr   r   rQ   r   r   r   merge_states  s   
zBeamSearch.merge_statesrunning_hypsc                 C   s`  g }t j| j|jd}|D ]}t j| j|j|jd}| ||\}}| jD ]}	|| j|	 ||	  7 }q%| j	rK| j
dkr=|n|| j
 }
t |
| jd }| |||\}}| jD ]}	||  | j|	 ||	  7  < qW||j7 }t| || D ]#\}}|t|| | |j|| |j||||| |||d qvt|dd dd	d
tt|| j }q|S )a"  Search new tokens for running hypotheses and encoded speech x.

        Args:
            running_hyps (List[Hypothesis]): Running hypotheses on beam
            x (torch.Tensor): Encoded speech feature (T, D)

        Returns:
            List[Hypotheses]: Best sorted hypotheses

        rK   rT   r4   r]   )r   r   r   r   c                 S      | j S Nr   rI   r   r   r   <lambda>E      z#BeamSearch.search.<locals>.<lambda>TkeyreverseN)r$   arangerB   rL   zerosrU   rY   r7   r,   rE   r3   r`   rA   r[   r8   r   ziprd   appendr   rW   r   rk   r   ro   sortedminrD   r-   )r   rp   rI   	best_hypspart_idsrX   r\   r   r   r   pre_beam_scorespart_scoresrl   jpart_jr   r   r   search  s<   


 

zBeamSearch.searchrJ   maxlenratiominlenratioc              
      s^  |dkr
|j d }n|dk rdt| }ntdt||d }t||d }tdt|j d   tdt|  tdt|   |}g }t|D ]I}t	dt|   
||}	 ||||	|}|dkrtd	d
 |D |rtd|   nt|dkrtd  nt	dt|  qSt|dd dd}
t|
dkrtd |dk rg S  ||td|d S |
d }	|	j D ]\}}t|dd j| dd| j|  dd|  qtd|	jd td|	jt|	j d tdt|
   jdur-tdd fd d
|	jdd D  d!  |
S )"aW  Perform beam search.

        Args:
            x (torch.Tensor): Encoded speech feature (T, D)
            maxlenratio (float): Input length ratio to obtain max output length.
                If maxlenratio=0.0 (default), it uses a end-detect function
                to automatically find maximum hypothesis lengths
                If maxlenratio<0.0, its absolute value is interpreted
                as a constant max output length.
            minlenratio (float): Input length ratio to obtain min output length.

        Returns:
            list[Hypothesis]: N-best decoding results

        r   r]   zdecoder input length: zmax output length: zmin output length: z	position rJ   c                 S   s   g | ]}|  qS r   )r   r   hr   r   r   
<listcomp>q  s    z&BeamSearch.forward.<locals>.<listcomp>zend detected at zno hypothesis. Finish decoding.zremained hypotheses: c                 S   rq   rr   rs   rt   r   r   r   ru   z  rv   z$BeamSearch.forward.<locals>.<lambda>Trw   zOthere is no N-best results, perform recognition again with smaller minlenratio.g?z6.2fz * 3z = z for ztotal log probability: z.2fznormalized log probability: z"total number of ended hypotheses: Nbest hypo:  c                       g | ]} j | qS r   r1   r   rI   r   r   r   r         
)shaper@   maxr_   logginginfor(   rR   rangedebugr   post_processr	   rD   r~   warningforwardr   r   r,   r   r   r1   join)r   rI   r   r   maxlenminlenrp   
ended_hypsibest
nbest_hypsr   r   r   r   r   r   J  sV   

8(zBeamSearch.forwardr   r   r   c              	      s  t dt|   jdur't dd fdd|d jdd D   ||d kr;t d	  fd
d|D }g }|D ]D}|jd  jkr~t j	
  j
 D ]#\}}	|	|j| }
|j|  |
7  < |j|j j| |
  d}qT|| q?|| q?|S )a   Perform post-processing of beam search iterations.

        Args:
            i (int): The length of hypothesis tokens.
            maxlen (int): The maximum length of tokens in beam search.
            maxlenratio (int): The maximum length ratio in beam search.
            running_hyps (List[Hypothesis]): The running hypotheses in beam search.
            ended_hyps (List[Hypothesis]): The ended hypotheses in beam search.

        Returns:
            List[Hypothesis]: The new running hypotheses.

        z"the number of running hypotheses: Nr   r   c                    r   r   r   r   r   r   r   r     r   z+BeamSearch.post_process.<locals>.<listcomp>r   r]   z-adding <eos> in the last position in the loopc                    s$   g | ]}|j  |j jd qS ))r   )r   rW   r   r0   r   r   r   r   r     s    r   rs   )r   r   rD   r1   r   r   r   r0   r   r7   r   r8   final_scorer   r   r   r   r,   r}   )r   r   r   r   rp   r   remained_hypsrX   r   rQ   sr   r   r   r     s(   
(

zBeamSearch.post_process)Nr*   N)rJ   rJ   )r    r!   r"   r#   r   r(   r   r   r@   r   r6   r$   r%   r   rR   staticmethodrW   r   r   rY   r[   rd   rk   ro   r   r   r   __classcell__r   r   rG   r   r)   "   s    


	
J



 4
Ir)   )	itertoolsr   r   typingr   r   r   r   r   r   r$   funasr.metrics.commonr	   2funasr.models.transformer.scorers.scorer_interfacer
   r   r   r9   r?   r)   r   r   r   r   <module>   s    