o
    %ݫi(T                     @   s   d Z ddlmZ ddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ dd	lmZ G d
d dejZdZdZdZdZG dd deZG dd dejZdd ZG dd dejZG dd dejZeedZedg dZedddgZdS ) zAn implementation of Denoising Diffusion

https://arxiv.org/pdf/2006.11239.pdf

Certain parts adopted from / inspired by denoising-diffusion-pytorch
https://github.com/lucidrains/denoising-diffusion-pytorch

Authors
 * Artem Ploujnikov 2022
    )
namedtupleN)nn)
functional)tqdm)length_to_mask)
data_utils)unsqueeze_asc                       sH   e Zd ZdZd fdd	ZdddZdddZd	d
 ZdddZ  Z	S )Diffusera  A base diffusion implementation

    Arguments
    ---------
    model: nn.Module
        the underlying model
    timesteps: int
        the number of timesteps
    noise: callable|str
        the noise function/module to use

        The following predefined types of noise are provided
        "gaussian": Gaussian noise, applied to the whole sample
        "length_masked_gaussian": Gaussian noise applied only
            to the parts of the sample that is not padding
    Nc                    sF   t    || _|| _|d u rd}t|trt|  | _d S || _d S )Ngaussian)super__init__model	timesteps
isinstancestr_NOISE_FUNCTIONSnoise)selfr   r   r   	__class__ N/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/nnet/diffusion.pyr   *   s   


zDiffuser.__init__c                 C      t )a  Adds noise to a batch of data

        Arguments
        ---------
        x: torch.Tensor
            the original data sample
        timesteps: torch.Tensor
            a 1-D integer tensor of a length equal to the number of
            batches in x, where each entry corresponds to the timestep
            number for the batch. If omitted, timesteps will be randomly
            sampled
        NotImplementedErrorr   xr   r   r   r   distort5   s   zDiffuser.distortc                 K   sl   |du r
t || j}| j|fd|i|\}}|du r&| j||fi |}n| j|||fi |}|||fS )a  Creates a sample for the training loop with a
        corresponding target

        Arguments
        ---------
        x: torch.Tensor
            the original data sample
        timesteps: torch.Tensor
            a 1-D integer tensor of a length equal to the number of
            batches in x, where each entry corresponds to the timestep
            number for the batch. If omitted, timesteps will be randomly
            sampled
        condition: torch.Tensor
            the condition used for conditional generation
            Should be omitted during unconditional generation
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        pred: torch.Tensor
            the model output 0 predicted noise
        noise: torch.Tensor
            the noise being applied
        noisy_sample: torch.Tensor
            the sample with the noise applied
        Nr   )sample_timestepsr   r   r   )r   r   r   	conditionkwargsnoisy_sampler   predr   r   r   train_sampleD   s   
zDiffuser.train_samplec                 K   r   )a  Generates the number of samples indicated by the
        count parameter

        Arguments
        ---------
        shape: enumerable
            the shape of the sample to generate
        **kwargs: dict
            Arguments to forward to the underlying model.
        r   )r   shaper    r   r   r   samplek   s   zDiffuser.samplec                 C   s   |  ||S )z*Computes the forward pass, calls distort())r   r   r   r   r   forwardx   s   zDiffuser.forwardNNN)
__name__
__module____qualname____doc__r   r   r#   r%   r&   __classcell__r   r   r   r   r	      s    

'r	   g-C6?g{Gz?i  zDiffusion Samplingc                       sb   e Zd ZdZ							d fdd	Zdd Zddd	Ze d
d Z	e dd Z
  ZS )DenoisingDiffusiona  An implementation of a classic Denoising Diffusion Probabilistic Model (DDPM)

    Arguments
    ---------
    model: nn.Module
        the underlying model
    timesteps: int
        the number of timesteps
    noise: str|nn.Module
        the type of noise being used
        "gaussian" will produce standard Gaussian noise
    beta_start: float
        the value of the "beta" parameter at the beginning at the end of the process
        (see the paper)
    beta_end: float
        the value of the "beta" parameter at the end of the process
    sample_min: float
    sample_max: float
        Used to clip the output.
    show_progress: bool
        whether to show progress during inference

    Example
    -------
    >>> from speechbrain.nnet.unet import UNetModel
    >>> unet = UNetModel(
    ...     in_channels=1,
    ...     model_channels=16,
    ...     norm_num_groups=4,
    ...     out_channels=1,
    ...     num_res_blocks=1,
    ...     attention_resolutions=[]
    ... )
    >>> diff = DenoisingDiffusion(
    ...     model=unet,
    ...     timesteps=5
    ... )
    >>> x = torch.randn(4, 1, 64, 64)
    >>> pred, noise, noisy_sample = diff.train_sample(x)
    >>> pred.shape
    torch.Size([4, 1, 64, 64])
    >>> noise.shape
    torch.Size([4, 1, 64, 64])
    >>> noisy_sample.shape
    torch.Size([4, 1, 64, 64])
    >>> sample = diff.sample((2, 1, 64, 64))
    >>> sample.shape
    torch.Size([2, 1, 64, 64])
    NFc	                    s  |d u rt }t j|||d |d u s|d u r+t | }	|d u r#|	t }|d u r+|	t }|| _|| _|  \}
}| d|
 | d| | j	j
