o
    %ݫim                     @   s   d Z ddlZddlmZ ddlmZmZ ddlZddlm	Z	 ddl
m	  mZ ddlZddlmZ ddlmZmZmZmZ ddlmZ ddlmZ dd	lmZ eG d
d dZeG dd dZG dd de	jZG dd de	jZ G dd de	jZ!G dd de	jZ"G dd de	jZ#dS )zConformer implementation.

Authors
-------
* Jianyuan Zhong 2020
* Samuele Cornell 2021
* Sylvain de Langen 2023
* Shucong Zhang 2024
    N)	dataclass)ListOptional)Swish)MultiheadAttentionPositionalwiseFeedForwardRelPosMHAXLRoPEMHA)HyperMixing)	LayerNorm)DynChunkTrainConfigc                   @   sB   e Zd ZU dZeed< 	 dZeej	 ed< 	 dZ
eej	 ed< dS )%ConformerEncoderLayerStreamingContexta  Streaming metadata and state for a `ConformerEncoderLayer`.

    The multi-head attention and Dynamic Chunk Convolution require to save some
    left context that gets inserted as left padding.

    See :class:`.ConvolutionModule` documentation for further details.
    mha_left_context_sizeNmha_left_contextdcconv_left_context)__name__
__module____qualname____doc__int__annotations__r   r   torchTensorr    r   r   b/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/transformer/Conformer.pyr       s   
 r   c                   @   s(   e Zd ZU dZeed< 	 ee ed< dS ) ConformerEncoderStreamingContextz6Streaming metadata and state for a `ConformerEncoder`.dynchunktrain_configlayersN)r   r   r   r   r   r   r   r   r   r   r   r   r   ?   s   
 r   c                       sV   e Zd ZdZddedddf fdd	Z				dd
ejdeej dee	 fddZ
  ZS )ConvolutionModuleaj  This is an implementation of convolution module in Conformer.

    Arguments
    ---------
    input_size : int
        The expected size of the input embedding dimension.
    kernel_size: int, optional
        Kernel size of non-bottleneck convolutional layer.
    bias: bool, optional
        Whether to use bias in the non-bottleneck conv layer.
    activation: torch.nn.Module
         Activation function used after non-bottleneck conv layer.
    dropout: float, optional
         Dropout rate.
    causal: bool, optional
         Whether the convolution should be causal or not.
    dilation: int, optional
         Dilation factor for the non bottleneck conv layer.

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> net = ConvolutionModule(512, 3)
    >>> output = net(x)
    >>> output.shape
    torch.Size([8, 60, 512])
       T        F   c              
      s   t    || _|| _|| _| jr|d d|d   | _n|d d|d   d | _t|| _t	tj
|d| dd|dtjdd| _tj
|||d| j|||d| _t	t|| tj|||dt|| _d S )Nr!      )kernel_sizestridebiasdim)r#   r$   paddingdilationgroupsr%   )r%   )super__init__r#   causalr)   r(   nnr   
layer_norm
SequentialConv1dGLU
bottleneckconvLinearDropout
after_conv)self
input_sizer#   r%   
activationdropoutr-   r)   	__class__r   r   r,   i   s<   



