o
    %ݫiG                     @   s  d Z ddlZddlmZ ddlm  mZ ddlmZ ddl	m
Z ddlmZ ddlmZ G dd deZ
G d	d
 d
eZG dd dejZG dd dejjZG dd dejZG dd dejZG dd dejZG dd dejjZG dd dejjZdS )zQA popular speaker recognition and diarization model.

Authors
 * Hwidong Na 2020
    N)length_to_mask)Conv1d)Linear)BatchNorm1dc                           e Zd ZdZ fddZ  ZS )r   z=1D convolution. Skip transpose is used to improve efficiency.c                       t  j|ddi| d S Nskip_transposeTsuper__init__selfargskwargs	__class__ W/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/ECAPA_TDNN.pyr         zConv1d.__init____name__
__module____qualname____doc__r   __classcell__r   r   r   r   r          r   c                       r   )r   zE1D batch normalization. Skip transpose is used to improve efficiency.c                    r   r   r
   r   r   r   r   r      r   zBatchNorm1d.__init__r   r   r   r   r   r      r   r   c                       s2   e Zd ZdZejddf fdd	Zdd Z  ZS )	TDNNBlocka  An implementation of TDNN.

    Arguments
    ---------
    in_channels : int
        Number of input channels.
    out_channels : int
        The number of output channels.
    kernel_size : int
        The kernel size of the TDNN blocks.
    dilation : int
        The dilation of the TDNN block.
    activation : torch class
        A class for constructing the activation layers.
    groups : int
        The groups size of the TDNN blocks.
    dropout : float
        Rate of channel dropout during training.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> layer = TDNNBlock(64, 64, kernel_size=3, dilation=1)
    >>> out_tensor = layer(inp_tensor).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 120, 64])
               c                    sD   t    t|||||d| _| | _t|d| _tj|d| _	d S )N)in_channelsout_channelskernel_sizedilationgroups
input_size)p)
r   r   r   conv
activationr   normnn	Dropout1ddropout)r   r    r!   r"   r#   r)   r$   r-   r   r   r   r   =   s   

zTDNNBlock.__init__c              	   C   s   |  | | | |S :Processes the input tensor x and returns an output tensor.)r-   r*   r)   r(   )r   xr   r   r   forwardS   s   zTDNNBlock.forward)	r   r   r   r   r+   ReLUr   r1   r   r   r   r   r   r       s    "r   c                       2   e Zd ZdZ				d
 fdd	Zdd	 Z  ZS )Res2NetBlocka  An implementation of Res2NetBlock w/ dilation.

    Arguments
    ---------
    in_channels : int
        The number of channels expected in the input.
    out_channels : int
        The number of output channels.
    scale : int
        The scale of the Res2Net block.
    kernel_size: int
        The kernel size of the Res2Net block.
    dilation : int
        The dilation of the Res2Net block.
    dropout : float
        Rate of channel dropout during training.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> layer = Res2NetBlock(64, 64, scale=4, dilation=3)
    >>> out_tensor = layer(inp_tensor).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 120, 64])
          r   r   c                    sn   t    || dksJ || dksJ || || t fddt|d D | _|| _d S )Nr   c              	      s   g | ]}t  d qS ))r"   r#   r-   )r   ).0ir#   r-   hidden_channel
in_channelr"   r   r   
<listcomp>   s    z)Res2NetBlock.__init__.<locals>.<listcomp>r   )r   r   r+   
ModuleListrangeblocksscale)r   r    r!   r@   r"   r#   r-   r   r9   r   r   s   s   
	

zRes2NetBlock.__init__c                 C   s   g }t tj|| jddD ])\}}|dkr|}n|dkr&| j|d  |}n| j|d  || }|| qtj|dd}|S )r/   r   dimr   )	enumeratetorchchunkr@   r?   appendcat)r   r0   yr8   x_iy_ir   r   r   r1      s   zRes2NetBlock.forward)r5   r6   r   r   r   r   r   r   r   r1   r   r   r   r   r   r4   X   s    r4   c                       s*   e Zd ZdZ fddZdddZ  ZS )SEBlocka3  An implementation of squeeze-and-excitation block.

    Arguments
    ---------
    in_channels : int
        The number of input channels.
    se_channels : int
        The number of output channels after squeeze.
    out_channels : int
        The number of output channels.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> se_layer = SEBlock(64, 16, 64)
    >>> lengths = torch.rand((8,))
    >>> out_tensor = se_layer(inp_tensor, lengths).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 120, 64])
    c                    sJ   t    t||dd| _tjjdd| _t||dd| _tj	 | _
d S )Nr   r    r!   r"   T)inplace)r   r   r   conv1rD   r+   r2   reluconv2Sigmoidsigmoid)r   r    se_channelsr!   r   r   r   r      s   
zSEBlock.__init__Nc                 C   s   |j d }|dur+t|| ||jd}|d}|jddd}|| jddd| }n|jddd}| | |}| | 	|}|| S )r/   Nmax_lendevicer      TrB   keepdim)
shaper   rX   	unsqueezesummeanrP   rO   rS   rQ   )r   r0   lengthsLmasktotalsr   r   r   r1      s   

