o
    %ݫiw@                     @   s   d Z ddlZddlmZ ddlm  mZ ddlZddl	m
Z
 dZG dd dejZG dd dejZG d	d
 d
ejjjZG dd dejZG dd dejjZG dd dejjjZG dd dejZdd ZG dd dejZG dd dejZdS )z6 Implementation of a popular speech separation model.
    N)overlap_and_addg:0yE>c                       (   e Zd ZdZ fddZdd Z  ZS )Encodera  This class learns the adaptive frontend for the ConvTasnet model.

    Arguments
    ---------
    L : int
        The filter kernel size. Needs to be an odd number.
    N : int
        Number of dimensions at the output of the adaptive front end.

    Example
    -------
    >>> inp = torch.rand(10, 100)
    >>> encoder = Encoder(11, 20)
    >>> h = encoder(inp)
    >>> h.shape
    torch.Size([10, 20, 20])
    c                    s,   t    tjjjd|||d dd| _d S )N      F)in_channelsout_channelskernel_sizestridebias)super__init__sbnnetCNNConv1dconv1d_UselfLN	__class__ X/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/conv_tasnet.pyr   !   s   
zEncoder.__init__c                 C   s$   t |d}| |}t|}|S )a  
        Arguments
        ---------
        mixture : torch.Tensor
            Tensor shape is [M, T]. M is batch size. T is #samples

        Returns
        -------
        mixture_w : torch.Tensor
            Tensor shape is [M, K, N], where K = (T-L)/(L/2)+1 = 2T/L-1
        )torch	unsqueezer   Frelu)r   mixtureconv_out	mixture_wr   r   r   forward-   s   

zEncoder.forward__name__
__module____qualname____doc__r   r#   __classcell__r   r   r   r   r      s    r   c                       r   )DecoderaF  This class implements the decoder for the ConvTasnet.

    The separated source embeddings are fed to the decoder to reconstruct
    the estimated sources in the time domain.

    Arguments
    ---------
    L : int
        Number of bases to use when reconstructing.
    N : int
        Input size

    Example
    -------
    >>> L, C, N = 8, 2, 8
    >>> mixture_w = torch.randn(10, 100, N)
    >>> est_mask = torch.randn(10, 100, C, N)
    >>> Decoder = Decoder(L, N)
    >>> mixture_hat = Decoder(mixture_w, est_mask)
    >>> mixture_hat.shape
    torch.Size([10, 404, 2])
    c                    s*   t    || _tjjj||dd| _d S )NF)
input_size	n_neuronsr   )r   r   r   r   r   linearLinearbasis_signalsr   r   r   r   r   W   s
   
zDecoder.__init__c                 C   sZ   t |ddd|dd| }|dddd}| |}t|| jd }|dddS )a)  
        Arguments
        ---------
        mixture_w : torch.Tensor
            Tensor shape is [M, K, N].
        est_mask : torch.Tensor
            Tensor shape is [M, K, C, N].

        Returns
        -------
        est_source : torch.Tensor
            Tensor shape is [M, T, C].
        r   r   r      )r   r   repeatsizepermuter/   r   r   )r   r"   est_masksource_w
est_sourcer   r   r   r#   b   s   
zDecoder.forwardr$   r   r   r   r   r*   ?   s    r*   c                       s    e Zd ZdZ fddZ  ZS )TemporalBlocksSequentialam  
    A wrapper for the temporal-block layer to replicate it

    Arguments
    ---------
    input_shape : tuple
        Expected shape of the input.
    H : int
        The number of intermediate channels.
    P : int
        The kernel size in the convolutions.
    R : int
        The number of times to replicate the multilayer Temporal Blocks.
    X : int
        The number of layers of Temporal Blocks with different dilations.
    norm_type : str
        The type of normalization, in ['gLN', 'cLN'].
    causal : bool
        To use causal or non-causal convolutions, in [True, False].

    Example
    -------
    >>> x = torch.randn(14, 100, 10)
    >>> H, P, R, X = 10, 5, 2, 3
    >>> TemporalBlocks = TemporalBlocksSequential(
    ...     x.shape, H, P, R, X, 'gLN', False
    ... )
    >>> y = TemporalBlocks(x)
    >>> y.shape
    torch.Size([14, 100, 10])
    c                    s^   t  j|d t|D ]!}t|D ]}	d|	 }
| jt||dd|
||d| d|	 d	 qqd S )Ninput_shaper   r   sametemporalblock__r   r	   r
   paddingdilation	norm_typecausal
