o
    oiRE                     @   s  d Z ddlZddlmZ ddlm  mZ ddlmZ ddl	m
Z ddlmZ ddlmZ G dd deZ
G d	d
 d
eZG dd dejZG dd dejjZG dd dejZG dd dejZG dd dejZG dd dejjZG dd dejjZdS )zQA popular speaker recognition and diarization model.

Authors
 * Hwidong Na 2020
    N)length_to_mask)Conv1d)Linear)BatchNorm1dc                           e Zd ZdZ fddZ  ZS )r   z=1D convolution. Skip transpose is used to improve efficiency.c                       t  j|ddi| d S Nskip_transposeTsuper__init__selfargskwargs	__class__ a/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/speechbrain/lobes/models/ECAPA_TDNN.pyr         zConv1d.__init____name__
__module____qualname____doc__r   __classcell__r   r   r   r   r          r   c                       r   )r   zE1D batch normalization. Skip transpose is used to improve efficiency.c                    r   r   r
   r   r   r   r   r      r   zBatchNorm1d.__init__r   r   r   r   r   r      r   r   c                       s0   e Zd ZdZejdf fdd	Zdd Z  ZS )	TDNNBlocka  An implementation of TDNN.

    Arguments
    ---------
    in_channels : int
        Number of input channels.
    out_channels : int
        The number of output channels.
    kernel_size : int
        The kernel size of the TDNN blocks.
    dilation : int
        The dilation of the TDNN block.
    activation : torch class
        A class for constructing the activation layers.
    groups : int
        The groups size of the TDNN blocks.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> layer = TDNNBlock(64, 64, kernel_size=3, dilation=1)
    >>> out_tensor = layer(inp_tensor).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 120, 64])
       c                    s6   t    t|||||d| _| | _t|d| _d S )N)in_channelsout_channelskernel_sizedilationgroups
input_size)r   r   r   conv
activationr   norm)r   r   r    r!   r"   r'   r#   r   r   r   r   ;   s   
	zTDNNBlock.__init__c                 C   s   |  | | |S :Processes the input tensor x and returns an output tensor.)r(   r'   r&   )r   xr   r   r   forwardO   s   zTDNNBlock.forward)	r   r   r   r   nnReLUr   r,   r   r   r   r   r   r       s     r   c                       s,   e Zd ZdZ	d	 fdd	Zdd Z  ZS )
Res2NetBlocka  An implementation of Res2NetBlock w/ dilation.

    Arguments
    ---------
    in_channels : int
        The number of channels expected in the input.
    out_channels : int
        The number of output channels.
    scale : int
        The scale of the Res2Net block.
    kernel_size: int
        The kernel size of the Res2Net block.
    dilation : int
        The dilation of the Res2Net block.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> layer = Res2NetBlock(64, 64, scale=4, dilation=3)
    >>> out_tensor = layer(inp_tensor).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 120, 64])
          r   c                    sl   t    || dksJ || dksJ || || t fddt|d D | _|| _d S )Nr   c                    s   g | ]
}t  d qS ))r!   r"   )r   ).0ir"   hidden_channel
in_channelr!   r   r   
<listcomp>x   s    z)Res2NetBlock.__init__.<locals>.<listcomp>r   )r   r   r-   
ModuleListrangeblocksscale)r   r   r    r;   r!   r"   r   r4   r   r   m   s   


zRes2NetBlock.__init__c                 C   s   g }t tj|| jddD ])\}}|dkr|}n|dkr&| j|d  |}n| j|d  || }|| qtj|dd}|S )r*   r   dimr   )	enumeratetorchchunkr;   r:   appendcat)r   r+   yr3   x_iy_ir   r   r   r,      s   zRes2NetBlock.forward)r0   r1   r   r   r   r   r   r   r,   r   r   r   r   r   r/   T   s
    r/   c                       s*   e Zd ZdZ fddZdddZ  ZS )SEBlocka3  An implementation of squeeze-and-excitation block.

    Arguments
    ---------
    in_channels : int
        The number of input channels.
    se_channels : int
        The number of output channels after squeeze.
    out_channels : int
        The number of output channels.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> se_layer = SEBlock(64, 16, 64)
    >>> lengths = torch.rand((8,))
    >>> out_tensor = se_layer(inp_tensor, lengths).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 120, 64])
    c                    sJ   t    t||dd| _tjjdd| _t||dd| _tj	 | _
d S )Nr   r   r    r!   T)inplace)r   r   r   conv1r?   r-   r.   reluconv2Sigmoidsigmoid)r   r   se_channelsr    r   r   r   r      s   
zSEBlock.__init__Nc                 C   s   |j d }|dur+t|| ||jd}|d}|jddd}|| jddd| }n|jddd}| | |}| | 	|}|| S )r*   Nmax_lendevicer      Tr=   keepdim)
shaper   rS   	unsqueezesummeanrK   rJ   rN   rL   )r   r+   lengthsLmasktotalsr   r   r   r,      s   

