o
    Gi|                  	   @   s   d dl Z d dlmZ d dlmZ d dlZd dlZddlm	Z	m
Z
 ddlmZmZ ddlmZmZ e r7d dlZeG d	d
 d
eZ		ddededed dejfddZG dd dee	ZdS )    N)	dataclass)Literal   )ConfigMixinregister_to_config)
BaseOutputis_scipy_available   )KarrasDiffusionSchedulersSchedulerMixinc                   @   s.   e Zd ZU dZejed< dZejdB ed< dS )HeunDiscreteSchedulerOutputaq  
    Output class for the scheduler's `step` function output.

    Args:
        prev_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images):
            Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the
            denoising loop.
        pred_original_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images):
            The predicted denoised sample `(x_{0})` based on the model output from the current timestep.
            `pred_original_sample` can be used to preview progress or for guidance.
    prev_sampleNpred_original_sample)__name__
__module____qualname____doc__torchTensor__annotations__r    r   r   a/home/ubuntu/.local/lib/python3.10/site-packages/diffusers/schedulers/scheduling_heun_discrete.pyr      s   
 
r   +?cosinenum_diffusion_timestepsmax_betaalpha_transform_type)r   explaplacereturnc                 C   s   |dkr	dd }n|dkrdd }n|dkrdd }nt d| g }t| D ]}||  }|d	 |  }|td	||||  | q(tj|tjd
S )a>  
    Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
    (1-beta) over time from t = [0,1].

    Contains a function alpha_bar that takes an argument t and transforms it to the cumulative product of (1-beta) up
    to that part of the diffusion process.

    Args:
        num_diffusion_timesteps (`int`):
            The number of betas to produce.
        max_beta (`float`, defaults to `0.999`):
            The maximum beta to use; use values lower than 1 to avoid numerical instability.
        alpha_transform_type (`str`, defaults to `"cosine"`):
            The type of noise schedule for `alpha_bar`. Choose from `cosine`, `exp`, or `laplace`.

    Returns:
        `torch.Tensor`:
            The betas used by the scheduler to step the model outputs.
    r   c                 S   s    t | d d t j d d S )NgMb?gT㥛 ?r   )mathcospitr   r   r   alpha_bar_fnM   s    z)betas_for_alpha_bar.<locals>.alpha_bar_fnr   c              	   S   sP   dt dd|   t ddt d|    d  }t |}t |d|  S )Ng      r	         ?r   gư>)r    copysignlogfabsr   sqrt)r$   lmbsnrr   r   r   r%   R   s   4
r   c                 S   s   t | d S )Ng      ()r    r   r#   r   r   r   r%   Y   s   z"Unsupported alpha_transform_type: r	   dtype)
ValueErrorrangeappendminr   tensorfloat32)r   r   r   r%   betasit1t2r   r   r   betas_for_alpha_bar3   s   


"r9   c                   @   sT  e Zd ZdZdd eD ZdZe							
							dRdede	de	de
