o
    %ݫiFd                  
   @   s"  d Z ddlmZ ddlmZmZ ddlZddlmZ ddlm	Z	 ddl
mZmZmZmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ eeZeG dd dZ		ddejdedee deej fddZ					ddedee fddZG dd deZ G dd dej!Z"dS )zTransformer for ASR in the SpeechBrain style.

Authors
* Jianyuan Zhong 2020
* Titouan Parcollet 2024
* Luca Della Libera 2024
* Shucong Zhang 2024
    )	dataclass)AnyOptionalN)nn)length_to_mask)NormalizedEmbeddingTransformerInterfaceget_key_padding_maskget_lookahead_mask)Swish)
ModuleList)Linear)DynChunkTrainConfig)
get_loggerc                   @   s$   e Zd ZU dZeed< 	 eed< dS )TransformerASRStreamingContextz=Streaming metadata and state for a `TransformerASR` instance.dynchunktrain_configencoder_contextN)__name__
__module____qualname____doc__r   __annotations__r    r   r   g/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/transformer/TransformerASR.pyr       s   
 r   Fsrccausalr   returnc           
      C   s   |r|du sJ t | S |du rdS | d}|j}|| }tj|| jd}tj|||d  || jd|d| }|d |dddf k}| sd|j}	|||	d  8 }||d |dddf k 7 }|S )a2  Prepare the source transformer mask that restricts which frames can
    attend to which frames depending on causal or other simple restricted
    attention methods.

    Arguments
    ---------
    src: torch.Tensor
        The source tensor to build a mask from. The contents of the tensor are
        not actually used currently; only its shape and other metadata (e.g.
        device).
    causal: bool
        Whether strict causality shall be used. Frames will not be able to
        attend to any future frame.
    dynchunktrain_config: DynChunkTrainConfig, optional
        Dynamic Chunk Training configuration. This implements a simple form of
        chunkwise attention. Incompatible with `causal`.

    Returns
    -------
    torch.Tensor
        A boolean mask Tensor of shape (timesteps, timesteps).
    N   )device   )	r
   size
chunk_sizetorcharanger   repeat_interleaveis_infinite_left_contextleft_context_size)
r   r   r   	timestepsr!   
num_chunkstimestep_idxmask_idxsrc_masknum_left_chunksr   r   r   make_transformer_src_mask/   s(   
r-   c                 C   sn   d}|durt || jd  }t|  }t| ||d}|dur-t||d}	t|}
nd}	d}
||	||
fS )a  This function generates masks for training the transformer model,
    opinionated for an ASR context with encoding masks and, optionally, decoding
    masks (if specifying `tgt`).

    Arguments
    ---------
    src : torch.Tensor
        The sequence to the encoder (required).
    tgt : torch.Tensor
        The sequence to the decoder.
    wav_len : torch.Tensor
        The lengths of the inputs.
    pad_idx : int
        The index for <pad> token (default=0).
    causal: bool
        Whether strict causality shall be used. See `make_asr_src_mask`
    dynchunktrain_config: DynChunkTrainConfig, optional
        Dynamic Chunk Training configuration. See `make_asr_src_mask`

    Returns
    -------
    src_key_padding_mask : torch.Tensor
        Key padding mask for ignoring padding
    tgt_key_padding_mask : torch.Tensor
        Key padding mask for ignoring padding
    src_mask : torch.Tensor
        Mask for ignoring invalid (e.g. future) timesteps
    tgt_mask : torch.Tensor
        Mask for ignoring invalid (e.g. future) timesteps
    Nr   )r   r   )pad_idx)r"   roundshaper   boolr-   r	   r
   )r   tgtwav_lenr.   r   r   src_key_padding_maskabs_lenr+   tgt_key_padding_masktgt_maskr   r   r   make_transformer_src_tgt_masksj   s   &