dd}| d| t|}td| }| d| | d	| tj|d d
 ddd}|d|  d|  }| d| | d|  |t| d|  }d| t|
 d|  }| d| | d| d|  }| d| d| d  }| d| || _|| _|| _d S )N)r   r   alphasbetasr   )dimalphas_cumprod      ?signal_coefficientsnoise_coefficients)   r   )valueposterior_varianceposterior_log_varianceposterior_mean_weight_startposterior_mean_weight_stepsample_pred_model_coefficientr7   sample_pred_noise_coefficient)DDPM_REF_TIMESTEPSr   r   DDPM_DEFAULT_BETA_STARTDDPM_DEFAULT_BETA_END
beta_startbeta_endcompute_coefficientsregister_bufferr/   cumprodtorchsqrtFpadlog
sample_min
sample_maxshow_progress)r   r   r   r   rB   rC   rL   rM   rN   scaler/   r0   r2   r4   r5   alphas_cumprod_prevr9   r;   r<   r=   r>   r   r   r   r      sd   

zDenoisingDiffusion.__init__c                 C   s$   t | j| j| j}d| }||fS )z2Computes diffusion coefficients (alphas and betas)r3   )rG   linspacerB   rC   r   )r   r0   r/   r   r   r   rD      s   z'DenoisingDiffusion.compute_coefficientsc                 K   sf   |du r
t || j}|du r| j|fi |}| j| }| j| }t||| t|||  }||fS )a  Adds noise to the sample, in a forward diffusion process,

        Arguments
        ---------
        x: torch.Tensor
            a data sample of 2 or more dimensions, with the
            first dimension representing the batch
        noise: torch.Tensor
            the noise to add
        timesteps: torch.Tensor
            a 1-D integer tensor of a length equal to the number of
            batches in x, where each entry corresponds to the timestep
            number for the batch. If omitted, timesteps will be randomly
            sampled
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        result: torch.Tensor
            a tensor of the same dimension as x
        N)r   r   r   r4   r5   r   )r   r   r   r   r    r4   r5   r!   r   r   r   r      s   

zDenoisingDiffusion.distortc                 K   s~   |  tj|d| jji}tt| j}| jrt	|t
| jd}|D ]}tj|d tj| jjd| }| j||fi |}q!|S )as  Generates the number of samples indicated by the
        count parameter

        Arguments
        ---------
        shape: enumerable
            the shape of the sample to generate
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        result: torch.Tensor
            the generated sample(s)
        device)desctotalr   )dtyperR   )r   rG   zerosr/   rR   reversedranger   rN   r   DESC_SAMPLINGoneslongsample_step)r   r$   r    r%   stepstimestep_numbertimestepr   r   r   r%     s   zDenoisingDiffusion.samplec                 K   s   | j ||fi |}| |}t| j| || t| j| ||  }t| j| |}t| j| |}|| ||  }	t| j| |}
|	d|
  |  }| j	dusW| j
dur`|j| j	| j
d |S )a  Processes a single timestep for the sampling
        process

        Arguments
        ---------
        sample: torch.Tensor
            the sample for the following timestep
        timestep: int
            the timestep number
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        predicted_sample: torch.Tensor
            the predicted sample (denoised by one step`)
        g      ?N)minmax)r   r   r   r=   r>   r;   r<   r:   exprL   rM   clip_)r   r%   r_   r    	model_outr   sample_startweight_startweight_stepmeanlog_variancepredicted_sampler   r   r   r\   >  s4   




