o
    %ݫiX                     @   s   d Z ddlZddlZddlmZ ddlm  mZ ddl	m
Z
 ddlmZmZmZ ee jZG dd dejZG dd dejZG d	d
 d
ejZG dd dejZG dd dejZG dd dejZdS )zKLibrary for the Resource-Efficient Sepformer.

Authors
 * Cem Subakan 2022
    N)select_norm)PositionalEncodingTransformerEncoderget_lookahead_maskc                       s2   e Zd ZdZ				d
 fdd	Zdd	 Z  ZS )MemLSTMa  the Mem-LSTM of SkiM --

    Note: This is taken from the SkiM implementation in ESPNet toolkit and modified for compatibility with SpeechBrain.

    Arguments
    ---------
    hidden_size: int
        Dimension of the hidden state.
    dropout: float
        dropout ratio. Default is 0.
    bidirectional: bool
        Whether the LSTM layers are bidirectional.
        Default is False.
    mem_type: str
        'hc', 'h', 'c', or 'id'
        This controls whether the hidden (or cell) state of
        SegLSTM will be processed by MemLSTM.
        In 'id' mode, both the hidden and cell states will
        be identically returned.
    norm_type: str
        'gln', 'cln'
        This selects the type of normalization
        cln is for causal implementation

    Example
    -------
    >>> x = (torch.randn(1, 5, 64), torch.randn(1, 5, 64))
    >>> block = MemLSTM(64)
    >>> x = block(x, 5)
    >>> x[0].shape
    torch.Size([1, 5, 64])
            Fhcclnc              	      s   t    || _|| _t|d | | _|| _|dv s"J d| |dv r?t| j| jd| jd||d| _t	|| jdt
d| _|d	v r^t| j| jd| jd||d| _t	|| jdt
d| _d S d S )
N   )r   hcidz4only support 'hc', 'h', 'c' and 'id', current type: )r   r   LSTM)
input_sizehidden_channels
num_layersoutsizernn_typedropoutbidirectional   normdimshapeeps)r   r   )super__init__hidden_sizer   intr   mem_type
SBRNNBlockh_netr   EPSh_normc_netc_norm)selfr   r   r   r    	norm_type	__class__ X/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/resepformer.pyr   9   sF   





zMemLSTM.__init__c                 C   s  | j dkr|}n|\}}|j\}}}|| }	|dd |	||| }|dd |	||| }| j dkrb|| | |dddddd }|| | 	|dddddd }n=| j dkr|| | |dddddd }t
|}n| j dkrt
|}|| | 	|dddddd }||	| ||dd }||	| ||dd }||f}| jsg }
|D ]$}t
|}|dddd	ddf |ddddddf< |
| qt|
}|S )
a-  The forward function for the memory RNN

        Arguments
        ---------
        hc : tuple
            (h, c), tuple of hidden and cell states from SegLSTM
            shape of h and c: (d, B*S, H)
                where d is the number of directions
                      B is the batchsize
                      S is the number chunks
                      H is the latent dimensionality
        S : int
            S is the number of chunks

        Returns
        -------
        ret_val : torch.Tensor
            The output of memory RNN
        r   r
   r   r      r   r   N)r    r   	transpose
contiguousviewr$   r"   permuter&   r%   torch
zeros_liker   appendtuple)r'   r   Sret_valr   r   dBSHBcausal_ret_valxx_r+   r+   r,   forwardk   sF   





0zMemLSTM.forward)r   Fr   r	   __name__
__module____qualname____doc__r   r@   __classcell__r+   r+   r)   r,   r      s    $2r   c                       0   e Zd ZdZ			d	 fdd	Zdd Z  ZS )
SegLSTMaM  the Segment-LSTM of SkiM

    Note: This is taken from the SkiM implementation in ESPNet toolkit and modified for compatibility with SpeechBrain.

    Arguments
    ---------
    input_size: int,
        dimension of the input feature.
        The input should have shape (batch, seq_len, input_size).
    hidden_size: int,
        dimension of the hidden state.
    dropout: float,
        dropout ratio. Default is 0.
    bidirectional: bool,
        whether the LSTM layers are bidirectional.
        Default is False.
    norm_type: str
        One of gln, cln.
        This selects the type of normalization
        cln is for causal implementation.

    Example
    -------
    >>> x = torch.randn(3, 20, 64)
    >>> hc = None
    >>> seglstm = SegLSTM(64, 64)
    >>> y = seglstm(x, hc)
    >>> y[0].shape
    torch.Size([3, 20, 64])
    r   FcLNc                    sr   t    || _|| _t|d | _tj||dd|d| _tj	|d| _
