o
    %ݫi#7                     @   s   d Z ddlmZ ddlZddlmZ ddlmZ ddlmZ ddl	m
Z
 G dd	 d	ejZG d
d deZedg dZedg dZG dd deZdS )zoAutoencoder implementation. Can be used for Latent Diffusion or in isolation

Authors
 * Artem Ploujnikov 2022
    )
namedtupleN)nn)clean_padding)
GlobalNorm)trim_asc                   @   s*   e Zd ZdZd	ddZdd Zdd ZdS )
Autoencodera  A standard interface for autoencoders

    Example
    -------
    >>> import torch
    >>> from torch import nn
    >>> from speechbrain.nnet.linear import Linear
    >>> class SimpleAutoencoder(Autoencoder):
    ...    def __init__(self):
    ...        super().__init__()
    ...        self.enc = Linear(n_neurons=16, input_size=128)
    ...        self.dec = Linear(n_neurons=128, input_size=16)
    ...    def encode(self, x, length=None):
    ...        return self.enc(x)
    ...    def decode(self, x, length=None):
    ...        return self.dec(x)
    >>> autoencoder = SimpleAutoencoder()
    >>> x = torch.randn(4, 10, 128)
    >>> x_enc = autoencoder.encode(x)
    >>> x_enc.shape
    torch.Size([4, 10, 16])
    >>> x_enc_fw = autoencoder(x)
    >>> x_enc_fw.shape
    torch.Size([4, 10, 16])
    >>> x_rec = autoencoder.decode(x_enc)
    >>> x_rec.shape
    torch.Size([4, 10, 128])
    Nc                 C      t )a  Converts a sample from an original space (e.g. pixel or waveform) to a latent
        space

        Arguments
        ---------
        x: torch.Tensor
            the original data representation
        length: torch.Tensor
            a tensor of relative lengths
        NotImplementedErrorselfxlength r   Q/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/nnet/autoencoders.pyencode/   s   zAutoencoder.encodec                 C   r   )zDecodes the sample from a latent representation

        Arguments
        ---------
        latent: torch.Tensor
            the latent representation
        r	   r   latentr   r   r   decode<   s   zAutoencoder.decodec                 C   
   |  |S )zPerforms the forward pass

        Arguments
        ---------
        x: torch.Tensor
            the input tensor

        Returns
        -------
        result: torch.Tensor
            the result
        )r   )r   r   r   r   r   forwardF      
zAutoencoder.forwardN)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r      s
    

r   c                       sV   e Zd ZdZ							d fdd	Zddd	Zd
d Zdd Z	dddZ  Z	S )VariationalAutoencodera	  A Variational Autoencoder (VAE) implementation.

    Paper reference: https://arxiv.org/abs/1312.6114

    Arguments
    ---------
    encoder: torch.Module
        the encoder network
    decoder: torch.Module
        the decoder network
    mean: torch.Module
        the module that computes the mean
    log_var: torch.Module
        the module that computes the log variance
    len_dim: None
        the length dimension
    latent_padding: function
        the function to use when padding the latent variable
    mask_latent: bool
        where to apply the length mask to the latent representation
    mask_out: bool
        whether to apply the length mask to the output
    out_mask_value: float
        the mask value used for the output
    latent_mask_value: float
        the mask value used for the latent representation
    latent_stochastic: bool
        if true, the "latent" parameter of VariationalAutoencoderOutput
        will be the latent space sample
        if false, it will be the mean

    Example
    -------
    The example below shows a very simple implementation of
    VAE, not suitable for actual experiments:

    >>> import torch
    >>> from torch import nn
    >>> from speechbrain.nnet.linear import Linear
    >>> vae_enc = Linear(n_neurons=16, input_size=128)
    >>> vae_dec = Linear(n_neurons=128, input_size=16)
    >>> vae_mean = Linear(n_neurons=16, input_size=16)
    >>> vae_log_var = Linear(n_neurons=16, input_size=16)
    >>> vae = VariationalAutoencoder(
    ...     encoder=vae_enc,
    ...     decoder=vae_dec,
    ...     mean=vae_mean,
    ...     log_var=vae_log_var,
    ... )
    >>> x = torch.randn(4, 10, 128)

    `train_sample` encodes a single batch and then reconstructs
    it

    >>> vae_out = vae.train_sample(x)
    >>> vae_out.rec.shape
    torch.Size([4, 10, 128])
    >>> vae_out.latent.shape
    torch.Size([4, 10, 16])
    >>> vae_out.mean.shape
    torch.Size([4, 10, 16])
    >>> vae_out.log_var.shape
    torch.Size([4, 10, 16])
    >>> vae_out.latent_sample.shape
    torch.Size([4, 10, 16])

    .encode() will return the mean corresponding
    to the sample provided

    >>> x_enc = vae.encode(x)
    >>> x_enc.shape
    torch.Size([4, 10, 16])

    .reparameterize() performs the reparameterization
    trick

    >>> x_enc = vae.encoder(x)
    >>> mean = vae.mean(x_enc)
    >>> log_var = vae.log_var(x_enc)
    >>> x_repar = vae.reparameterize(mean, log_var)
    >>> x_repar.shape
    torch.Size([4, 10, 16])

       NT        c                    sP   t    || _|| _|| _|| _|| _|| _|| _|| _	|	| _
