o
    ۷i!                     @   sN   d Z ddlmZ ddlmZ ddlZddlmZmZm	Z	 G dd dedZ
dS )	zI
Base pipeline class for Diffusion models with shared CFG functionality.
    )ABCMeta)AnyN)get_cfg_group!get_classifier_free_guidance_rank'get_classifier_free_guidance_world_sizec                   @   s  e Zd ZdZ		d!dededeeef deeef dB ded	e	dB d
e
jdB fddZde
jde
jd
e
jfddZ	d"de
jde
jdeded
e
jf
ddZdeded
e
jfddZdeded
efddZde
jde
jde
jd
e
jfddZde
jde
jde
jded
e
jf
dd ZdS )#CFGParallelMixinz
    Base Mixin class for Diffusion pipelines providing shared CFG methods.

    All pipelines should inherit from this class to reuse
    classifier-free guidance logic.
    TNdo_true_cfgtrue_cfg_scalepositive_kwargsnegative_kwargscfg_normalizeoutput_slicereturnc                 C   s6  |rt  dk}|rQt }t }	|	dkr| jdi |}
n| jdi |}
|dur2|
ddd|f }
|j|
dd}|	dkrO|d }|d }| ||||}|S dS | jdi |}| jdi |}|dury|ddd|f }|ddd|f }| ||||}|S | jdi |}|dur|ddd|f }|S )aF  
        Predict noise with optional classifier-free guidance.

        Args:
            do_true_cfg: Whether to apply CFG
            true_cfg_scale: CFG scale factor
            positive_kwargs: Kwargs for positive/conditional prediction
            negative_kwargs: Kwargs for negative/unconditional prediction
            cfg_normalize: Whether to normalize CFG output (default: True)
            output_slice: If set, slice output to [:, :output_slice] for image editing

        Returns:
            Predicted noise tensor (only valid on rank 0 in CFG parallel mode)
           r   NT)separate_tensors )r   r   r   predict_noise
all_gathercombine_cfg_noise)selfr   r	   r
   r   r   r   cfg_parallel_ready	cfg_groupcfg_rank
local_predgathered
noise_predneg_noise_predpositive_noise_prednegative_noise_predpredr   r   b/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/cfg_parallel.pypredict_noise_maybe_with_cfg   s<   
z-CFGParallelMixin.predict_noise_maybe_with_cfgr   	comb_predc                 C   s0   t j|ddd}t j|ddd}|||  }|S )z
        Normalize the combined noise prediction.

        Args:
            noise_pred: positive noise prediction
            comb_pred: combined noise prediction after CFG

        Returns:
            Normalized noise prediction tensor
        T)dimkeepdim)torchnorm)r   r   r"   	cond_norm
noise_normr   r   r    cfg_normalize_functionc   s   z'CFGParallelMixin.cfg_normalize_functionFr   c                 C   s,   ||||   }|r|  ||}|S |}|S )a  
        Combine conditional and unconditional noise predictions with CFG.

        Args:
            noise_pred: Conditional noise prediction
            neg_noise_pred: Unconditional noise prediction
            true_cfg_scale: CFG scale factor
            cfg_normalize: Whether to normalize the combined prediction (default: False)

        Returns:
            Combined noise prediction tensor
        )r*   )r   r   r   r	   r   r"   r   r   r    r   s   s   z"CFGParallelMixin.combine_cfg_noiseargskwargsc                 O   s   | j |i |d S )z
        Forward pass through transformer to predict noise.

        Subclasses should override this if they need custom behavior,
        but the default implementation calls self.transformer.
        r   )transformerr   r+   r,   r   r   r    r      s   zCFGParallelMixin.predict_noisec                 O   s   t d)aU  
        Diffusion loop with optional classifier-free guidance.

        Subclasses MUST implement this method to define the complete
        diffusion/denoising loop for their specific model.

        Typical implementation pattern:
        ```python
        def diffuse(self, latents, timesteps, prompt_embeds, negative_embeds, ...):
            for t in timesteps:
                # Prepare kwargs for positive and negative predictions
                positive_kwargs = {...}
                negative_kwargs = {...}

                # Predict noise with automatic CFG handling
                noise_pred = self.predict_noise_maybe_with_cfg(
                    do_true_cfg=True,
                    true_cfg_scale=self.guidance_scale,
                    positive_kwargs=positive_kwargs,
                    negative_kwargs=negative_kwargs,
                )

                # Step scheduler with automatic CFG sync
                latents = self.scheduler_step_maybe_with_cfg(
                    noise_pred, t, latents, do_true_cfg=True
                )

            return latents
        ```
        z!Subclasses must implement diffuse)NotImplementedErrorr.   r   r   r    diffuse   s   #zCFGParallelMixin.diffusetlatentsc                 C   s   | j j|||ddd S )z
        Step the scheduler.

        Args:
            noise_pred: Predicted noise
            t: Current timestep
            latents: Current latents

        Returns:
            Updated latents after scheduler step
        F)return_dictr   )	schedulerstep)r   r   r1   r2   r   r   r    scheduler_step   s   zCFGParallelMixin.scheduler_stepc                 C   s`   |ot  dk}|r't }t }|dkr| |||}| }|j|dd |S | |||}|S )a  
        Step the scheduler with (maybe) automatic CFG parallel synchronization.

        In CFG parallel mode, only rank 0 computes the scheduler step,
        then broadcasts the result to other ranks.

        Args:
            noise_pred: Predicted noise (only valid on rank 0 in CFG parallel)
            t: Current timestep
            latents: Current latents
            do_true_cfg: Whether CFG is enabled

        Returns:
            Updated latents (synchronized across all CFG ranks)
        r   r   )src)r   r   r   r6   
contiguous	broadcast)r   r   r1   r2   r   r   r   r   r   r   r    scheduler_step_maybe_with_cfg   s   z.CFGParallelMixin.scheduler_step_maybe_with_cfg)TN)F)__name__
__module____qualname____doc__boolfloatdictstrr   intr&   Tensorr!   r*   r   r   r0   r6   r:   r   r   r   r    r      sh    

G
	
"%r   )	metaclass)r>   abcr   typingr   r&   .vllm_omni.diffusion.distributed.parallel_stater   r   r   r   r   r   r   r    <module>   s   