o
    %ݫiV                     @   s   d Z ddlmZ ddlZddlmZ ddlm  mZ ddl	m
Z
 ddlmZ ddlmZ ddlmZ ejZejZejjdd	 Zd
d ZG dd dejZG dd dejZG dd dejZG dd dejZG dd deZdS )z
Neural network modules for DIFFWAVE:
A VERSATILE DIFFUSION MODEL FOR AUDIO SYNTHESIS

For more details: https://arxiv.org/pdf/2009.09761.pdf

Authors
 * Yingzhi WANG 2022
    )sqrtN)
transforms)linear)Conv1d)DenoisingDiffusionc                 C   s   | t |  S )z'sigmoid linear unit activation function)torchsigmoid)x r
   U/home/ubuntu/.local/lib/python3.10/site-packages/speechbrain/lobes/models/DiffWave.pysilu.   s   r   c                 C   sp   t j| |||||||||	|
d|j}|t|dd}dttj|dd d }t|d d dd}|S )	as  calculates MelSpectrogram for a raw audio signal
    and preprocesses it for diffwave training

    Arguments
    ---------
    sample_rate : int
        Sample rate of audio signal.
    hop_length : int
        Length of hop between STFT windows.
    win_length : int
        Window size.
    n_fft : int
        Size of FFT.
    n_mels : int
        Number of mel filterbanks.
    f_min : float
        Minimum frequency.
    f_max : float
        Maximum frequency.
    power : float
        Exponent for the magnitude spectrogram.
    normalized : bool
        Whether to normalize by magnitude after stft.
    norm : str or None
        If "slaney", divide the triangular mel weights by the width of the mel band
    mel_scale : str
        Scale to use: "htk" or "slaney".
    audio : torch.tensor
        input audio signal

    Returns
    -------
    mel : torch.Tensor
    )sample_rate
hop_length
win_lengthn_fftn_melsf_minf_maxpower
normalizednorm	mel_scale            ?   gh㈵>)mind   g        )r   MelSpectrogramtodevicer   clamplog10)r   r   r   r   r   r   r   r   r   r   r   audioaudio_to_melmelr
   r
   r   diffwave_mel_spectogram4   s&   0r%   c                       s8   e Zd ZdZ fddZdd Zdd Zdd	 Z  ZS )
DiffusionEmbeddinga  Embeds the diffusion step into an input vector of DiffWave

    Arguments
    ---------
    max_steps: int
        total diffusion steps

    Example
    -------
    >>> from speechbrain.lobes.models.DiffWave import DiffusionEmbedding
    >>> diffusion_embedding = DiffusionEmbedding(max_steps=50)
    >>> time_step = torch.randint(50, (1,))
    >>> step_embedding = diffusion_embedding(time_step)
    >>> step_embedding.shape
    torch.Size([1, 512])
    c                    s@   t    | jd| |dd tddd| _tddd| _d S )N	embeddingF)
persistent      
input_size	n_neurons)super__init__register_buffer_build_embeddingLinearprojection1projection2)self	max_steps	__class__r
   r   r/      s   
zDiffusionEmbedding.__init__c                 C   sP   |j tjtjfv r| j| }n| |}| |}t|}| |}t|}|S )a  forward function of diffusion step embedding

        Arguments
        ---------
        diffusion_step: torch.Tensor
            which step of diffusion to execute

        Returns
        -------
        diffusion step embedding: tensor [bs, 512]
        )	dtyper   int32int64r'   _lerp_embeddingr3   r   r4   )r5   diffusion_stepr	   r
   r
   r   forward   s   


zDiffusionEmbedding.forwardc                 C   sD   t | }t | }| j| }| j| }||| ||   S )zDeals with the cases where diffusion_step is not int

        Arguments
        ---------
        t: torch.Tensor
            which step of diffusion to execute

        Returns
        -------
        embedding : torch.Tensor
        )r   floorlongceilr'   )r5   tlow_idxhigh_idxlowhighr
   r
   r   r<      s
   

