o
    %ݫiM3                     @   s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	m
Z
 ddlmZ ddlmZ G dd	 d	ejZG d
d dejZG dd dejZdS )a  Branchformer implementation.

Ref: "Branchformer: Parallel MLP-Attention Architectures
to Capture Local and Global Context for Speech Recognition and Understanding"

Source: Some parts of the code may be adapted from ESPNet.

Authors
* Titouan Parcollet 2023
    )OptionalN)ConvolutionalSpatialGatingUnit)MultiheadAttentionRelPosMHAXL)HyperMixing)	LayerNormc                       s:   e Zd ZdZddejejddf fdd	Zdd	 Z  Z	S )
ConvolutionBrancha  This is an implementation of the convolution branch in Branchformer.

    The default structure is:
    LN -> Channel Proj -> GeLU -> (CNN Spatial Gating) -> Channel Proj -> Dropout

    Arguments
    ---------
    input_size : int
        The expected size of the feature (channel) dimension.
    linear_units: int, optional
        Number of neurons in the hidden linear units.
    kernel_size: int, optional
        Kernel size of non-bottleneck convolutional layer.
    activation: torch.nn.Module, optional
         Activation function used after pre projection.
    gate_activation: torch.nn.Module, optional
         Activation function used at the gate of the CSGU module.
    dropout: float, optional
         Dropout rate.
    use_linear_after_conv: bool, optional
        If True, will apply a linear transformation of size input_size//2

    Example
    -------
    >>> x = torch.rand((8, 60, 512))
    >>> net = ConvolutionBranch(512, 1024)
    >>> output = net(x)
    >>> output.shape
    torch.Size([8, 60, 512])
                  Fc                    sJ   t    t||| _t|d || _| | _t|||||d| _d S )N   )
input_sizekernel_sizedropoutuse_linear_after_conv
activation)	super__init__nnLinearpre_channel_projpost_channel_projr   r   csgu)selfr   linear_unitsr   r   gate_activationr   r   	__class__ e/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/transformer/Branchformer.pyr   7   s   

zConvolutionBranch.__init__c                 C   s(   |  | |}| |}| |}|S )zT
        Arguments
        ----------
        x: torch.Tensor -> (B, T, D)

        )r   r   r   r   )r   xr   r   r   forwardN   s   

zConvolutionBranch.forward)
__name__
__module____qualname____doc__r   GELUIdentityr   r!   __classcell__r   r   r   r   r      s    "r   c                	       sh   e Zd ZdZdddejdddejdf	 fdd		Z			dd
ee	j
 dee	j
 dee	j
 fddZ  ZS )BranchformerEncoderLayera  This is an implementation of Branchformer encoder layer.

    Arguments
    ---------
    d_model : int
        The expected size of the input embedding.
    nhead : int
        Number of attention heads.
    kernel_size : int, optional
        Kernel size of convolution model.
    kdim : int, optional
        Dimension of the key.
    vdim : int, optional
        Dimension of the value.
    activation: torch.nn.Module
         Activation function used in each Conformer layer.
    dropout : int, optional
        Dropout for the encoder.
    attention_type: str, optional
        type of attention layer, e.g. regularMHA for regular MultiHeadAttention.
    csgu_linear_units: int, optional
        Number of neurons in the hidden linear units of the CSGU Module.
    gate_activation: torch.nn.Module, optional
         Activation function used at the gate of the CSGU module.
    use_linear_after_conv: bool, optional
        If True, will apply a linear transformation of size input_size//2

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> pos_embs = torch.rand((1, 2*60-1, 512))
    >>> net = BranchformerEncoderLayer(nhead=8, d_model=512, kernel_size=3)
    >>> output = net(x, pos_embs=pos_embs)
    >>> output[0].shape
    torch.Size([8, 60, 512])
    r
   Nr   r   r	   Fc              	      s   t    |dkrt|||||d| _n|dkr"t|||dd| _n|dkr2t||d d|dd| _t|||	||
||d	| _tj	
|d
 || _t|| _t|| _t	|| _d S )N
regularMHA)nheadd_modelr   kdimvdimr   F)	num_heads	embed_dimr   mask_pos_futurehypermixing   )input_output_dimhypernet_sizetiedr/   fix_tm_hidden_size)r   r   r   r   r   r   r   r   )r   r   r   	mha_layerr   r   r   convolution_branchtorchr   r   
merge_projr   	norm_mhsa	norm_convDropoutr   )r   r,   r+   r   r-   r.   r   r   attention_typecsgu_linear_unitsr   r   r   r   r   r      sJ   





z!BranchformerEncoderLayer.__init__src_masksrc_key_padding_maskpos_embsc              	   C   s~   |}|}|  |}| j||||||d\}}| |}| |}| |}| |}|| | tj||gdd }||fS )a  
        Arguments
        ----------
        x : torch.Tensor
            The sequence to the encoder layer.
        src_mask : torch.Tensor, optional
            The mask for the src sequence.
        src_key_padding_mask : torch.Tensor, optional
            The mask for the src keys per batch.
        pos_embs: torch.Tensor, torch.nn.Module, optional
            Module or tensor containing the input sequence positional embeddings
        )	attn_maskkey_padding_maskrC   )dim)r<   r8   r   r=   r9   r;   r:   cat)r   r    rA   rB   rC   x1x2	self_attnr   r   r   r!      s"   