dejee	 B d	B de
dedededede	ded dedd	fddZ		dSd e	ejB d!ejd	B defd"d#Zed$d% Zed&d' Zed(d) ZdTd*edd	fd+d,Zd-ejd e	ejB dejfd.d/Z								dUd0ed	B d1e
ejB ded	B d2ee d	B fd3d4Zd5ejd6ejdejfd7d8Zd9ejd0edejfd:d;Zd9ejd0edejfd<d=Z	>dVd9ejd0ed?e	d@e	dejf
dAdBZedCdD Z d e	ejB dd	fdEdFZ!	GdWdHejejB d e	ejB d-ejejB dIede"e#B f
dJdKZ$dLejdMejd2ejdejfdNdOZ%defdPdQZ&d	S )XHeunDiscreteScheduleruE  
    Scheduler with Heun steps for discrete beta schedules.

    This model inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the generic
    methods the library implements for all schedulers such as loading and saving.

    Args:
        num_train_timesteps (`int`, defaults to 1000):
            The number of diffusion steps to train the model.
        beta_start (`float`, defaults to 0.0001):
            The starting `beta` value of inference.
        beta_end (`float`, defaults to 0.02):
            The final `beta` value.
        beta_schedule (`"linear"`, `"scaled_linear"`, `"squaredcos_cap_v2"`, or `"exp"`, defaults to `"linear"`):
            The beta schedule, a mapping from a beta range to a sequence of betas for stepping the model. Choose from
            `linear`, `scaled_linear`, `squaredcos_cap_v2`, or `exp`.
        trained_betas (`np.ndarray`, *optional*):
            Pass an array of betas directly to the constructor to bypass `beta_start` and `beta_end`.
        prediction_type (`"epsilon"`, `"sample"`, or `"v_prediction"`, defaults to `"epsilon"`, *optional*):
            Prediction type of the scheduler function; can be `epsilon` (predicts the noise of the diffusion process),
            `sample` (directly predicts the noisy sample`) or `v_prediction` (see section 2.4 of [Imagen
            Video](https://huggingface.co/papers/2210.02303) paper).
        clip_sample (`bool`, defaults to `True`):
            Clip the predicted sample for numerical stability.
        clip_sample_range (`float`, defaults to 1.0):
            The maximum magnitude for sample clipping. Valid only when `clip_sample=True`.
        use_karras_sigmas (`bool`, *optional*, defaults to `False`):
            Whether to use Karras sigmas for step sizes in the noise schedule during the sampling process. If `True`,
            the sigmas are determined according to a sequence of noise levels {σi}.
        use_exponential_sigmas (`bool`, *optional*, defaults to `False`):
            Whether to use exponential sigmas for step sizes in the noise schedule during the sampling process.
        use_beta_sigmas (`bool`, *optional*, defaults to `False`):
            Whether to use beta sigmas for step sizes in the noise schedule during the sampling process. Refer to [Beta
            Sampling is All You Need](https://huggingface.co/papers/2407.12173) for more information.
        timestep_spacing (`"linspace"`, `"leading"`, or `"trailing"`, defaults to `"linspace"`):
            The way the timesteps should be scaled. Refer to Table 2 of the [Common Diffusion Noise Schedules and
            Sample Steps are Flawed](https://huggingface.co/papers/2305.08891) for more information.
        steps_offset (`int`, defaults to 0):
            An offset added to the inference steps, as required by some model families.
    c                 C   s   g | ]}|j qS r   )name).0er   r   r   
<listcomp>   s    z HeunDiscreteScheduler.<listcomp>r     _QK?~jt?linearNepsilonF      ?linspacer   num_train_timesteps
beta_startbeta_endbeta_scheduletrained_betasprediction_typeuse_karras_sigmasuse_exponential_sigmasuse_beta_sigmasclip_sampleclip_sample_rangetimestep_spacing)rE   leadingtrailingsteps_offsetr   c                 C   s8  | j jrt stdt| j j| j j| j jgdkrtd|d ur,tj	|tj
d| _nH|dkr<tj|||tj
d| _n8|dkrRtj|d |d |tj
dd | _n"|d	kr^t|d
d| _n|dkrjt|dd| _n
t| d| j d| j | _tj| jdd| _| |d | || _d | _d | _| jd| _d S )Nz:Make sure to install scipy if you want to use beta sigmas.r	   znOnly one of `config.use_beta_sigmas`, `config.use_exponential_sigmas`, `config.use_karras_sigmas` can be used.r-   rB   scaled_linearr&   r   squaredcos_cap_v2r   )r   r   z is not implemented for rD   r   )dimcpu)configrN   r   ImportErrorsumrM   rL   r/   r   r3   r4   r5   rE   r9   NotImplementedError	__class__alphascumprodalphas_cumprodset_timesteps_step_index_begin_indexsigmasto)selfrF   rG   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   rT   r   r   r   __init__   s0   $zHeunDiscreteScheduler.__init__timestepschedule_timestepsc                 C   s:   |du r| j }||k }t|dkrdnd}||  S )ak  
        Find the index of a given timestep in the timestep schedule.

        Args:
            timestep (`float` or `torch.Tensor`):
                The timestep value to find in the schedule.
            schedule_timesteps (`torch.Tensor`, *optional*):
                The timestep schedule to search in. If `None`, uses `self.timesteps`.

        Returns:
            `int`:
                The index of the timestep in the schedule. For the very first step, returns the second index if
                multiple matches exist to avoid skipping a sigma when starting mid-schedule (e.g., for image-to-image).
        Nr	   r   )	timestepsnonzerolenitem)rf   rh   ri   indicesposr   r   r   index_for_timestep   s
   z(HeunDiscreteScheduler.index_for_timestepc                 C   s,   | j jdv r| j S | j d d d S )N)rE   rS   r   r	   r&   )rY   rQ   rd   maxrf   r   r   r   init_noise_sigma   s   
z&HeunDiscreteScheduler.init_noise_sigmac                 C      | j S )zg
        The index counter for current timestep. It will increase 1 after each scheduler step.
        )rb   rr   r   r   r   
step_index      z HeunDiscreteScheduler.step_indexc                 C   rt   )zq
        The index for the first timestep. It should be set from pipeline with `set_begin_index` method.
        rc   rr   r   r   r   begin_index   rv   z!HeunDiscreteScheduler.begin_indexrx   c                 C   s
   || _ dS )z
        Sets the begin index for the scheduler. This function should be run from pipeline before the inference.

        Args:
            begin_index (`int`, defaults to `0`):
                The begin index for the scheduler.
        Nrw   )rf   rx   r   r   r   set_begin_index   s   
z%HeunDiscreteScheduler.set_begin_indexsamplec                 C   s8   | j du r
| | | j| j  }||d d d  }|S )a  
        Ensures interchangeability with schedulers that need to scale the denoising model input depending on the
        current timestep.

        Args:
            sample (`torch.Tensor`):
                The input sample.
            timestep (`float` or `torch.Tensor`):
                The current timestep in the diffusion chain.

        Returns:
            `torch.Tensor`:
                A scaled input sample.
        Nr   r	   r&   )ru   _init_step_indexrd   )rf   rz   rh   sigmar   r   r   scale_model_input  s
   

z'HeunDiscreteScheduler.scale_model_inputnum_inference_stepsdevicerj   c                    s   |du r|du rt d|dur|durt d|dur$jjr$t d|dur0jjr0t d|dur<jjr<t d|pAt|}|_|pJjj}|durXtj	|tj
d}njjjdkrrtjd	|d
 |tj
dddd  }nPjjdkr|j }td	||  ddd  tj
}|jj7 }n)jjdkr|j }t|d	|   tj
}|d
8 }n	t jj dt	d
j j d }t| t|td	t||}jjrj|jd}t	 fdd|D }n3jjrj||d}t	 fdd|D }njjr.j||d}t	 fdd|D }t|dggtj
}t|j|d}t|dd
 |d
d d|dd g_t|}t|dd
 |d
d dg}|j|tj
d_d_ d_!d_"d_#jd_dS )a/  
        Sets the discrete timesteps used for the diffusion chain (to be run before inference).

        Args:
            num_inference_steps (`int`, *optional*, defaults to `None`):
                The number of diffusion steps used when generating samples with a pre-trained model.
            device (`str`, `torch.device`, *optional*, defaults to `None`):
                The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
            num_train_timesteps (`int`, *optional*, defaults to `None`):
                The number of diffusion steps used when training the model. If `None`, the default
                `num_train_timesteps` attribute is used.
            timesteps (`list[int]`, *optional*):
                Custom timesteps used to support arbitrary spacing between timesteps. If `None`, timesteps will be
                generated based on the `timestep_spacing` attribute. If `timesteps` is passed, `num_inference_steps`
                must be `None`, and `timestep_spacing` attribute will be ignored.
        NzEMust pass exactly one of `num_inference_steps` or `custom_timesteps`.zACan only pass one of `num_inference_steps` or `custom_timesteps`.z=Cannot use `timesteps` with `config.use_karras_sigmas = True`zCCannot set `timesteps` with `config.use_exponential_sigmas = True`.z<Cannot set `timesteps` with `config.use_beta_sigmas = True`.r-   rE   r   r	   rR   rS   zY is not supported. Please make sure to choose one of 'linspace', 'leading' or 'trailing'.r&   )	in_sigmasr~   c                       g | ]} | qS r   _sigma_to_tr<   r|   
log_sigmasrf   r   r   r>   b      z7HeunDiscreteScheduler.set_timesteps.<locals>.<listcomp>c                    r   r   r   r   r   r   r   r>   e  r   c                    r   r   r   r   r   r   r   r>   h  r   g        )r   r   r   r.   rX   )$r/   rY   rL   rM   rN   rl   r~   rF   nparrayr4   rQ   rE   copyarangeroundastyperT   r`   r(   interp_convert_to_karras_convert_to_exponential_convert_to_betaconcatenater   
from_numpyre   catrepeat_interleaverd   rj   prev_derivativedtrb   rc   )rf   r~   r   rF   rj   
step_ratiord   r   r   r   ra     sf   (
*
 



0
$z#HeunDiscreteScheduler.set_timestepsr|   r   c                 C   s   t t |d}||ddt jf  }t j|dkddjddj|jd d d}|d }|| }|| }|| ||  }	t |	dd}	d|	 | |	|  }
|
|j}
|
S )a  
        Convert sigma values to corresponding timestep values through interpolation.

        Args:
            sigma (`np.ndarray`):
                The sigma value(s) to convert to timestep(s).
            log_sigmas (`np.ndarray`):
                The logarithm of the sigma schedule used for interpolation.

        Returns:
            `np.ndarray`:
                The interpolated timestep value(s) corresponding to the input sigma(s).
        g|=Nr   )axisr   )rq   r	   )	r   r(   maximumnewaxiscumsumargmaxclipshapereshape)rf   r|   r   	log_sigmadistslow_idxhigh_idxlowhighwr$   r   r   r   r   |  s   ,z!HeunDiscreteScheduler._sigma_to_tr   c           
      C   s   t | jdr| jj}nd}t | jdr| jj}nd}|dur |n|d  }|dur,|n|d  }d}tdd|}|d|  }|d|  }||||   | }	|	S )a  
        Construct the noise schedule as proposed in [Elucidating the Design Space of Diffusion-Based Generative
        Models](https://huggingface.co/papers/2206.00364).

        Args:
            in_sigmas (`torch.Tensor`):
                The input sigma values to be converted.
            num_inference_steps (`int`):
                The number of inference steps to generate the noise schedule for.

        Returns:
            `torch.Tensor`:
                The converted sigma values following the Karras noise schedule.
        	sigma_minN	sigma_maxr   r   g      @r	   )hasattrrY   r   r   rm   r   rE   )