t|| j || _t||dtd| _d S )Nr
   T)batch_firstr   )pr   r   )r   r   r   r   r   num_directionnnr   lstmDropoutr   Linearprojr   r#   r   )r'   r   r   r   r   r(   r)   r+   r,   r      s    
zSegLSTM.__init__c                 C   s   |j \}}}|du r&| j}t||| j|j}t||| j|j}n|\}}| |||f\}	\}}| |	}	| 	|	
 d|	j d |j }	| |	dddddd}
||
 }	|	||ffS )a  The forward function of the Segment LSTM

        Arguments
        ---------
        input : torch.Tensor
            shape [B*S, T, H]
            where B is the batchsize
                  S is the number of chunks
                  T is the chunks size
                  H is the latent dimensionality
        hc : tuple
            tuple of hidden and cell states from SegLSTM
            shape of h and c: (d, B*S, H)
                where d is the number of directions
                      B is the batchsize
                      S is the number chunks
                      H is the latent dimensionality

        Returns
        -------
        output: torch.Tensor
            Output of Segment LSTM
        (h, c): tuple
            Same as hc input
        Nr.   r-   r   r
   )r   rL   r3   zerosr   todevicerN   r   rQ   r0   r1   r   r2   )r'   inputr   r<   Tr;   r9   r   r   outputoutput_normr+   r+   r,   r@      s   
zSegLSTM.forward)r   FrI   rA   r+   r+   r)   r,   rH      s    #rH   c                       rG   )
r!   a  RNNBlock with output layer.

    Arguments
    ---------
    input_size : int
        Dimensionality of the input features.
    hidden_channels : int
        Dimensionality of the latent layer of the rnn.
    num_layers : int
        Number of the rnn layers.
    outsize : int
        Number of dimensions at the output of the linear layer
    rnn_type : str
        Type of the the rnn cell.
    dropout : float
        Dropout rate
    bidirectional : bool
        If True, bidirectional.

    Example
    -------
    >>> x = torch.randn(10, 100, 64)
    >>> rnn = SBRNNBlock(64, 100, 1, 128, bidirectional=True)
    >>> x = rnn(x)
    >>> x.shape
    torch.Size([10, 100, 128])
    r   r   Tc           	         sF   t    tt||||||d| _|rd| n|}t||| _d S )N)r   r   r   r   r-   )r   r   getattrSBRNNmdlrM   rP   out)	r'   r   r   r   r   r   r   r   rnn_outsizer)   r+   r,   r   0  s   

zSBRNNBlock.__init__c                 C   s   |  |d }| |}|S )aJ  Returns the transformed output.

        Arguments
        ---------
        x : torch.Tensor
            [B, L, N]
            where, B = Batchsize,
                   N = number of filters
                   L = time points

        Returns
        -------
        out : torch.Tensor
            The transformed output.
        r   )r[   r\   )r'   r>   rnn_outr\   r+   r+   r,   r@   F  s   
zSBRNNBlock.forward)r   r   TrA   r+   r+   r)   r,   r!     s    "r!   c                       sD   e Zd ZdZ														d fd
d	Zdd Z  ZS )SBTransformerBlock_wnormandskipa  A wrapper for the SpeechBrain implementation of the transformer encoder.

    Arguments
    ---------
    num_layers : int
        Number of layers.
    d_model : int
        Dimensionality of the representation.
    nhead : int
        Number of attention heads.
    d_ffn : int
        Dimensionality of positional feed forward.
    input_shape : tuple
        Shape of input.
    kdim : int
        Dimension of the key (Optional).
    vdim : int
        Dimension of the value (Optional).
    dropout : float
        Dropout rate.
    activation : str
        Activation function.
    use_positional_encoding : bool
        If true we use a positional encoding.
    norm_before : bool
        Use normalization before transformations.
    attention_type : str
        Type of attention, default "regularMHA"
    causal : bool
        Whether to mask future information, default False
    use_norm : bool
        Whether to include norm in the block.
    use_skip : bool
        Whether to add skip connections in the block.
    norm_type : str
        One of "cln", "gln"

    Example
    -------
    >>> x = torch.randn(10, 100, 64)
    >>> block = SBTransformerBlock_wnormandskip(1, 64, 8)
    >>> x = block(x)
    >>> x.shape
    torch.Size([10, 100, 64])
       N皙?reluF
