o
    iB                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZmZ G dd deZG dd	 d	e jjZdS )
    N)chain)AnyDictList
NamedTupleTupleUnion)
end_detect)PartialScorerInterfaceScorerInterfacec                   @   sv   e Zd ZU dZejed< dZee	ejf ed< e
 Zeeee	ejf f ed< e
 Zeeef ed< de
fdd	Zd
S )
HypothesiszHypothesis data type.yseqr   scorescoresstatesreturnc                 C   s0   | j | j t| jdd | j D d S )z#Convert data to JSON-friendly dict.c                 S   s   i | ]	\}}|t |qS  )float).0kvr   r   U/home/ubuntu/.local/lib/python3.10/site-packages/funasr/models/e_paraformer/search.py
<dictcomp>       z%Hypothesis.asdict.<locals>.<dictcomp>)r   r   r   )_replacer   tolistr   r   r   items_asdictselfr   r   r   asdict   s   zHypothesis.asdictN)__name__
__module____qualname____doc__torchTensor__annotations__r   r   r   dictr   r   strr   r   r    r   r   r   r   r      s   
 
 r   c                       s  e Zd ZdZ			d9deeef deeef dededed	ed
e	e dedef fddZ
dejde	e fddZedejdedejfddZdedejdeeeejf eeef f fddZdedejdejdeeeejf eeef f fddZdejdejdeejejf fddZedeeef d eeejf d!ed"eeejf d#edeeejf fd$d%Zd&ed'ed#edefd(d)Zd*e	e dejd+ejde	e fd,d-Z	.	.d:dejd/ejd0ed1ede	e f
d2d3Zd4ed5ed0ed*e	e d6e	e de	e fd7d8Z  ZS );BeamSearchParazBeam search implementation.N      ?scorersweights	beam_size
vocab_sizesoseos
token_listpre_beam_ratiopre_beam_score_keyc
                    sR  t    || _t | _t | _t | _tj	 | _
| D ]E\}
}||
d}|dks0|du r1qt|tsBJ |
 dt| d|| j|
< t|trR|| j|
< n|| j|
< t|tjjrc|| j
|
< q|| _|| _|| _t|| | _|| _|| _|	dur|	dkr|	| jvrt|	 d| j |	| _| jduo| j| jk ot| jdk| _dS )aT  Initialize beam search.

        Args:
            scorers (dict[str, ScorerInterface]): Dict of decoder modules
                e.g., Decoder, CTCPrefixScorer, LM
                The scorer will be ignored if it is `None`
            weights (dict[str, float]): Dict of weights for each scorers
                The scorer will be ignored if its weight is 0
            beam_size (int): The number of hypotheses kept during search
            vocab_size (int): The number of vocabulary
            sos (int): Start of sequence id
            eos (int): End of sequence id
            token_list (list[str]): List of tokens for debug log
            pre_beam_score_key (str): key of scores to perform pre-beam search
            pre_beam_ratio (float): beam size in the pre-beam search
                will be `int(pre_beam_ratio * beam_size)`

        r   Nz (z$) does not implement ScorerInterfacefullz is not found in )super__init__r-   r(   r,   full_scorerspart_scorersr%   nn
ModuleDictnn_dictr   get
isinstancer   typer
   Moduler0   r1   r2   intpre_beam_sizer.   n_vocabKeyErrorr4   lendo_pre_beam)r   r,   r-   r.   r/   r0   r1   r2   r3   r4   r   r   w	__class__r   r   r7   &   sL   







zBeamSearchPara.__init__xr   c                 C   sX   t  }t  }| j D ]\}}||||< d||< qtd||tj| jg|jddgS )zGet an initial hypothesis data.

        Args:
            x (torch.Tensor): The encoder output feature

        Returns:
            Hypothesis: The initial hypothesis.

                device)r   r   r   r   )	r(   r,   r   
init_stater   r%   tensorr0   rM   )r   rJ   init_statesinit_scoresr   dr   r   r   init_hypp   s   

zBeamSearchPara.init_hypxsc                 C   s$   t j|g| j| jd}t | |fS )zAppend new token to prefix tokens.

        Args:
            xs (torch.Tensor): The prefix token
            x (int): The new token to append

        Returns:
            torch.Tensor: New tensor contains: xs + [x] with xs.dtype and xs.device

        dtyperM   )r%   rO   rV   rM   cat)rT   rJ   r   r   r   append_token   s   zBeamSearchPara.append_tokenhypc                 C   sJ   t  }t  }| j D ]\}}||j|j| |\||< ||< q||fS )a  Score new hypothesis by `self.full_scorers`.

        Args:
            hyp (Hypothesis): Hypothesis with prefix tokens to score
            x (torch.Tensor): Corresponding input feature

        Returns:
            Tuple[Dict[str, torch.Tensor], Dict[str, Any]]: Tuple of
                score dict of `hyp` that has string keys of `self.full_scorers`
                and tensor score values of shape: `(self.n_vocab,)`,
                and state dict that has string keys
                and state values of `self.full_scorers`

        )r(   r8   r   r   r   r   )r   rY   rJ   r   r   r   rR   r   r   r   
