o
    pi                     @   s   d dl Z d dlmZ d dlmZmZmZ d dlZddlm	Z	m
Z
 ddlmZ ddlmZ dd	lmZ 	
	dddZeG dd deZG dd dee	ZdS )    N)	dataclass)OptionalTupleUnion   )ConfigMixinregister_to_config)
BaseOutput)randn_tensor   )SchedulerMixin+?cosinec                 C   s   |dkr	dd }n|dkrdd }nt d| g }t| D ]}||  }|d |  }|td||||  | qtj|tjdS )	a  
    Create a beta schedule that discretizes the given alpha_t_bar function, which defines the cumulative product of
    (1-beta) over time from t = [0,1].

    Contains a function alpha_bar that takes an argument t and transforms it to the cumulative product of (1-beta) up
    to that part of the diffusion process.


    Args:
        num_diffusion_timesteps (`int`): the number of betas to produce.
        max_beta (`float`): the maximum beta to use; use values lower than 1 to
                     prevent singularities.
        alpha_transform_type (`str`, *optional*, default to `cosine`): the type of noise schedule for alpha_bar.
                     Choose from `cosine` or `exp`

    Returns:
        betas (`np.ndarray`): the betas used by the scheduler to step the model outputs
    r   c                 S   s    t | d d t j d d S )NgMb?gT㥛 ?r   )mathcospit r   q/home/ubuntu/SoloSpeech/.venv/lib/python3.10/site-packages/diffusers/schedulers/scheduling_consistency_decoder.pyalpha_bar_fn'   s    z)betas_for_alpha_bar.<locals>.alpha_bar_fnexpc                 S   s   t | d S )Ng      ()r   r   r   r   r   r   r   ,   s   z"Unsupported alpha_transform_type: r   )dtype)
ValueErrorrangeappendmintorchtensorfloat32)num_diffusion_timestepsmax_betaalpha_transform_typer   betasit1t2r   r   r   betas_for_alpha_bar   s   

"r'   c                   @   s   e Zd ZU dZejed< dS )!ConsistencyDecoderSchedulerOutputa>  
    Output class for the scheduler's `step` function.

    Args:
        prev_sample (`torch.Tensor` of shape `(batch_size, num_channels, height, width)` for images):
            Computed sample `(x_{t-1})` of previous timestep. `prev_sample` should be used as next model input in the
            denoising loop.
    prev_sampleN)__name__
__module____qualname____doc__r   Tensor__annotations__r   r   r   r   r(   :   s   
 	r(   c                   @   s   e Zd ZdZe		ddedefddZ		dd	ee d
e	e
ejf fddZedd Zddejdee dejfddZ		ddejde	eejf dejdeej dede	eef fddZdS )ConsistencyDecoderSchedulerr            ?num_train_timesteps
sigma_datac                 C   s   t |}d| }tj|dd}t|| _td| | _td| d }td| }||d  |d |d   | _|| |d |d  d  | _||d |d  d  | _d S )Ng      ?r   )dimr   r   r2   )	r'   r   cumprodsqrtsqrt_alphas_cumprodsqrt_one_minus_alphas_cumprodc_skipc_outc_in)selfr3   r4   r#   alphasalphas_cumprodsigmassqrt_recip_alphas_cumprodr   r   r   __init__K   s   z$ConsistencyDecoderScheduler.__init__Nnum_inference_stepsdevicec                 C   sr   |dkrt dtjddgtj|d| _| j|| _| j|| _| j|| _| j	|| _	| j
|| _
d S )Nr   z8Currently more than 2 inference steps are not supported.i  i   )r   rD   )r   r   r   long	timestepsr8   tor9   r:   r;   r<   )r=   rC   rD   r   r   r   set_timestepsa   s   z)ConsistencyDecoderScheduler.set_timestepsc                 C   s   | j | jd  S )Nr   )r9   rF   )r=   r   r   r   init_noise_sigmap   s   z,ConsistencyDecoderScheduler.init_noise_sigmasampletimestepreturnc                 C   s   || j |  S )a  
        Ensures interchangeability with schedulers that need to scale the denoising model input depending on the
        current timestep.

        Args:
            sample (`torch.Tensor`):
                The input sample.
            timestep (`int`, *optional*):
                The current timestep in the diffusion chain.

        Returns:
            `torch.Tensor`:
                A scaled input sample.
        )r<   )r=   rJ   rK   r   r   r   scale_model_inputt   s   z-ConsistencyDecoderScheduler.scale_model_inputTmodel_output	generatorreturn_dictc           
      C   s   | j | | | j| |  }t| j|kd }|t| jd kr$|}n+t|j||j|j	d}	| j
| j|d   |j| | j| j|d   |j|	  }|sT|fS t|dS )a  
        Predict the sample from the previous timestep by reversing the SDE. This function propagates the diffusion
        process from the learned model outputs (most often the predicted noise).

        Args:
            model_output (`torch.Tensor`):
                The direct output from the learned diffusion model.
            timestep (`float`):
                The current timestep in the diffusion chain.
            sample (`torch.Tensor`):
                A current instance of a sample created by the diffusion process.
            generator (`torch.Generator`, *optional*):
                A random number generator.
            return_dict (`bool`, *optional*, defaults to `True`):
                Whether or not to return a
                [`~schedulers.scheduling_consistency_models.ConsistencyDecoderSchedulerOutput`] or `tuple`.

        Returns:
            [`~schedulers.scheduling_consistency_models.ConsistencyDecoderSchedulerOutput`] or `tuple`:
                If return_dict is `True`,
                [`~schedulers.scheduling_consistency_models.ConsistencyDecoderSchedulerOutput`] is returned, otherwise
                a tuple is returned where the first element is the sample tensor.
        r   r   )rO   r   rD   )r)   )r;   r:   r   whererF   lenr
   shaper   rD   r8   rG   r9   r(   )
r=   rN   rK   rJ   rO   rP   x_0timestep_idxr)   noiser   r   r   step   s   
z ConsistencyDecoderScheduler.step)r1   r2   )NN)N)NT)r*   r+   r,   orderr   intfloatrB   r   r   strr   rD   rH   propertyrI   r.   rM   	Generatorboolr(   r   rW   r   r   r   r   r0   H   sF    

 
r0   )r   r   )r   dataclassesr   typingr   r   r   r   configuration_utilsr   r   utilsr	   utils.torch_utilsr
   scheduling_utilsr   r'   r(   r0   r   r   r   r   <module>   s    
,