o
    ߥi6                     @   s4  d dl Z d dlZd dlmZmZ d dlZd dlmZ d dlm  m	Z
 d dlmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZ d dlmZ d	Zejejejd
G dd deZdd ZG dd dej Z!G dd dej"Z#G dd dZ$G dd dej Z%G dd dej Z&G dd dej Z'dS )    N)AnyDict)Models)MODELS
TorchModel)MossFormerModuleScaledSinuEmbedding)CumulativeLayerNormGlobalLayerNorm)Tensor)Tasks:0yE>)module_namec                       sR   e Zd ZdZdef fddZdedeeef fddZ	dd
dZ
dd Z  ZS )
MossFormerzqLibrary to support MossFormer speech separation.

        Args:
            model_dir (str): the model path.
    	model_dirc                    s   t  j|g|R i | t|d |d d| _t|d d|d |d |d d| _t|d |d t|d	 |d
 |d |d |d |d |d |d |d d| _|d | _	d S )Nkernel_sizeout_channels)r   r   in_channels   stridebias)r   r   r   r   r   
num_blocksd_modelattn_dropout
group_sizequery_key_dimexpansion_factorcausalnormnum_spks)r   r   )
super__init__EncoderencoderDecoderdecoderMossFormerMaskNetMossFormerMmask_netr   )selfr   argskwargs	__class__ a/home/ubuntu/.local/lib/python3.10/site-packages/modelscope/models/audio/separation/mossformer.pyr!   !   s0   	zMossFormer.__init__inputsreturnc                    s     |} |}t|g j }|| tj fddt jD dd}|d}|d}||krEt	|ddd|| f}|S |d d d |d d f }|S )Nc                    s    g | ]}  | d qS ))r%   	unsqueeze).0ir)   sep_hr.   r/   
<listcomp>?   s    z&MossFormer.forward.<locals>.<listcomp>r2   dimr   r   )
r#   r(   torchstackr   catrangesizeFpad)r)   r0   mix_west_mask
est_sourcet_origint_estr.   r6   r/   forward7   s"   



zMossFormer.forwardNc                 C   s   |s| j }|std}| jjtjtj|d|ddd | j	jtjtj|d|ddd | j
jtjtj|d|ddd d S )Ncpuzencoder.bin)map_locationT)strictzdecoder.binzmasknet.bin)r   r;   devicer#   load_state_dictloadospathjoinr%   r(   )r)   	load_pathrK   r.   r.   r/   load_check_pointN   s,   

zMossFormer.load_check_pointc                 C   s   t | j| j| jdS )N)r#   r%   masknet)dictr#   r%   r(   )r)   r.   r.   r/   as_dict`   s   zMossFormer.as_dict)NN)__name__
__module____qualname____doc__strr!   r   r   r   rG   rR   rU   __classcell__r.   r.   r,   r/   r      s    
r   c                 C   sL   | dkrt ||ddS | dkrt|ddS | dkr!tjd|ddS t|S )	z5Just a wrapper to select the normalization type.
    glnT)elementwise_affineclnlnr   r   eps)r
   r	   nn	GroupNormBatchNorm1d)r   r:   shaper.   r.   r/   select_norme   s   
rf   c                       sF   e Zd ZdZ			ddededef fdd	Zd
ejfddZ  Z	S )r"   ab  Convolutional Encoder Layer.

    Args:
        kernel_size: Length of filters.
        in_channels: Number of  input channels.
        out_channels: Number of output channels.

    Examples:

    >>> x = torch.randn(2, 1000)
    >>> encoder = Encoder(kernel_size=4, out_channels=64)
    >>> h = encoder(x)
    >>> h.shape # torch.Size([2, 64, 499])
       @   r   r   r   r   c                    s4   t t|   tj||||d ddd| _|| _d S )Nrg   r   F)r   r   r   r   groupsr   )r    r"   r!   rb   Conv1dconv1dr   )r)   r   r   r   r,   r.   r/   r!      s   
zEncoder.__init__xc                 C   s0   | j dkrtj|dd}| |}t|}|S )ay  Return the encoded output.

        Args:
            x: Input tensor with dimensionality [B, L].

        Returns:
            Encoded tensor with dimensionality [B, N, T_out].
            where B = Batchsize
                  L = Number of timepoints
                  N = Number of filters
                  T_out = Number of timepoints at the output of the encoder
        r   r9   )r   r;   r3   rk   r@   relur)   rl   r.   r.   r/   rG      s
   