score_full   s
   $zBeamSearchPara.score_fullidsc                 C   sL   t  }t  }| j D ]\}}||j||j| |\||< ||< q||fS )aa  Score new hypothesis by `self.part_scorers`.

        Args:
            hyp (Hypothesis): Hypothesis with prefix tokens to score
            ids (torch.Tensor): 1D tensor of new partial tokens to score
            x (torch.Tensor): Corresponding input feature

        Returns:
            Tuple[Dict[str, torch.Tensor], Dict[str, Any]]: Tuple of
                score dict of `hyp` that has string keys of `self.part_scorers`
                and tensor score values of shape: `(len(ids),)`,
                and state dict that has string keys
                and state values of `self.part_scorers`

        )r(   r9   r   score_partialr   r   )r   rY   r[   rJ   r   r   r   rR   r   r   r   r\      s
   &zBeamSearchPara.score_partialweighted_scoresc                 C   sz   | d| dkr|| jd }||fS || }td |dd< |||< || jd }|| | jd }||fS )a  Compute topk full token ids and partial token ids.

        Args:
            weighted_scores (torch.Tensor): The weighted sum scores for each tokens.
            Its shape is `(self.n_vocab,)`.
            ids (torch.Tensor): The partial token ids to compute topk

        Returns:
            Tuple[torch.Tensor, torch.Tensor]:
                The topk full token ids and partial token ids.
                Their shapes are `(self.beam_size,)`

        r      infN)sizetopkr.   r   )r   r]   r[   top_idstmp	local_idsr   r   r   beam   s   zBeamSearchPara.beamprev_scoresnext_full_scoresfull_idxnext_part_scorespart_idxc                 C   sV   t  }| D ]\}}| | ||  ||< q| D ]\}}| | ||  ||< q|S )a  Merge scores for new hypothesis.

        Args:
            prev_scores (Dict[str, float]):
                The previous hypothesis scores by `self.scorers`
            next_full_scores (Dict[str, torch.Tensor]): scores by `self.full_scorers`
            full_idx (int): The next token id for `next_full_scores`
            next_part_scores (Dict[str, torch.Tensor]):
                scores of partial tokens by `self.part_scorers`
            part_idx (int): The new token id for `next_part_scores`

        Returns:
            Dict[str, torch.Tensor]: The new score dict.
                Its keys are names of `self.full_scorers` and `self.part_scorers`.
                Its values are scalar tensors by the scorers.

        )r(   r   )rf   rg   rh   ri   rj   
new_scoresr   r   r   r   r   merge_scores   s   zBeamSearchPara.merge_scoresr   part_statesc                 C   sL   t  }| D ]\}}|||< q| j D ]\}}||| |||< q|S )a  Merge states for new hypothesis.

        Args:
            states: states of `self.full_scorers`
            part_states: states of `self.part_scorers`
            part_idx (int): The new token id for `part_scores`

        Returns:
            Dict[str, torch.Tensor]: The new score dict.
                Its keys are names of `self.full_scorers` and `self.part_scorers`.
                Its values are states of the scorers.

        )r(   r   r9   select_state)r   r   rm   rj   
new_statesr   r   rR   r   r   r   merge_states  s   
zBeamSearchPara.merge_statesrunning_hypsam_scorec                 C   sh  g }t j| j|jd}|D ]}t j| j|j|jd}||7 }| ||\}}	| jD ]}
|| j|
 ||
  7 }q)| j	rO| j
dkrA|n|| j
 }t || jd }| |||\}}| jD ]}
||  | j|
 ||
  7  < q[||j7 }t| || D ]#\}}|t|| | |j|| |j||||| |	||d qzt|dd dd	d