zSEBlock.forwardNrF   r   r   r   r   rG      s    rG   c                       s,   e Zd ZdZd	 fdd	Zd
ddZ  ZS )AttentiveStatisticsPoolinga  This class implements an attentive statistic pooling layer for each channel.
    It returns the concatenated mean and std of the input tensor.

    Arguments
    ---------
    channels: int
        The number of input channels.
    attention_channels: int
        The number of attention channels.
    global_context: bool
        Whether to use global context.

    Example
    -------
    >>> inp_tensor = torch.rand([8, 120, 64]).transpose(1, 2)
    >>> asp_layer = AttentiveStatisticsPooling(64)
    >>> lengths = torch.rand((8,))
    >>> out_tensor = asp_layer(inp_tensor, lengths).transpose(1, 2)
    >>> out_tensor.shape
    torch.Size([8, 1, 128])
       Tc                    s^   t    d| _|| _|rt|d |dd| _nt||dd| _t | _t	||dd| _
d S )Ng-q=r1   r   rH   )r   r   epsglobal_contextr   tdnnr-   Tanhtanhr   r&   )r   channelsattention_channelsrd   r   r   r   r      s   

z#AttentiveStatisticsPooling.__init__Nc                 C   s(  |j d }d| jfdd}|du rtj|j d |jd}t|| ||jd}|d	}| jr_|jdd
d	 }|||| \}}|d
d	d	|}|d
d	d	|}tj|||gd	d}	n|}	| | | |	}	|	|dkt	d}	tj|	dd}	|||	\}}tj||fd	d}
|
d}
|
S )ae  Calculates mean and std for a batch (input tensor).

        Arguments
        ---------
        x : torch.Tensor
            Tensor of shape [N, C, L].
        lengths : torch.Tensor
            The corresponding relative lengths of the inputs.

        Returns
        -------
        pooled_stats : torch.Tensor
            mean and std of batch
        rP   rT   c                 S   s@   ||   |}t|| || d  ||}||fS )NrT   )rY   r?   sqrtrX   powclamp)r+   mr=   rc   rZ   stdr   r   r   _compute_statistics   s
   "z?AttentiveStatisticsPooling.forward.<locals>._compute_statisticsNr   rS   rQ   r   TrU   r<   z-inf)rW   rc   r?   onesrS   r   rX   rd   rY   floatrepeatrB   r&   rg   re   masked_fillFsoftmax)r   r+   r[   r\   ro   r]   r^   rZ   rn   attnpooled_statsr   r   r   r,      s(   


z"AttentiveStatisticsPooling.forward)rb   Tr`   rF   r   r   r   r   ra      s    ra   c                       s<   e Zd ZdZddddejjdf fdd	Zd
dd	Z  Z	S )SERes2NetBlocka  An implementation of building block in ECAPA-TDNN, i.e.,
    TDNN-Res2Net-TDNN-SEBlock.

    Arguments
    ---------
    in_channels: int
        Expected size of input channels.
    out_channels: int
        The number of output channels.
    res2net_scale: int
        The scale of the Res2Net block.
    se_channels : int
        The number of output channels after squeeze.
    kernel_size: int
        The kernel size of the TDNN blocks.
    dilation: int
        The dilation of the Res2Net block.
    activation : torch class
        A class for constructing the activation layers.
    groups: int
        Number of blocked connections from input channels to output channels.

    Example
    -------
    >>> x = torch.rand(8, 120, 64).transpose(1, 2)
    >>> conv = SERes2NetBlock(64, 64, res2net_scale=4)
    >>> out = conv(x).transpose(1, 2)
    >>> out.shape
    torch.Size([8, 120, 64])
    r0   rb   r   c	           	         s   t    || _t||dd||d| _t|||||| _t||dd||d| _t|||| _	d | _
||kr?t||dd| _
d S d S )Nr   )r!   r"   r'   r#   rH   )r   r   r    r   tdnn1r/   res2net_blocktdnn2rG   se_blockshortcutr   )	r   r   r    res2net_scalerO   r!   r"   r'   r#   r   r   r   r   F  s<   

