o
    }oiuD                     @   s   d dl mZmZmZmZmZ d dlZd dlZd dl	Zd dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZ G d	d
 d
ZdS )    )AnyCallableDictOptionalTupleN)	rearrange)parallel_state)Tensor)	batch_mul)cat_outputs_cp)EDMSDE
EDMSampler
EDMScalingc                   @   s  e Zd ZdZ							d7d	d
Zedd Zdd Zdee	e
jf dedeee	e
jf e
jf fddZde
jde
jdee	e
jf fddZdee	e
jf de
jde
jdee	e
jf de
jde
jfddZde
jfddZdefd d!Z	"	#d8ded$ed%edefd&d'Z	"		#	(d9ded$ed)edB d%ed*edefd+d,Zd-edede
jfd.d/Zd:d0e
jd1ee de
jfd2d3Zd;dee	ef dee fd5d6ZdS )<EDMPipelinea	  
    EDMPipeline is a class that implements a diffusion model pipeline for video generation. It includes methods for
    initializing the pipeline, encoding and decoding video data, performing training steps, denoising, and generating
    samples.
    Attributes:
        p_mean: Mean for SDE process.
        p_std: Standard deviation for SDE process.
        sigma_max: Maximum noise level.
        sigma_min: Minimum noise level.
        _noise_generator: Generator for noise.
        _noise_level_generator: Generator for noise levels.
        sde: SDE process.
        sampler: Sampler for the diffusion model.
        scaling: Scaling for EDM.
        input_data_key: Key for input video data.
        input_image_key: Key for input image data.
        tensor_kwargs: Tensor keyword arguments.
        loss_reduce: Method for reducing loss.
        loss_scale: Scale factor for loss.
        aesthetic_finetuning: Aesthetic finetuning parameter.
        camera_sample_weight: Camera sample weight parameter.
        loss_mask_enabled: Flag for enabling loss mask.
    Methods:
        noise_level_generator: Returns the noise level generator.
        _initialize_generators: Initializes noise and noise-level generators.
        encode: Encodes input tensor using the video tokenizer.
        decode: Decodes latent tensor using video tokenizer.
        training_step: Performs a single training step for the diffusion model.
        denoise: Performs denoising on the input noise data, noise level, and condition.
        compute_loss_with_epsilon_and_sigma: Computes the loss for training.
        get_per_sigma_loss_weights: Returns loss weights per sigma noise level.
        get_condition_uncondition: Returns conditioning and unconditioning for classifier-free guidance.
        get_x0_fn_from_batch: Creates a function to generate denoised predictions with the sampler.
        generate_samples_from_batch: Generates samples based on input data batch.
        _normalize_video_databatch_inplace: Normalizes video data in-place on a CUDA device to [-1, 1].
        draw_training_sigma_and_epsilon: Draws training noise (epsilon) and noise levels (sigma).
        random_dropout_input: Applies random dropout to the input tensor.
        get_data_and_condition: Retrieves data and conditioning for model input.
    N              ?P   -C6*?      ?  c	           	      C   s   || _ || _|| _|| _|| _|| _|| _|| _d| _d| _	t
||||| _t | _t|| _d| _d| _dtjd| _d| _d| _dS )a_  
        Initializes the EDM pipeline with the given parameters.

        Args:
            net: The DiT model.
            vae: The Video Tokenizer (optional).
            p_mean (float): Mean for the SDE.
            p_std (float): Standard deviation for the SDE.
            sigma_max (float): Maximum sigma value for the SDE.
            sigma_min (float): Minimum sigma value for the SDE.
            sigma_data (float): Sigma value for EDM scaling.
            seed (int): Random seed for reproducibility.

        Attributes:
            vae: The Video Tokenizer.
            net: The DiT model.
            p_mean (float): Mean for the SDE.
            p_std (float): Standard deviation for the SDE.
            sigma_max (float): Maximum sigma value for the SDE.
            sigma_min (float): Minimum sigma value for the SDE.
            sigma_data (float): Sigma value for EDM scaling.
            seed (int): Random seed for reproducibility.
            _noise_generator: Placeholder for noise generator.
            _noise_level_generator: Placeholder for noise level generator.
            sde: Instance of EDMSDE initialized with p_mean, p_std, sigma_max, and sigma_min.
            sampler: Instance of EDMSampler.
            scaling: Instance of EDMScaling initialized with sigma_data.
            input_data_key (str): Key for input data.
            input_image_key (str): Key for input images.
            tensor_kwargs (dict): Tensor keyword arguments for device and dtype.
            loss_reduce (str): Method to reduce loss ('mean' or other).
            loss_scale (float): Scale factor for loss.
        Nvideoimages_1024cuda)devicedtypemeanr   )vaenetp_meanp_std	sigma_max	sigma_min
