o
    ½e¦ibV  ã                   @   sÂ   d Z ddlZddlmZ ddlmZ ddd„Zdd	„ ZG d
d„ deƒZG dd„ deƒZ	G dd„ dej
ƒZG dd„ dej
ƒZG dd„ dej
ƒZG dd„ deƒZG dd„ deƒZG dd„ dej
ƒZdS )z¨This file implements the necessary classes and functions to implement Posthoc Interpretations via Quantization.

 Authors
 * Cem Subakan 2023
 * Francesco Paissan 2023
é    N)ÚFunctioné   ÚTRAINc                 C   sÂ   t  t  d|d || ¡¡ | j¡}| d¡ | jd d¡}|  d¡ d|| ¡}||k}|dkrIt j|t  	|jd |¡ | j¡gdddk}|S t j|t  
|jd |¡ | j¡gdddk}|S )a¥  This class returns binary matrix that indicates the irrelevant regions in the VQ-dictionary given the labels array

    Arguments
    ---------
    labels : torch.Tensor
        1 dimensional tensor of size [B]
    K : int
        Number of keys in the dictionary
    num_classes : int
        Number of possible classes
    N_shared : int
        Number of shared keys
    stage : str
        "TRAIN" or else

    Returns
    -------
    irrelevant_regions : torch.Tensor

    Example
    -------
    >>> labels = torch.Tensor([1, 0, 2])
    >>> irrelevant_regions = get_irrelevant_regions(labels, 20, 3, 5)
    >>> print(irrelevant_regions.shape)
    torch.Size([3, 20])
    g      à¿gR¸…ëQà?r   é   r   ©Údim)ÚtorchÚroundÚlinspaceÚtoÚdeviceÚ	unsqueezeÚrepeatÚshapeÚcatÚonesÚzeros)ÚlabelsÚKÚnum_classesÚN_sharedÚstageÚuniform_matÚlabels_expandedÚirrelevant_regions© r   úZ/home/ubuntu/transcripts/venv/lib/python3.10/site-packages/speechbrain/lobes/models/PIQ.pyÚget_irrelevant_regions   sB   ÿþÿþù	÷ÿõÿþù	÷ÿr   c                 C   s^   | j j}| d¡dkr-ztj | jj¡ | jj 	d¡ W dS  t
y,   td|ƒ Y dS w dS )z;
    Applies Xavier initialization to network weights.
    ÚConvéÿÿÿÿr   zSkipping initialization of N)Ú	__class__Ú__name__ÚfindÚnnÚinitÚxavier_uniform_ÚweightÚdataÚbiasÚfill_ÚAttributeErrorÚprint)ÚmÚ	classnamer   r   r   Úweights_initP   s   ÿür.   c                   @   s4   e Zd ZdZe					d	dd„ƒZedd„ ƒZdS )
ÚVectorQuantizationa  This class defines the forward method for vector quantization. As VQ is not differentiable, it returns a RuntimeError in case `.grad()` is called. Refer to `VectorQuantizationStraightThrough` for a straight_through estimation of the gradient for the VQ operation.Né
   Tc                 C   s  t  ¡ } | d¡}| ¡ }	| d|¡}
| ddd¡ d|	d |	d ¡}| d¡}t||jd |||r5dndd}t j|d dd}t j|
d dd	d
}t j	|| |
| 
¡ ddd}|rat j||< t j|dd\}}|j|	dd… Ž }|  |¡ |W  d  ƒ S 1 s„w   Y  dS )a[  
        Applies VQ to vectors `input` with `codebook` as VQ dictionary.

        Arguments
        ---------
        ctx : torch context
            The context object for storing info for backwards.
        inputs : torch.Tensor
            Hidden representations to quantize. Expected shape is `torch.Size([B, W, H, C])`.
        codebook : torch.Tensor
            VQ-dictionary for quantization. Expected shape of `torch.Size([K, C])` with K dictionary elements.
        labels : torch.Tensor
            Classification labels. Used to define irrelevant regions and divide the latent space based on predicted class. Shape should be `torch.Size([B])`.
        num_classes : int
            Number of possible classes
        activate_class_partitioning : bool
            `True` if latent space should be quantized for different classes.
        shared_keys : int
            Number of shared keys among classes.
        training : bool
            `True` if stage is TRAIN.

        Returns
        -------
        Codebook's indices for quantized representation : torch.Tensor

        Example
        -------
        >>> inputs = torch.ones(3, 14, 25, 256)
        >>> codebook = torch.randn(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> print(VectorQuantization.apply(inputs, codebook, labels).shape)
        torch.Size([3, 14, 25])
        r   r   é   r   r   ÚVALID)r   r   r   T)r   Úkeepdimg       Àç      ð?)ÚalphaÚbetaN)r   Úno_gradÚsizeÚviewÚreshaper   r   r   ÚsumÚaddmmÚtÚinfÚminÚmark_non_differentiable)ÚctxÚinputsÚcodebookr   r   Úactivate_class_partitioningÚshared_keysÚtrainingÚembedding_sizeÚinputs_sizeÚinputs_flattenr   Úlabels_flattenr   Úcodebook_sqrÚ