zSERes2NetBlock.__init__Nc                 C   sF   |}| j r
|  |}| |}| |}| |}| ||}|| S r)   )r~   rz   r{   r|   r}   )r   r+   r[   residualr   r   r   r,   p  s   



zSERes2NetBlock.forwardr`   
r   r   r   r   r?   r-   r.   r   r,   r   r   r   r   r   ry   &  s    #*ry   c                       sV   e Zd ZdZddejjg dg dg ddddd	g d
f fdd	ZdddZ  Z	S )
ECAPA_TDNNa=  An implementation of the speaker embedding model in a paper.
    "ECAPA-TDNN: Emphasized Channel Attention, Propagation and Aggregation in
    TDNN Based Speaker Verification" (https://arxiv.org/abs/2005.07143).

    Arguments
    ---------
    input_size : int
        Expected size of the input dimension.
    device : str
        Device used, e.g., "cpu" or "cuda".
    lin_neurons : int
        Number of neurons in linear layers.
    activation : torch class
        A class for constructing the activation layers.
    channels : list of ints
        Output channels for TDNN/SERes2Net layer.
    kernel_sizes : list of ints
        List of kernel sizes for each layer.
    dilations : list of ints
        List of dilations for kernels in each layer.
    attention_channels: int
        The number of attention channels.
    res2net_scale : int
        The scale of the Res2Net block.
    se_channels : int
        The number of output channels after squeeze.
    global_context: bool
        Whether to use global context.
    groups : list of ints
        List of groups for kernels in each layer.

    Example
    -------
    >>> input_feats = torch.rand([5, 120, 80])
    >>> compute_embedding = ECAPA_TDNN(80, lin_neurons=192)
    >>> outputs = compute_embedding(input_feats)
    >>> outputs.shape
    torch.Size([5, 1, 192])
    cpu   )   r   r   r   i   )   r1   r1   r1   r   )r   rT   r1      r   rb   r0   T)r   r   r   r   r   c                    s6  t    t|t|ksJ t|t|ksJ || _t | _| jt||d |d |d ||d  t	dt|d D ]}| jt
||d  || |	|
|| || ||| d q?t|d t|d  |d |d |d ||d d| _t|d ||d| _t|d d d	| _t|d d |dd
| _d S )Nr   r   )r   rO   r!   r"   r'   r#   rT   rP   )r#   )ri   rd   r$   rH   )r   r   lenrh   r-   r8   r:   rA   r   r9   ry   mfara   aspr   asp_bnr   fc)r   r%   rS   lin_neuronsr'   rh   kernel_sizes	dilationsri   r   rO   rd   r#   r3   r   r   r   r     s^   




zECAPA_TDNN.__init__Nc              	   C   s   | dd}g }| jD ]}z|||d}W n ty"   ||}Y nw || qtj|dd dd}| |}| j||d}| |}| 	|}| dd}|S )aE  Returns the embedding vector.

        Arguments
        ---------
        x : torch.Tensor
            Tensor of shape (batch, time, channel).
        lengths : torch.Tensor
            Corresponding relative lengths of inputs.

        Returns
        -------
        x : torch.Tensor
            Embedding vector.
        r   rT   )r[   Nr<   )
	transposer:   	TypeErrorrA   r?   rB   r   r   r   r   )r   r+   r[   xllayerr   r   r   r,     s    



zECAPA_TDNN.forwardr`   r   r   r   r   r   r   ~  s    +Ir   c                       s2   e Zd ZdZ				d
 fdd	Zdd	 Z  ZS )
Classifiera  This class implements the cosine similarity on the top of features.

    Arguments
    ---------
    input_size : int
        Expected size of input dimension.
    device : str
        Device used, e.g., "cpu" or "cuda".
    lin_blocks : int
        Number of linear layers.
    lin_neurons : int
        Number of neurons in linear layers.
    out_neurons : int
        Number of classes.

    Example
    -------
    >>> classify = Classifier(input_size=2, lin_neurons=2, out_neurons=2)
    >>> outputs = torch.tensor([ [1., -1.], [-9., 1.], [0.9, 0.1], [0.1, 0.9] ])
    >>> outputs = outputs.unsqueeze(1)
    >>> cos = classify(outputs)
    >>> (cos < -1.0).long().sum()
    tensor(0)
    >>> (cos > 1.0).long().sum()
    tensor(0)
    r   r   r     c                    sn   t    t | _t|D ]}| jt|dt||dg |}qt	t
j|||d| _tj| j d S )Nr$   )r%   	n_neuronsrp   )r   r   r-   r8   r:   r9   extend_BatchNorm1dr   	Parameterr?   FloatTensorweightinitxavier_uniform_)r   r%   rS   
lin_blocksr   out_neuronsblock_indexr   r   r   r   5  s   


zClassifier.__init__c                 C   s>   | j D ]}||}qtt|dt| j}|dS )zReturns the output probabilities over speakers.

        Arguments
        ---------
        x : torch.Tensor
            Torch tensor.

        Returns
        -------
        out : torch.Tensor
            Output probabilities over speakers.
        r   )r:   ru   linear	normalizesqueezer   rX   )r   r+   r   r   r   r   r,   O  s   

 
zClassifier.forward)r   r   r   r   rF   r   r   r   r   r     s    r   )r   r?   torch.nnr-   torch.nn.functional
functionalru   speechbrain.dataio.dataior   speechbrain.nnet.CNNr   _Conv1dspeechbrain.nnet.linearr   speechbrain.nnet.normalizationr   r   Moduler   r/   rG   ra   ry   r   r   r   r   r   r   <module>   s$    4?3`X 