z"DiffusionEmbedding._lerp_embeddingc                 C   sV   t |d}t dd}|d|d d   }t jt |t |gdd}|S )zBuild embeddings in a designed way

        Arguments
        ---------
        max_steps: int
            total diffusion steps

        Returns
        -------
        table: torch.Tensor
           @   r   g      $@g      @g     O@dim)r   arange	unsqueezecatsincos)r5   r6   stepsdimstabler
   r
   r   r1      s
   z#DiffusionEmbedding._build_embedding)	__name__
__module____qualname____doc__r/   r>   r<   r1   __classcell__r
   r
   r7   r   r&   x   s    r&   c                       s(   e Zd ZdZ fddZdd Z  ZS )SpectrogramUpsampleru  Upsampler for spectrograms with Transposed Conv
    Only the upsampling is done here, the layer-specific Conv can be found
    in residual block to map the mel bands into 2× residual channels

    Example
    -------
    >>> from speechbrain.lobes.models.DiffWave import SpectrogramUpsampler
    >>> spec_upsampler = SpectrogramUpsampler()
    >>> mel_input = torch.rand(3, 80, 100)
    >>> upsampled_mel = spec_upsampler(mel_input)
    >>> upsampled_mel.shape
    torch.Size([3, 80, 25600])
    c                    sN   t    tddddgddgddgd| _tddddgddgddgd| _d S )NrG                )stridepadding)r.   r/   ConvTranspose2dconv1conv2r5   r7   r
   r   r/      s   
