o
    ۷i%                     @   s   d dl Z d dlmZ d dlmZ d dlZddlmZmZ ddl	m
Z
 ddlmZ dd	ejd
ejdB dejfddZ		ddejdejded
ejdB dejf
ddZeG dd de
ZG dd deeZdS )    N)	dataclass)Literal   )ConfigMixinregister_to_config)
BaseOutput   )SchedulerMixint	generatorreturnc                 C   sV   |dur|j n| j }tj| |djdd|d| j }tt|d d S )a  
    Generate Gumbel noise for sampling.

    Args:
        t (`torch.Tensor`):
            Input tensor to match the shape and dtype of the output noise.
        generator (`torch.Generator`, *optional*):
            A random number generator for reproducible sampling.

    Returns:
        `torch.Tensor`:
            Gumbel-distributed noise with the same shape, dtype, and device as the input tensor.
    Ndevicer   r   r   #B;)r   torch
zeros_likeuniform_tologclamp)r
   r   r   noise r   \/home/ubuntu/vllm_env/lib/python3.10/site-packages/diffusers/schedulers/scheduling_amused.pygumbel_noise   s   " r         ?mask_lenprobstemperaturec                 C   sN   t |d|t||d  }t j|ddj}t |d|  }||k }|S )a  
    Mask tokens by selecting the top-k lowest confidence scores with temperature-based randomness.

    Args:
        mask_len (`torch.Tensor`):
            Number of tokens to mask per sample in the batch.
        probs (`torch.Tensor`):
            Probability scores for each token.
        temperature (`float`, *optional*, defaults to 1.0):
            Temperature parameter for controlling randomness in the masking process.
        generator (`torch.Generator`, *optional*):
            A random number generator for reproducible sampling.

    Returns:
        `torch.Tensor`:
            Boolean mask indicating which tokens should be masked.
    r   r   dimr   )r   r   r   r   sortvaluesgatherlong)r   r   r   r   
confidencesorted_confidencecut_offmaskingr   r   r   mask_by_random_topk   s
    r*   c                   @   s.   e Zd ZU dZejed< dZejdB ed< dS )AmusedSchedulerOutputa  
    Output class for the scheduler's `step` function output.

    Args:
        prev_sample (`torch.LongTensor` of shape `(batch_size, height, width)` or `(batch_size, sequence_length)`):
            Computed sample `(x_{t-1})` of previous timestep with token IDs. `prev_sample` should be used as next model
            input in the denoising loop.
        pred_original_sample (`torch.LongTensor` of shape `(batch_size, height, width)` or `(batch_size, sequence_length)`, *optional*):
            The predicted fully denoised sample `(x_{0})` with token IDs based on the model output from the current
            timestep. `pred_original_sample` can be used to preview progress or for guidance.
    prev_sampleNpred_original_sample)	__name__
__module____qualname____doc__r   Tensor__annotations__r-   	Generatorr   r   r   r   r+   =   s   
 
r+   c                   @   s   e Zd ZU dZdZejdB ed< ejdB ed< e	dde	de
d	 fd
dZ		d de	de	ee	e	f B ee	 B deejB fddZ			d!dejde	dejdedejdB dedeeB fddZ	d"dejde	dejdB dejfddZdS )#AmusedSchedulera  
    A scheduler for masked token generation as used in [`AmusedPipeline`].

    This scheduler iteratively unmasks tokens based on their confidence scores, following either a cosine or linear
    schedule. Unlike traditional diffusion schedulers that work with continuous pixel values, this scheduler operates
    on discrete token IDs, making it suitable for autoregressive and non-autoregressive masked token generation models.

    This scheduler inherits from [`SchedulerMixin`] and [`ConfigMixin`]. Check the superclass documentation for the
    generic methods the library implements for all schedulers such as loading and saving.

    Args:
        mask_token_id (`int`):
            The token ID used to represent masked tokens in the sequence.
        masking_schedule (`Literal["cosine", "linear"]`, *optional*, defaults to `"cosine"`):
            The schedule type for determining the mask ratio at each timestep. Can be either `"cosine"` or `"linear"`.
    r   Ntemperatures	timestepscosinemask_token_idmasking_schedule)r8   linearc                 C   s   d | _ d | _d S N)r6   r7   )selfr9   r:   r   r   r   __init__f   s   
zAmusedScheduler.__init__r   r   num_inference_stepsr   r   c                 C   s\   t j||dd| _t|ttfr"t j|d |d ||d| _d S t j|d||d| _d S )Nr   r   r   g{Gz?)	r   arangeflipr7   
isinstancetuplelistlinspacer6   )r=   r@   r   r   r   r   r   set_timestepso   s    zAmusedScheduler.set_timestepsr   Tmodel_outputtimestepsamplestarting_mask_ratior   return_dictr   c                 C   st  |j dko	|j dk}|r)|j\}}	}
}|||
| }|||	|
| ddd}|| jjk}|jdd}|j}|d urB||jn|}|jj	dkrT|j
tjkrT| }|d|d}tj|d|d	j|d
}|d d df j|jd d  }t|||}|dkr|}n|jd }| j|k }|d t| j }| jjdkrt|tj d }n| jjdkrd| }n	td| jj || }||  }t|jdddd |}ttjdg|jd
|}t|d|d d d d d f d d d d df }t||t |j
j}t!||| j"| |}t|| jj|}|r.|||
|}|||
|}|s5||fS t#||S )N      r   r   r   r   r    cpur   r   r8   r;   unknown masking schedule T)r!   keepdim)$ndimshapereshapepermuteconfigr9   softmaxr   r   typedtyper   float32floatsizemultinomialviewwherer7   nonzerolenr:   cosmathpi
ValueErrorfloorminsummaxtensorr$   finfor*   r6   r+   )r=   rH   rI   rJ   rK   r   rL   two_dim_input
batch_sizecodebook_sizeheightwidthunknown_mapr   r   probs_r-   r,   seq_lenstep_idxratio
mask_ratior   selected_probsr)   r   r   r   step|   sN   	 

2
zAmusedScheduler.stepc           	      C   s   | j |k }|d t| j  }| jjdkr!t|tj d }n| jjdkr,d| }n	t	d| jj tj
|j|dur@|jn|j|d|j|k }| }| jj||< |S )a  
        Add noise to a sample by randomly masking tokens according to the masking schedule.

        Args:
            sample (`torch.LongTensor`):
                The input sample containing token IDs to be partially masked.
            timesteps (`int`):
                The timestep that determines how much masking to apply. Higher timesteps result in more masking.
            generator (`torch.Generator`, *optional*):
                A random number generator for reproducible masking.

        Returns:
            `torch.LongTensor`:
                The sample with some tokens replaced by `mask_token_id` according to the masking schedule.
        r   r8   r   r;   rP   N)r   r   )r7   r`   ra   rV   r:   r   rb   rc   rd   re   randrS   r   r   cloner9   )	r=   rJ   r7   r   rt   ru   rv   mask_indicesmasked_sampler   r   r   	add_noise   s"   
zAmusedScheduler.add_noise)r8   )r?   N)r   NTr<   )r.   r/   r0   r1   orderr   r4   r3   r   intr   r>   rD   rE   strr   rG   r2   
LongTensorr[   boolr+   rx   r}   r   r   r   r   r5   O   sb   
 

Ir5   r<   )r   N)rc   dataclassesr   typingr   r   configuration_utilsr   r   utilsr   scheduling_utilsr	   r2   r4   r   r[   r*   r+   r5   r   r   r   r   <module>   s0    "
