o
    i                      @   sd   d dl Z d dlmZmZmZ d dlmZ d dlmZ d dl	m
Z
 eddG dd	 d	e jjZdS )
    N)ListOptionalTuple)tables)SpecAug)
Hypothesisdecoder_classesrnnt_decoderc                       s  e Zd ZdZ								d2ded	ed
edededededededdf fddZ	d3de	j
de	j
deee	j
ee	j
 f  de	j
fddZde	j
dee	j
ee	j
 f dee	j
ee	j
ee	j
 f f fddZde	j
dee d ee	j
ee	j
 f dee	j
ee	j
ee	j
 f f fd!d"Zd#ee dee	j
ee	j
ee	j
 f f fd$d%Zd&e	jddfd'd(Zd)edee	j
ee	j f fd*d+Zdee	j
ee	j
 f d,edee	j
ee	j
 f fd-d.Zd/eee	j
ee	j
 f  dee	j
ee	j
 f fd0d1Z  ZS )4RNNTDecodera  RNN decoder module.

    Args:
        vocab_size: Vocabulary size.
        embed_size: Embedding size.
        hidden_size: Hidden size..
        rnn_type: Decoder layers type.
        num_layers: Number of decoder layers.
        dropout_rate: Dropout rate for decoder layers.
        embed_dropout_rate: Dropout rate for embedding layer.
        embed_pad: Embedding padding symbol ID.

       lstm           r   F
vocab_size
embed_sizehidden_sizernn_type
num_layersdropout_rateembed_dropout_rate	embed_paduse_embed_maskreturnNc
              	      s  t    |dvrtd| tjj|||d| _tjj|d| _|dkr*tjj	ntjj
}
tj|
||dddg| _td|D ]}|  j|
||dddg7  _qAtj fd	d
t|D | _|| _|| _|| _|| _t|  j| _i | _|	| _| jrtddddd| _dS dS )zConstruct a RNNDecoder object.)r   gruzNot supported: rnn_type=)padding_idxpr   r   T)batch_firstc                    s   g | ]	}t jj d qS )r   )torchnnDropout).0_r    Y/home/ubuntu/.local/lib/python3.10/site-packages/funasr/models/transducer/rnnt_decoder.py
<listcomp>;   s    z(RNNTDecoder.__init__.<locals>.<listcomp>      F)time_mask_width_rangenum_time_maskapply_freq_maskapply_time_warpN)super__init__
ValueErrorr   r   	Embeddingembedr    dropout_embedLSTMGRU
ModuleListrnnrangedropout_rnndlayersdtypeoutput_sizer   next
parametersdevicescore_cacher   r   _embed_mask)selfr   r   r   r   r   r   r   r   r   	rnn_classr"   	__class__r#   r%   r.      s6   
zRNNTDecoder.__init__labels
label_lensstatesc                 C   sX   |du r|  |d}| | |}| jr"| jr"| ||d }| ||\}}|S )a%  Encode source label sequences.

        Args:
            labels: Label ID sequences. (B, L)
            states: Decoder hidden states.
                      ((N, B, D_dec), (N, B, D_dec) or None) or None

        Returns:
            dec_out: Decoder output sequences. (B, U, D_dec)

        Nr   )
init_statesizer2   r1   r   trainingr@   rnn_forward)rA   rE   rF   rG   	dec_embeddec_outr$   r$   r%   forwardP   s   zRNNTDecoder.forwardxstatec                 C   s   |\}}|  |d\}}t| jD ]M}| jdkrB| j| ||||d  |||d  fd\}\|||d < |||d < n| j| ||||d  d\}|||d < | j| |}q|||ffS )at  Encode source label sequences.

        Args:
            x: RNN input sequences. (B, D_emb)
            state: Decoder hidden states. ((N, B, D_dec), (N, B, D_dec) or None)

        Returns:
            x: RNN output sequences. (B, D_dec)
            (h_next, c_next): Decoder hidden states.
                                (N, B, D_dec), (N, B, D_dec) or None)

        r   r   r   )hx)rH   rI   r7   r9   r:   r6   r8   )rA   rO   rP   h_prevc_prevh_nextc_nextlayerr$   r$   r%   rK   j   s   
 (.zRNNTDecoder.rnn_forwardlabellabel_sequence	dec_statec                 C   s^   d tt|}|| jv r| j| \}}n| |}| ||\}}||f| j|< |d |fS )a  One-step forward hypothesis.

        Args:
            label: Previous label. (1, 1)
            label_sequence: Current label sequence.
            dec_state: Previous decoder hidden states.
                         ((N, 1, D_dec), (N, 1, D_dec) or None)

        Returns:
            dec_out: Decoder output sequence. (1, D_dec)
            dec_state: Decoder hidden states.
                         ((N, 1, D_dec), (N, 1, D_dec) or None)

        r"   r   )joinmapstrr?   r1   rK   )rA   rW   rX   rY   