zSpectrogramUpsampler.__init__c                 C   sH   t |d}| |}t|d}| |}t|d}t |d}|S )aO  Upsamples spectrograms 256 times to match the length of audios
        Hop length should be 256 when extracting mel spectrograms

        Arguments
        ---------
        x: torch.Tensor
            input mel spectrogram [bs, 80, mel_len]

        Returns
        -------
        upsampled spectrogram [bs, 80, mel_len*256]
        rG   g?)r   rL   r`   F
leaky_relura   squeeze)r5   r	   r
   r
   r   r>      s   

zSpectrogramUpsampler.forwardrS   rT   rU   rV   r/   r>   rW   r
   r
   r7   r   rX      s    	rX   c                       s,   e Zd ZdZd fdd	Zd	ddZ  ZS )
ResidualBlocka  
    Residual Block with dilated convolution

    Arguments
    ---------
    n_mels: int
        input mel channels of conv1x1 for conditional vocoding task
    residual_channels: int
        channels of audio convolution
    dilation: int
        dilation cycles of audio convolution
    uncond: bool
        conditional/unconditional generation

    Example
    -------
    >>> from speechbrain.lobes.models.DiffWave import ResidualBlock
    >>> res_block = ResidualBlock(n_mels=80, residual_channels=64, dilation=3)
    >>> noisy_audio = torch.randn(1, 1, 22050)
    >>> timestep_embedding = torch.rand(1, 512)
    >>> upsampled_mel = torch.rand(1, 80, 22050)
    >>> output = res_block(noisy_audio, timestep_embedding, upsampled_mel)
    >>> output[0].shape
    torch.Size([1, 64, 22050])
    Fc              	      sx   t    t|d| d|dddd| _td|d| _|s*t|d| d	dddd
| _nd | _t|d| d	dddd
| _d S )N   rY   Tsamekaiming)in_channelsout_channelskernel_sizedilationskip_transposer^   	conv_initr*   r+   rG   rk   rl   rm   ro   r^   rp   )r.   r/   r   dilated_convr2   diffusion_projectionconditioner_projectionoutput_projection)r5   r   residual_channelsrn   uncondr7   r
   r   r/     s>   
	

zResidualBlock.__init__Nc           	      C   s   |du r	| j du s|dur| j dusJ | |d}|| }| j du r+| |}n|  |}| || }tj|ddd\}}t|t| }| |}tj|ddd\}}|| t	d |fS )a  
        forward function of Residual Block

        Arguments
        ---------
        x: torch.Tensor
            input sample [bs, 1, time]
        diffusion_step: torch.Tensor
            the embedding of which step of diffusion to execute
        conditioner: torch.Tensor
            the condition used for conditional generation
        Returns
        -------
        residual output [bs, residual_channels, time]
        a skip of residual branch [bs, residual_channels, time]
        Nrh   rG   rI   g       @)
rt   rs   rL   rr   r   chunkr   tanhru   r   )	r5   r	   r=   conditionerygatefilterresidualskipr
   r
   r   r>   <  s   


zResidualBlock.forwardFNrf   r
   r
   r7   r   rg      s    &rg   c                       s@   e Zd ZdZ	d
 fdd	ZdddZ				ddd	Z  ZS )DiffWavea  
    DiffWave Model with dilated residual blocks

    Arguments
    ---------
    input_channels: int
        input mel channels of conv1x1 for conditional vocoding task
    residual_layers: int
        number of residual blocks
    residual_channels: int
        channels of audio convolution
    dilation_cycle_length: int
        dilation cycles of audio convolution
    total_steps: int
        total steps of diffusion
    unconditional: bool
        conditional/unconditional generation

    Example
    -------
    >>> from speechbrain.lobes.models.DiffWave import DiffWave
    >>> diffwave = DiffWave(
    ...     input_channels=80,
    ...     residual_layers=30,
    ...     residual_channels=64,
    ...     dilation_cycle_length=10,
    ...     total_steps=50,
    ... )
    >>> noisy_audio = torch.randn(1, 1, 25600)
    >>> timestep = torch.randint(50, (1,))
    >>> input_mel = torch.rand(1, 80, 100)
    >>> predicted_noise = diffwave(noisy_audio, timestep, input_mel)
    >>> predicted_noise.shape
    torch.Size([1, 1, 25600])
    Fc                    s   t    | _| _| _| _| _| _td jddddd _	t
 j _ jr0d  _nt  _t fddt jD  _t j jddddd _t jdddddd _d S )	NrG   Tri   rj   rq   c                    s,   g | ]}t  j jd | j   jdqS )rh   )rw   )rg   input_channelsrv   dilation_cycle_lengthunconditional).0irb   r
   r   
<listcomp>  s    z%DiffWave.__init__.<locals>.<listcomp>zero)r.   r/   r   residual_layersrv   r   r   total_stepsr   input_projectionr&   diffusion_embeddingspectrogram_upsamplerrX   nn
ModuleListrangeskip_projectionru   )r5   r   r   rv   r   r   r   r7   rb   r   r/     sP   
	
zDiffWave.__init__Nc           	      C   s   |du r	| j du s|dur| j dusJ | |}t|}| |}| j r+|  |}d}| jD ]}||||\}}|du r@|n|| }q0|tt| j }| |}t|}| 	|}|S )a  
        DiffWave forward function

        Arguments
        ---------
        audio: torch.Tensor
            input gaussian sample [bs, 1, time]
        diffusion_step: torch.Tensor
            which timestep of diffusion to execute [bs, 1]
        spectrogram: torch.Tensor
            spectrogram data [bs, 80, mel_len]
        length: torch.Tensor
            sample lengths - not used - provided for compatibility only

        Returns
        -------
        predicted noise [bs, 1, time]
        N)
r   r   rc   relur   r   r   lenr   ru   )	r5   r"   r=   spectrogramlengthr	   r   layerskip_connectionr
   r
   r   r>     s    







zDiffWave.forwardc                 C   s   | ||||dS )zForward function suitable for wrapping by diffusion.
        For this model, `out_mask_value`/`latent_mask_value` are unused
        and discarded.
        See :meth:`~DiffWave.forward` for details.)r   r   r
   )r5   r	   	timestepscond_embr   out_mask_valuelatent_mask_valuer
   r
   r   diffusion_forward  s   zDiffWave.diffusion_forwardr   )NN)NNNN)rS   rT   rU   rV   r/   r>   r   rW   r
   r
   r7   r   r   e  s    +
;-r   c                       sJ   e Zd ZdZ							d fdd	Ze 				d	ddZ  ZS )
DiffWaveDiffusionaq  An enhanced diffusion implementation with DiffWave-specific inference

    Arguments
    ---------
    model: nn.Module
        the underlying model
    timesteps: int
        the total number of timesteps
    noise: str|nn.Module
        the type of noise being used
        "gaussian" will produce standard Gaussian noise
    beta_start: float
        the value of the "beta" parameter at the beginning of the process
        (see DiffWave paper)
    beta_end: float
        the value of the "beta" parameter at the end of the process
    sample_min: float
    sample_max: float
        Used to clip the output.
    show_progress: bool
        whether to show progress during inference

    Example
    -------
    >>> from speechbrain.lobes.models.DiffWave import DiffWave
    >>> diffwave = DiffWave(
    ...     input_channels=80,
    ...     residual_layers=30,
    ...     residual_channels=64,
    ...     dilation_cycle_length=10,
    ...     total_steps=50,
    ... )
    >>> from speechbrain.lobes.models.DiffWave import DiffWaveDiffusion
    >>> from speechbrain.nnet.diffusion import GaussianNoise
    >>> diffusion = DiffWaveDiffusion(
    ...     model=diffwave,
    ...     beta_start=0.0001,
    ...     beta_end=0.05,
    ...     timesteps=50,
    ...     noise=GaussianNoise,
    ... )
    >>> input_mel = torch.rand(1, 80, 100)
    >>> output = diffusion.inference(
    ...     unconditional=False,
    ...     scale=256,
    ...     condition=input_mel,
    ...     fast_sampling=True,
    ...     fast_sampling_noise_schedule=[0.0001, 0.001, 0.01, 0.05, 0.2, 0.5],
    ... )
    >>> output.shape
    torch.Size([1, 25600])
    NFc	           	   
      s   t  |||||||| d S r   )r.   r/   )	r5   modelr   noise
beta_startbeta_end
sample_min
sample_maxshow_progressr7   r
   r   r/   5  s   zDiffWaveDiffusion.__init__c                 C   sD  |du r	t d}|r|du sJ n	|dusJ |j}|r#|dus#J |r9|dur9|}dt | }|jdd}	n	| j}| j}| j}	g }
tt|D ]H}t| j	d D ]>}| j|d  |	|   kri| j| krn qS| j| d |	| d  | j| d | j|d  d   }|

||   nqSqJ|st|jdkr|d}t j|jd ||jd  |d	}nt jd||d	}tt|d ddD ][}d|| d  }|| d|	|  d  }| |t j|
| g|d	|d}||||   }|dkrt |}d
|	|d   d
|	|   ||  d }||| 7 }t |dd
}q|S )a  Processes the inference for diffwave
        One inference function for all the locally/globally conditional
        generation and unconditional generation tasks

        Arguments
        ---------
        unconditional: bool
            do unconditional generation if True, else do conditional generation
        scale: int
            scale to get the final output wave length
            for conditional generation, the output wave length is scale * condition.shape[-1]
            for example, if the condition is spectrogram (bs, n_mel, time), scale should be hop length
            for unconditional generation, scale should be the desired audio length
        condition: torch.Tensor
            input spectrogram for vocoding or other conditions for other
            conditional generation, should be None for unconditional generation
        fast_sampling: bool
            whether to do fast sampling
        fast_sampling_noise_schedule: list
            the noise schedules used for fast sampling
        device: str|torch.device
            inference device

        Returns
        -------
        predicted_sample: torch.Tensor
            the predicted audio (bs, 1, t)
        NcudarG   r   rI   g      ?rh   rx   )r   r   r   )r   r   tensorcumprodbetasalphasalphas_cumprodr   r   r   appendshaperL   randnr   re   
randn_liker    )r5   r   scale	conditionfast_samplingfast_sampling_noise_scheduler   inference_noise_scheduleinference_alphasinference_alpha_cuminference_stepssrB   twiddler"   nc1c2
noise_predr   sigmar
   r
   r   	inferenceK  s   &






zDiffWaveDiffusion.inference)NNNNNNF)NFNN)	rS   rT   rU   rV   r/   r   no_gradr   rW   r
   r
   r7   r   r     s     8r   )rV   mathr   r   torch.nnr   torch.nn.functional
functionalrc   
torchaudior   speechbrain.nnetr   speechbrain.nnet.CNNr   speechbrain.nnet.diffusionr   r2   r_   jitscriptr   r%   Moduler&   rX   rg   r   r   r
   r
   r
   r   <module>   s(    
DU.j 