zConvolutionModule.__init__Nxmaskr   c              	   C   s  |dur| j rJ d| jdksJ d|j}|jd }|jd | dkr/||jd |  }nd}| |}|dd}| |}tj|| j	|fdd}|j
d|| j	 |d}tj|d| j	fdd}|dd}|jddd	}tj|| jj| jj| jjd| jj| jjd
}|dd}| |}tj|d|dfd}tj|ddd	}|dkr|ddd| ddf }n-| |}|dd}| |}| |}| j r|dd| j	 f }|dd}| |}|dur||d |S )a  Applies the convolution to an input tensor `x`.

        Arguments
        ---------
        x: torch.Tensor
            Input tensor to the convolution module.
        mask: torch.Tensor, optional
            Mask to be applied over the output of the convolution using
            `masked_fill_`, if specified.
        dynchunktrain_config: DynChunkTrainConfig, optional
            If specified, makes the module support Dynamic Chunk Convolution
            (DCConv) as implemented by
            `Dynamic Chunk Convolution for Unified Streaming and Non-Streaming Conformer ASR <https://www.amazon.science/publications/dynamic-chunk-convolution-for-unified-streaming-and-non-streaming-conformer-asr>`_.
            This allows masking future frames while preserving better accuracy
            than a fully causal convolution, at a small speed cost.
            This should only be used for training (or, if you know what you're
            doing, for masked evaluation at inference time), as the forward
            streaming function should be used at inference time.

        Returns
        -------
        out: torch.Tensor
            The output tensor.
        Nz5Chunked convolution not supported with causal paddingr!   z:Current DynChunkTrain logic does not support dilation != 1r   r"   )value)sizestep)	start_dimend_dim)weightr%   r$   r(   r)   r*   )r'   sizes.r    )r-   r)   
chunk_sizeshaper/   	transposer3   Fpadr(   unfoldflattenconv1dr4   rE   r%   r$   r*   r7   r   	unflattenmasked_fill_)r8   r>   r?   r   rH   
batch_sizefinal_right_paddingoutr   r   r   forward   s`   








zConvolutionModule.forward)NN)r   r   r   r   r   r,   r   r   r   r   rU   __classcell__r   r   r<   r   r   K   s$     9r   c                
       s   e Zd ZdZdddeddddf fdd		Z				dd
eej deej dejdee	 fddZ
	ddedejfddZdefddZ  ZS )ConformerEncoderLayera  This is an implementation of Conformer encoder layer.

    Arguments
    ---------
    d_model : int
        The expected size of the input embedding.
    d_ffn : int
        Hidden size of self-attention Feed Forward layer.
    nhead : int
        Number of attention heads.
    kernel_size : int, optional
        Kernel size of convolution model.
    kdim : int, optional
        Dimension of the key.
    vdim : int, optional
        Dimension of the value.
    activation: torch.nn.Module
         Activation function used in each Conformer layer.
    bias : bool, optional
        Whether  convolution module.
    dropout : int, optional
        Dropout for the encoder.
    causal : bool, optional
        Whether the convolutions should be causal or not.
    attention_type : str, optional
        type of attention layer, e.g. regularMHA for regular MultiHeadAttention.

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> pos_embs = torch.rand((1, 2*60-1, 512))
    >>> net = ConformerEncoderLayer(d_ffn=512, nhead=8, d_model=512, kernel_size=3)
    >>> output = net(x, pos_embs=pos_embs)
    >>> output[0].shape
    torch.Size([8, 60, 512])
    r   NTr    Fr   c              	      s  t    |dkrt|||	||d| _n)|dkr"t|||	|
d| _n|dkr1t||d|dd| _n|dkr=t|||	d	| _t|||||	|
d
| _t	
t	|t|||	|dt	|	| _t	
t	|t|||	|dt	|	| _t|| _t|| _t	|	| _d S )N
regularMHAnheadd_modelr;   kdimvdimr   	num_heads	embed_dimr;   mask_pos_futurehypermixingF)input_output_dimhypernet_sizetiedr_   fix_tm_hidden_sizer	   )r_   r`   r;   r-   d_ffnr9   r;   r:   )r+   r,   r   	mha_layerr   r
   r	   r   convolution_moduler.   r0   r   r   r6   ffn_module1ffn_module2norm1norm2dropr8   r[   ri   rZ   r#   r\   r]   r:   r%   r;   r-   attention_typer<   r   r   r,   t  sp   





zConformerEncoderLayer.__init__src_masksrc_key_padding_maskpos_embsr   c           	      C   s   d}|dur| d}|d| |  }|}| |}| j||||||d\}}|| }|| j|||d }| |d| |  }||fS )a  
        Arguments
        ----------
        src : torch.Tensor
            The sequence to the encoder layer.
        src_mask : torch.Tensor, optional
            The mask for the src sequence.
        src_key_padding_mask : torch.Tensor, optional
            The mask for the src keys per batch.
        pos_embs: torch.Tensor, torch.nn.Module, optional
            Module or tensor containing the input sequence positional embeddings
        dynchunktrain_config: Optional[DynChunkTrainConfig]
            Dynamic Chunk Training configuration object for streaming,
            specifically involved here to apply Dynamic Chunk Convolution to
            the convolution module.
        NrF         ?	attn_maskkey_padding_maskru   r   )	unsqueezerl   rn   rj   rk   ro   rm   )	r8   r>   rs   rt   ru   r   	conv_maskskip	self_attnr   r   r   rU     s(   


zConformerEncoderLayer.forwardcontextc                 C   s*  |j d }|d| |  }|jdurtj|j|fdd}|jdkr0|d|j dddf |_|}| |}| j|||dd|d\}}|| }|d| dddf }|jdurctj|j|fdd}|d| j	j
 dddf |_|| 	| }|d| dddf }| |d| |  }||fS )	a  Conformer layer streaming forward (typically for
        DynamicChunkTraining-trained models), which is to be used at inference
        time. Relies on a mutable context object as initialized by
        `make_streaming_context` that should be used across chunks.
        Invoked by `ConformerEncoder.forward_streaming`.

        Arguments
        ---------
        x : torch.Tensor
            Input tensor for this layer. Batching is supported as long as you
            keep the context consistent.
        context : ConformerEncoderStreamingContext
            Mutable streaming context; the same object should be passed across
            calls.
        pos_embs : torch.Tensor, optional
            Positional embeddings, if used.

        Returns
        -------
        x : torch.Tensor
            Output tensor.
        self_attn : list
            List of self attention values.
        rv   Nr!   r&   r   .rw   )rI   rl   r   r   catr   rn   rj   r   rk   r(   ro   rm   )r8   r>   r   ru   orig_lenr}   r~   r   r   r   forward_streaming  s:   

	