"z BranchformerEncoderLayer.forward)NNNr"   r#   r$   r%   r   r&   r'   r   r   r:   Tensorr!   r(   r   r   r   r   r)   \   s,    *<r)   c                       sn   e Zd ZdZdddejdddejdddf fdd		Z				dd
ee	j
 dee	j
 dee	j
 fddZ  ZS )BranchformerEncodera  This class implements the Branchformer encoder.

    Arguments
    ---------
    num_layers : int
        Number of layers.
    d_model : int
        Embedding dimension size.
    nhead : int
        Number of attention heads.
    kernel_size : int, optional
        Kernel size of convolution model.
    kdim : int, optional
        Dimension of the key.
    vdim : int, optional
        Dimension of the value.
    activation: torch.nn.Module
         Activation function used in each Confomer layer.
    dropout : int, optional
        Dropout for the encoder.
    attention_type: str, optional
        type of attention layer, e.g. regularMHA for regular MultiHeadAttention.
    csgu_linear_units: int, optional
        Number of neurons in the hidden linear units of the CSGU Module.
    gate_activation: torch.nn.Module, optional
         Activation function used at the gate of the CSGU module.
    use_linear_after_conv: bool, optional
        If True, will apply a linear transformation of size input_size//2.
    output_hidden_states: bool, optional
        Whether the model should output the hidden states as a list of tensor.
    layerdrop_prob: float
        The probability to drop an entire layer.


    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> pos_emb = torch.rand((1, 2*60-1, 512))
    >>> net = BranchformerEncoder(1, 512, 8)
    >>> output, _ = net(x, pos_embs=pos_emb)
    >>> output.shape
    torch.Size([8, 60, 512])

    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> pos_emb = torch.rand((1, 2*60-1, 512))
    >>> net = BranchformerEncoder(1, 512, 8, output_hidden_states=True)
    >>> output, attn_list, hidden_list = net(x, pos_embs=pos_emb)
    >>> hidden_list[0].shape
    torch.Size([8, 60, 512])
    >>> len(hidden_list)
    2
    r
   Nr   r   r	   Fc                    sb   t    tj 	
fddt|D | _tdd| _|| _	| _
|| _d S )Nc                    s*   g | ]}t 
 	d qS ))r+   r,   r-   r.   r   r   r   r?   r@   r   r   )r)   ).0ir   r?   r@   r,   r   r   r-   r   r+   r   r.   r   r   
<listcomp>9  s     z0BranchformerEncoder.__init__.<locals>.<listcomp>gư>)eps)r   r   r:   r   
ModuleListrangelayersr   normlayerdrop_probr?   output_hidden_states)r   
num_layersr,   r+   r   r-   r.   r   r   r?   r@   r   r   rY   rX   r   rQ   r   r   %  s   

zBranchformerEncoder.__init__rA   rB   rC   c                 C   s   |du sJ d| j dkr|du rtd|}| jdkr$tt| j}g }| jr,|g}	t| jD ]*\}
}| j	rD| jdksD||
 | jkr[|||||d\}}|
| | jr[|	
| q1| |}| jri|||	fS ||fS )a  
        Arguments
        ---------
        src : torch.Tensor
            The sequence to the encoder layer.
        src_mask : torch.Tensor, optional
            The mask for the src sequence.
        src_key_padding_mask : torch.Tensor, optional
            The mask for the src keys per batch.
        pos_embs: torch.Tensor, torch.nn.Module,
            Module or tensor containing the input sequence positional embeddings
            If custom pos_embs are given it needs to have the shape (1, 2*S-1, E)
            where S is the sequence length, and E is the embedding dimension.
        dynchunktrain_config : None
            This configuration is unsupported for this encoder.

        Returns
        -------
        output : torch.Tensor
            The output of the Conformer.
        attention_lst : list
            The attention values.
        hidden_state_lst : list, optional
            The output of the hidden layers of the encoder.
            Only works if output_hidden_states is set to true.
        Nz3Dynamic Chunk Training unsupported for this encoderr   zThe chosen attention type for the Branchformer is RelPosMHAXL. For this attention type, the positional embeddings are mandatoryr   )rA   rB   rC   )r?   
ValueErrorrX   r:   randlenrV   rY   	enumeratetrainingappendrW   )r   srcrA   rB   rC   dynchunktrain_configoutput
keep_probsattention_lsthidden_state_lstrP   	enc_layer	attentionr   r   r   r!   O  sB   
#







zBranchformerEncoder.forward)NNNNrL   r   r   r   r   rN      s2    <-rN   )r%   typingr   r:   torch.nnr   $speechbrain.lobes.models.convolutionr   speechbrain.nnet.attentionr   r   speechbrain.nnet.hypermixingr   speechbrain.nnet.normalizationr   Moduler   r)   rN   r   r   r   r   <module>   s    E 