inputs_sqrÚ	distancesÚ_Úindices_flattenÚindicesr   r   r   Úforward`   s>   
-
ÿ

ûû	

$ÛzVectorQuantization.forwardc                 C   s   t dƒ‚)z;Handles error in case grad() is called on the VQ operation.zÝTrying to call `.grad()` on graph containing `VectorQuantization`. The function `VectorQuantization` is not differentiable. Use `VectorQuantizationStraightThrough` if you want a straight-through estimator of the gradient.)ÚRuntimeError)rA   Úgrad_outputr   r   r   Úbackward´   s   ÿzVectorQuantization.backward©Nr0   Tr0   T©r!   Ú
__module__Ú__qualname__Ú__doc__ÚstaticmethodrQ   rT   r   r   r   r   r/   ]   s    øSr/   c                   @   s@   e Zd ZdZe					d	dd„ƒZe					d
dd„ƒZdS )Ú!VectorQuantizationStraightThroughzªThis class defines the forward method for vector quantization. As VQ is not differentiable, it approximates the gradient of the VQ as in https://arxiv.org/abs/1711.00937.Nr0   Tc              	   C   sX   t  |||||||¡}| d¡}	|  |	|¡ |  |	¡ tj|d|	d}
|
 |¡}||	fS )a3  
        Applies VQ to vectors `input` with `codebook` as VQ dictionary and estimates gradients with a
        Straight-Through (id) approximation of the quantization steps.

        Arguments
        ---------
        ctx : torch context
            The context object for storing info for backwards.
        inputs : torch.Tensor
            Hidden representations to quantize. Expected shape is `torch.Size([B, W, H, C])`.
        codebook : torch.Tensor
            VQ-dictionary for quantization. Expected shape of `torch.Size([K, C])` with K dictionary elements.
        labels : torch.Tensor
            Classification labels. Used to define irrelevant regions and divide the latent space based on predicted class. Shape should be `torch.Size([B])`.
        num_classes : int
            Number of possible classes
        activate_class_partitioning : bool
            `True` if latent space should be quantized for different classes.
        shared_keys : int
            Number of shared keys among classes.
        training : bool
            `True` if stage is TRAIN.

        Returns
        -------
        Quantized representation and codebook's indices for quantized representation : tuple

        Example
        -------
        >>> inputs = torch.ones(3, 14, 25, 256)
        >>> codebook = torch.randn(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> quant, quant_ind = VectorQuantizationStraightThrough.apply(inputs, codebook, labels)
        >>> print(quant.shape, quant_ind.shape)
        torch.Size([3, 14, 25, 256]) torch.Size([1050])
        r   r   ©r   Úindex)r/   Úapplyr9   Úsave_for_backwardr@   r   Úindex_selectÚview_as)rA   rB   rC   r   r   rD   rE   rF   rP   rO   Úcodes_flattenÚcodesr   r   r   rQ   Â   s"   /ù
	
ÿ
z)VectorQuantizationStraightThrough.forwardc                 C   sr   d\}}	| j d r| ¡ }| j d r0| j\}
}| d¡}| ¡  d|¡}t |¡}	|	 d|
|¡ ||	dddddfS )zz
        Estimates gradient assuming vector quantization as identity function. (https://arxiv.org/abs/1711.00937)
        )NNr   r   r   N)	Úneeds_input_gradÚcloneÚsaved_tensorsr8   Ú
contiguousr9   r   Ú
zeros_likeÚ
index_add_)rA   rS   Úgrad_indicesr   r   rD   rE   rF   Úgrad_inputsÚgrad_codebookrP   rC   rG   Úgrad_output_flattenr   r   r   rT     s   



ÿ
z*VectorQuantizationStraightThrough.backwardrU   )NNTr0   TrV   r   r   r   r   r[   ¿   s     øBør[   c                       s*   e Zd ZdZd‡ fdd„	Zdd„ Z‡  ZS )ÚConv2dEncoder_v2ay  
    This class implements a convolutional encoder to extract classification embeddings from logspectra.

    Arguments
    ---------
    dim : int
        Number of channels of the extracted embeddings.

    Example
    -------
    >>> inputs = torch.ones(3, 431, 513)
    >>> model = Conv2dEncoder_v2()
    >>> print(model(inputs).shape)
    torch.Size([3, 256, 26, 32])
    é   c                    s¢   t ƒ  ¡  t d|ddd¡| _t |¡| _t ||ddd¡| _t |¡| _t ||ddd¡| _	t |¡| _
t ||ddd¡| _t |¡| _t|ƒ| _t ¡ | _d S )Nr   é   r1   )ÚsuperÚ__init__r#   ÚConv2dÚconv1ÚBatchNorm2dÚbn1Úconv2Úbn2Úconv3Úbn3Úconv4Úbn4ÚResBlockAudioÚresblockÚReLUÚnonl©Úselfr   ©r    r   r   rr   7  s   

zConv2dEncoder_v2.__init__c                 C   s   |  d¡}|  |¡}|  |¡}|  |¡}|  |¡}|  |¡}|  |¡}|  |¡}|  |¡}|  |¡}|  |¡}|  	|¡}|  |¡}|  
|¡}|S )zô
        Computes forward pass.

        Arguments
        ---------
        x : torch.Tensor
            Log-power spectrogram. Expected shape `torch.Size([B, T, F])`.

        Returns
        -------
        Embeddings : torch.Tensor
        r   )r   rt   rv   r€   rw   rx   ry   rz   r{   r|   r~   )r‚   ÚxÚh1Úh2Úh3Úh4r   r   r   rQ   E  s   













zConv2dEncoder_v2.forward)ro   ©r!   rW   rX   rY   rr   rQ   Ú__classcell__r   r   rƒ   r   rn   &  s    rn   c                       s(   e Zd ZdZ‡ fdd„Zdd„ Z‡  ZS )r}   aQ  This class implements a residual block.

    Arguments
    ---------
    dim : int
        Input channels of the tensor to process. Matches output channels of the residual block.

    Example
    -------
    >>> res = ResBlockAudio(128)
    >>> x = torch.randn(2, 128, 16, 16)
    >>> print(x.shape)
    torch.Size([2, 128, 16, 16])
    c              
      sL   t ƒ  ¡  t t ||ddd¡t |¡t d¡t ||d¡t |¡¡| _d S )Né   r   T)rq   rr   r#   Ú
Sequentialrs   ru   r   Úblockr   rƒ   r   r   rr   x  s   

ûzResBlockAudio.__init__c                 C   s   ||   |¡ S )zïForward step.

        Arguments
        ---------
        x : torch.Tensor
            Tensor to process. Expected shape is `torch.Size([B, C, H, W])`.

        Returns
        -------
        Residual block output : torch.Tensor
        )r   )r‚   r„   r   r   r   rQ   ‚  s   zResBlockAudio.forwardr‰   r   r   rƒ   r   r}   h  s    
r}   c                       s8   e Zd ZdZ							d‡ fdd„	Zd	d
„ Z‡  ZS )ÚVectorQuantizedPSI_Audioaæ  
    This class reconstructs log-power spectrograms from classifier's representations.

    Arguments
    ---------
    dim : int
        Dimensionality of VQ vectors.
    K : int
        Number of elements of VQ dictionary.
    numclasses : int
        Number of possible classes
    activate_class_partitioning : bool
        `True` if latent space should be quantized for different classes.
    shared_keys : int
        Number of shared keys among classes.
    use_adapter : bool
        `True` to learn an adapter for classifier's representations.
    adapter_reduce_dim : bool
        `True` if adapter should compress representations.

    Example
    -------
    >>> psi = VectorQuantizedPSI_Audio(dim=256, K=1024)
    >>> x = torch.randn(2, 256, 16, 16)
    >>> labels = torch.Tensor([0, 2])
    >>> logspectra, hcat, z_q_x = psi(x, labels)
    >>> print(logspectra.shape, hcat.shape, z_q_x.shape)
    torch.Size([2, 1, 257, 257]) torch.Size([2, 256, 8, 8]) torch.Size([2, 256, 8, 8])
    é€   é   é2   Tr   c                    s  t ƒ  ¡  t|||||d| _|| _|| _|r2t|ƒ| _|r2t 	||ddd¡| _
t ||ddd¡| _t t ||ddd¡t d¡t |¡t ||ddd¡t ¡ t |¡t ||ddd¡t ¡ t |¡t ||ddd¡t ¡ t |¡t |dddd¡¡| _|  t¡ d S )N)Ú
numclassesrD   rE   rp   ©r1   r1   r   r‹   Té   )rq   rr   ÚVQEmbeddingrC   Úuse_adapterÚadapter_reduce_dimr}   Úadapterr#   rs   ÚdownÚConvTranspose2dÚuprŒ   r   ru   Údecoderr^   r.   )r‚   r   r   r’   rD   rE   r–   r—   rƒ   r   r   rr   °  s>   

û
óz!VectorQuantizedPSI_Audio.__init__c                 C   sj   | j r	|  |¡}n|}| jr"|  |¡}| j ||¡\}}|  |¡}n	| j ||¡\}}|  |¡}|||fS )aÔ  
        Forward step. Reconstructs log-power based on provided label's keys in VQ dictionary.

        Arguments
        ---------
        hs : torch.Tensor
            Classifier's representations.
        labels : torch.Tensor
            Predicted labels for classifier's representations.

        Returns
        -------
        Reconstructed log-power spectrogram, reduced classifier's representations and quantized classifier's representations. : tuple
        )r–   r˜   r—   r™   rC   Ústraight_throughr›   rœ   )r‚   Úhsr   ÚhcatÚz_q_x_stÚz_q_xÚx_tilder   r   r   rQ   Ü  s   


z VectorQuantizedPSI_Audio.forward)r   r   r‘   Tr   TTr‰   r   r   rƒ   r   rŽ   ‘  s     ø,rŽ   c                       ó"   e Zd ZdZd‡ fdd„	Z‡  ZS )Ú VectorQuantizedPSIFocalNet_Audioal  
    This class reconstructs log-power spectrograms from a FocalNet classifier's representations.

    Arguments
    ---------
    dim : int
        Dimensionality of VQ vectors.
    **kwargs : dict
        See documentation of `VectorQuantizedPSI_Audio`.

    Example
    -------
    >>> psi = VectorQuantizedPSIFocalNet_Audio(dim=256, K=1024)
    >>> x = torch.randn(2, 256, 16, 16)
    >>> labels = torch.Tensor([0, 2])
    >>> logspectra, hcat, z_q_x = psi(x, labels)
    >>> print(logspectra.shape, hcat.shape, z_q_x.shape)
    torch.Size([2, 1, 495, 593]) torch.Size([2, 256, 8, 8]) torch.Size([2, 256, 8, 8])
    é   c                    ó¶   t ƒ jd	d|i|¤Ž t t ||ddd¡t ¡ t |¡t ||ddd¡t ¡ t |¡t ||ddd¡t ¡ t |¡t ||ddd¡t ¡ t |¡t |dddd¡¡| _|  t	¡ d S ©
Nr   r‹   )rp   r   r   )rp   r   r“   )rp   r1   )r0   é   r   ©
rq   rr   r#   rŒ   rš   r   ru   rœ   r^   r.   ©r‚   r   Úkwargsrƒ   r   r   rr     ó"   óz)VectorQuantizedPSIFocalNet_Audio.__init__)r¥   ©r!   rW   rX   rY   rr   rŠ   r   r   rƒ   r   r¤   û  ó    r¤   c                       r£   )ÚVectorQuantizedPSIViT_Audioab  
    This class reconstructs log-power spectrograms from a ViT classifier's representations.

    Arguments
    ---------
    dim : int
        Dimensionality of VQ vectors.
    **kwargs : dict
        See documentation of `VectorQuantizedPSI_Audio`.

    Example
    -------
    >>> psi = VectorQuantizedPSIViT_Audio(dim=256, K=1024)
    >>> x = torch.randn(2, 256, 16, 16)
    >>> labels = torch.Tensor([0, 2])
    >>> logspectra, hcat, z_q_x = psi(x, labels)
    >>> print(logspectra.shape, hcat.shape, z_q_x.shape)
    torch.Size([2, 1, 495, 593]) torch.Size([2, 256, 8, 8]) torch.Size([2, 256, 8, 8])
    é   c                    r¦   r§   r©   rª   rƒ   r   r   rr   9  r¬   z$VectorQuantizedPSIViT_Audio.__init__)r°   r­   r   r   rƒ   r   r¯   $  r®   r¯   c                       s<   e Zd ZdZ			d‡ fdd„	Zddd	„Zdd
d„Z‡  ZS )r•   a
  
    Implements VQ Dictionary. Wraps `VectorQuantization` and `VectorQuantizationStraightThrough`. For more details refer to the specific class.

    Arguments
    ---------
    K : int
        Number of elements of VQ dictionary.
    D : int
        Dimensionality of VQ vectors.
    numclasses : int
        Number of possible classes
    activate_class_partitioning : bool
        `True` if latent space should be quantized for different classes.
    shared_keys : int
        Number of shared keys among classes.

    r‘   Tr   c                    sH   t ƒ  ¡  t ||¡| _| jjj d| d| ¡ || _|| _	|| _
d S )Ng      ð¿r4   )rq   rr   r#   Ú	EmbeddingÚ	embeddingr&   r'   Úuniform_r’   rD   rE   )r‚   r   ÚDr’   rD   rE   rƒ   r   r   rr   `  s   