zEncoder.forward)rg   rh   r   )
rV   rW   rX   rY   intr!   r;   r   rG   r[   r.   r.   r,   r/   r"   s   s    r"   c                       s,   e Zd ZdZ fddZ fddZ  ZS )r$   a  A decoder layer that consists of ConvTranspose1d.

    Args:
        kernel_size: Length of filters.
        in_channels: Number of  input channels.
        out_channels: Number of output channels.

    Example
    ---------
    >>> x = torch.randn(2, 100, 1000)
    >>> decoder = Decoder(kernel_size=4, in_channels=100, out_channels=1)
    >>> h = decoder(x)
    >>> h.shape
    torch.Size([2, 1003])
    c                    s   t t| j|i | d S N)r    r$   r!   )r)   r*   r+   r,   r.   r/   r!      s   zDecoder.__init__c                    sr   |  dvrtd| jt |  dkr|nt|d}t|  dkr2tj|dd}|S t|}|S )zReturn the decoded output.

        Args:
            x: Input tensor with dimensionality [B, N, L].
            where, B = Batchsize,
                   N = number of filters
                   L = time points
        )rg      z{} accept 3/4D tensor as inputrq   r   r9   )	r:   RuntimeErrorformatrV   r    rG   r;   r3   squeezern   r,   r.   r/   rG      s   
$
zDecoder.forward)rV   rW   rX   rY   r!   rG   r[   r.   r.   r,   r/   r$      s    r$   c                   @   s    e Zd ZdZdd Zdd ZdS )IdentityBlockzThis block is used when we want to have identity transformation within the Dual_path block.

    Example
    -------
    >>> x = torch.randn(10, 100)
    >>> IB = IdentityBlock()
    >>> xhat = IB(x)
    c                 K   s   d S rp   r.   )r)   r+   r.   r.   r/   _init__      zIdentityBlock._init__c                 C   s   |S rp   r.   rn   r.   r.   r/   __call__   rw   zIdentityBlock.__call__N)rV   rW   rX   rY   rv   rx   r.   r.   r.   r/   ru      s    	ru   c                       s>   e Zd ZdZ						d fdd		Zd
