o
    %ݫiE                     @   s   d Z ddlZddlmZ ddlm  mZ ddlmZ ee	Z
G dd dejZG dd dejZG dd	 d	ejZG d
d dejZG dd dejZG dd dejZdS )zLibrary implementing pooling.

Authors
 * Titouan Parcollet 2020
 * Mirco Ravanelli 2020
 * Nauman Dawalatabad 2020
 * Jianyuan Zhong 2020
 * Sarthak Yadav 2022
 * Ha Nguyen 2023
    N)
get_loggerc                       s6   e Zd ZdZ						d fdd	Zd	d
 Z  ZS )	Pooling1da  This function implements 1d pooling of the input tensor.

    Arguments
    ---------
    pool_type : str
        It is the type of pooling function to use ('avg','max').
    kernel_size : int
        It is the kernel size that defines the pooling dimension.
        For instance, kernel size=3 applies a 1D Pooling with a size=3.
    input_dims : int
        The count of dimensions expected in the input.
    pool_axis : int
        The axis where the pooling is applied.
    ceil_mode : bool
        When True, will use ceil instead of floor to compute the output shape.
    padding : int
        It is the number of padding elements to apply.
    dilation : int
        Controls the dilation factor of pooling.
    stride : int
        It is the stride size.

    Example
    -------
    >>> pool = Pooling1d('max',3)
    >>> inputs = torch.rand(10, 12, 40)
    >>> output=pool(inputs)
    >>> output.shape
    torch.Size([10, 4, 40])
          Fr   Nc	           	         s   t    || _|d u r|}|dkr>|dkr#tjj||||d| _d S |dkr:tjjd|fd|fd|f|d| _d S td|dkrr|dkrTtjj	|||||d	| _d S |dkrntjj
d|fd|fd|fd|f|d	| _d S tdtd
)Navgr   stridepadding	ceil_mode   r   r   zinput_dims must be 3 or 4max)r   r	   dilationr
   z pool_type must be 'avg' or 'max')super__init__	pool_axistorchnn	AvgPool1d
pool_layer	AvgPool2d
ValueError	MaxPool1d	MaxPool2d)	self	pool_typekernel_size
input_dimsr   r
   r	   r   r   	__class__ L/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/nnet/pooling.pyr   5   sN   
zPooling1d.__init__c                 C   s*   | d| j}| |}| d| j}|S )zPerforms 1d pooling to the input tensor.

        Arguments
        ---------
        x : torch.Tensor
            It represents a tensor for a mini-batch.

        Returns
        -------
        x : torch.Tensor
            The pooled outputs.
        )	transposer   r   r   xr   r   r    forwardo   s   
zPooling1d.forward)r   r   Fr   r   N__name__
__module____qualname____doc__r   r%   __classcell__r   r   r   r    r      s    #:r   c                       s4   e Zd ZdZ					d fdd	Zd	d
 Z  ZS )	Pooling2da  This function implements 2d pooling of the input tensor.

    Arguments
    ---------
    pool_type : str
        It is the type of pooling function to use ('avg','max').
    kernel_size : int
        It is the kernel size that defines the pooling dimension.
        For instance, kernel size=3,3 performs a 2D Pooling with a 3x3 kernel.
    pool_axis : tuple
        It is a list containing the axis that will be considered
        during pooling.
    ceil_mode : bool
        When True, will use ceil instead of floor to compute the output shape.
    padding : int
        It is the number of padding elements to apply.
    dilation : int
        Controls the dilation factor of pooling.
    stride : int
        It is the stride size.

    Example
    -------
    >>> pool = Pooling2d('max',(5,3))
    >>> inputs = torch.rand(10, 15, 12)
    >>> output=pool(inputs)
    >>> output.shape
    torch.Size([10, 3, 4])
    r      Fr   r   Nc                    s   t    || _|| _|| _|| _|| _|| _|d u r|| _n|| _| jdkr8t	j
j| j| j| j| jd| _d S t	j
j| j| j| j| jd| _d S )Nr   r   )r   r   r   r   r   r
   r	   r   r   r   r   r   r   r   )r   r   r   r   r
   r	   r   r   r   r   r    r      s.   


zPooling2d.__init__c                 C   s   | d dd| jd d| jd | jd | jd }| |}| | jd  | jd d| jd d| jd dd}|S )zPerforms 2d pooling to the input tensor.

        Arguments
        ---------
        x : torch.Tensor
            It represents a tensor for a mini-batch.

        Returns
        -------
        x : torch.Tensor
            The pooled outputs.
        r!   r   r   )	unsqueezer"   r   squeezer   r#   r   r   r    r%      s    


	zPooling2d.forward)r-   Fr   r   Nr&   r   r   r   r    r,      s    "&r,   c                       s6   e Zd ZdZd fdd	ZdddZdd	d