rf   r   r~   r   r   rhorampmin_inv_rhomax_inv_rhord   r   r   r   r     s   

z(HeunDiscreteScheduler._convert_to_karrasc                 C   s   t | jdr| jj}nd}t | jdr| jj}nd}|dur |n|d  }|dur,|n|d  }ttt	|t	||}|S )a  
        Construct an exponential noise schedule.

        Args:
            in_sigmas (`torch.Tensor`):
                The input sigma values to be converted.
            num_inference_steps (`int`):
                The number of inference steps to generate the noise schedule for.

        Returns:
            `torch.Tensor`:
                The converted sigma values following an exponential schedule.
        r   Nr   r   r   )
r   rY   r   r   rm   r   r   rE   r    r(   )rf   r   r~   r   r   rd   r   r   r   r     s   

 z-HeunDiscreteScheduler._convert_to_exponential333333?alphabetac              
      s   t | jdr| jjndt | jdr| jjnddur n|d  dur,n|d  tfdd fddd	tdd	| D D }|S )
a  
        Construct a beta noise schedule as proposed in [Beta Sampling is All You
        Need](https://huggingface.co/papers/2407.12173).

        Args:
            in_sigmas (`torch.Tensor`):
                The input sigma values to be converted.
            num_inference_steps (`int`):
                The number of inference steps to generate the noise schedule for.
            alpha (`float`, *optional*, defaults to `0.6`):
                The alpha parameter for the beta distribution.
            beta (`float`, *optional*, defaults to `0.6`):
                The beta parameter for the beta distribution.

        Returns:
            `torch.Tensor`:
                The converted sigma values following a beta distribution schedule.
        r   Nr   r   r   c                    s   g | ]
}|    qS r   r   )r<   ppf)r   r   r   r   r>     s    z:HeunDiscreteScheduler._convert_to_beta.<locals>.<listcomp>c                    s   g | ]}t jj| qS r   )scipystatsr   r   )r<   rh   )r   r   r   r   r>     s    r	   )r   rY   r   r   rm   r   r   rE   )rf   r   r~   r   r   rd   r   )r   r   r   r   r   r     s    

	z&HeunDiscreteScheduler._convert_to_betac                 C   s
   | j d u S N)r   rr   r   r   r   state_in_first_order  s   
