o
    piY                     @   s   d dl mZ d dlmZmZmZ d dlZd dlZd dl	m
  mZ ddlmZmZ ddlmZ ddlmZ eG d	d
 d
eZdejdedejfddZdejdeej dejfddZddefddZddefddZG dd deeZdS )    )	dataclass)OptionalTupleUnionN   )ConfigMixinregister_to_config)
BaseOutput   )SchedulerMixinc                   @   s   e Zd ZU dZejed< dS )VQDiffusionSchedulerOutputa.  
    Output class for the scheduler's step function output.

    Args:
        prev_sample (`torch.LongTensor` of shape `(batch size, num latent pixels)`):
            Computed sample x_{t-1} of previous timestep. `prev_sample` should be used as next model input in the
            denoising loop.
    prev_sampleN)__name__
__module____qualname____doc__torch
LongTensor__annotations__ r   r   j/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/diffusers/schedulers/scheduling_vq_diffusion.pyr      s   
 	r   xnum_classesreturnc                 C   s4   t | |}|ddd}t| jdd}|S )a  
    Convert batch of vector of class indices into batch of log onehot vectors

    Args:
        x (`torch.LongTensor` of shape `(batch size, vector length)`):
            Batch of class indices

        num_classes (`int`):
            number of classes to be used for the onehot vectors

    Returns:
        `torch.Tensor` of shape `(batch size, num classes, vector length)`:
            Log onehot vectors
    r   r   r
   KH9)min)Fone_hotpermuter   logfloatclamp)r   r   x_onehotlog_xr   r   r   index_to_log_onehot)   s   r$   logits	generatorc                 C   s<   t j| j| j|d}t t |d  d  }||  }|S )z(
    Apply gumbel noise to `logits`
    )devicer&   r   )r   randshaper'   r   )r%   r&   uniformgumbel_noisenoisedr   r   r   gumbel_noised>   s   r-   wJ??̔>num_diffusion_timestepsc                 C   sh   t d| | d  ||  | }t dg|f}|dd |dd  }t |dd dgf}||fS )zN
    Cumulative and non-cumulative alpha schedules.

    See section 4.1.
    r   r
   Nnparangeconcatenate)r0   alpha_cum_startalpha_cum_endattatr   r   r   alpha_schedulesH   s   r:   c                 C   sx   t d| | d  ||  | }t dg|f}d| }|dd |dd  }d| }t |dd dgf}||fS )zN
    Cumulative and non-cumulative gamma schedules.

    See section 4.1.
    r   r
   Nr1   r2   )r0   gamma_cum_startgamma_cum_endcttone_minus_cttone_minus_ctctr   r   r   gamma_schedulesX   s   rA   c                   @   s   e Zd ZdZdZe					d&dededed	ed
edefddZd'dede	e
ejf fddZ		d(dejdejdejdeej dede	eef fddZdd Zdejdejd ejd!efd"d#Zd$d% ZdS ))VQDiffusionSchedulera  
    A scheduler for vector quantized diffusion.

    This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
    methods the library implements for all schedulers such as loading and saving.

    Args:
        num_vec_classes (`int`):
            The number of classes of the vector embeddings of the latent pixels. Includes the class for the masked
            latent pixel.
        num_train_timesteps (`int`, defaults to 100):
            The number of diffusion steps to train the model.
        alpha_cum_start (`float`, defaults to 0.99999):
            The starting cumulative alpha value.
        alpha_cum_end (`float`, defaults to 0.00009):
            The ending cumulative alpha value.
        gamma_cum_start (`float`, defaults to 0.00009):
            The starting cumulative gamma value.
        gamma_cum_end (`float`, defaults to 0.99999):
            The ending cumulative gamma value.
    r
   d   r.   r/   num_vec_classesnum_train_timestepsr6   r7   r;   r<   c                 C   sd  || _ | j d | _t|||d\}}t|||d\}	}
| j d }d| |	 | }d| |
 | }t|d}t|d}t|	d}	t|}t|}t|	}t|d}t|d}t|
