o
    Gi)                   	   @   s   d dl Z d dlmZ d dlmZ d dlZddlmZmZ ddl	m
Z
 ddlmZ dd	lmZ 	
	ddededed dejfddZeG dd de
ZG dd deeZdS )    N)	dataclass)Literal   )ConfigMixinregister_to_config)
BaseOutput)randn_tensor   )SchedulerMixin+?cosinenum_diffusion_timestepsmax_betaalpha_transform_type)r   explaplacereturnc                 C   s   |dkr	dd }n|dkrdd }n|dkrdd }nt d| g }t| D ]}||  }|d	 |  }|td	||||  | q(tj|tjd
S )a>  
    Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
    (1-beta) over time from t = [0,1].

    Contains a function alpha_bar that takes an argument t and transforms it to the cumulative product of (1-beta) up
    to that part of the diffusion process.

    Args:
        num_diffusion_timesteps (`int`):
            The number of betas to produce.
        max_beta (`float`, defaults to `0.999`):
            The maximum beta to use; use values lower than 1 to avoid numerical instability.
        alpha_transform_type (`str`, defaults to `"cosine"`):
            The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.

    Returns:
        `torch.Tensor`:
            The betas used by the scheduler to step the model outputs.
    r   c                 S   s    t | d d t j d d S )NgMb?gT㥛 ?r   )mathcospit r   g/home/ubuntu/.local/lib/python3.10/site-packages/diffusers/schedulers/scheduling_consistency_decoder.pyalpha_bar_fn(   s    z)betas_for_alpha_bar.<locals>.alpha_bar_fnr   c              	   S   sP   dt dd|   t ddt d|    d  }t |}t |d|  S )Ng      r	         ?r   gư>)r   copysignlogfabsr   sqrt)r   lmbsnrr   r   r   r   -   s   4
r   c                 S   s   t | d S )Ng      ()r   r   r   r   r   r   r   4   s   z"Unsupported alpha_transform_type: r	   )dtype)
ValueErrorrangeappendmintorchtensorfloat32)r   r   r   r   betasit1t2r   r   r   betas_for_alpha_bar   s   


"r.   c                   @   s   e Zd ZU dZejed< dS )!ConsistencyDecoderSchedulerOutputa>  
    Output class for the scheduler's `step` function.

    Args:
        prev_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images):
            Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the
            denoising loop.
    prev_sampleN)__name__
__module____qualname____doc__r'   Tensor__annotations__r   r   r   r   r/   B   s   
 	r/   c                   @   s   e Zd ZdZdZe		ddededdfd	d
Z		ddedB de	e
jB fddZede
jfddZdde
jdedB de
jfddZ		dde
jdee
jB de
jde
jdB dedeeB fddZdS )ConsistencyDecoderSchedulera  
    A scheduler for the consistency decoder used in Stable Diffusion pipelines.

    This scheduler implements a two-step denoising process using consistency models for decoding latent representations
    into images.

    This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
    methods the library implements for all schedulers such as loading and saving.

    Args:
        num_train_timesteps (`int`, *optional*, defaults to `1024`):
            The number of diffusion steps to train the model.
        sigma_data (`float`, *optional*, defaults to `0.5`):
            The standard deviation of the data distribution. Used for computing the skip and output scaling factors.
    r	      r   num_train_timesteps
sigma_datar   Nc                 C   s   t |}d| }tj|dd}t|| _td| | _td| d }td| }||d  |d |d   | _|| |d |d  d  | _||d |d  d  | _d S )Ng      ?r   )dimr	   r   r   )	r.   r'   cumprodr   sqrt_alphas_cumprodsqrt_one_minus_alphas_cumprodc_skipc_outc_in)selfr9   r:   r*   alphasalphas_cumprodsigmassqrt_recip_alphas_cumprodr   r   r   __init__c   s   z$ConsistencyDecoderScheduler.__init__num_inference_stepsdevicec                 C   sr   |dkrt dtjddgtj|d| _| j|| _| j|| _| j|| _| j	|| _	| j
|| _
d S )Nr   z8Currently more than 2 inference steps are not supported.i  i   )r"   rI   )r#   r'   r(   long	timestepsr=   tor>   r?   r@   rA   )rB   rH   rI   r   r   r   set_timestepsy   s   z)ConsistencyDecoderScheduler.set_timestepsc                 C   s   | j | jd  S )a
  
        Return the standard deviation of the initial noise distribution.

        Returns:
            `torch.Tensor`:
                The initial noise sigma value from the precomputed `sqrt_one_minus_alphas_cumprod` at the first
                timestep.
        r   )r>   rK   )rB   r   r   r   init_noise_sigma   s   
z,ConsistencyDecoderScheduler.init_noise_sigmasampletimestepc                 C   s   || j |  S )a  
        Ensures interchangeability with schedulers that need to scale the denoising model input depending on the
        current timestep.

        Args:
            sample (`torch.Tensor`):
                The input sample.
            timestep (`int`, *optional*):
                The current timestep in the diffusion chain.

        Returns:
            `torch.Tensor`:
                A scaled input sample.
        )rA   )rB   rO   rP   r   r   r   scale_model_input   s   z-ConsistencyDecoderScheduler.scale_model_inputTmodel_output	generatorreturn_dictc           
      C   s   | j | | | j| |  }t| j|kd }|t| jd kr$|}n+t|j||j|j	d}	| j
| j|d   |j| | j| j|d   |j|	  }|sT|fS t|dS )a  
        Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion
        process from the learned model outputs (most often the predicted noise).

        Args:
            model_output (`torch.Tensor`):
                The direct output from the learned diffusion model.
            timestep (`float` or `torch.Tensor`):
                The current timestep in the diffusion chain.
            sample (`torch.Tensor`):
                A current instance of a sample created by the diffusion process.
            generator (`torch.Generator`, *optional*):
                A random number generator for reproducibility.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a
                [`~schedulers.scheduling_consistency_decoder.ConsistencyDecoderSchedulerOutput`] or `tuple`.

        Returns:
            [`~schedulers.scheduling_consistency_decoder.ConsistencyDecoderSchedulerOutput`] or `tuple`:
                If `return_dict` is `True`,
                [`~schedulers.scheduling_consistency_decoder.ConsistencyDecoderSchedulerOutput`] is returned, otherwise
                a tuple is returned where the first element is the sample tensor.
        r   r	   )rS   r"   rI   )r0   )r@   r?   r'   whererK   lenr   shaper"   rI   r=   rL   r>   r/   )
rB   rR   rP   rO   rS   rT   x_0timestep_idxr0   noiser   r   r   step   s   
z ConsistencyDecoderScheduler.step)r8   r   )NN)N)NT)r1   r2   r3   r4   orderr   intfloatrG   strr'   rI   rM   propertyr5   rN   rQ   	Generatorboolr/   tupler[   r   r   r   r   r7   P   sL    
 r7   )r   r   )r   dataclassesr   typingr   r'   configuration_utilsr   r   utilsr   utils.torch_utilsr   scheduling_utilsr
   r]   r^   r5   r.   r/   r7   r   r   r   r   <module>   s,    
4