z'ConformerEncoderLayer.forward_streamingr   c                 C   s
   t |dS )aU  Creates a blank streaming context for this encoding layer.

        Arguments
        ---------
        mha_left_context_size : int
            How many left frames should be saved and used as left context to the
            current chunk when streaming

        Returns
        -------
        ConformerEncoderLayerStreamingContext
        r   )r   )r8   r   r   r   r   make_streaming_contextJ  s   z,ConformerEncoderLayer.make_streaming_contextNNNNN)r   r   r   r   r   r,   r   r   r   r   rU   r   r   r   r   rV   r   r   r<   r   rW   M  s>    +P
6
WrW   c                
       s   e Zd ZdZdddeddddddf
 fdd		Z				dd
eej deej deej dee	 fddZ
	ddejdedeej fddZde	fddZ  ZS )ConformerEncodera  This class implements the Conformer encoder.

    Arguments
    ---------
    num_layers : int
        Number of layers.
    d_model : int
        Embedding dimension size.
    d_ffn : int
        Hidden size of self-attention Feed Forward layer.
    nhead : int
        Number of attention heads.
    kernel_size : int, optional
        Kernel size of convolution model.
    kdim : int, optional
        Dimension of the key.
    vdim : int, optional
        Dimension of the value.
    activation: torch.nn.Module
         Activation function used in each Confomer layer.
    bias : bool, optional
        Whether  convolution module.
    dropout : int, optional
        Dropout for the encoder.
    causal: bool, optional
        Whether the convolutions should be causal or not.
    attention_type: str, optional
        type of attention layer, e.g. regulaMHA for regular MultiHeadAttention.
    output_hidden_states: bool, optional
        Whether the model should output the hidden states as a list of tensor.
    layerdrop_prob: float
        The probability to drop an entire layer.

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> pos_emb = torch.rand((1, 2*60-1, 512))
    >>> net = ConformerEncoder(1, 512, 512, 8)
    >>> output, _ = net(x, pos_embs=pos_emb)
    >>> output.shape
    torch.Size([8, 60, 512])

    >>> import torch
    >>> from speechbrain.lobes.models.transformer.Conformer import ConformerEncoder
    >>> x = torch.rand((8, 60, 512)); pos_emb = torch.rand((1, 2*60-1, 512));
    >>> net = ConformerEncoder(4, 512, 512, 8, output_hidden_states=True)
    >>> output, _, hs = net(x, pos_embs=pos_emb)
    >>> hs[0].shape
    torch.Size([8, 60, 512])

    r   NTr    Fr   c                    sb   t    tj 	
