o
    %ݫibV                     @   s   d Z ddlZddlmZ ddlmZ dddZdd	 ZG d
d deZG dd deZ	G dd dej
ZG dd dej
ZG dd dej
ZG dd deZG dd deZG dd dej
ZdS )zThis file implements the necessary classes and functions to implement Posthoc Interpretations via Quantization.

 Authors
 * Cem Subakan 2023
 * Francesco Paissan 2023
    N)Function   TRAINc                 C   s   t t d|d || | j}|d| jd d}| dd|| }||k}|dkrIt j|t 	|jd || jgdddk}|S t j|t 
|jd || jgdddk}|S )a  This class returns binary matrix that indicates the irrelevant regions in the VQ-dictionary given the labels array

    Arguments
    ---------
    labels : torch.Tensor
        1 dimensional tensor of size [B]
    K : int
        Number of keys in the dictionary
    num_classes : int
        Number of possible classes
    N_shared : int
        Number of shared keys
    stage : str
        "TRAIN" or else

    Returns
    -------
    irrelevant_regions : torch.Tensor

    Example
    -------
    >>> labels = torch.Tensor([1, 0, 2])
    >>> irrelevant_regions = get_irrelevant_regions(labels, 20, 3, 5)
    >>> print(irrelevant_regions.shape)
    torch.Size([3, 20])
    g      gRQ?r      r   dim)torchroundlinspacetodevice	unsqueezerepeatshapecatoneszeros)labelsKnum_classesN_sharedstageuniform_matlabels_expandedirrelevant_regions r   P/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/PIQ.pyget_irrelevant_regions   sB   		r   c                 C   s^   | j j}|ddkr-ztj| jj | jj	d W dS  t
y,   td| Y dS w dS )z;
    Applies Xavier initialization to network weights.
    Convr   zSkipping initialization of N)	__class____name__findnninitxavier_uniform_weightdatabiasfill_AttributeErrorprint)m	classnamer   r   r   weights_initP   s   r.   c                   @   s4   e Zd ZdZe					d	ddZedd ZdS )
VectorQuantizationa  This class defines the forward method for vector quantization. As VQ is not differentiable, it returns a RuntimeError in case `.grad()` is called. Refer to `VectorQuantizationStraightThrough` for a straight_through estimation of the gradient for the VQ operation.N
   Tc                 C   s  t  } |d}| }	|d|}
|dddd|	d |	d }|d}t||jd |||r5dndd}t j|d dd}t j|
d dd	d
}t j	|| |
|
 ddd}|rat j||< t j|dd\}}|j|	dd  }| | |W  d   S 1 sw   Y  dS )a[  
        Applies VQ to vectors `input` with `codebook` as VQ dictionary.

        Arguments
        ---------
        ctx : torch context
            The context object for storing info for backwards.
        inputs : torch.Tensor
            Hidden representations to quantize. Expected shape is `torch.Size([B, W, H, C])`.
        codebook : torch.Tensor
            VQ-dictionary for quantization. Expected shape of `torch.Size([K, C])` with K dictionary elements.
        labels : torch.Tensor
            Classification labels. Used to define irrelevant regions and divide the latent space based on predicted class. Shape should be `torch.Size([B])`.
        num_classes : int
            Number of possible classes
        activate_class_partitioning : bool
            `True` if latent space should be quantized for different classes.
        shared_keys : int
            Number of shared keys among classes.
        training : bool
            `True` if stage is TRAIN.

        Returns
        -------
        Codebook's indices for quantized representation : torch.Tensor

        Example
        -------
        >>> inputs = torch.ones(3, 14, 25, 256)
        >>> codebook = torch.randn(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> print(VectorQuantization.apply(inputs, codebook, labels).shape)
        torch.Size([3, 14, 25])
        r   r      r   r   VALID)r   r   r   T)r   keepdimg             ?)alphabetaN)r   no_gradsizeviewreshaper   r   r   sumaddmmtinfminmark_non_differentiable)ctxinputscodebookr   r   activate_class_partitioningshared_keystrainingembedding_sizeinputs_sizeinputs_flattenr   labels_flattenr   codebook_sqr