sigma_dataseed_noise_generator_noise_level_generatorr   sder   samplerr   scalinginput_data_keyinput_image_keytorchbfloat16tensor_kwargsloss_reduce
loss_scale)	selfr   r   r   r   r    r!   r"   r#    r1   g/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/diffusion/sampler/edm/edm_pipeline.py__init__I   s$   ,

zEDMPipeline.__init__c                 C   s   | j S )z
        Generates noise levels for the EDM pipeline.

        Returns:
            Callable: A function or generator that produces noise levels.
        )r%   )r0   r1   r1   r2   noise_level_generator   s   z!EDMPipeline.noise_level_generatorc                 C   sb   | j dtjdd  }| j dtjdd  }tjdd| _| j| tj	|| _
| j
| j_dS )a  
        Initializes the random number generators for noise and noise level.

        This method sets up two generators:
        1. A PyTorch generator for noise, seeded with a combination of the base seed and the data parallel rank.
        2. A NumPy generator for noise levels, seeded similarly but without considering context parallel rank.

        Returns:
            None
        d   T)with_context_parallelFr   )r   N)r#   r   get_data_parallel_rankr+   	Generatorr$   manual_seednprandomdefault_rngr%   r&   
_generator)r0   
noise_seednoise_level_seedr1   r1   r2   _initialize_generators   s   z"EDMPipeline._initialize_generators
data_batch	iterationreturnc                 C   sf   |  |\}}}| | |\}}t r'| ||||||\}}	}
||
fS | ||||||}|S )a  
        Performs a single training step for the diffusion model.

        This method is responsible for executing one iteration of the model's training. It involves:
        1. Adding noise to the input data using the SDE process.
        2. Passing the noisy data through the network to generate predictions.
        3. Computing the loss based on the difference between the predictions and the original data.

        Args:
            data_batch (dict): raw data batch draw from the training data loader.
            iteration (int): Current iteration number.

        Returns:
            A tuple with the output batch and the computed loss.
        )get_data_and_conditiondraw_training_sigma_and_epsilonsizer   is_pipeline_last_stage#compute_loss_with_epsilon_and_sigma)r0   rA   rB   x0_from_data_batchx0	conditionsigmaepsilonoutput_batchpred_mseedm_loss
net_outputr1   r1   r2   training_step   s   
zEDMPipeline.training_stepxtrL   rK   c                 C   s   |j di | j}|j di | j}| j|d\}}}}| jdt|||d|}t s0|S t||t|| }	t||	 d| }
|	|
fS )af  
        Performs denoising on the input noise data, noise level, and condition

        Args:
            xt (torch.Tensor): The input noise data.
            sigma (torch.Tensor): The noise level.
            condition (dict[str, torch.Tensor]): conditional information

        Returns:
            Predicted clean data (x0) and noise (eps_pred).
        rL   )x	timestepsr   Nr1   )tor-   r(   r   r
   r   rG   )r0   rS   rL   rK   c_skipc_outc_inc_noiserQ   x0_predeps_predr1   r1   r2   denoise   s   zEDMPipeline.denoiserI   rJ   rM   c              	   C   s   | j ||\}}|t|| }	t rE| |	||\}
}| j|d}||
 d }t||}||	||||
|d| | d}|||fS | |	||}
|
 S )ah  
        Computes the loss for training.

        Args:
            data_batch: Batch of input data.
            x0_from_data_batch: Raw input tensor.
            x0: Latent tensor.
            condition: Conditional input data.
            epsilon: Noise tensor.
            sigma: Noise level tensor.

        Returns:
            The computed loss.
        rT      )r\   r]   )rJ   rS   rL   weights_per_sigmarK   
model_predmse_lossrP   )	r&   marginal_probr
   r   rG   r^   get_per_sigma_loss_weightsr   