d}
t|}t|}t|
}| | _	| | _
| | _| | _| | _| | _d | _ttd|d d d  | _d S )Nr
   )r6   r7   )r;   r<   float64r   r1   )	num_embed
mask_classr:   rA   r   tensorastyper   r    log_atlog_btlog_ctlog_cumprod_atlog_cumprod_btlog_cumprod_ctnum_inference_steps
from_numpyr3   r4   copy	timesteps)selfrD   rE   r6   r7   r;   r<   r9   r8   r@   r=   num_non_mask_classesbtbttrK   rL   rM   rN   rO   rP   r   r   r   __init__   s6   













&zVQDiffusionScheduler.__init__NrQ   r'   c                 C   s   || _ td| j ddd  }t||| _| j|| _| j	|| _	| j
|| _
| j|| _| j|| _| j|| _dS )a  
        Sets the discrete timesteps used for the diffusion chain (to be run before inference).

        Args:
            num_inference_steps (`int`):
                The number of diffusion steps used when generating samples with a pre-trained model.
            device (`str` or `torch.device`, *optional*):
                The device to which the timesteps and diffusion process parameters (alpha, beta, gamma) should be moved
                to.
        r   Nr1   )rQ   r3   r4   rS   r   rR   torT   rK   rL   rM   rN   rO   rP   )rU   rQ   r'   rT   r   r   r   set_timesteps   s   z"VQDiffusionScheduler.set_timestepsTmodel_outputtimestepsampler&   return_dictr   c                 C   sF   |dkr|}n|  |||}t||}|jdd}|s|fS t|dS )a  
        Predict the sample from the previous timestep by the reverse transition distribution. See
        [`~VQDiffusionScheduler.q_posterior`] for more details about how the distribution is computer.

        Args:
            log_p_x_0: (`torch.Tensor` of shape `(batch size, num classes - 1, num latent pixels)`):
                The log probabilities for the predicted classes of the initial latent pixels. Does not include a
                prediction for the masked class as the initial unnoised image cannot be masked.
            t (`torch.long`):
                The timestep that determines which transition matrices are used.
            x_t (`torch.LongTensor` of shape `(batch size, num latent pixels)`):
                The classes of each latent pixel at time `t`.
            generator (`torch.Generator`, or `None`):
                A random number generator for the noise applied to `p(x_{t-1} | x_t)` before it is sampled from.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a [`~schedulers.scheduling_vq_diffusion.VQDiffusionSchedulerOutput`] or
                `tuple`.

        Returns:
            [`~schedulers.scheduling_vq_diffusion.VQDiffusionSchedulerOutput`] or `tuple`:
                If return_dict is `True`, [`~schedulers.scheduling_vq_diffusion.VQDiffusionSchedulerOutput`] is
                returned, otherwise a tuple is returned where the first element is the sample tensor.
        r   r
   dim)r   )q_posteriorr-   argmaxr   )rU   r\   r]   r^   r&   r_   log_p_x_t_min_1	x_t_min_1r   r   r   step   s   

zVQDiffusionScheduler.stepc           
      C   sp   t || j}| j|||dd}| j|||dd}|| }tj|ddd}|| }| ||d }|| | }	|	S )a  
        Calculates the log probabilities for the predicted classes of the image at timestep `t-1`:

        ```
        p(x_{t-1} | x_t) = sum( q(x_t | x_{t-1}) * q(x_{t-1} | x_0) * p(x_0) / q(x_t | x_0) )
        ```

        Args:
            log_p_x_0 (`torch.Tensor` of shape `(batch size, num classes - 1, num latent pixels)`):
                The log probabilities for the predicted classes of the initial latent pixels. Does not include a
                prediction for the masked class as the initial unnoised image cannot be masked.
            x_t (`torch.LongTensor` of shape `(batch size, num latent pixels)`):
                The classes of each latent pixel at time `t`.
            t (`torch.Long`):
                The timestep that determines which transition matrix is used.

        Returns:
            `torch.Tensor` of shape `(batch size, num classes, num latent pixels)`:
                The log probabilities for the predicted classes of the image at timestep `t-1`.
        T)tx_tlog_onehot_x_t
