o
    i.                     @   s   d dl Z d dlmZ d dlm  mZ zd dlmZ W n   Y d dlm	Z	m
Z
mZ d dlmZ d dlmZ dd ZG dd	 d	ejZG d
d dejZG dd dejZG dd dejZG dd dejZdS )    N)RotaryEmbedding)GlobalLayerNormCumulativeLayerNorm	ScaleNorm)ScaledSinuEmbeddingFLASH_ShareA_FFConvMc                 C   sL   | dkrt ||ddS | dkrt|ddS | dkr!tjd|ddS t|S )	z0Just a wrapper to select the normalization type.glnT)elementwise_affineclnln   g:0yE>eps)r   r   nn	GroupNormBatchNorm1d)normdimshape r   _/home/ubuntu/.local/lib/python3.10/site-packages/funasr/models/mossformer/mossformer_encoder.pyselect_norm   s   
r   c                       s<   e Zd Zdddddddd fd	d

ZddddZ  ZS )MossformerBlock            @F皙?	scalenormT)
group_sizequery_key_dimexpansion_factorcausalattn_dropout	norm_typeshift_tokensc       	   
         s~   t    |dv sJ d|dkrtn|dkrtj| _ttddt f	ddt	|D | _
d S )	N)r   	layernormz/norm_type must be one of scalenorm or layernormr   r&       r   c                    s&   g | ]}t  d 	qS ))	r   r   r    r!   r"   dropoutrotary_pos_emb
norm_klassr%   r   ).0_	r#   r"   r   r!   r   r+   r    r*   r%   r   r   
<listcomp><   s    z,MossformerBlock.__init__.<locals>.<listcomp>)super__init__r   r   	LayerNormr   r   min
ModuleListrangelayers)
selfr   depthr   r    r!   r"   r#   r$   r%   	__class__r.   r   r1      s   


zMossformerBlock.__init__Nmaskc                C   s(   d}| j D ]}|||d}|d }q|S )Nr   r;   r   )r6   )r7   xr<   iiflashr   r   r   forwardL   s
   

zMossformerBlock.forward)__name__
__module____qualname__r1   r@   __classcell__r   r   r9   r   r      s    -r   c                       s6   e Zd ZdZ						d fdd	Zd	d
 Z  ZS )MossFormer_MaskNeta  The MossFormer module for computing output masks.

    Arguments
    ---------
    in_channels : int
        Number of channels at the output of the encoder.
    out_channels : int
        Number of channels that would be inputted to the intra and inter blocks.
    num_blocks : int
        Number of layers of Dual Computation Block.
    norm : str
        Normalization type.
    num_spks : int
        Number of sources (speakers).
    skip_around_intra : bool
        Skip connection around intra.
    use_global_pos_enc : bool
        Global positional encodings.
    max_length : int
        Maximum sequence length.

    Example
    ---------
    >>> mossformer_block = MossFormerM(1, 64, 8)
    >>> mossformer_masknet = MossFormer_MaskNet(64, 64, intra_block, num_spks=2)
    >>> x = torch.randn(10, 64, 2000)
    >>> x = mossformer_masknet(x)
    >>> x.shape
    torch.Size([2, 10, 64, 2000])
       r      T N  c	           	         s   t t|   || _|| _t||d| _tj||ddd| _	|| _
| j
r)t|| _t||||d| _tj||| dd| _tj||ddd| _t | _t | _tt||dt | _tt||dt | _d S )N   r   F)bias)skip_around_intra)kernel_size)r0   rE   r1   num_spks
num_blocksr   r   r   Conv1dconv1d_encoderuse_global_pos_encr   pos_encComputation_Blockmdl
conv1d_outconv1_decoderPReLUpreluReLU
activation
SequentialTanhoutputSigmoidoutput_gate)	r7   in_channelsout_channelsrN   r   rM   rK   rQ   
max_lengthr9   r   r   r1   t   s(   


 zMossFormer_MaskNet.__init__c           	      C   s   |  |}| |}| jr$|}|dd}| |}|dd}|| }| |}| |}| |}|j\}}}|	|| j
 d|}| || | }| |}|j\}}}|	|| j