inputs_sqr	distances_indices_flattenindicesr   r   r   forward`   s>   
-


	

$zVectorQuantization.forwardc                 C   s   t d)z;Handles error in case grad() is called on the VQ operation.zTrying to call `.grad()` on graph containing `VectorQuantization`. The function `VectorQuantization` is not differentiable. Use `VectorQuantizationStraightThrough` if you want a straight-through estimator of the gradient.)RuntimeError)rA   grad_outputr   r   r   backward   s   zVectorQuantization.backwardNr0   Tr0   Tr!   
__module____qualname____doc__staticmethodrQ   rT   r   r   r   r   r/   ]   s    Sr/   c                   @   s@   e Zd ZdZe					d	ddZe					d
ddZdS )!VectorQuantizationStraightThroughzThis class defines the forward method for vector quantization. As VQ is not differentiable, it approximates the gradient of the VQ as in https://arxiv.org/abs/1711.00937.Nr0   Tc              	   C   sX   t |||||||}|d}	| |	| | |	 tj|d|	d}
|
|}||	fS )a3  
        Applies VQ to vectors `input` with `codebook` as VQ dictionary and estimates gradients with a
        Straight-Through (id) approximation of the quantization steps.

        Arguments
        ---------
        ctx : torch context
            The context object for storing info for backwards.
        inputs : torch.Tensor
            Hidden representations to quantize. Expected shape is `torch.Size([B, W, H, C])`.
        codebook : torch.Tensor
            VQ-dictionary for quantization. Expected shape of `torch.Size([K, C])` with K dictionary elements.
        labels : torch.Tensor
            Classification labels. Used to define irrelevant regions and divide the latent space based on predicted class. Shape should be `torch.Size([B])`.
        num_classes : int
            Number of possible classes
        activate_class_partitioning : bool
            `True` if latent space should be quantized for different classes.
        shared_keys : int
            Number of shared keys among classes.
        training : bool
            `True` if stage is TRAIN.

        Returns
        -------
        Quantized representation and codebook's indices for quantized representation : tuple

        Example
        -------
        >>> inputs = torch.ones(3, 14, 25, 256)
        >>> codebook = torch.randn(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> quant, quant_ind = VectorQuantizationStraightThrough.apply(inputs, codebook, labels)
        >>> print(quant.shape, quant_ind.shape)
        torch.Size([3, 14, 25, 256]) torch.Size([1050])
        r   r   r   index)r/   applyr9   save_for_backwardr@   r   index_selectview_as)rA   rB   rC   r   r   rD   rE   rF   rP   rO   codes_flattencodesr   r   r   rQ      s"   /
	

z)VectorQuantizationStraightThrough.forwardc                 C   sr   d\}}	| j d r| }| j d r0| j\}
}|d}| d|}t|}	|	d|
| ||	dddddfS )zz
        Estimates gradient assuming vector quantization as identity function. (https://arxiv.org/abs/1711.00937)
        )NNr   r   r   N)	needs_input_gradclonesaved_tensorsr8   
contiguousr9   r   
zeros_like
index_add_)rA   rS   grad_indicesr   r   rD   rE   rF   grad_inputsgrad_codebookrP   rC   rG   grad_output_flattenr   r   r   rT     s   




z*VectorQuantizationStraightThrough.backwardrU   )NNTr0   TrV   r   r   r   r   r[      s     Br[   c                       s*   e Zd ZdZd fdd	Zdd Z  ZS )Conv2dEncoder_v2ay  
    This class implements a convolutional encoder to extract classification embeddings from logspectra.

    Arguments
    ---------
    dim : int
        Number of channels of the extracted embeddings.

    Example
    -------
    >>> inputs = torch.ones(3, 431, 513)
    >>> model = Conv2dEncoder_v2()
    >>> print(model(inputs).shape)
    torch.Size([3, 256, 26, 32])
       c                    s   t    td|ddd| _t|| _t||ddd| _t|| _t||ddd| _	t|| _
t||ddd| _t|| _t|| _t | _d S )Nr      r1   )super__init__r#   Conv2dconv1BatchNorm2dbn1conv2bn2conv3bn3conv4bn4ResBlockAudioresblockReLUnonlselfr   r    r   r   rr   7  s   

zConv2dEncoder_v2.__init__c                 C   s   | d}| |}| |}| |}| |}| |}| |}| |}| |}| |}| |}| 	|}| |}| 
|}|S )z
        Computes forward pass.

        Arguments
        ---------
        x : torch.Tensor
            Log-power spectrogram. Expected shape `torch.Size([B, T, F])`.

        Returns
        -------
        Embeddings : torch.Tensor
        r   )r   rt   rv   r   rw   rx   ry   rz   r{   r|   r~   )r   xh1h2h3h4r   r   r   rQ   E  s   













zConv2dEncoder_v2.forward)ro   r!   rW   rX   rY   rr   rQ   __classcell__r   r   r   r   rn   &  s    rn   c                       s(   e Zd ZdZ fddZdd Z  ZS )r}   aQ  This class implements a residual block.

    Arguments
    ---------
    dim : int
        Input channels of the tensor to process. Matches output channels of the residual block.

    Example
    -------
    >>> res = ResBlockAudio(128)
    >>> x = torch.randn(2, 128, 16, 16)
    >>> print(x.shape)
    torch.Size([2, 128, 16, 16])
    c              
      sL   t    tt||dddt|tdt||dt|| _d S )N   r   T)rq   rr   r#   
Sequentialrs   ru   r   blockr   r   r   r   rr   x  s   

zResBlockAudio.__init__c                 C   s   ||  | S )zForward step.

        Arguments
        ---------
        x : torch.Tensor
            Tensor to process. Expected shape is `torch.Size([B, C, H, W])`.

        Returns
        -------
        Residual block output : torch.Tensor
        )r   )r   r   r   r   r   rQ     s   zResBlockAudio.forwardr   r   r   r   r   r}   h  s    
r}   c                       s8   e Zd ZdZ							d fdd	Zd	d
 Z  ZS )VectorQuantizedPSI_Audioa  
    This class reconstructs log-power spectrograms from classifier's representations.

    Arguments
    ---------
    dim : int
        Dimensionality of VQ vectors.
    K : int
        Number of elements of VQ dictionary.
    numclasses : int
        Number of possible classes
    activate_class_partitioning : bool
        `True` if latent space should be quantized for different classes.
    shared_keys : int
        Number of shared keys among classes.
    use_adapter : bool
        `True` to learn an adapter for classifier's representations.
    adapter_reduce_dim : bool
        `True` if adapter should compress representations.

    Example
    -------
    >>> psi = VectorQuantizedPSI_Audio(dim=256, K=1024)
    >>> x = torch.randn(2, 256, 16, 16)
    >>> labels = torch.Tensor([0, 2])
    >>> logspectra, hcat, z_q_x = psi(x, labels)
    >>> print(logspectra.shape, hcat.shape, z_q_x.shape)
    torch.Size([2, 1, 257, 257]) torch.Size([2, 256, 8, 8]) torch.Size([2, 256, 8, 8])
          2   Tr   c                    s  t    t|||||d| _|| _|| _|r2t|| _|r2t	||ddd| _
t||ddd| _tt||dddtdt|t||dddt t|t||dddt t|t||dddt t|t|dddd| _| t d S )N)
numclassesrD   rE   rp   r1   r1   r   r   T   )rq   rr   VQEmbeddingrC   use_adapteradapter_reduce_dimr}   adapterr#   rs   downConvTranspose2dupr   r   ru   decoderr^   r.   )r   r   r   r   rD   rE   r   r   r   r   r   rr     s>   


z!VectorQuantizedPSI_Audio.__init__c                 C   sj   | j r	| |}n|}| jr"| |}| j||\}}| |}n	| j||\}}| |}|||fS )a  
        Forward step. Reconstructs log-power based on provided label's keys in VQ dictionary.

        Arguments
        ---------
        hs : torch.Tensor
            Classifier's representations.
        labels : torch.Tensor
            Predicted labels for classifier's representations.

        Returns
        -------
        Reconstructed log-power spectrogram, reduced classifier's representations and quantized classifier's representations. : tuple
        )r   r   r   r   rC   straight_throughr   r   )r   hsr   hcatz_q_x_stz_q_xx_tilder   r   r   rQ     s   


z VectorQuantizedPSI_Audio.forward)r   r   r   Tr   TTr   r   r   r   r   r     s     ,r   c                       "   e Zd ZdZd fdd	Z  ZS ) VectorQuantizedPSIFocalNet_Audioal  
    This class reconstructs log-power spectrograms from a FocalNet classifier's representations.

    Arguments
    ---------
    dim : int
        Dimensionality of VQ vectors.
    **kwargs : dict
        See documentation of `VectorQuantizedPSI_Audio`.

    Example
    -------
    >>> psi = VectorQuantizedPSIFocalNet_Audio(dim=256, K=1024)
    >>> x = torch.randn(2, 256, 16, 16)
    >>> labels = torch.Tensor([0, 2])
    >>> logspectra, hcat, z_q_x = psi(x, labels)
    >>> print(logspectra.shape, hcat.shape, z_q_x.shape)
    torch.Size([2, 1, 495, 593]) torch.Size([2, 256, 8, 8]) torch.Size([2, 256, 8, 8])
       c                       t  jd	d|i| tt||dddt t|t||dddt t|t||dddt t|t||dddt t|t|dddd| _| t	 d S 
Nr   r   )rp   r   r   )rp   r   r   )rp   r1   )r0      r   
rq   rr   r#   r   r   r   ru   r   r^   r.   r   r   kwargsr   r   r   rr     "   z)VectorQuantizedPSIFocalNet_Audio.__init__)r   r!   rW   rX   rY   rr   r   r   r   r   r   r         r   c                       r   )VectorQuantizedPSIViT_Audioab  
    This class reconstructs log-power spectrograms from a ViT classifier's representations.

    Arguments
    ---------
    dim : int
        Dimensionality of VQ vectors.
    **kwargs : dict
        See documentation of `VectorQuantizedPSI_Audio`.

    Example
    -------
    >>> psi = VectorQuantizedPSIViT_Audio(dim=256, K=1024)
    >>> x = torch.randn(2, 256, 16, 16)
    >>> labels = torch.Tensor([0, 2])
    >>> logspectra, hcat, z_q_x = psi(x, labels)
    >>> print(logspectra.shape, hcat.shape, z_q_x.shape)
    torch.Size([2, 1, 495, 593]) torch.Size([2, 256, 8, 8]) torch.Size([2, 256, 8, 8])
       c                    r   r   r   r   r   r   r   rr   9  r   z$VectorQuantizedPSIViT_Audio.__init__)r   r   r   r   r   r   r   $  r   r   c                       s<   e Zd ZdZ			d fdd	Zddd	Zdd
dZ  ZS )r   a
  
    Implements VQ Dictionary. Wraps `VectorQuantization` and `VectorQuantizationStraightThrough`. For more details refer to the specific class.

    Arguments
    ---------
    K : int
        Number of elements of VQ dictionary.
    D : int
        Dimensionality of VQ vectors.
    numclasses : int
        Number of possible classes
    activate_class_partitioning : bool
        `True` if latent space should be quantized for different classes.
    shared_keys : int
        Number of shared keys among classes.

    r   Tr   c                    sH   t    t||| _| jjjd| d|  || _|| _	|| _
d S )Ng      r4   )rq   rr   r#   	Embedding	embeddingr&   r'   uniform_r   rD   rE   )r   r   Dr   rD   rE   r   r   r   rr   `  s   