z*HeunDiscreteScheduler.state_in_first_orderc                 C   s@   | j du rt|tjr|| jj}| || _dS | j	| _dS )z
        Initialize the step index for the scheduler based on the given timestep.

        Args:
            timestep (`float` or `torch.Tensor`):
                The current timestep to initialize the step index from.
        N)
rx   
isinstancer   r   re   rj   r   rp   rb   rc   )rf   rh   r   r   r   r{     s
   
z&HeunDiscreteScheduler._init_step_indexTmodel_outputreturn_dictc                 C   s  | j du r
| | | jr| j| j  }| j| j d  }n| j| j d  }| j| j  }d}||d  }| jjdkrF| jr=|n|}	||	|  }
n6| jjdkri| jrQ|n|}	||	 |	d d d   ||	d d   }
n| jjdkrr|}
n
td	| jj d
| jjr|
| jj	 | jj	}
| jr||
 | }|| }|| _
|| _|| _n||
 | }| j
| d }| j}| j}d| _
d| _d| _|||  }|  jd7  _|s||
fS t||
dS )a  
        Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion
        process from the learned model outputs (most often the predicted noise).

        Args:
            model_output (`torch.Tensor`):
                The direct output from learned diffusion model.
            timestep (`float`):
                The current discrete timestep in the diffusion chain.
            sample (`torch.Tensor`):
                A current instance of a sample created by the diffusion process.
            return_dict (`bool`):
                Whether or not to return a [`~schedulers.scheduling_heun_discrete.HeunDiscreteSchedulerOutput`] or
                tuple.

        Returns:
            [`~schedulers.scheduling_heun_discrete.HeunDiscreteSchedulerOutput`] or `tuple`:
                If return_dict is `True`, [`~schedulers.scheduling_heun_discrete.HeunDiscreteSchedulerOutput`] is
                returned, otherwise a tuple is returned where the first element is the sample tensor.
        Nr	   r   rC   v_predictionr   r&   rz   zprediction_type given as z, must be one of `epsilon`, or `v_prediction`)r   r   )ru   r{   r   rd   rY   rK   r/   rO   clamprP   r   r   rz   rb   r   )rf   r   rh   rz   r   r|   