zDenoisingDiffusion.sample_step)NNNNNNFr(   )r)   r*   r+   r,   r   rD   r   rG   no_gradr%   r\   r-   r   r   r   r   r.      s     5@
#
r.   c                       sN   e Zd ZdZ		d fdd	Zdd Zdd	 Zd
d Zdd Zdd Z	  Z
S )LatentDiffusiona
  A latent diffusion wrapper. Latent diffusion is denoising diffusion
    applied to a latent space instead of the original data space

    Arguments
    ---------
    autoencoder: speechbrain.nnet.autoencoders.Autoencoder
        An autoencoder converting the original space to a latent space
    diffusion: speechbrain.nnet.diffusion.Diffuser
        A diffusion wrapper
    latent_downsample_factor: int
        The factor that latent space dimensions need to be divisible
        by. This is useful if the underlying model for the diffusion
        wrapper is based on a UNet-like architecture where the inputs
        are progressively downsampled and upsampled by factors of two
    latent_pad_dim: int|list[int]
        the dimension(s) along which the latent space will be
        padded

    Example
    -------
    >>> import torch
    >>> from torch import nn
    >>> from speechbrain.nnet.CNN import Conv2d
    >>> from speechbrain.nnet.autoencoders import NormalizingAutoencoder
    >>> from speechbrain.nnet.unet import UNetModel

    Set up a simple autoencoder (a real autoencoder would be a
    deep neural network)

    >>> ae_enc = Conv2d(
    ...     kernel_size=3,
    ...     stride=4,
    ...     in_channels=1,
    ...     out_channels=1,
    ...     skip_transpose=True,
    ... )
    >>> ae_dec = nn.ConvTranspose2d(
    ...     kernel_size=3,
    ...     stride=4,
    ...     in_channels=1,
    ...     out_channels=1,
    ...     output_padding=1
    ... )
    >>> ae = NormalizingAutoencoder(
    ...     encoder=ae_enc,
    ...     decoder=ae_dec,
    ... )

    Construct a diffusion model with a UNet architecture

    >>> unet = UNetModel(
    ...     in_channels=1,
    ...     model_channels=16,
    ...     norm_num_groups=4,
    ...     out_channels=1,
    ...     num_res_blocks=1,
    ...     attention_resolutions=[]
    ... )
    >>> diff = DenoisingDiffusion(
    ...     model=unet,
    ...     timesteps=5
    ... )
    >>> latent_diff = LatentDiffusion(
    ...     autoencoder=ae,
    ...     diffusion=diff,
    ...     latent_downsample_factor=4,
    ...     latent_pad_dim=2
    ... )
    >>> x = torch.randn(4, 1, 64, 64)
    >>> latent_sample = latent_diff.train_sample_latent(x)
    >>> diff_sample, ae_sample = latent_sample
    >>> pred, noise, noisy_sample = diff_sample
    >>> pred.shape
    torch.Size([4, 1, 16, 16])
    >>> noise.shape
    torch.Size([4, 1, 16, 16])
    >>> noisy_sample.shape
    torch.Size([4, 1, 16, 16])
    >>> ae_sample.latent.shape
    torch.Size([4, 1, 16, 16])

    Create a few samples (the shape given should be the shape
    of the latent space)

    >>> sample = latent_diff.sample((2, 1, 16, 16))
    >>> sample.shape
    torch.Size([2, 1, 64, 64])
    Nr7   c                    s6   t    || _|| _|| _t|tr|g}|| _d S r'   )r   r   autoencoder	diffusionlatent_downsample_factorr   intlatent_pad_dim)r   rm   rn   ro   rq   r   r   r   r     s   


zLatentDiffusion.__init__c                 K   s*   | j |}| |}| jj|fi |S )a  Creates a sample for the training loop with a
        corresponding target

        Arguments
        ---------
        x: torch.Tensor
            the original data sample
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        pred: torch.Tensor
            the model output 0 predicted noise
        noise: torch.Tensor
            the noise being applied
        noisy_sample
            the sample with the noise applied
        )rm   encode_pad_latentrn   r#   )r   r   r    latentr   r   r   r#     s   
zLatentDiffusion.train_samplec                 C   s:   | j dur| j dkr| jD ]}tj|| j |d\}}q|S )a  Pads the latent space to the desired dimension

        Arguments
        ---------
        latent: torch.Tensor
            the latent representation

        Returns
        -------
        result: torch.Tensor
            the latent representation, with padding
        Nr7   )factorlen_dim)ro   rq   r   pad_divisible)r   rt   r1   _r   r   r   rs     s   