str_labelsrM   rL   r$   r$   r%   score   s   

zRNNTDecoder.scorehypsc                 C   sV   t jdd |D | jd}| |}| dd |D }| ||\}}|d|fS )zOne-step forward hypotheses.

        Args:
            hyps: Hypotheses.

        Returns:
            dec_out: Decoder output sequences. (B, D_dec)
            states: Decoder hidden states. ((N, B, D_dec), (N, B, D_dec) or None)

        c                 S   s   g | ]}|j d  gqS ))yseqr!   hr$   r$   r%   r&      s    z+RNNTDecoder.batch_score.<locals>.<listcomp>r>   c                 S   s   g | ]}|j qS r$   )rY   rb   r$   r$   r%   r&      s    r   )r   
LongTensorr>   r1   create_batch_statesrK   squeeze)rA   r_   rE   rL   rG   rM   r$   r$   r%   batch_score   s
   
zRNNTDecoder.batch_scorer>   c                 C   s
   || _ dS )zNSet GPU device to use.

        Args:
            device: Device ID.

        Nrd   )rA   r>   r$   r$   r%   
set_device   s   
zRNNTDecoder.set_device
batch_sizec                 C   sJ   t j| j|| j| jd}| jdkr!t j| j|| j| jd}||fS |dfS )zInitialize decoder states.

        Args:
            batch_size: Batch size.

        Returns:
            : Initial decoder hidden states. ((N, B, D_dec), (N, B, D_dec) or None)

        rd   r   N)r   zerosr9   r;   r>   r:   )rA   rj   h_nc_nr$   r$   r%   rH      s   

zRNNTDecoder.init_stateidxc                 C   sT   |d dd||d ddf | j dkr'|d dd||d ddf fS dfS )a-  Get specified ID state from decoder hidden states.

        Args:
            states: Decoder hidden states. ((N, B, D_dec), (N, B, D_dec) or None)
            idx: State ID to extract.

        Returns:
            : Decoder hidden state for given ID. ((N, 1, D_dec), (N, 1, D_dec) or None)

        r   Nr   r   )r:   )rA   rG   rn   r$   r$   r%   select_state   s
    *zRNNTDecoder.select_state
new_statesc                 C   s@   t jdd |D dd| jdkrt jdd |D ddfS dfS )zCreate decoder hidden states.

        Args:
            new_states: Decoder hidden states. [N x ((1, D_dec), (1, D_dec) or None)]

        Returns:
            states: Decoder hidden states. ((N, B, D_dec), (N, B, D_dec) or None)

        c                 S      g | ]}|d  qS )r   r$   r!   sr$   r$   r%   r&         z3RNNTDecoder.create_batch_states.<locals>.<listcomp>r   )dimr   c                 S   rq   )r   r$   rr   r$   r$   r%   r&     rt   N)r   catr:   )rA   rp   r$   r$   r%   rf      s
    zRNNTDecoder.create_batch_states)r   r   r   r   r   r   r   F)N)__name__
__module____qualname____doc__intr\   floatboolr.   r   Tensorr   r   rN   rK   r   r^   r   rh   r>   ri   tensorrH   ro   rf   __classcell__r$   r$   rC   r%   r
      s    	
6

 
 
"	
r
   )r   typingr   r   r   funasr.registerr   funasr.models.specaug.specaugr   /funasr.models.transducer.beam_search_transducerr   registerr   Moduler
   r$   r$   r$   r%   <module>   s   