|
| _|| _d S r   )super__init__encoderdecodermeanlog_varlen_dimlatent_paddingmask_latentmask_outout_mask_valuelatent_mask_valuelatent_stochastic)r   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   	__class__r   r   r!      s   

zVariationalAutoencoder.__init__c                 C   s   |  |}| |S )a  Converts a sample from an original space (e.g. pixel or waveform) to a latent
        space

        Arguments
        ---------
        x: torch.Tensor
            the original data representation
        length: torch.Tensor
            the length of the corresponding input samples (optional)

        Returns
        -------
        latent: torch.Tensor
            the latent representation
        )r"   r$   )r   r   r   encoder_outr   r   r   r      s   

zVariationalAutoencoder.encodec                 C   r   zDecodes the sample from a latent representation

        Arguments
        ---------
        latent: torch.Tensor
            the latent representation

        Returns
        -------
        result: torch.Tensor
            the decoded sample
        r#   r   r   r   r   r      r   zVariationalAutoencoder.decodec                 C   s    t |}||t d|   S )a  Applies the VAE reparameterization trick to get a latent space
        single latent space sample for decoding

        Arguments
        ---------
        mean: torch.Tensor
            the latent representation mean
        log_var: torch.Tensor
            the logarithm of the latent representation variance

        Returns
        -------
        sample: torch.Tensor
            a latent space sample
        g      ?)torch
randn_likeexp)r   r$   r%   epsilonr   r   r   reparameterize   s   
z%VariationalAutoencoder.reparameterizec                 C   s   |du r| j }|du r| j}| |}| |}| |}| ||}| jdur2| j||d\}}	n|}	| jrC|durCt||	| j	|}| 
|}
t|
|}
| jr\|dur\t|
|| j	|}
| jrb|}n	| j||d\}}	t|
|||||	S )a  Provides a data sample for training the autoencoder

        Arguments
        ---------
        x: torch.Tensor
            the source data (in the sample space)
        length: None
            the length (optional). If provided, latents and
            outputs will be masked
        out_mask_value: float
            the mask value used for the output
        latent_mask_value: float
            the mask value used for the latent tensor


        Returns
        -------
        result: VariationalAutoencoderOutput
            a named tuple with the following values
            rec: torch.Tensor
                the reconstruction
            latent: torch.Tensor
                the latent space sample
            mean: torch.Tensor
                the mean of the latent representation
            log_var: torch.Tensor
                the logarithm of the variance of the latent representation

        Nr   )r*   r+   r"   r$   r%   r6   r'   r(   r   r&   r   r   r)   r,   VariationalAutoencoderOutput)r   r   r   r*   r+   r/   r$   r%   latent_samplelatent_lengthx_recr   r   r   r   train_sample   s6    