r8   c                       s  e Zd ZdZddddddejddd	d
deejddddejdddfde	e
 de	e de	e de	ej de	ej de	e de	e
 de	e de	e
 de	ej de	e f fddZd-dd Ze d.d!d"Z			d/d#e	e fd$d%Zd&efd'd(Zi fd#efd)d*Zd+d, Z  ZS )0TransformerASRa  This is an implementation of transformer model for ASR.

    The architecture is based on the paper "Attention Is All You Need":
    https://arxiv.org/pdf/1706.03762.pdf

    Arguments
    ---------
    tgt_vocab: int
        Size of vocabulary.
    input_size: int
        Input feature size.
    d_model : int, optional
        Embedding dimension size.
        (default=512).
    nhead : int, optional
        The number of heads in the multi-head attention models (default=8).
    num_encoder_layers : int, optional
        The number of sub-encoder-layers in the encoder (default=6).
    num_decoder_layers : int, optional
        The number of sub-decoder-layers in the decoder (default=6).
    d_ffn : int, optional
        The dimension of the feedforward network model (default=2048).
    dropout : int, optional
        The dropout value (default=0.1).
    activation : torch.nn.Module, optional
        The activation function of FFN layers.
        Recommended: relu or gelu (default=relu).
    positional_encoding: str, optional
        Type of positional encoding used. e.g. 'fixed_abs_sine' for fixed absolute positional encodings.
    normalize_before: bool, optional
        Whether normalization should be applied before or after MHA or FFN in Transformer layers.
        Defaults to True as this was shown to lead to better performance and training stability.
    kernel_size: int, optional
        Kernel size in convolutional layers when Conformer is used.
    bias: bool, optional
        Whether to use bias in Conformer convolutional layers.
    encoder_module: str, optional
        Choose between Conformer and Transformer for the encoder. The decoder is fixed to be a Transformer.
    conformer_activation: torch.nn.Module, optional
        Activation module used after Conformer convolutional layers. E.g. Swish, ReLU etc. it has to be a torch Module.
    branchformer_activation: torch.nn.Module, optional
        Activation module used within the Branchformer Encoder. E.g. Swish, ReLU etc. it has to be a torch Module.
    attention_type: str, optional
        Type of attention layer used in all Transformer or Conformer layers.
        e.g. regularMHA or RelPosMHA.
    max_length: int, optional
        Max length for the target and source sequence in input.
        Used for positional encodings.
    causal: bool, optional
        Whether the encoder should be causal or not (the decoder is always causal).
        If causal the Conformer convolutional layer is causal.
    csgu_linear_units: int, optional
        Number of neurons in the hidden linear units of the CSGU Module.
        -> Branchformer
    gate_activation: torch.nn.Module, optional
        Activation function used at the gate of the CSGU module.
        -> Branchformer
    use_linear_after_conv: bool, optional
        If True, will apply a linear transformation of size input_size//2.
        -> Branchformer
    output_hidden_states: bool, optional
        Whether the model should output the hidden states as a list of tensor.
    layerdrop_prob: float
        The probability to drop an entire layer.

    Example
    -------
    >>> src = torch.rand([8, 120, 512])
    >>> tgt = torch.randint(0, 720, [8, 120])
    >>> net = TransformerASR(
    ...     720, 512, 512, 8, 1, 1, 1024, activation=torch.nn.GELU
    ... )
    >>> enc_out, dec_out = net.forward(src, tgt)
    >>> enc_out.shape
    torch.Size([8, 120, 512])
    >>> dec_out.shape
    torch.Size([8, 120, 512])
    i         i   g?fixed_abs_sineF   Ttransformer
regularMHAi	  Ni   g        kernel_sizebiasencoder_moduleconformer_activationbranchformer_activationattention_type
max_lengthr   csgu_linear_unitsgate_activationuse_linear_after_convc                    s   |d u rt d d}t jdi d|d|d|d|d|d|d	|	d
|
d|d|d|d|d|d|d|d|d|d|d|d|d|d| tt||dddtj|| _	|dkrptt
||| _|   d S )Na  `causal` not specified for `TransformerASR`, assuming `True` for compatibility. We strongly recommend that you explicitly set this. If you are using a model or recipe defined before v1.0, it might now be BROKEN! If so, please see https://github.com/speechbrain/speechbrain/issues/2604Td_modelnheadnum_encoder_layersnum_decoder_layersd_ffndropout
activationpositional_encodingnormalize_beforer@   rA   rB   rC   rD   rE   rF   r   rG   rH   rI   output_hidden_stateslayerdrop_probF)
input_size	n_neuronsrA   combine_dimsr   r   )loggerwarningsuper__init__r   r   r"   r   Dropoutcustom_src_moduler   custom_tgt_module_init_params)self	tgt_vocabrU   rJ   rK   rL   rM   rN   rO   rP   rQ   rR   r@   rA   rB   rC   rD   rE   rF   r   rG   rH   rI   rS   rT   	__class__r   r   r[      s   	