zLatentDiffusion._pad_latentc           	      K   s^   | d}| d}| d}| jj||||d}| |j}| jj|fi |}t||dS )a  Returns a train sample with autoencoder output - can be used to jointly
        training the diffusion model and the autoencoder

        Arguments
        ---------
        x: torch.Tensor
            the original data sample
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        LatentDiffusionTrainSample
            Training sample.
        lengthout_mask_valuelatent_mask_value)ry   rz   r{   )rn   rm   )getrm   r#   rs   rt   rn   LatentDiffusionTrainSample)	r   r   r    ry   rz   r{   autoencoder_outrt   diffusion_train_sampler   r   r   train_sample_latent  s   


z#LatentDiffusion.train_sample_latentc                 C   s   | j |}| j|S )a\  Adds noise to the sample, in a forward diffusion process,

        Arguments
        ---------
        x: torch.Tensor
            a data sample of 2 or more dimensions, with the
            first dimension representing the batch

        Returns
        -------
        result: torch.Tensor
            a tensor of the same dimension as x
        )rm   rr   rn   r   )r   r   rt   r   r   r   r   &  s   zLatentDiffusion.distortc                 C   s"   | j |}| |}| j|S )zObtains a sample out of the diffusion model

        Arguments
        ---------
        shape: torch.Tensor

        Returns
        -------
        sample: torch.Tensor
            the sample of the specified shape
        )rn   r%   rs   rm   decode)r   r$   rt   r   r   r   r%   8  s   
zLatentDiffusion.sample)Nr7   )r)   r*   r+   r,   r   r#   rs   r   r   r%   r-   r   r   r   r   rl   k  s    ] rl   c                 C   s   t j|| df| jdS )a'  Returns a random sample of timesteps as a 1-D tensor
    (one dimension only)

    Arguments
    ---------
    x: torch.Tensor
        a tensor of samples of any dimension
    num_timesteps: int
        the total number of timesteps

    Returns
    -------
    Random sample of timestamps.
    r   )rR   )rG   randintsizerR   )r   num_timestepsr   r   r   r   J  s   r   c                   @   s   e Zd ZdZdd ZdS )GaussianNoisezAdds ordinary Gaussian noisec                 K   s
   t |S )zForward pass

        Arguments
        ---------
        sample: the original sample
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        Noise in shape of sample.
        )rG   
randn_like)r   r%   r    r   r   r   r&   _  s   
zGaussianNoise.forwardN)r)   r*   r+   r,   r&   r   r   r   r   r   \  s    r   c                       s4   e Zd ZdZd
 fdd	ZdddZdd	 Z  ZS )LengthMaskedGaussianNoisezGaussian noise applied to padded samples. No
    noise is added to positions that are part of padding

    Arguments
    ---------
    length_dim: int
        The time dimension for which lengths apply.
    r7   c                    s   t    || _d S r'   )r   r   
length_dim)r   r   r   r   r   r   y  s   

z"LengthMaskedGaussianNoise.__init__Nc                 K   sX   t |}|dur*|| j}t|| | }| ||}||}|| d |S )a  Creates Gaussian noise. If a tensor of lengths is
        provided, no noise is added to the padding positions.

        Arguments
        ---------
        sample: torch.Tensor
            a batch of data
        length: torch.Tensor
            relative lengths
        **kwargs: dict
            Arguments to forward to the underlying model.

        Returns
        -------
        Gaussian noise in shape of sample.
        Ng        )	rG   r   r   r   r   bool_compute_mask_shapeviewmasked_fill_)r   r%   ry   r    r   max_lenmask
mask_shaper   r   r   r&   }  s   

z!LengthMaskedGaussianNoise.forwardc                 C   s0   |j d fd| jd   |f d| d   S )Nr   r7   r7      )r$   r   r1   )r   r   r   r   r   r   r     s   
z-LengthMaskedGaussianNoise._compute_mask_shaper   r'   )r)   r*   r+   r,   r   r&   r   r-   r   r   r   r   r   o  s
    	
r   )r
   length_masked_gaussianDiffusionTrainSample)r"   r   r!   r}   rn   rm   )r,   collectionsr   rG   r   torch.nnr   rI   	tqdm.autor   speechbrain.dataio.dataior   speechbrain.utilsr   speechbrain.utils.data_utilsr   Moduler	   r@   rA   r?   rY   r.   rl   r   r   r   r   r   r}   r   r   r   r   <module>   s<    e i `2