tt|| j }q|S )a"  Search new tokens for running hypotheses and encoded speech x.

        Args:
            running_hyps (List[Hypothesis]): Running hypotheses on beam
            x (torch.Tensor): Encoded speech feature (T, D)

        Returns:
            List[Hypotheses]: Best sorted hypotheses

        rL   rU   r5   r^   )r   r   r   r   c                 S      | j S Nr   rJ   r   r   r   <lambda>I      z'BeamSearchPara.search.<locals>.<lambda>TkeyreverseN)r%   arangerC   rM   zerosrV   rZ   r8   r-   rF   r4   ra   rB   r\   r9   r   zipre   appendr   rX   r   rl   r   rp   sortedminrE   r.   )r   rq   rJ   rr   	best_hypspart_idsrY   r]   r   r   r   pre_beam_scorespart_scoresrm   jpart_jr   r   r   search  s>   


 

zBeamSearchPara.searchrK   	am_scoresmaxlenratiominlenratioc              
      s  |j d }tdt|j d   tdt|   |}g }t|D ]L}tdt|   |||| }	 ||||	|}|dkrZt	dd |D |rZtd|   nt
|dkrgtd	  ntd
t
|  q%t|dd dd}
t
|
dkrtd |dk rg S  ||td|d S |
d }	|	j D ]\}}t|dd j| dd| j|  dd|  qtd|	jd td|	jt
|	j d tdt
|
   jdurtdd fdd|	jdd D  d   |
S )!aW  Perform beam search.

        Args:
            x (torch.Tensor): Encoded speech feature (T, D)
            maxlenratio (float): Input length ratio to obtain max output length.
                If maxlenratio=0.0 (default), it uses a end-detect function
                to automatically find maximum hypothesis lengths
                If maxlenratio<0.0, its absolute value is interpreted
                as a constant max output length.
            minlenratio (float): Input length ratio to obtain min output length.

        Returns:
            list[Hypothesis]: N-best decoding results

        r   zdecoder input length: zmax output length: z	position rK   c                 S   s   g | ]}|  qS r   )r    r   hr   r   r   
<listcomp>r  s    z*BeamSearchPara.forward.<locals>.<listcomp>zend detected at zno hypothesis. Finish decoding.zremained hypotheses: c                 S   rs   rt   ru   rv   r   r   r   rw   {  rx   z(BeamSearchPara.forward.<locals>.<lambda>Try   zOthere is no N-best results, perform recognition again with smaller minlenratio.g?z6.2fz * 3z = z for ztotal log probability: z.2fznormalized log probability: z"total number of ended hypotheses: Nbest hypo:  c                       g | ]	} j |  qS r   r2   itemr   rJ   r   r   r   r     r   r^   
)shapelogginginfor)   rS   rangedebugr   post_processr	   rE   r   warningforwardmaxr   r   r-   r   r   r2   join)r   rJ   r   r   r   maxlenrq   
ended_hypsibest
nbest_hypsr   r   r   r   r   r   N  sJ   


8(zBeamSearchPara.forwardr   r   r   c              	      s  t dt|   jdur't dd fdd|d jdd D   ||d kr;t d	  fd
d|D }g }|D ]D}|jd  jkr~t j	
  j
 D ]#\}}	|	|j| }
|j|  |
7  < |j|j j| |
  d}qT|| q?|| q?|S )a   Perform post-processing of beam search iterations.

        Args:
            i (int): The length of hypothesis tokens.
            maxlen (int): The maximum length of tokens in beam search.
            maxlenratio (int): The maximum length ratio in beam search.
            running_hyps (List[Hypothesis]): The running hypotheses in beam search.
            ended_hyps (List[Hypothesis]): The ended hypotheses in beam search.

        Returns:
            List[Hypothesis]: The new running hypotheses.

        z"the number of running hypotheses: Nr   r   c                    r   r   r   r   r   r   r   r     r   z/BeamSearchPara.post_process.<locals>.<listcomp>r   r^   z-adding <eos> in the last position in the loopc                    s$   g | ]}|j  |j jd qS ))r   )r   rX   r   r1   r   r   r   r   r     s    r   ru   )r   r   rE   r2   r   r   r   r1   r   r8   r   r9   final_scorer   r   r   r   r-   r   )r   r   r   r   rq   r   remained_hypsrY   r   rR   sr   r   r   r     s,   
$

zBeamSearchPara.post_process)Nr+   N)rK   rK   )r!   r"   r#   r$   r   r)   r   r   rA   r   r7   r%   r&   r   rS   staticmethodrX   r   r   rZ   r\   re   rl   rp   r   r   r   __classcell__r   r   rH   r   r*   #   s    


	
J




:
Fr*   )r%   r   	itertoolsr   typingr   r   r   r   r   r   funasr.metrics.commonr	   2funasr.models.transformer.scorers.scorer_interfacer
   r   r   r:   r@   r*   r   r   r   r   <module>   s    