zTransformerASR.__init__r   c              
   C   sl  |j dkr|j\}}}}||||| }t|||| j|d\}	}
}}| |}| jdks2| jdkr5d}n| jdkr@| |}n| jdkrN|| | }d}| j	|||	|d}|du r]|S | j
rf|\}}}n|\}}| |}| jdksy| jdkr|| | }d}d}n| jdks| jdkr|| | }d}d}| j||d||
|	||d	\}}}| j
r|||fS ||fS )
a  
        Arguments
        ---------
        src : torch.Tensor
            The sequence to the encoder.
        tgt : torch.Tensor
            The sequence to the decoder.
        wav_len: torch.Tensor, optional
            Torch Tensor of shape (batch, ) containing the relative length to padded length for each example.
        pad_idx : int, optional
            The index for <pad> token (default=0).

        Returns
        -------
        encoder_out : torch.Tensor
            The output of the encoder.
        decoder_out : torch.Tensor
            The output of the decoder
        hidden_state_lst : list, optional
            The output of the hidden layers of the encoder.
            Only works if output_hidden_states is set to true.
           )r   r.   hypermixingRoPEMHANRelPosMHAXLr<   )r   r+   r4   pos_embs)r2   memorymemory_maskr7   r6   memory_key_padding_maskpos_embs_tgtpos_embs_src)ndimr0   reshaper8   r   r]   rE   rQ   positional_encoding_typeencoderrS   r^   positional_encoding_decoderdecoder)r`   r   r2   r3   r.   bztch1ch2r4   r6   r+   r7   pos_embs_encoderoutputsencoder_out_hidden_statespos_embs_targetdecoder_outr   r   r   forwardF  sn   











zTransformerASR.forwardc                 C   s   t |}d}|durdt|  }| |}| jdks!| jdkr-|| | }d}d}n| jdks7| jdkrB|| | }d}d}| j||||||d\}}	}
||
d fS )	a  This method implements a decoding step for the transformer model.

        Arguments
        ---------
        tgt : torch.Tensor
            The sequence to the decoder.
        encoder_out : torch.Tensor
            Hidden output of the encoder.
        enc_len : torch.LongTensor
            The actual length of encoder states.

        Returns
        -------
        prediction
        Nr   rg   rf   r<   re   )r7   rk   rl   rm   )	r
   r   r1   r^   rE   rr   rp   rQ   rs   )r`   r2   rz   enc_lenr7   r4   rx   r}   
prediction
self_attnsmultihead_attnsr   r   r   decode  s0   