zSEBlock.forwardNrK   r   r   r   r   rL      s    rL   c                       s,   e Zd ZdZd	 fdd	Zd
ddZ  ZS )AttentiveStatisticsPoolinga  This class implements an attentive statistic pooling layer for each channel.
    It returns the concatenated mean and std of the input tensor.

    Arguments
    ---------
    channels: int
        The number of input channels.
    attention_channels: int
        The number of attention channels.
    global_context: bool
        Whether to use global context.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> asp_layer = AttentiveStatisticsPooling(64)
    >>> lengths = torch.rand((8,))
    >>> out_tensor = asp_layer(inp_tensor, lengths).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 1, 128])
       Tc                    s^   t    d| _|| _|rt|d |dd| _nt||dd| _t | _t	||dd| _
d S )Ng-q=r6   r   rM   )r   r   epsglobal_contextr   tdnnr+   Tanhtanhr   r(   )r   channelsattention_channelsri   r   r   r   r      s   

z#AttentiveStatisticsPooling.__init__Nc                 C   s(  |j d }d| jfdd}|du rtj|j d |jd}t|| ||jd}|d	}| jr_|jdd
d	 }|||| \}}|d
d	d	|}|d
d	d	|}tj|||gd	d}	n|}	| | | |	}	|	|dkt	d}	tj|	dd}	|||	\}}tj||fd	d}
|
d}
|
S )ae  Calculates mean and std for a batch (input tensor).

        Arguments
        ---------
        x : torch.Tensor
            Tensor of shape [N, C, L].
        lengths : torch.Tensor
            The corresponding relative lengths of the inputs.

        Returns
        -------
        pooled_stats : torch.Tensor
            mean and std of batch
        rU   rY   c                 S   s@   ||   |}t|| || d  ||}||fS )NrY   )r^   rD   sqrtr]   powclamp)r0   mrB   rh   r_   stdr   r   r   _compute_statistics	  s
   "z?AttentiveStatisticsPooling.forward.<locals>._compute_statisticsNr   rX   rV   r   TrZ   rA   z-inf)r\   rh   rD   onesrX   r   r]   ri   r^   floatrepeatrG   r(   rl   rj   masked_fillFsoftmax)r   r0   r`   ra   rt   rb   rc   r_   rs   attnpooled_statsr   r   r   r1      s(   


z"AttentiveStatisticsPooling.forward)rg   Tre   rK   r   r   r   r   rf      s    rf   c                       s>   e Zd ZdZddddejjddf fdd	Zdd	d
Z  Z	S )SERes2NetBlocka  An implementation of building block in ECAPA-TDNN, i.e.,
    TDNN-Res2Net-TDNN-SEBlock.

    Arguments
    ---------
    in_channels: int
        Expected size of input channels.
    out_channels: int
        The number of output channels.
    res2net_scale: int
        The scale of the Res2Net block.
    se_channels : int
        The number of output channels after squeeze.
    kernel_size: int
        The kernel size of the TDNN blocks.
    dilation: int
        The dilation of the Res2Net block.
    activation : torch class
        A class for constructing the activation layers.
    groups: int
        Number of blocked connections from input channels to output channels.
    dropout: float
        Rate of channel dropout during training.

    Example
    -------
    >>> x = torch.rand(8, 120, 64).transpose(1, 2)
    >>> conv = SERes2NetBlock(64, 64, res2net_scale=4)
    >>> out = conv(x).transpose(1, 2)
    >>> out.shape
    torch.Size([8, 120, 64])
    r5   rg   r   r   c
           
   	      s   t    || _t||dd|||	d| _t|||||| _t||dd|||	d| _t|||| _	d | _
||krAt||dd| _
d S d S )Nr   )r"   r#   r)   r$   r-   rM   )r   r   r!   r   tdnn1r4   res2net_blocktdnn2rL   se_blockshortcutr   )
r   r    r!   res2net_scalerT   r"   r#   r)   r$   r-   r   r   r   r   U  s@   
	
	zSERes2NetBlock.__init__Nc                 C   sF   |}| j r