fddt|D | _tdd| _|| _	| _
|| _d S )Nc                    *   g | ]}t 	
 d qS )ri   rZ   r[   r\   r]   r;   r:   r#   r%   r-   rr   )rW   ).0ir:   rr   r%   r-   ri   r[   r;   r\   r#   rZ   r]   r   r   
<listcomp>       z-ConformerEncoder.__init__.<locals>.<listcomp>ư>eps)r+   r,   r   r.   
ModuleListranger   r   normlayerdrop_probrr   output_hidden_states)r8   
num_layersr[   ri   rZ   r#   r\   r]   r:   r%   r;   r-   rr   r   r   r<   r   r   r,     s   

zConformerEncoder.__init__rs   rt   ru   r   c                 C   s   | j dkr|du rtd| j  d|}| jdkr!tt| j}g }| jr)|g}	t| jD ]+\}
}| j	rA| jdksA||
 | jkrY||||||d\}}|
| | jrY|	
| q.| |}| jrg|||	fS ||fS )a  
        Arguments
        ---------
        src : torch.Tensor
            The sequence to the encoder layer.
        src_mask : torch.Tensor, optional
            The mask for the src sequence.
        src_key_padding_mask : torch.Tensor, optional
            The mask for the src keys per batch.
        pos_embs: torch.Tensor, torch.nn.Module,
            Module or tensor containing the input sequence positional embeddings
            If custom pos_embs are given it needs to have the shape (1, 2*S-1, E)
            where S is the sequence length, and E is the embedding dimension.
        dynchunktrain_config: Optional[DynChunkTrainConfig]
            Dynamic Chunk Training configuration object for streaming,
            specifically involved here to apply Dynamic Chunk Convolution to the
            convolution module.

        Returns
        -------
        output : torch.Tensor
            The output of the Conformer.
        attention_lst : list
            The attention values.
        hidden_state_lst : list, optional
            The output of the hidden layers of the encoder.
            Only works if output_hidden_states is set to true.
        r   N/The chosen attention type for the Conformer is B. For this attention type, the positional embeddings are mandatoryr    )rs   rt   ru   r   )rr   
ValueErrorr   r   randlenr   r   	enumeratetrainingappendr   )r8   srcrs   rt   ru   r   output
keep_probsattention_lsthidden_state_lstr   	enc_layer	attentionr   r   r   rU     s>   
$






zConformerEncoder.forwardr   r   c           	      C   s   | j dks
| j dkr|du rtd| j  d|}g }t| jD ]\}}|j|||j| d\}}|| q | |}||fS )aM  Conformer streaming forward (typically for
        DynamicChunkTraining-trained models), which is to be used at inference
        time. Relies on a mutable context object as initialized by
        `make_streaming_context` that should be used across chunks.

        Arguments
        ---------
        src : torch.Tensor
            Input tensor. Batching is supported as long as you keep the context
            consistent.
        context : ConformerEncoderStreamingContext
            Mutable streaming context; the same object should be passed across
            calls.
        pos_embs : torch.Tensor, optional
            Positional embeddings, if used.

        Returns
        -------
        output : torch.Tensor
            The output of the streaming conformer.
        attention_lst : list
            The attention values.
        r   r	   Nr   r   )ru   r   )rr   r   r   r   r   r   r   )	r8   r   r   ru   r   r   r   r   r   r   r   r   r     s   