zTransformerASR.decoder   c                 C   s   |  dkr|j\}}}}||||| }t|d||| j|d\}	}
}}
| |}| jdks4| jdkr7d}n| jdkrB| |}n| jdkrP|| | }d}| j	|||	||d}| j
rf|\}}
}||fS |\}}
|S )	a   
        Encoder forward pass

        Arguments
        ---------
        src : torch.Tensor
            The sequence to the encoder.
        wav_len : torch.Tensor, optional
            Torch Tensor of shape (batch, ) containing the relative length to padded length for each example.
        pad_idx : int
            The index used for padding.
        dynchunktrain_config : DynChunkTrainConfig
            Dynamic chunking config.

        Returns
        -------
        encoder_out : torch.Tensor
        rd   N)r.   r   r   re   rf   rg   r<   )r   r+   r4   rh   r   )dimr0   ro   r8   r   r]   rE   rQ   rp   rq   rS   )r`   r   r3   r.   r   rt   ru   rv   rw   r4   r{   r+   pos_embs_sourcery   rz   r|   r   r   r   encode  sJ   






zTransformerASR.encodecontextc                 C   s   |  dkr|j\}}}}||||| }|jjd j}|du r$|}nt|j}	|	d  |jd 7  < tj|	d	|}| 
|}| jdkrM| |}
n| jdkrUd}
n| jdkrc|| | }d}
| jj||
|jd	\}}|S )
ap  
        Streaming encoder forward pass

        Arguments
        ---------
        src : torch.Tensor
            The sequence (chunk) to the encoder.
        context : TransformerASRStreamingContext
            Mutable reference to the streaming context. This holds the state
            needed to persist across chunk inferences and can be built using
            `make_streaming_context`. This will get mutated by this function.

        Returns
        -------
        Encoder output for this chunk.

        Example
        -------
        >>> import torch
        >>> from speechbrain.lobes.models.transformer.TransformerASR import TransformerASR
        >>> from speechbrain.utils.dynamic_chunk_training import DynChunkTrainConfig
        >>> net = TransformerASR(
        ...     tgt_vocab=100,
        ...     input_size=64,
        ...     d_model=64,
        ...     nhead=8,
        ...     num_encoder_layers=1,
        ...     num_decoder_layers=0,
        ...     d_ffn=128,
        ...     attention_type="RelPosMHAXL",
        ...     positional_encoding=None,
        ...     encoder_module="conformer",
        ...     normalize_before=True,
        ...     causal=False,
        ... )
        >>> ctx = net.make_streaming_context(DynChunkTrainConfig(16, 1))
        >>> src1 = torch.rand([8, 16, 64])
        >>> src2 = torch.rand([8, 16, 64])
        >>> out1 = net.encode_streaming(src1, ctx)
        >>> out1.shape
        torch.Size([8, 16, 64])
        >>> ctx.encoder_context.layers[0].mha_left_context.shape
        torch.Size([8, 16, 64])
        >>> out2 = net.encode_streaming(src2, ctx)
        >>> out2.shape
        torch.Size([8, 16, 64])
        >>> ctx.encoder_context.layers[0].mha_left_context.shape
        torch.Size([8, 16, 64])
        >>> combined_out = torch.concat((out1, out2), dim=1)
        >>> combined_out.shape
        torch.Size([8, 32, 64])
        rd   r   N)r    rg   rf   r<   )r   rh   r   )r   r0   ro   r   layersmha_left_contextlistr"   emptytor]   rE   rQ   rp   rq   forward_streaming)r`   r   r   rt   ru   rv   rw   known_left_contextpos_encoding_dummytarget_shaper   rz   r{   r   r   r   encode_streaming"  s*   6





zTransformerASR.encode_streamingc                 C   s   t || jj|fi |dS )a  Creates a blank streaming context for this transformer and its
        encoder.

        Arguments
        ---------
        dynchunktrain_config : DynChunkTrainConfig
            Runtime chunkwise attention configuration.
        encoder_kwargs : dict
            Parameters to be forward to the encoder's `make_streaming_context`.
            Metadata required for the encoder could differ depending on the
            encoder.

        Returns
        -------
        TransformerASRStreamingContext
        )r   r   )r   rq   make_streaming_context)r`   r   encoder_kwargsr   r   r   r     s   z%TransformerASR.make_streaming_contextc                 C   s,   |   D ]}| dkrtjj| qd S )Nr   )
parametersr   r"   r   initxavier_normal_)r`   pr   r   r   r_     s
   zTransformerASR._init_paramsNr   N)Nr   N)r   r   r   r   r   ReLUr   GELUIdentityr   intr1   strModuler[   r   r"   no_gradr   r   r   r   r   r   r_   __classcell__r   r   rb   r   r9      s|    S
Od3
G`
r9   c                       s:   e Zd ZdZ fddZdddZdd	 Zd
d Z  ZS )EncoderWrapperaR  This is a wrapper of any ASR transformer encoder. By default, the
    TransformerASR .forward() function encodes and decodes. With this wrapper
    the .forward() function becomes .encode() only.

    Important: The TransformerASR class must contain a .encode() function.

    Arguments
    ---------
    transformer : sb.lobes.models.TransformerInterface
        A Transformer instance that contains a .encode() function.
    *args : tuple
    **kwargs : dict
        Arguments to forward to parent class.

    Example
    -------
    >>> src = torch.rand([8, 120, 512])
    >>> tgt = torch.randint(0, 720, [8, 120])
    >>> net = TransformerASR(
    ...     720, 512, 512, 8, 1, 1, 1024, activation=torch.nn.GELU
    ... )
    >>> encoder = EncoderWrapper(net)
    >>> enc_out = encoder(src)
    >>> enc_out.shape
    torch.Size([8, 120, 512])
    c                    s&   t  j|i | || _| jj| _d S r   )rZ   r[   r>   r   )r`   r>   argskwargsrb   r   r   r[     s   zEncoderWrapper.__init__Nr   c                 K   s   | j j|||fi |}|S )z:Processes the input tensor x and returns an output tensor.)r>   r   )r`   xwav_lensr.   r   r   r   r   r     s   zEncoderWrapper.forwardc                 C   s   | j ||}|S )zdProcesses the input audio chunk tensor `x`, using and updating the
        mutable encoder `context`)r>   r   )r`   r   r   r   r   r   r     s   z EncoderWrapper.forward_streamingc                 O   s   | j j|i |S )zInitializes a streaming context. Forwards all arguments to the
        underlying transformer. See :meth:`speechbrain.lobes.models.transformer.TransformerASR.make_streaming_context`.
        )r>   r   )r`   r   r   r   r   r   r     s   z%EncoderWrapper.make_streaming_contextr   )	r   r   r   r   r[   r   r   r   r   r   r   rb   r   r     s    
r   )FN)NNr   FN)#r   dataclassesr   typingr   r   r"   r   speechbrain.dataio.dataior   0speechbrain.lobes.models.transformer.Transformerr   r   r	   r
   speechbrain.nnet.activationsr   speechbrain.nnet.containersr   speechbrain.nnet.linearr   (speechbrain.utils.dynamic_chunk_trainingr   speechbrain.utils.loggerr   r   rX   r   Tensorr1   r-   r8   r9   r   r   r   r   r   r   <module>   sR    	
=
=   ~