layer_name)r   r   rangeappendTemporalBlock)r   r9   HPRXr@   rA   rxr?   r   r   r   r      s"   z!TemporalBlocksSequential.__init__r%   r&   r'   r(   r   r)   r   r   r   r   r7   }   s     r7   c                       s0   e Zd ZdZ			d	 fdd	Zdd Z  ZS )
MaskNetu{  
    Arguments
    ---------
    N : int
        Number of filters in autoencoder.
    B : int
        Number of channels in bottleneck 1 × 1-conv block.
    H : int
        Number of channels in convolutional blocks.
    P : int
        Kernel size in convolutional blocks.
    X : int
        Number of convolutional blocks in each repeat.
    R : int
        Number of repeats.
    C : int
        Number of speakers.
    norm_type : str
        One of BN, gLN, cLN.
    causal : bool
        Causal or non-causal.
    mask_nonlinear : str
        Use which non-linear function to generate mask, in ['softmax', 'relu'].

    Example
    -------
    >>> N, B, H, P, X, R, C = 11, 12, 2, 5, 3, 1, 2
    >>> MaskNet = MaskNet(N, B, H, P, X, R, C)
    >>> mixture_w = torch.randn(10, 11, 100)
    >>> est_mask = MaskNet(mixture_w)
    >>> est_mask.shape
    torch.Size([2, 10, 11, 100])
    gLNFr   c                    sx   t    || _|
| _t|| _tjjj	||ddd| _
d d |f}t|||||||	| _tjjj	||| ddd| _d S )Nr   F)r   r   r	   r   )r   r   Cmask_nonlinearChannelwiseLayerNorm
layer_normr   r   r   r   bottleneck_conv1x1r7   temporal_conv_netmask_conv1x1)r   r   BrF   rG   rI   rH   rO   r@   rA   rP   in_shaper   r   r   r      s"   


zMaskNet.__init__c                 C   s   | ddd}| \}}}| |}| |}| |}| |}| ||| j|}| dddd}| j	dkrCt
j|dd}|S | j	dkrOt
|}|S td)	a  Keep this API same with TasNet.

        Arguments
        ---------
        mixture_w : torch.Tensor
            Tensor shape is [M, K, N], M is batch size.

        Returns
        -------
        est_mask : torch.Tensor
            Tensor shape is [M, K, C, N].
        r   r   r   r0   softmax)dimr   z$Unsupported mask non-linear function)r3   r2   rR   rS   rT   rU   
contiguousreshaperO   rP   r   rX   r   
ValueError)r   r"   MKr   yscorer4   r   r   r   r#      s"   







zMaskNet.forward)rN   Fr   r$   r   r   r   r   rM      s    +*rM   c                       s.   e Zd ZdZ		d fdd	Zdd Z  ZS )	rE   a  The conv1d compound layers used in Masknet.

    Arguments
    ---------
    input_shape : tuple
        The expected shape of the input.
    out_channels : int
        The number of intermediate channels.
    kernel_size : int
        The kernel size in the convolutions.
    stride : int
        Convolution stride in convolutional layers.
    padding : str
        The type of padding in the convolutional layers,
        (same, valid, causal). If "valid", no padding is performed.
    dilation : int
        Amount of dilation in convolutional layers.
    norm_type : str
        The type of normalization, in ['gLN', 'cLN'].
    causal : bool
        To use causal or non-causal convolutions, in [True, False].

    Example
    -------
    >>> x = torch.randn(14, 100, 10)
    >>> TemporalBlock = TemporalBlock(x.shape, 10, 11, 1, 'same', 1)
    >>> y = TemporalBlock(x)
    >>> y.shape
    torch.Size([14, 100, 10])
    rN   Fc	                    s   t    |\}	}
}tjjj|d| _| jjtjjj	|dddd | jjt
 dd | jjt||dd | jjt|||||||d	d
	 d S )Nr8   r   Fconvr   r	   r   rB   actrB   normDSconvr=   )r   r   r   r   
containers
SequentiallayersrD   r   r   nnPReLUchoose_normDepthwiseSeparableConv)r   r9   r   r	   r
   r>   r?   r@   rA   r]   r^   rV   r   r   r   r   B  s2   



zTemporalBlock.__init__c                 C   s   |}|  |}|| S )z
        Arguments
        ---------
        x : torch.Tensor
            Tensor shape is [M, K, B].

        Returns
        -------
        x : torch.Tensor
            Tensor shape is [M, K, B].
        )ri   )r   rK   residualr   r   r   r#   l  s   
zTemporalBlock.forwardrN   Fr$   r   r   r   r   rE   "  s    '*rE   c                       s&   e Zd ZdZ		d fdd	Z  ZS )rm   a  Building block for the Temporal Blocks of Masknet in ConvTasNet.

    Arguments
    ---------
    input_shape : tuple
        Expected shape of the input.
    out_channels : int
        Number of output channels.
    kernel_size : int
        The kernel size in the convolutions.
    stride : int
        Convolution stride in convolutional layers.
    padding : str
        The type of padding in the convolutional layers,
        (same, valid, causal). If "valid", no padding is performed.
    dilation : int
        Amount of dilation in convolutional layers.
    norm_type : str
        The type of normalization, in ['gLN', 'cLN'].
    causal : bool
        To use causal or non-causal convolutions, in [True, False].

    Example
    -------
    >>> x = torch.randn(14, 100, 10)
    >>> DSconv = DepthwiseSeparableConv(x.shape, 10, 11, 1, 'same', 1)
    >>> y = DSconv(x)
    >>> y.shape
    torch.Size([14, 100, 10])

    rN   Fc	                    s   t  j|d |\}	}
}|r||d  }d}d}nd}| jtjjj||||||dd|d
 |r8| jt|d	d
 | jt	 dd
 | jt