sigma_nextgamma	sigma_hatsigma_inputr   
derivativer   r   r   r   r   step.  s\   

zHeunDiscreteScheduler.steporiginal_samplesnoisec                    s
  j j|j|jd}|jjdkr)t|r)jj|jtjd |j|jtjd}nj|j ||j}j	du rF fdd|D }nj
durUj
g|jd  }n	j	g|jd  }||  }t|jt|jk r}|d}t|jt|jk sn|||  }|S )	am  
        Add noise to the original samples according to the noise schedule at the specified timesteps.

        Args:
            original_samples (`torch.Tensor`):
                The original samples to which noise will be added.
            noise (`torch.Tensor`):
                The noise tensor to add to the original samples.
            timesteps (`torch.Tensor`):
                The timesteps at which to add noise, determining the noise level from the schedule.

        Returns:
            `torch.Tensor`:
                The noisy samples with added noise scaled according to the timestep schedule.
        r   mpsr-   Nc                    r   r   )rp   )r<   r$   ri   rf   r   r   r>     r   z3HeunDiscreteScheduler.add_noise.<locals>.<listcomp>r   r   )rd   re   r   r.   typer   is_floating_pointrj   r4   rx   ru   r   flattenrl   	unsqueeze)rf   r   r   rj   rd   step_indicesr|   noisy_samplesr   r   r   	add_noise  s"   


zHeunDiscreteScheduler.add_noisec                 C   s   | j jS r   )rY   rF   rr   r   r   r   __len__  s   zHeunDiscreteScheduler.__len__)r?   r@   rA   rB   NrC   FFFFrD   rE   r   r   )r   )NNNN)r   r   )T)'r   r   r   r   r
   _compatiblesorderr   intfloatstrr   ndarraylistboolr   rg   r   r   rp   propertyrs   ru   rx   ry   r}   r   ra   r   r   r   r   r   r{   r   tupler   r   r   r   r   r   r   r:   g   s    )	
2







]%'#
0



h
0r:   )r   r   )r    dataclassesr   typingr   numpyr   r   configuration_utilsr   r   utilsr   r   scheduling_utilsr
   r   scipy.statsr   r   r   r   r   r9   r:   r   r   r   r   <module>   s0   
4