z#VariationalAutoencoder.train_sample)r   NTTr   r   Tr   NNN)
r   r   r   r   r!   r   r   r6   r<   __classcell__r   r   r-   r   r   V   s    [
r   r8   )recr   r$   r%   r9   r:   AutoencoderOutput)r?   r   r:   c                       sN   e Zd ZdZ							d fdd	Zddd	Zd
d Z	dddZ  ZS )NormalizingAutoencoderan  A classical (non-variational) autoencoder that
    does not use reparameterization but instead uses
    an ordinary normalization technique to constrain
    the latent space

    Arguments
    ---------
    encoder: torch.nn.Module
        the encoder to be used
    decoder: torch.nn.Module
        the decoder to be used
    latent_padding: function
        Function to use when padding the latent tensor
    norm: torch.nn.Module
        the normalization module
    len_dim: int
        The time dimension, which the length applies to.
    mask_out: bool
        whether to apply the length mask to the output
    mask_latent: bool
        where to apply the length mask to the latent representation
    out_mask_value: float
        the mask value used for the output
    latent_mask_value: float
        the mask value used for the latent tensor

    Examples
    --------
    >>> import torch
    >>> from torch import nn
    >>> from speechbrain.nnet.linear import Linear
    >>> ae_enc = Linear(n_neurons=16, input_size=128)
    >>> ae_dec = Linear(n_neurons=128, input_size=16)
    >>> ae = NormalizingAutoencoder(
    ...     encoder=ae_enc,
    ...     decoder=ae_dec,
    ... )
    >>> x = torch.randn(4, 10, 128)
    >>> x_enc = ae.encode(x)
    >>> x_enc.shape
    torch.Size([4, 10, 16])
    >>> x_dec = ae.decode(x_enc)
    >>> x_dec.shape
    torch.Size([4, 10, 128])
    Nr   Tr   c
           
         sR   t    || _|| _|| _|d u rt }|| _|| _|| _|| _	|| _
|	| _d S r   )r    r!   r"   r#   r'   r   normr&   r)   r(   r*   r+   )
r   r"   r#   r'   rB   r&   r)   r(   r*   r+   r-   r   r   r!   w  s   

zNormalizingAutoencoder.__init__c                 C   s   |  |}| j||d}|S )a  Converts a sample from an original space (e.g. pixel or waveform) to a latent
        space

        Arguments
        ---------
        x: torch.Tensor
            the original data representation
        length: torch.Tensor
            The length of each sample in the input tensor.

        Returns
        -------
        latent: torch.Tensor
            the latent representation
        )lengths)r"   rB   r   r   r   r   r     s   
zNormalizingAutoencoder.encodec                 C   r   r0   r1   r   r   r   r   r     r   zNormalizingAutoencoder.decodec                 C   s   |du r| j }|du r| j}| j||d}| jdur$| j||d\}}n|}| jr5|dur5t||| j|}| |}t||}| j	rN|durNt||| j|}t
|||S )a  Provides a data sample for training the autoencoder

        Arguments
        ---------
        x: torch.Tensor
            the source data (in the sample space)
        length: torch.Tensor
            the length (optional). If provided, latents and
            outputs will be masked
        out_mask_value: float
            The value to use when masking the output.
        latent_mask_value: float
            The value to use when masking the latent tensor.

        Returns
        -------
        result: AutoencoderOutput
            a named tuple with the following values
            rec: torch.Tensor
                the reconstruction
            latent: torch.Tensor
                the latent space sample
        Nr7   )r*   r+   r   r'   r(   r   r&   r   r   r)   r@   )r   r   r   r*   r+   r   r:   r;   r   r   r   r<     s"   



z#NormalizingAutoencoder.train_sample)NNr   TTr   r   r   r=   )	r   r   r   r   r!   r   r   r<   r>   r   r   r-   r   rA   H  s    2
rA   )r   collectionsr   r2   r   speechbrain.dataio.dataior   speechbrain.processing.featuresr   speechbrain.utils.data_utilsr   Moduler   r   r8   r@   rA   r   r   r   r   <module>   s$    E i