regularMHATglnc                    s   t    |
| _|	dkrtj}	n|	dkrtj}	ntd|| _t|||||||||	|||d| _	|| _
|| _|rAt||dtd| _|
rLt|dd| _d S d S )	Nrb   geluzunknown activation)r   nheadd_ffninput_shaped_modelkdimvdimr   
activationnormalize_beforecausalattention_typer   r   i )r   max_len)r   r   use_positional_encodingrM   ReLUGELU
ValueErrorrn   r   r[   use_normuse_skipr   r#   r   r   pos_enc)r'   r   ri   rf   rg   rh   rj   rk   r   rl   rq   norm_beforero   rn   ru   rv   r(   r)   r+   r,   r     sB   
z(SBTransformerBlock_wnormandskip.__init__c                 C   s   | j rt|nd}| jr| |}| j|| |dd }n	| j||dd }| jr8| |dddddd}| jr?|| }|S )aX  Returns the transformed output.

        Arguments
        ---------
        x : torch.Tensor
            Tensor shape [B, L, N],
            where, B = Batchsize,
                   L = time points
                   N = number of filters

        Returns
        -------
        out : torch.Tensor
            The transformed output.
        N)src_maskr   r-   r
   )	rn   r   rq   rw   r[   ru   r   r2   rv   )r'   r>   ry   rw   r\   r+   r+   r,   r@     s   
z'SBTransformerBlock_wnormandskip.forward)r`   NNNra   rb   FFrc   FTTrd   rA   r+   r+   r)   r,   r_   [  s"    3;r_   c                       sB   e Zd ZdZ								d fd	d
	Zdd Zdd Z  ZS )#ResourceEfficientSeparationPipelineaq  Resource Efficient Separation Pipeline Used for RE-SepFormer and SkiM

    Note: This implementation is a generalization of the ESPNET implementation of SkiM

    Arguments
    ---------
    input_size: int
        Dimension of the input feature.
        Input shape should be (batch, length, input_size)
    hidden_size: int
        Dimension of the hidden state.
    output_size: int
        Dimension of the output size.
    dropout: float
        Dropout ratio. Default is 0.
    num_blocks: int
        Number of basic SkiM blocks
    segment_size: int
        Segmentation size for splitting long features
    bidirectional: bool
        Whether the RNN layers are bidirectional.
    mem_type: str
        'hc', 'h', 'c', 'id' or None.
        This controls whether the hidden (or cell) state of SegLSTM
        will be processed by MemLSTM.
        In 'id' mode, both the hidden and cell states will
        be identically returned.
        When mem_type is None, the MemLSTM will be removed.
    norm_type: str
        One of gln or cln
        cln is for causal implementation.
    seg_model: class
        The model that processes the within segment elements
    mem_model: class
        The memory model that ensures continuity between the segments

    Example
    -------
    >>> x = torch.randn(10, 100, 64)
    >>> seg_mdl = SBTransformerBlock_wnormandskip(1, 64, 8)
    >>> mem_mdl = SBTransformerBlock_wnormandskip(1, 64, 8)
    >>> resepf_pipeline = ResourceEfficientSeparationPipeline(64, 64, 128, seg_model=seg_mdl, mem_model=mem_mdl)
    >>> out = resepf_pipeline.forward(x)
    >>> out.shape
    torch.Size([10, 100, 128])
    r   r-      Tavrd   Nc                    s   t    || _|| _|| _|| _|| _|| _|| _|	| _	|dv s(J d| t
g | _t|D ]}| jt|
 q2| jd ur[t
g | _t|d D ]}| jt| qOt
t
 t
||d| _d S )Nr   r   r   r   r|   Nz@only support 'hc', 'h', 'c', 'id', 'av' and None, current type: r
   )r   r   r   output_sizer   segment_sizer   
num_blocksr    r(   rM   
ModuleList	seg_modelranger5   copydeepcopy	mem_model
SequentialPReLUConv1d	output_fc)r'   r   r   r~   r   r   r   r   r    r(   r   r   ir)   r+   r,   r     s,   

	

z,ResourceEfficientSeparationPipeline.__init__c                 C   s  |j \}}}| j|d\}}||d| j|}|j \}}}}|| jks%J ||| ||}| jdkrDtj|j d d|j d |jd}	nd}	t	| j
D ]T}
t| jd j}|dkrc| j|
 ||	 }n|d	krr| j|
 ||	\}}	ntd
|
| j
d k r| jdkr|dd}	| j|
 |	ddd}	qK| j|
 |	|}	qK|||| |ddd|ddf }| |dddd}|S )a  The forward function of the ResourceEfficientSeparationPipeline

        This takes in a tensor of size [B, (S*K), D]

        Arguments
        ---------
        input : torch.Tensor
                Tensor shape [B, (S*K), D],
                where, B = Batchsize,
                       S = Number of chunks
                       K = Chunksize
                       D = number of features

        Returns
        -------
        output : torch.Tensor
            The separated tensor.
        )rU   r.   r|   r   r
   )rT   Nr_   rH   zUnsupported segment model classr-   )r   _padfeaturer1   r   reshaper    r3   rR   rT   r   r   typer   rB   rt   mean	unsqueezer   r2   r   r/   )r'   rU   r<   rV   Drestr7   KrW   r   r   seg_model_typer+   r+   r,   r@   C  s6   