||}| |}|dd}|S )a  Returns the output tensor.

        Arguments
        ---------
        x : torch.Tensor
            Input tensor of dimension [B, N, S].

        Returns
        -------
        out : torch.Tensor
            Output tensor of dimension [spks, B, N, S]
            where, spks = Number of speakers
               B = Batchsize,
               N = number of filters
               S = the number of time frames
        r   r   )r   rP   rQ   	transposerR   rT   rX   rU   r   viewrM   r]   r_   rV   rZ   )	r7   r=   baseembBr-   SNLr   r   r   r@      s(   







zMossFormer_MaskNet.forward)rF   r   rG   TTrH   rA   rB   rC   __doc__r1   r@   rD   r   r   r9   r   rE   T   s    #$rE   c                       s*   e Zd ZdZd	 fdd	Zdd Z  ZS )
MossFormerEncodera  Convolutional Encoder Layer.

    Arguments
    ---------
    kernel_size : int
        Length of filters.
    in_channels : int
        Number of  input channels.
    out_channels : int
        Number of output channels.

    Example
    -------
    >>> x = torch.randn(2, 1000)
    >>> encoder = Encoder(kernel_size=4, out_channels=64)
    >>> h = encoder(x)
    >>> h.shape
    torch.Size([2, 64, 499])
    rG   @   r   c                    s4   t t|   tj||||d ddd| _|| _d S )NrG   r   F)r`   ra   rL   stridegroupsrJ   )r0   rn   r1   r   rO   conv1dr`   )r7   rL   ra   r`   r9   r   r   r1      s   
zMossFormerEncoder.__init__c                 C   s0   | j dkrtj|dd}| |}t|}|S )a  Return the encoded output.

        Arguments
        ---------
        x : torch.Tensor
            Input tensor with dimensionality [B, L].
        Return
        ------
        x : torch.Tensor
            Encoded tensor with dimensionality [B, N, T_out].

        where B = Batchsize
              L = Number of timepoints
              N = Number of filters
              T_out = Number of timepoints at the output of the encoder
        r   r(   )r`   torch	unsqueezerr   Frelu)r7   r=   r   r   r   r@      s
   


zMossFormerEncoder.forward)rG   ro   r   rl   r   r   r9   r   rn      s    rn   c                       s6   e Zd ZdZ						d fdd		Zd
d Z  ZS )MossFormerMa  This class implements the transformer encoder.

    Arguments
    ---------
    num_blocks : int
        Number of mossformer blocks to include.
    d_model : int
        The dimension of the input embedding.
    attn_dropout : float
        Dropout for the self-attention (Optional).
    group_size: int
        the chunk size
    query_key_dim: int
        the attention vector dimension
    expansion_factor: int
        the expansion factor for the linear projection in conv module
    causal: bool
        true for causal / false for non causal

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512))
    >>> net = TransformerEncoder_MossFormerM(num_blocks=8, d_model=512)
    >>> output, _ = net(x)
    >>> output.shape
    torch.Size([8, 60, 512])
    NFr   r   r   r   c              	      s6   t    t|||||||d| _tj|dd| _d S )N)r   r8   r   r    r!   r"   r#   gư>r   )r0   r1   r   mossformerMr   r2   r   )r7   rN   d_modelr"   r   r    r!   r#   r9   r   r   r1   3  s   

	zMossFormerM.__init__c                 C   s   |  |}| |}|S )a  
        Arguments
        ----------
        src : torch.Tensor
            Tensor shape [B, L, N],
            where, B = Batchsize,
                   L = time points
                   N = number of filters
            The sequence to the encoder layer (required).
        src_mask : tensor
            The mask for the src sequence (optional).
        src_key_padding_mask : tensor
            The mask for the src keys per batch (optional).
        )rx   r   )r7   srcr]   r   r   r   r@   J  s   

zMossFormerM.forward)NFr   r   r   r   rl   r   r   r9   r   rw     s     rw   c                       s.   e Zd ZdZ		d fdd	Zdd Z  ZS )	rS   a  Computation block for dual-path processing.

    Arguments
    ---------
     out_channels : int
        Dimensionality of inter/intra model.
     norm : str
        Normalization type.
     skip_around_intra : bool
        Skip connection around the intra layer.

    Example
    ---------
        >>> comp_block = Computation_Block(64)
        >>> x = torch.randn(10, 64, 100)
        >>> x = comp_block(x)
        >>> x.shape
        torch.Size([10, 64, 100])
    r   Tc                    sF   t t|   t||d| _|| _|| _|d ur!t||d| _d S d S )N)rN   ry   rI   )	r0   rS   r1   rw   	intra_mdlrK   r   r   
intra_norm)r7   rN   ra   r   rK   r9   r   r   r1   w  s   zComputation_Block.__init__c                 C   sd   |j \}}}|ddd }| |}|ddd }| jdur'| |}| jr.|| }|}|S )ao  Returns the output tensor.

        Arguments
        ---------
        x : torch.Tensor
            Input tensor of dimension [B, N, S].


        Return
        ---------
        out: torch.Tensor
            Output tensor of dimension [B, N, S].
            where, B = Batchsize,
               N = number of filters
               S = sequence time index
        r   rG   r   N)r   permute
contiguousr{   r   r|   rK   )r7   r=   rh   rj   ri   intraoutr   r   r   r@     s   


zComputation_Block.forward)r   Trl   r   r   r9   r   rS   b  s    rS   )rs   torch.nnr   torch.nn.functional
functionalru   rotary_embedding_torchr   $funasr.models.transformer.layer_normr   r   r   #funasr.models.transformer.embeddingr   #funasr.models.mossformer.mossformerr   r   Moduler   rE   rn   rw   rS   r   r   r   r   <module>   s"    6 <M