zVQEmbedding.__init__Nc                 C   s*   |  dddd¡ ¡ }t || jj|¡}|S )aá  
        Wraps VectorQuantization. Computes VQ-dictionary indices for input quantization. Note that this forward step is not differentiable.

        Arguments
        ---------
        z_e_x : torch.Tensor
            Input tensor to be quantized.
        labels : torch.Tensor
            Predicted class for input representations (used for latent space quantization).

        Returns
        -------
        Codebook's indices for quantized representation : torch.Tensor

        Example
        -------
        >>> inputs = torch.ones(3, 256, 14, 25)
        >>> codebook = VQEmbedding(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> print(codebook(inputs, labels).shape)
        torch.Size([3, 14, 25])
        r   r1   r‹   r   )Úpermuterg   r/   r^   r²   r&   )r‚   Úz_e_xr   Úz_e_x_Úlatentsr   r   r   rQ   q  s
   
ÿzVQEmbedding.forwardc           
   	   C   sŒ   |  dddd¡ ¡ }t || jj ¡ || j| j| j	| j
¡\}}|  dddd¡ ¡ }tj| jjd|d}| |¡}|  dddd¡ ¡ }	||	fS )a/  
        Implements the vector quantization with straight through approximation of the gradient.

        Arguments
        ---------
        z_e_x : torch.Tensor
            Input tensor to be quantized.
        labels : torch.Tensor
            Predicted class for input representations (used for latent space quantization).

        Returns
        -------
        Straight through quantized representation and quantized representation : tuple

        Example
        -------
        >>> inputs = torch.ones(3, 256, 14, 25)
        >>> codebook = VQEmbedding(1024, 256)
        >>> labels = torch.Tensor([1, 0, 2])
        >>> quant, quant_ind = codebook.straight_through(inputs, labels)
        >>> print(quant.shape, quant_ind.shape)
        torch.Size([3, 256, 14, 25]) torch.Size([3, 256, 14, 25])

        r   r1   r‹   r   r\   )rµ   rg   r[   r^   r²   r&   Údetachr’   rD   rE   rF   r   r`   ra   )
r‚   r¶   r   r·   Úz_q_x_rP   r¡   Úz_q_x_bar_flattenÚ
z_q_x_bar_Ú	z_q_x_barr   r   r   r   Ž  s"   
ù	
ÿ
zVQEmbedding.straight_through)r‘   Tr   )N)r!   rW   rX   rY   rr   rQ   r   rŠ   r   r   rƒ   r   r•   M  s    ú
r•   )r   r   )rY   r   Útorch.nnr#   Útorch.autogradr   r   r.   r/   r[   ÚModulern   r}   rŽ   r¤   r¯   r•   r   r   r   r   Ú<module>   s    
CbgB)j))