|  |}| |}| |}| |}| ||}|| S r.   )r   r   r   r   r   )r   r0   r`   residualr   r   r   r1     s   



zSERes2NetBlock.forwardre   
r   r   r   r   rD   r+   r2   r   r1   r   r   r   r   r   r~   3  s    %-r~   c                       sX   e Zd ZdZddejjg dg dg ddddd	g d
df fdd	ZdddZ  Z	S )
ECAPA_TDNNa  An implementation of the speaker embedding model in a paper.
    "ECAPA-TDNN: Emphasized Channel Attention, Propagation and Aggregation in
    TDNN Based Speaker Verification" (https://arxiv.org/abs/2005.07143).

    Arguments
    ---------
    input_size : int
        Expected size of the input dimension.
    device : str
        Device used, e.g., "cpu" or "cuda".
    lin_neurons : int
        Number of neurons in linear layers.
    activation : torch class
        A class for constructing the activation layers.
    channels : list of ints
        Output channels for TDNN/SERes2Net layer.
    kernel_sizes : list of ints
        List of kernel sizes for each layer.
    dilations : list of ints
        List of dilations for kernels in each layer.
    attention_channels: int
        The number of attention channels.
    res2net_scale : int
        The scale of the Res2Net block.
    se_channels : int
        The number of output channels after squeeze.
    global_context: bool
        Whether to use global context.
    groups : list of ints
        List of groups for kernels in each layer.
    dropout : float
        Rate of channel dropout during training.

    Example
    -------
    >>> input_feats = torch.rand([5, 120, 80])
    >>> compute_embedding = ECAPA_TDNN(80, lin_neurons=192)
    >>> outputs = compute_embedding(input_feats)
    >>> outputs.shape
    torch.Size([5, 1, 192])
    cpu   )   r   r   r   i   )   r6   r6   r6   r   )r   rY   r6      r   rg   r5   T)r   r   r   r   r   r   c                    s<  t    t|t|ksJ t|t|ksJ || _t | _| jt||d |d |d ||d | t	dt|d D ]}| jt
||d  || |	|
|| || ||| |d	 q@t|d t|d  |d |d |d ||d |d| _t|d ||d| _t|d d d	| _t|d d |dd
| _d S )Nr   r   )r   rT   r"   r#   r)   r$   r-   rY   rU   )r$   r-   )rn   ri   r%   rM   )r   r   lenrm   r+   r=   r?   rF   r   r>   r~   mfarf   aspr   asp_bnr   fc)r   r&   rX   lin_neuronsr)   rm   kernel_sizes	dilationsrn   r   rT   ri   r$   r-   r8   r   r   r   r     sd   



zECAPA_TDNN.__init__Nc              	   C   s   | dd}g }| jD ]}z|||d}W n ty"   ||}Y nw || qtj|dd dd}| |}| j||d}| |}| 	|}| dd}|S )aE  Returns the embedding vector.

        Arguments
        ---------
        x : torch.Tensor
            Tensor of shape (batch, time, channel).
        lengths : torch.Tensor
            Corresponding relative lengths of inputs.

        Returns
        -------
        x : torch.Tensor
            Embedding vector.
        r   rY   )r`   NrA   )
	transposer?   	TypeErrorrF   rD   rG   r   r   r   r   )r   r0   r`   xllayerr   r   r   r1     s    



zECAPA_TDNN.forwardre   r   r   r   r   r   r     s     -Mr   c                       r3   )
Classifiera  This class implements the cosine similarity on the top of features.

    Arguments
    ---------
    input_size : int
        Expected size of input dimension.
    device : str
        Device used, e.g., "cpu" or "cuda".
    lin_blocks : int
        Number of linear layers.
    lin_neurons : int
        Number of neurons in linear layers.
    out_neurons : int
        Number of classes.

    Example
    -------
    >>> classify = Classifier(input_size=2, lin_neurons=2, out_neurons=2)
    >>> outputs = torch.tensor([ [1., -1.], [-9., 1.], [0.9, 0.1], [0.1, 0.9] ])
    >>> outputs = outputs.unsqueeze(1)
    >>> cos = classify(outputs)
    >>> (cos < -1.0).long().sum()
    tensor(0)
    >>> (cos > 1.0).long().sum()
    tensor(0)
    r   r   r     c                    sn   t    t | _t|D ]}| jt|dt||dg |}qt	t
j|||d| _tj| j d S )Nr%   )r&   	n_neuronsru   )r   r   r+   r=   r?   r>   extend_BatchNorm1dr   	ParameterrD   FloatTensorweightinitxavier_uniform_)r   r&   rX   
lin_blocksr   out_neuronsblock_indexr   r   r   r   M  s   


zClassifier.__init__c                 C   s>   | j D ]}||}qtt|dt| j}|dS )zReturns the output probabilities over speakers.

        Arguments
        ---------
        x : torch.Tensor
            Torch tensor.

        Returns
        -------
        out : torch.Tensor
            Output probabilities over speakers.
        r   )r?   rz   linear	normalizesqueezer   r]   )r   r0   r   r   r   r   r1   g  s   

 
zClassifier.forward)r   r   r   r   rK   r   r   r   r   r   1  s    r   )r   rD   torch.nnr+   torch.nn.functional
functionalrz   speechbrain.dataio.dataior   speechbrain.nnet.CNNr   _Conv1dspeechbrain.nnet.linearr   speechbrain.nnet.normalizationr   r   Moduler   r4   rL   rf   r~   r   r   r   r   r   r   <module>   s$    8H3`] "