ejfddZ  ZS )r'   a  This class implements the transformer encoder.

    Args:
    num_blocks : int
        Number of mossformer blocks to include.
    d_model : int
        The dimension of the input embedding.
    attn_dropout : float
        Dropout for the self-attention (Optional).
    group_size: int
        the chunk size
    query_key_dim: int
        the attention vector dimension
    expansion_factor: int
        the expansion factor for the linear projection in conv module
    causal: bool
        true for causal / false for non causal

    Example
    -------
    >>> import torch
    >>> x = torch.rand((8, 60, 512)) #B, S, N
    >>> net = MossFormerM(num_blocks=8, d_model=512)
    >>> output, _ = net(x)
    >>> output.shape
    torch.Size([8, 60, 512])
    N皙?            @Fc           	   	      sB   t    t|||||||d| _dd l}|jjj|dd| _d S )N)r:   depthr   r   r   r   r   r   gư>r`   )	r    r!   r   mossformerMspeechbrainnnetnormalization	LayerNormr   )	r)   r   r   r   r   r   r   r   sbr,   r.   r/   r!     s   
zMossFormerM.__init__srcc                 C   s   |  |}| |}|S )z
        Args:
            src: Tensor shape [B, S, N],
            where, B = Batchsize,
                   S = time points
                   N = number of filters
            The sequence to the encoder layer (required).
        )r~   r   )r)   r   outputr.   r.   r/   rG     s   
	
zMossFormerM.forward)Nry   rz   r{   r|   F	rV   rW   rX   rY   r!   r;   r   rG   r[   r.   r.   r,   r/   r'      s    r'   c                       s6   e Zd ZdZ		d	 fdd	ZdejfddZ  ZS )
ComputeAttentionaR  Computation block for dual-path processing.

    Args:
    att_mdl : torch.nn.module
        Model to process within the chunks.
     out_channels : int
        Dimensionality of attention model.
     norm : str
        Normalization type.
     skip_connection : bool
        Skip connection around the attention module.

    Example
    ---------
        >>> att_block = MossFormerM(num_blocks=8, d_model=512)
        >>> comp_att = ComputeAttention(att_block, 512)
        >>> x = torch.randn(10, 64, 512)
        >>> x = comp_att(x)
        >>> x.shape
        torch.Size([10, 64, 512])
    r_   Tc                    s>   t t|   || _|| _|| _|d urt||d| _d S d S )Nrq   )r    r   r!   att_mdlskip_connectionr   rf   att_norm)r)   r   r   r   r   r,   r.   r/   r!   <  s   zComputeAttention.__init__rl   c                 C   sX   | ddd }| |}| ddd }| jdur!| |}| jr(|| }|}|S )a  Returns the output tensor.

        Args:
            x: Input tensor of dimension [B, S, N].

        Returns:
            out: Output tensor of dimension [B, S, N].
            where, B = Batchsize,
               N = number of filters
               S = time points
        r   rg   r   N)permute
contiguousr   r   r   r   )r)   rl   att_outoutr.   r.   r/   rG   M  s   


zComputeAttention.forward)r_   Tr   r.   r.   r,   r/   r   %  s    r   c                       s:   e Zd ZdZ				d
 fdd	Zdejfdd	Z  ZS )r&   a  The dual path model which is the basis for dualpathrnn, sepformer, dptnet.

    Args:
    in_channels : int
        Number of channels at the output of the encoder.
    out_channels : int
        Number of channels that would be inputted to the intra and inter blocks.
    att_model : torch.nn.module
        Attention model to process the input sequence.
    norm : str
        Normalization type.
    num_spks : int
        Number of sources (speakers).
    skip_connection : bool
        Skip connection around attention module.
    use_global_pos_enc : bool
        Global positional encodings.

    Example
    ---------
    >>> mossformer_block = MossFormerM(num_blocks=8, d_model=512)
    >>> mossformer_masknet = MossFormerMaskNet(64, 64, att_model, num_spks=2)
    >>> x = torch.randn(10, 64, 2000)
    >>> x = mossformer_masknet(x)
    >>> x.shape
    torch.Size([2, 10, 64, 2000])
    r_   rg   Tc                    s   t t|   || _t||d| _tj||ddd| _|| _	| j	r&t
|| _tt||||d| _tj||| dd| _tj||ddd| _t | _t | _tt||dt | _tt||dt | _d S )Nrq   r   F)r   )r   )r   )r    r&   r!   r   rf   r   rb   rj   conv1d_encoderuse_global_pos_encr   pos_enccopydeepcopyr   mdl
conv1d_outconv1_decoderPReLUpreluReLU
activation
SequentialTanhr   Sigmoidoutput_gate)r)   r   r   	att_modelr   r   r   r   r,   r.   r/   r!     s>   





zMossFormerMaskNet.__init__rl   c           	      C   s   |  |}| |}| jr$|}|dd}| |}|dd}|| }| |}| |}| |}|j\}}}|	|| j
 d|}| || | }| |}|j\}}}|	|| j
||}| |}|dd}|S )aQ  Returns the output tensor.

        Args:
            x: Input tensor of dimension [B, N, S].

        Returns:
            out: Output tensor of dimension [spks, B, N, S]
            where, spks = Number of speakers
               B = Batchsize,
               N = number of filters
               S = the number of time frames
        r   r2   r   )r   r   r   	transposer   r   r   r   re   viewr   r   r   r   r   )	r)   rl   baseembb_snLr.   r.   r/   rG     s(   







zMossFormerMaskNet.forward)r_   rg   TTr   r.   r.   r,   r/   r&   k  s    !(r&   )(r   rN   typingr   r   r;   torch.nnrb   torch.nn.functional
functionalr@   modelscope.metainfor   modelscope.modelsr   r   3modelscope.models.audio.separation.mossformer_blockr   r   9modelscope.models.audio.separation.mossformer_conv_moduler	   r
   modelscope.models.baser   modelscope.utils.constantr   EPSregister_modulespeech_separation(speech_mossformer_separation_temporal_8kr   rf   Moduler"   ConvTranspose1dr$   ru   r'   r   r&   r.   r.   r.   r/   <module>   s2   K6*AF