zVQEmbedding.__init__Nc                 C   s*   | dddd }t|| jj|}|S )a  
        Wraps VectorQuantization. Computes VQ-dictionary indices for input quantization. Note that this forward step is not differentiable.

        Arguments
        ---------
        z_e_x : torch.Tensor
            Input tensor to be quantized.
        labels : torch.Tensor
            Predicted class for input representations (used for latent space quantization).

        Returns
        -------
        Codebook's indices for quantized representation : torch.Tensor

        Example
        -------
        >>> inputs = torch.ones(3, 256, 14, 25)
        >>> codebook = VQEmbedding(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> print(codebook(inputs, labels).shape)
        torch.Size([3, 14, 25])
        r   r1   r   r   )permuterg   r/   r^   r   r&   )r   z_e_xr   z_e_x_latentsr   r   r   rQ   q  s
   
zVQEmbedding.forwardc           
   	   C   s   | dddd }t|| jj || j| j| j	| j
\}}| dddd }tj| jjd|d}||}| dddd }	||	fS )a/  
        Implements the vector quantization with straight through approximation of the gradient.

        Arguments
        ---------
        z_e_x : torch.Tensor
            Input tensor to be quantized.
        labels : torch.Tensor
            Predicted class for input representations (used for latent space quantization).

        Returns
        -------
        Straight through quantized representation and quantized representation : tuple

        Example
        -------
        >>> inputs = torch.ones(3, 256, 14, 25)
        >>> codebook = VQEmbedding(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> quant, quant_ind = codebook.straight_through(inputs, labels)
        >>> print(quant.shape, quant_ind.shape)
        torch.Size([3, 256, 14, 25]) torch.Size([3, 256, 14, 25])

        r   r1   r   r   r\   )r   rg   r[   r^   r   r&   detachr   rD   rE   rF   r   r`   ra   )
r   r   r   r   z_q_x_rP   r   z_q_x_bar_flatten
z_q_x_bar_	z_q_x_barr   r   r   r     s"   
	

zVQEmbedding.straight_through)r   Tr   )N)r!   rW   rX   rY   rr   rQ   r   r   r   r   r   r   r   M  s    
r   )r   r   )rY   r   torch.nnr#   torch.autogradr   r   r.   r/   r[   Modulern   r}   r   r   r   r   r   r   r   r   <module>   s    
CbgB)j))