o
    %ݫi6                     @   s   d Z ddlZddlZddlmZ ddlm  mZ ddlm	Z	 ddl
mZ dddZddd	ZG d
d dejZG dd dejZG dd dejZG dd dejZG dd dejjZdS )zNResNet PreActivated for speaker verification

Authors
 * Mickael Rouvier 2022
    N)Linear)BatchNorm1d   c                 C   s   t j| |d|dddS )z#2D convolution with kernel_size = 3   r   F)kernel_sizestridepaddingbiasnnConv2d	in_planes
out_planesr    r   S/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/ResNet.pyconv3x3   s   r   c                 C   s   t j| |d|ddS )z#2D convolution with kernel_size = 1r   Fr   r   r	   r
   r   r   r   r   conv1x1   s   
r   c                       s0   e Zd ZdZdejf fdd	Zdd Z  ZS )SEBlocka  An implementation of Squeeze-and-Excitation Block.

    Arguments
    ---------
    channels : int
        The number of channels.
    reduction : int
        The reduction factor of channels.
    activation : Callable
        The function to apply between layers.

    Example
    -------
    >>> inp_tensor = torch.rand([1, 64, 80, 40])
    >>> se_layer = SEBlock(64)
    >>> out_tensor = se_layer(inp_tensor)
    >>> out_tensor.shape
    torch.Size([1, 64, 80, 40])
    r   c                    sN   t t|   td| _tt||| | t|| |t | _	d S )Nr   )
superr   __init__r   AdaptiveAvgPool2davg_pool
Sequentialr   Sigmoidfc)selfchannels	reduction
activation	__class__r   r   r   ;   s   
zSEBlock.__init__c                 C   s@   |  \}}}}| |||}| |||dd}|| S )^Intermediate step. Processes the input tensor x
        and returns an output tensor.
        r   )sizer   viewr   )r   xbc_yr   r   r   forwardG   s   zSEBlock.forward	__name__
__module____qualname____doc__r   ReLUr   r+   __classcell__r   r   r!   r   r   &   s    r   c                       s2   e Zd ZdZddejf fdd	Zdd Z  ZS )
BasicBlocka  An implementation of ResNet Block.

    Arguments
    ---------
    in_channels : int
        Number of input channels.
    out_channels : int
        The number of output channels.
    stride : int
        Factor that reduce the spatial dimensionality
    downsample : torch function
        A function for downsample the identity of block when stride != 1
    activation : torch class
        A class for constructing the activation layers.

    Example
    -------
    >>> inp_tensor = torch.rand([1, 64, 80, 40])
    >>> layer = BasicBlock(64, 64, stride=1)
    >>> out_tensor = layer(inp_tensor)
    >>> out_tensor.shape
    torch.Size([1, 64, 80, 40])
    r   Nc                    sp   t t|   | | _t|| _t|||| _t|| _	t||| _
t|| _t||| _|| _|| _d S N)r   r3   r   r    r   BatchNorm2dbn1r   conv1bn2conv2bn3r   conv3
downsampler   )r   in_channelsout_channelsr   r<   r    r!   r   r   r   j   s   
zBasicBlock.__init__c                 C   s~   |}|  |}| |}| |}| |}| |}| |}| |}| |}| |}| jdur9| |}||7 }|S r#   N)r6   r    r7   r8   r9   r:   r;   r<   r   r&   residualoutr   r   r   r+      s   










zBasicBlock.forwardr,   r   r   r!   r   r3   Q   s    r3   c                       s4   e Zd ZdZdddejf fdd	Zdd Z  ZS )SEBasicBlocka  An implementation of Squeeze-and-Excitation ResNet Block.

    Arguments
    ---------
    in_channels : int
        Number of input channels.
    out_channels : int
        The number of output channels.
    reduction : int
        The reduction factor of channels.
    stride : int
        Factor that reduce the spatial dimensionality
    downsample : torch function
        A function for downsample the identity of block when stride != 1
    activation : torch class
        A class for constructing the activation layers.

    Example
    -------
    >>> inp_tensor = torch.rand([1, 64, 80, 40])
    >>> layer = SEBasicBlock(64, 64, stride=1)
    >>> out_tensor = layer(inp_tensor)
    >>> out_tensor.shape
    torch.Size([1, 64, 80, 40])
    r   Nc                    s|   t t|   | | _t|| _t|||| _t|| _	t||| _
t|| _t||| _|| _|| _t||| _d S r4   )r   rC   r   r    r   r5   r6   r   r7   r8   r9   r:   r   r;   r<   r   r   se)r   r=   r>   r   r   r<   r    r!   r   r   r      s   	zSEBasicBlock.__init__c                 C   s   |}|  |}| |}| |}| |}| |}| |}| |}| |}| |}| |}| jdur>| |}||7 }|S r?   )	r6   r    r7   r8   r9   r:   r;   rD   r<   r@   r   r   r   r+      s   











zSEBasicBlock.forwardr,   r   r   r!   r   rC      s    rC   c                       s^   e Zd ZdZddejjg dg dg ddf fdd		ZdddZdddZ	dddZ
  ZS )ResNeta  An implementation of ResNet

    Arguments
    ---------
    input_size : int
        Expected size of the input dimension.
    device : str
        Device used, e.g., "cpu" or "cuda".
    activation : torch class
        A class for constructing the activation layers.
    channels : list of ints
        List of number of channels used per stage.
    block_sizes : list of ints
        List of number of groups created per stage.
    strides : list of ints
        List of stride per stage.
    lin_neurons : int
        Number of neurons in linear layers.

    Example
    -------
    >>> input_feats = torch.rand([2, 400, 80])
    >>> compute_embedding = ResNet(lin_neurons=256)
    >>> outputs = compute_embedding(input_feats)
    >>> outputs.shape
    torch.Size([2, 256])
    P   cpu)   rH      rI   )r         r   )r      rL   rL   rI   c           
   
      s  t    t|dksJ t|dksJ t|dksJ t||d |d  |d  |d   }tjd|d ddddd| _t|d | _	| | _