z"ConformerEncoder.forward_streamingc                    s   t   fdd| jD dS )a,  Creates a blank streaming context for the encoder.

        Arguments
        ---------
        dynchunktrain_config: Optional[DynChunkTrainConfig]
            Dynamic Chunk Training configuration object for streaming

        Returns
        -------
        ConformerEncoderStreamingContext
        c                    s   g | ]
}|j   d qS )r   )r   left_context_size_frames)r   layerrz   r   r   r   G  s    z;ConformerEncoder.make_streaming_context.<locals>.<listcomp>)r   r   )r   r   )r8   r   r   rz   r   r   9  s   
z'ConformerEncoder.make_streaming_contextr   r   )r   r   r   r   r   r,   r   r   r   r   rU   r   r   r   rV   r   r   r<   r   r   \  sF    ;-
O
2r   c                       sF   e Zd ZdZddeddddf fdd	Z						d
dd	Z  ZS )ConformerDecoderLayera  This is an implementation of Conformer encoder layer.

    Arguments
    ---------
    d_model : int
        The expected size of the input embedding.
    d_ffn : int
        Hidden size of self-attention Feed Forward layer.
    nhead : int
        Number of attention heads.
    kernel_size : int, optional
        Kernel size of convolution model.
    kdim : int, optional
        Dimension of the key.
    vdim : int, optional
        Dimension of the value.
    activation : torch.nn.Module, optional
         Activation function used in each Conformer layer.
    bias : bool, optional
        Whether  convolution module.
    dropout : int, optional
        Dropout for the encoder.
    causal : bool, optional
        Whether the convolutions should be causal or not.
    attention_type : str, optional
        type of attention layer, e.g. regularMHA for regular MultiHeadAttention.

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> pos_embs = torch.rand((1, 2*60-1, 512))
    >>> net = ConformerEncoderLayer(d_ffn=512, nhead=8, d_model=512, kernel_size=3)
    >>> output = net(x, pos_embs=pos_embs)
    >>> output[0].shape
    torch.Size([8, 60, 512])
    NTr    r   c              	      s   t    |
std |dkrt|||	||d| _n|dkr(t|||	|
d| _t|||||	|
d| _t	
t	|t|||	|dt	|	| _t	
t	|t|||	|dt	|	| _t|| _t|| _t	|	| _d S )NzWDecoder is not causal, in most applications it should be causal, you have been warned !rX   rY   r   r^   rg   rh   )r+   r,   warningswarnr   rj   r   r   rk   r.   r0   r   r   r6   rl   rm   rn   ro   rp   rq   r<   r   r   r,   w  s\   



zConformerDecoderLayer.__init__c	                 C   sr   |d|  |  }|}	| |}
| j|
|||||d\}
}|
|	 }
|
| |
 }
| |
d| |
  }