||dd
 | jtjjj|dddd d S )Nr8   r   rA   r:   r   Fconv_0)	r   r	   r
   r>   r?   groupsr   rB   default_paddingchomprd   rc   conv_1rb   )r   r   rD   r   r   r   r   Chomp1drj   rk   rl   )r   r9   r   r	   r
   r>   r?   r@   rA   	batchsizetimer   
paddingvalrr   r   r   r   r     s<   

zDepthwiseSeparableConv.__init__ro   rL   r   r   r   r   rm   }  s
    (rm   c                       r   )ru   a  This class cuts out a portion of the signal from the end.

    It is written as a class to be able to incorporate it inside a sequential
    wrapper.

    Arguments
    ---------
    chomp_size : int
        The size of the portion to discard (in samples).

    Example
    -------
    >>> x = torch.randn(10, 110, 5)
    >>> chomp = Chomp1d(10)
    >>> x_chomped = chomp(x)
    >>> x_chomped.shape
    torch.Size([10, 100, 5])
    c                    s   t    || _d S )N)r   r   
chomp_size)r   ry   r   r   r   r     s   

zChomp1d.__init__c                 C   s"   |ddd| j  ddf  S )z
        Arguments
        ---------
        x : torch.Tensor
            Tensor shape is [M, Kpad, H].

        Returns
        -------
        x : torch.Tensor
            Tensor shape is [M, K, H].
        N)ry   rZ   )r   rK   r   r   r   r#     s   "zChomp1d.forwardr$   r   r   r   r   ru     s    ru   c                 C   s*   | dkrt |S | dkrt|S t|S )aY  This function returns the chosen normalization type.

    Arguments
    ---------
    norm_type : str
        One of ['gLN', 'cLN', 'batchnorm'].
    channel_size : int
        Number of channels.

    Returns
    -------
    Constructed layer of the chosen type

    Example
    -------
    >>> choose_norm('gLN', 10)
    GlobalLayerNorm()
    rN   cLN)GlobalLayerNormrQ   rj   BatchNorm1d)r@   channel_sizer   r   r   rl     s
   
rl   c                       0   e Zd ZdZ fddZdd Zdd Z  ZS )rQ   ae  Channel-wise Layer Normalization (cLN).

    Arguments
    ---------
    channel_size : int
        Number of channels in the normalization dimension (the third dimension).

    Example
    -------
    >>> x = torch.randn(2, 3, 3)
    >>> norm_func = ChannelwiseLayerNorm(3)
    >>> x_normalized = norm_func(x)
    >>> x.shape
    torch.Size([2, 3, 3])
    c                    B   t    ttdd|| _ttdd|| _|   d S Nr   	r   r   rj   	Parameterr   Tensorgammabetareset_parametersr   r}   r   r   r   r   &     
zChannelwiseLayerNorm.__init__c                 C      | j jd | jj  dS zResets the parameters.r   Nr   datafill_r   zero_r   r   r   r   r   ,     z%ChannelwiseLayerNorm.reset_parametersc                 C   sJ   t j|ddd}t j|dddd}| j||  t |t d | j }|S )z
        Args:
            y: [M, K, N], M is batch size, N is channel size, K is length
        Returns:
            cLN_y: [M, K, N]
        r   TrY   keepdimF)rY   r   unbiased      ?)r   meanvarr   powEPSr   )r   r_   r   r   cLN_yr   r   r   r#   1  s   $zChannelwiseLayerNorm.forwardr%   r&   r'   r(   r   r   r#   r)   r   r   r   r   rQ     
    rQ   c                       r~   )r{   a<  Global Layer Normalization (gLN).

    Arguments
    ---------
    channel_size : int
        Number of channels in the third dimension.

    Example
    -------
    >>> x = torch.randn(2, 3, 3)
    >>> norm_func = GlobalLayerNorm(3)
    >>> x_normalized = norm_func(x)
    >>> x.shape
    torch.Size([2, 3, 3])
    c                    r   r   r   r   r   r   r   r   O  r   zGlobalLayerNorm.__init__c                 C   r   r   r   r   r   r   r   r   U  r   z GlobalLayerNorm.reset_parametersc                 C   sd   |j dddj ddd}t|| dj dddj ddd}| j||  t|t d | j }|S )a   
        Arguments
        ---------
        y : torch.Tensor
            Tensor shape [M, K, N]. M is batch size, N is channel size, and K is length.

        Returns
        -------
        gLN_y : torch.Tensor
            Tensor shape [M, K. N]
        r   Tr   r   r   )r   r   r   r   r   r   )r   r_   r   r   gLN_yr   r   r   r#   Z  s   $zGlobalLayerNorm.forwardr   r   r   r   r   r{   >  r   r{   )r(   r   torch.nnrj   torch.nn.functional
functionalr   speechbrainr   (speechbrain.processing.signal_processingr   r   Moduler   r*   r   rg   rh   r7   rM   rE   rm   ru   rl   rQ   r{   r   r   r   r   <module>   s"    1>3r[U')