(z+ResourceEfficientSeparationPipeline.forwardc                 C   sD   |j \}}}| j|| j  }|dkrtjj|ddd|f}||fS )az  
        Arguments
        ---------
        input : Tensor of size [B, T, D]
                    where B is Batchsize
                          T is the chunk length
                          D is the feature dimensionality

        Returns
        -------
        input : torch.Tensor
            Padded input
        rest : torch.Tensor
            Amount of padding
        r   )r   r   r3   rM   
functionalpad)r'   rU   r<   rV   r   r   r+   r+   r,   r   |  s
   z/ResourceEfficientSeparationPipeline._padfeature)r   r-   r{   Tr|   rd   NN)rB   rC   rD   rE   r   r@   r   rF   r+   r+   r)   r,   rz     s    4.9rz   c                       sl   e Zd ZdZ										
	
ddededededededededef fddZde	j
fddZ  ZS )ResourceEfficientSeparatoraw  Resource Efficient Source Separator
    This is the class that implements RE-SepFormer

    Arguments
    ---------
    input_dim: int
        Input feature dimension
    causal: bool
        Whether the system is causal.
    num_spk: int
        Number of target speakers.
    nonlinear: class
        the nonlinear function for mask estimation,
        select from 'relu', 'tanh', 'sigmoid'
    layer: int
        number of blocks. Default is 2 for RE-SepFormer.
    unit: int
        Dimensionality of the hidden state.
    segment_size: int
        Chunk size for splitting long features
    dropout: float
        dropout ratio. Default is 0.
    mem_type: str
        'hc', 'h', 'c', 'id', 'av'  or None.
        This controls whether a memory representation will be used to ensure continuity between segments.
        In 'av' mode, the summary state is is calculated by simply averaging over the time dimension of each segment
        In 'id' mode, both the hidden and cell states
        will be identically returned.
        When mem_type is None, the memory model will be removed.
    seg_model: class
        The model that processes the within segment elements
    mem_model: class
        The memory model that ensures continuity between the segments

    Example
    -------
    >>> x = torch.randn(10, 64, 100)
    >>> seg_mdl = SBTransformerBlock_wnormandskip(1, 64, 8)
    >>> mem_mdl = SBTransformerBlock_wnormandskip(1, 64, 8)
    >>> resepformer = ResourceEfficientSeparator(64, num_spk=3, mem_type='av', seg_model=seg_mdl, mem_model=mem_mdl)
    >>> out = resepformer.forward(x)
    >>> out.shape
    torch.Size([3, 10, 64, 100])
    Tr-   rb   r      r{   r   r   N	input_dimrn   num_spk	nonlinearlayerunitr   r   r    c                    s   t    || _|| _|	dvrtd|	t|||| ||| |r$dnd||	|
|d| _|dvr8td|tj	
 tj	 tj	 d| | _d S )Nr}   zNot supporting mem_type={}r	   rd   )r   r   r~   r   r   r   r(   r   r    r   r   )sigmoidrb   tanhzNot supporting nonlinear={})r   r   r   r   rt   formatrz   modelr3   rM   Sigmoidrr   Tanhr   )r'   r   rn   r   r   r   r   r   r   r    r   r   r)   r+   r,   r     s4   


z#ResourceEfficientSeparator.__init__inptc                 C   s`   | ddd}|j\}}}| |}||||| j}| |jdd}tdd |D }|S )zForward

        Arguments
        ---------
        inpt : torch.Tensor
            Encoded feature [B, T, N]

        Returns
        -------
        mask_tensor : torch.Tensor
        r   r-   r
   r   )r   c                 S   s   g | ]	}| d ddqS )r   r-   r
   )r2   ).0mr+   r+   r,   
<listcomp>  s    z6ResourceEfficientSeparator.forward.<locals>.<listcomp>)	r2   r   r   r   r   r   unbindr3   stack)r'   r   r<   rV   N	processedmasksmask_tensorr+   r+   r,   r@     s   
z"ResourceEfficientSeparator.forward)
Tr-   rb   r   r   r{   r   r   NN)rB   rC   rD   rE   r   boolstrfloatr   r3   Tensorr@   rF   r+   r+   r)   r,   r     s@    0	
.r   )rE   r   r3   torch.nnrM   speechbrain.nnet.RNNnnetRNNrZ   "speechbrain.lobes.models.dual_pathr   0speechbrain.lobes.models.transformer.Transformerr   r   r   finfoget_default_dtyper   r#   Moduler   rH   r!   r_   rz   r   r+   r+   r+   r,   <module>   s"     jH  0