contiguous)r0   rA   rI   rJ   rK   rM   rL   r   stdrS   r\   r]   r`   rO   rP   rN   r1   r1   r2   rH      s&   


z/EDMPipeline.compute_loss_with_epsilon_and_sigmac                 C   s    |d | j d  || j  d  S )z
        Args:
            sigma (tensor): noise level

        Returns:
            loss weights per sigma noise level
        r_   )r"   )r0   rL   r1   r1   r2   rd   "  s    z&EDMPipeline.get_per_sigma_loss_weightsc                 C   sl   | j |dd\}}}d|v r(|d |d< |d |d< | j |dd\}}}||fS | j |dd\}}}||fS )zEReturns conditioning and unconditioning for classifier-free guidance.r   dropout_rateneg_t5_text_embeddingst5_text_embeddingsneg_t5_text_maskt5_text_maskr   )rD   )r0   rA   _rK   unconditionr1   r1   r2   get_condition_uncondition,  s   z%EDMPipeline.get_condition_uncondition      ?Fguidanceis_negative_promptc                    s8    |\ dtjdtjdtjf fdd}|S )aF  
        Creates a function to generate denoised predictions with the sampler.

        Args:
            data_batch: Batch of input data.
            guidance: Guidance scale factor.
            is_negative_prompt: Whether to use negative prompts.

        Returns:
            A callable to predict clean data (x0).
        noise_xrL   rC   c                    s4    | | \}} | |\}}|||   S N)r^   )rs   rL   cond_x0rm   	uncond_x0rK   rq   r0   rn   r1   r2   x0_fnL  s   z/EDMPipeline.get_x0_fn_from_batch.<locals>.x0_fn)ro   r+   r	   )r0   rA   rq   rr   rx   r1   rw   r2   get_x0_fn_from_batch9  s   &z EDMPipeline.get_x0_fn_from_batch#   state_shape	num_stepsc                 C   s   t  dk}| jdu r|   | j|||d}t|}|d  t    < tj|fi | jd| ji| j	j
 }| j|||| j	j
d}	|rPt  }
t|	d|
d}	|	S )a  
        Generates samples based on input data batch.

        Args:
            data_batch: Batch of input data.
            guidance: Guidance scale factor.
            state_shape: Shape of the state.
            is_negative_prompt: Whether to use negative prompts.
            num_steps: Number of steps for sampling.
            solver_option: SDE Solver option.

        Returns:
            Generated samples from diffusion model.
           N)rr   	generator)r|   r    r_   )seq_dimcp_group)r   get_context_parallel_world_sizer$   r@   ry   listr+   randnr-   r&   r    r'   get_context_parallel_groupr   )r0   rA   rq   r{   rr   r|   
cp_enabledrx   x_sigma_maxsamplesr   r1   r1   r2   generate_samples_from_batchS  s   
$z'EDMPipeline.generate_samples_from_batchx0_sizec                 C   sX   ~|d }| j du r|   tj|fi | jd| j i}| j|jdi | j|fS )a  
        Draws training noise (epsilon) and noise levels (sigma).

        Args:
            x0_size: Shape of the input tensor.
            condition: Conditional input (unused).

        Returns:
            Noise level (sigma) and noise (epsilon).
        r   Nr~   r1   )r$   r@   r+   r   r-   r&   sample_trW   )r0   r   rK   
batch_sizerM   r1   r1   r2   rE   }  s   
z+EDMPipeline.draw_training_sigma_and_epsilon	in_tensorrh   c                 C   s<   |dur|n| j }ttd| t|jd  ||S )z
        Applies random dropout to the input tensor.

        Args:
            in_tensor: Input tensor.
            dropout_rate: Dropout probability (optional).

        Returns:
            Conditioning with random dropout applied.
        Nr   r   )rh   r
   r+   	bernoullionesshapetype_as)r0   r   rh   r1   r1   r2   random_dropout_input  s
   "z EDMPipeline.random_dropout_input皙?c                 C   sX   |d | j  }|}i }| D ]\}}|dvr|||< q| j|d |d|d< |||fS )a  
        Retrieves data and conditioning for model input.

        Args:
            data_batch: Batch of input data.
            dropout_rate: Dropout probability for conditioning.

        Returns:
            Raw data, latent data, and conditioning information.
        r   )r   rj   rj   rg   crossattn_emb)r"   itemsr   )r0   rA   rh   	raw_statelatent_staterK   keyvaluer1   r1   r2   rD     s   

z"EDMPipeline.get_data_and_condition)Nr   r   r   r   r   r   )rp   F)rp   NFrz   rt   )r   )__name__
__module____qualname____doc__r3   propertyr4   r@   dictstrr+   r	   inttuplerR   r^   rH   rd   r   ro   floatboolr   ry   r   r   r   rE   r   r   rD   r1   r1   r1   r2   r       s    +
C
	
$$"
4


* $r   )typingr   r   r   r   r   numpyr:   r+   torch.distributedeinopsr   megatron.corer   r	   ,nemo.collections.diffusion.sampler.batch_opsr
   3nemo.collections.diffusion.sampler.context_parallelr   *nemo.collections.diffusion.sampler.edm.edmr   r   r   r   r1   r1   r1   r2   <module>   s   