| j|d |d |d |d d| _| j|d |d |d |d d| _| j|d |d |d |d d| _| j|d |d |d |d d| _tjd| |d	  | _ttj|d	 | d
ddt td
tjd
|d	 | ddtjdd| _td| |d	  || _tj|| _|  D ]*}	t|	tjrtjj|	j ddd qt|	tjr
tj!|	j d tj!|	j"d qd S )NrJ   r   r   rL   r   F)r	   )r   rH   )r   dimfan_outrelu)modenonlinearity)#r   r   lenmathceilr   r   r7   r5   r6   activation1_make_layer_selayer1layer2_make_layerlayer3layer4torchr   
norm_statsr   Conv1dr1   Softmax	attentionr   fc_embed
norm_embedmodules
isinstanceinitkaiming_normal_weight	constant_r	   )
r   
input_sizedevicer    r   block_sizesstrideslin_neurons	input_outmr!   r   r   r     sT   

"
zResNet.__init__r   c              	   C   s|   d}|dks
||krt t j||d|ddt |}g }|t||d|| td|D ]}|t||d q-t j| S )a  Construct the squeeze-and-excitation block layer.

        Arguments
        ---------
        in_channels : int
            Number of input channels.
        out_channels : int
            The number of output channels.
        block_num: int
            Number of ResNet blocks for the network.
        stride : int
            Factor that reduce the spatial dimensionality. Default is 1

        Returns
        -------
        se_block : nn.Sequential
            Squeeze-and-excitation block
        Nr   Fr   )r   r   r   r5   appendrC   ranger   r=   r>   	block_numr   r<   layersir   r   r   rX   C  s&   
zResNet._make_layer_sec              	   C   sx   d}|dks
||krt t j||d|ddt |}g }|t|||| td|D ]
}|t|| q,t j| S )a  
        Construct the ResNet block layer.

        Arguments
        ---------
        in_channels : int
            Number of input channels.
        out_channels : int
            The number of output channels.
        block_num: int
            Number of ResNet blocks for the network.
        stride : int
            Factor that reduce the spatial dimensionality. Default is 1

        Returns
        -------
        block : nn.Sequential
            ResNet block
        Nr   Fr   )r   r   r   r5   rr   r3   rs   rt   r   r   r   r[   l  s"   
zResNet._make_layerNc                 C   s   | d}| |}| |}| |}| |}| |}| |}| |}|dd}|	dd}| 
|}tj|| dd}ttj|d | dd|d  jdd}tj||gdd}| |}| |}| |}|S )aM  Returns the embedding vector.

        Arguments
        ---------
        x : torch.Tensor
            Tensor of shape (batch, time, channel).
        lengths : torch.Tensor
            Corresponding relative lengths of the inputs.

        Returns
        -------
        x : torch.Tensor
            The embedding vector.
        r   rL   r   rN   gh㈵>)min)	unsqueezer7   r6   rW   rY   rZ   r\   r]   	transposeflattenrb   r^   sumsqrtclampcatr_   rc   rd   )r   r&   lengthswmusgr   r   r   r+     s$   








,


zResNet.forwardr   r4   )r-   r.   r/   r0   r^   r   r1   r   rX   r[   r+   r2   r   r   r!   r   rE      s    
;
)(rE   c                       s2   e Zd ZdZ				d
 fdd	Zdd	 Z  ZS )
Classifiera  This class implements the cosine similarity on the top of features.

    Arguments
    ---------
    input_size : int
        Expected size of the inputs.
    device : str
        Device used, e.g., "cpu" or "cuda".
    lin_blocks : int
        Number of linear layers.
    lin_neurons : int
        Number of neurons in linear layers.
    out_neurons : int
        Number of classes.

    Example
    -------
    >>> classify = Classifier(input_size=2, lin_neurons=2, out_neurons=2)
    >>> outputs = torch.tensor([ [1., -1.], [-9., 1.], [0.9, 0.1], [0.1, 0.9] ])
    >>> outputs = outputs.unsqueeze(1)
    >>> cos = classify(outputs)
    >>> (cos < -1.0).long().sum()
    tensor(0)
    >>> (cos > 1.0).long().sum()
    tensor(0)
    rG   r   rI     c                    sn   t    t | _t|D ]}| jt|dt||dg |}qt	t
j|||d| _tj| j d S )N)rk   )rk   	n_neurons)rl   )r   r   r   
ModuleListblocksrs   extend_BatchNorm1dr   	Parameterr^   FloatTensorri   rg   xavier_uniform_)r   rk   rl   
lin_blocksro   out_neuronsblock_indexr!   r   r   r     s   


zClassifier.__init__c                 C   s>   | j D ]}||}qtt|dt| j}|dS )zReturns the output probabilities over speakers.

        Arguments
        ---------
        x : torch.Tensor
            Torch tensor.

        Returns
        -------
        x : torch.Tensor
            Output probabilities over speakers.
        r   )r   Flinear	normalizesqueezeri   ry   )r   r&   layerr   r   r   r+     s   

 
zClassifier.forward)rG   r   rI   r   )r-   r.   r/   r0   r   r+   r2   r   r   r!   r   r     s    r   r   )r0   rU   r^   torch.nnr   torch.nn.functional
functionalr   speechbrain.nnet.linearr   speechbrain.nnet.normalizationr   r   r   r   Moduler   r3   rC   rE   r   r   r   r   r   <module>   s    

+IQ T