cumulativeFr
   )ra   keepdim)r$   rG   $log_Q_t_transitioning_to_known_classr   	logsumexpapply_cumulative_transitions)
rU   	log_p_x_0rh   rg   ri   log_q_x_t_given_x_0log_q_t_given_x_t_min_1qq_log_sum_exprd   r   r   r   rb      s   	.z VQDiffusionScheduler.q_posteriorrg   rh   ri   rj   c                C   s   |r| j | }| j| }| j| }n| j| }| j| }| j| }|s1|dddddf d}|ddddddf }|| |}	|| jk}
|
d	d| j
d d}
||	|
< |sftj|	|fdd}	|	S )a/	  
        Calculates the log probabilities of the rows from the (cumulative or non-cumulative) transition matrix for each
        latent pixel in `x_t`.

        Args:
            t (`torch.Long`):
                The timestep that determines which transition matrix is used.
            x_t (`torch.LongTensor` of shape `(batch size, num latent pixels)`):
                The classes of each latent pixel at time `t`.
            log_onehot_x_t (`torch.Tensor` of shape `(batch size, num classes, num latent pixels)`):
                The log one-hot vectors of `x_t`.
            cumulative (`bool`):
                If cumulative is `False`, the single step transition matrix `t-1`->`t` is used. If cumulative is
                `True`, the cumulative transition matrix `0`->`t` is used.

        Returns:
            `torch.Tensor` of shape `(batch size, num classes - 1, num latent pixels)`:
                Each _column_ of the returned matrix is a _row_ of log probabilities of the complete probability
                transition matrix.

                When non cumulative, returns `self.num_classes - 1` rows because the initial latent pixel cannot be
                masked.

                Where:
                - `q_n` is the probability distribution for the forward process of the `n`th latent pixel.
                - C_0 is a class of a latent pixel embedding
                - C_k is the class of the masked latent pixel

                non-cumulative result (omitting logarithms):
                ```
                q_0(x_t | x_{t-1} = C_0) ... q_n(x_t | x_{t-1} = C_0)
                          .      .                     .
                          .               .            .
                          .                      .     .
                q_0(x_t | x_{t-1} = C_k) ... q_n(x_t | x_{t-1} = C_k)
                ```

                cumulative result (omitting logarithms):
                ```
                q_0_cumulative(x_t | x_0 = C_0)    ...  q_n_cumulative(x_t | x_0 = C_0)
                          .               .                          .
                          .                        .                 .
                          .                               .          .
                q_0_cumulative(x_t | x_0 = C_{k-1}) ... q_n_cumulative(x_t | x_0 = C_{k-1})
                ```
        Nr1   r
   r`   )rN   rO   rP   rK   rL   rM   	unsqueeze	logaddexprH   expandrG   r   cat)rU   rg   rh   ri   rj   abc(log_onehot_x_t_transitioning_from_maskedlog_Q_tmask_class_maskr   r   r   rl   d  s"   1





z9VQDiffusionScheduler.log_Q_t_transitioning_to_known_classc                 C   sd   |j d }| j| }| j| }| j| }|j d }||d|}|| |}tj||fdd}|S )Nr   r   r
   r`   )r)   rN   rO   rP   rv   ru   r   rw   )rU   rr   rg   bszrx   ry   rz   num_latent_pixelsr   r   r   rn     s   




z1VQDiffusionScheduler.apply_cumulative_transitions)rC   r.   r/   r/   r.   )N)NT)r   r   r   r   orderr   intr    rY   r   strr   r'   r[   Tensorlongr   r   	Generatorboolr   r   rf   rb   rl   rn   r   r   r   r   rB   j   sb    .

-o
crB   )r.   r/   )r/   r.   )dataclassesr   typingr   r   r   numpyr3   r   torch.nn.functionalnn
functionalr   configuration_utilsr   r   utilsr	   scheduling_utilsr   r   r   r   r   r$   r   r-   r:   rA   rB   r   r   r   r   <module>   s    