|
||fS )am  
        Arguments
        ---------
        tgt: torch.Tensor
            The sequence to the decoder layer.
        memory: torch.Tensor
            The sequence from the last layer of the encoder.
        tgt_mask: torch.Tensor, optional, optional
            The mask for the tgt sequence.
        memory_mask: torch.Tensor, optional
            The mask for the memory sequence.
        tgt_key_padding_mask: torch.Tensor, optional
            The mask for the tgt keys per batch.
        memory_key_padding_mask: torch.Tensor, optional
            The mask for the memory keys per batch.
        pos_embs_tgt: torch.Tensor, torch.nn.Module, optional
            Module or tensor containing the target sequence positional embeddings for each attention layer.
        pos_embs_src: torch.Tensor, torch.nn.Module, optional
            Module or tensor containing the source sequence positional embeddings for each attention layer.

        Returns
        -------
        x: torch.Tensor
            The output tensor
        self_attn : torch.Tensor
        self_attn : torch.Tensor
            The self attention tensor
        rv   rw   )rl   rn   rj   rk   ro   rm   )r8   tgtmemorytgt_maskmemory_masktgt_key_padding_maskmemory_key_padding_maskpos_embs_tgtpos_embs_srcr}   r>   r~   r   r   r   rU     s   (


zConformerDecoderLayer.forwardNNNNNNr   r   r   r   r   r,   rU   rV   r   r   r<   r   r   P  s"    ,Hr   c                       sH   e Zd ZdZdddeddddf fdd	Z						dd	d
Z  ZS )ConformerDecodera  This class implements the Transformer decoder.

    Arguments
    ---------
    num_layers: int
        Number of layers.
    nhead: int
        Number of attention heads.
    d_ffn: int
        Hidden size of self-attention Feed Forward layer.
    d_model: int
        Embedding dimension size.
    kdim: int, optional
        Dimension for key.
    vdim: int, optional
        Dimension for value.
    dropout: float, optional
        Dropout rate.
    activation: torch.nn.Module, optional
        Activation function used after non-bottleneck conv layer.
    kernel_size : int, optional
        Kernel size of convolutional layer.
    bias : bool, optional
        Whether  convolution module.
    causal: bool, optional
        Whether the convolutions should be causal or not.
    attention_type: str, optional
        type of attention layer, e.g. regularMHA for regular MultiHeadAttention.


    Example
    -------
    >>> src = torch.rand((8, 60, 512))
    >>> tgt = torch.rand((8, 60, 512))
    >>> net = ConformerDecoder(1, 8, 1024, 512, attention_type="regularMHA")
    >>> output, _, _ = net(tgt, src)
    >>> output.shape
    torch.Size([8, 60, 512])
    Nr       Tr   c                    sV   t    tj 	
fddt|D | _tjj	j
dd| _d S )Nc                    r   r   )r   )r   _r   r   r   r   1  r   z-ConformerDecoder.__init__.<locals>.<listcomp>r   r   )r+   r,   r   r.   r   r   r   sbnnetnormalizationr   r   )r8   r   rZ   ri   r[   r\   r]   r;   r:   r#   r%   r-   rr   r<   r   r   r,      s   
zConformerDecoder.__init__c	                 C   s`   |}	g g }
}| j D ]}||	|||||||d\}	}}|
| || q
| |	}	|	|
|fS )a  
        Arguments
        ---------
        tgt: torch.Tensor
            The sequence to the decoder layer.
        memory: torch.Tensor
            The sequence from the last layer of the encoder.
        tgt_mask: torch.Tensor, optional, optional
            The mask for the tgt sequence.
        memory_mask: torch.Tensor, optional
            The mask for the memory sequence.
        tgt_key_padding_mask : torch.Tensor, optional
            The mask for the tgt keys per batch.
        memory_key_padding_mask : torch.Tensor, optional
            The mask for the memory keys per batch.
        pos_embs_tgt: torch.Tensor, torch.nn.Module, optional
            Module or tensor containing the target sequence positional embeddings for each attention layer.
        pos_embs_src: torch.Tensor, torch.nn.Module, optional
            Module or tensor containing the source sequence positional embeddings for each attention layer.

        Returns
        -------
        output: torch.Tensor
            Conformer decoder output.
        self_attns : list
            Location of self attentions.
        multihead_attns : list
            Location of multihead attentions.
        )r   r   r   r   r   r   )r   r   r   )r8   r   r   r   r   r   r   r   r   r   
self_attnsmultihead_attns	dec_layerr~   multihead_attnr   r   r   rU   D  s"   (





zConformerDecoder.forwardr   r   r   r   r<   r   r     s$    .(r   )$r   r   dataclassesr   typingr   r   r   torch.nnr.   torch.nn.functional
functionalrK   speechbrainr   speechbrain.nnet.activationsr   speechbrain.nnet.attentionr   r   r   r	   speechbrain.nnet.hypermixingr
   speechbrain.nnet.normalizationr   (speechbrain.utils.dynamic_chunk_trainingr   r   r   Moduler   rW   r   r   r   r   r   r   r   <module>   s8    
     u (