Z  ZS )StatisticsPoolinga  This class implements a statistic pooling layer.

    It returns the mean and/or std of input tensor.

    Arguments
    ---------
    return_mean : bool
         If True, the average pooling will be returned.
    return_std : bool
         If True, the standard deviation will be returned.

    Example
    -------
    >>> inp_tensor = torch.rand([5, 100, 50])
    >>> sp_layer = StatisticsPooling()
    >>> out_tensor = sp_layer(inp_tensor)
    >>> out_tensor.shape
    torch.Size([5, 1, 100])
    Tc                    s8   t    d| _|| _|| _| js| jstdd S d S )Ngh㈵>zZboth of statistics are equal to False 
consider enabling mean and/or std statistic pooling)r   r   epsreturn_mean
return_stdr   )r   r4   r5   r   r   r    r     s   
zStatisticsPooling.__init__Nc           	      C   s`  |du r| j r|jdd}| jr|jdd}nTg }g }t|jd D ]8}tt|| |jd  }| j rF|	tj||d|df dd | jrZ|	tj||d|df dd q"| j rct
|}| jrkt
|}| j r~| j| |jd}|}||7 }| jr|| j }| j r| jrtj||fdd}|d}|S | j r|d}|S | jr|d}|S )at  Calculates mean and std for a batch (input tensor).

        Arguments
        ---------
        x : torch.Tensor
            It represents a tensor for a mini-batch.
        lengths : torch.Tensor
            The lengths of the samples in the input.

        Returns
        -------
        pooled_stats : torch.Tensor
            The mean and std for the input.
        Nr   dimr   .device)r4   meanr5   stdrangeshapeintr   roundappendstack_get_gauss_noisesizer9   r3   catr0   )	r   r$   lengthsr:   r;   snt_idactual_sizegnoisepooled_statsr   r   r    r%   !  sJ   "





zStatisticsPooling.forwardcpuc                 C   s@   t j||d}|t |8 }|t | }| jd| d  }|S )au  Returns a tensor of epsilon Gaussian noise.

        Arguments
        ---------
        shape_of_tensor : torch.Tensor
            It represents the size of tensor for generating Gaussian noise.
        device : str
            Device on which to perform computations.

        Returns
        -------
        gnoise : torch.Tensor
            The Gaussian noise.
        r8   i	   )r   randnminr   r3   )r   shape_of_tensorr9   rH   r   r   r    rB   Z  s
   z"StatisticsPooling._get_gauss_noise)TT)N)rJ   )r'   r(   r)   r*   r   r%   rB   r+   r   r   r   r    r2      s
    
9r2   c                       (   e Zd ZdZ fddZdd Z  ZS )AdaptivePoola6  This class implements the adaptive average pooling.

    Arguments
    ---------
    output_size : int
        The size of the output.

    Example
    -------
    >>> pool = AdaptivePool(1)
    >>> inp = torch.randn([8, 120, 40])
    >>> output = pool(inp)
    >>> output.shape
    torch.Size([8, 1, 40])
    c                    s   t    t|tpt|tpt|t}|sJ dt|ts$t|tr.t|dks.J dt|tr;t|| _	d S t
|| _	d S )Nz&output size must be int, list or tupler.   z-len of output size must not be greater than 2)r   r   
isinstancer>   tuplelistlenr   AdaptiveAvgPool1dpoolAdaptiveAvgPool2d)r   output_size	conditionr   r   r    r     s   


zAdaptivePool.__init__c                 C   sX   |j dkr| |ddddddS |j dkr*| |ddddddddS dS )a  Performs adaptive pooling to the input tensor.

        Arguments
        ---------
        x : torch.Tensor
            It represents a tensor for a mini-batch.

        Returns
        -------
        x : torch.Tensor
            The pooled outputs.
        r   r   r.   r   r   N)ndimrV   permuter#   r   r   r    r%     s
   

"zAdaptivePool.forwardr&   r   r   r   r    rP   q  s    rP   c                       sF   e Zd ZdZ						d fdd		Zd
d Zdd Zdd Z  ZS )GaussianLowpassPoolinga  
    This class implements a learnable Gaussian lowpass pooling from

    Neil Zeghidour, Olivier Teboul, F{'e}lix de Chaumont Quitry & Marco Tagliasacchi, "LEAF: A LEARNABLE FRONTEND
    FOR AUDIO CLASSIFICATION", in Proc. of ICLR 2021 (https://arxiv.org/abs/2101.08596)

    Arguments
    ---------
    in_channels : int
        The number of input channels.
    kernel_size: int
        Kernel size of the gaussian lowpass filters.
    stride : int
        Stride factor of the convolutional filters. When the stride factor > 1,
        a decimation in time is performed.
    initialization_constant : float
        The constant used for initialization, default 0.4
    padding : str
        (same, valid). If "valid", no padding is performed.
        If "same" and stride is 1, output shape is the same as the input shape.
    padding_mode : str
        This flag specifies the type of padding. See torch.nn documentation
        for more information.
    bias : bool
        If True, the additive bias b is adopted.
    skip_transpose : bool
        If False, uses batch x time x channel convention of speechbrain.
        If True, uses batch x channel x time convention.

    Example
    -------
    >>> inp_tensor = torch.rand([10, 8000, 40])
    >>> low_pass_pooling = GaussianLowpassPooling(
    ...     40, kernel_size=401, stride=160,
    ... )
    >>> # parameters corresponding to a window of 25 ms and stride 10 ms at 16000 kHz
    >>> out_tensor = low_pass_pooling(inp_tensor)
    >>> out_tensor.shape
    torch.Size([10, 50, 40])
    r   皙?sameconstantTFc	           	         sr   t    || _|| _|| _|| _|| _|| _t	t
dd|df| | _|r4t
j	t
|| _d S d | _d S Nr   )r   r   r   r   r	   padding_modein_channelsskip_transposer   	Parameterr   onesweights_bias)	r   rb   r   r   initialization_constantr	   ra   biasrc   r   r   r    r     s   

zGaussianLowpassPooling.__init__c                 C   sz   | j }tj|d| dd}tjd||j|jd}t|d|ddf}|d|d   }|d |d  }td|| d  S )	Ng       @g      ?)rM   r   r   )dtyper9   r   g      r.   )r   r   clamparangerj   r9   reshapeexp)r   sigmafilter_sizet	numeratordenominatorr   r   r    _get_impulse_responses  s   z-GaussianLowpassPooling._get_impulse_responsesc                 C   s   | j s	|dd}| | j}|d| j| j}|ddd}| jdkr,| 	|| j}n| jdkr2nt
d| j tj||| j| jd| jd}| j sP|dd}|S )	zPerforms GaussianLowpass Pooling.

        Arguments
        ---------
        x : torch.Tensor
            3D tensor in input [batch,time,channels].

        Returns
        -------
        outputs : torch.Tensor
            The pooled outputs.
        r   r!   r.   r   r^   validz'Padding must be 'same' or 'valid'. Got )ri   r   r	   groups)rc   r"   rt   rf   rm   r   rb   r[   r	   _manage_paddingr   Fconv1drg   r   )r   r$   kerneloutputsr   r   r    r%     s.   

zGaussianLowpassPooling.forwardc                 C   s(   dd }||}t j||| jdd}|S )Nc                 S   s@   | f}ddl m} ddlm} ||dd |ddd D }|S )zGet number of elements to pad.r   )reduce)__add__c                 S   s0   g | ]}|d  |d |d     d |d  fqS )r.   r   r   ).0kr   r   r    
<listcomp>,  s    "zUGaussianLowpassPooling._manage_padding.<locals>.get_padding_value.<locals>.<listcomp>Nr!   )	functoolsr|   operatorr}   )r   kernel_sizesr|   r}   conv_paddingr   r   r    get_padding_value$  s   zAGaussianLowpassPooling._manage_padding.<locals>.get_padding_valuer   )modevalue)rx   padra   )r   r$   r   r   	pad_valuer   r   r    rw      s   z&GaussianLowpassPooling._manage_padding)r   r]   r^   r_   TF)	r'   r(   r)   r*   r   rt   r%   rw   r+   r   r   r   r    r\     s    -	(r\   c                       rO   )AttentionPoolingaP  This function implements a self-attention pooling (https://arxiv.org/abs/2008.01077).

    Arguments
    ---------
    input_dim: int
        The dimension of the input torch.Tensor

    Example
    -------
    >>> inp_tensor = torch.rand([4, 40])
    >>> pool = AttentionPooling(input_dim=40)
    >>> out_tensor = pool(inp_tensor)
    c                    s$   t    || _tj|d| _d S r`   )r   r   	input_dimr   r   Linearattn_pooling_w)r   r   r   r   r    r   G  s   
zAttentionPooling.__init__c                 C   sB   |  |d }tjjj|ddd}tj|| dd}|S )zReturns the output the adapter.

        Arguments
        ---------
        x : torch.Tensor
            Input tensor.

        Returns
        -------
        out : torch.Tensor
            The pooled outputs.
        r!   r6   r   )	r   r1   floatr   r   
functionalsoftmaxr0   sum)r   r$   outr   r   r    r%   O  s   zAttentionPooling.forwardr&   r   r   r   r    r   8  s    r   )r*   r   torch.nnr   torch.nn.functionalr   rx   speechbrain.utils.loggerr   r'   loggerModuler   r,   r2   rP   r\   r   r   r   r   r    <module>   s    swr9 