o
    پid                     @   s,  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	Z
ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddl m!Z! ddl"m#Z# ddl$m%Z% ddl&m'Z' ddl(m)Z) e)e*Z+G dd de!Z,G dd deZ-e-Z.dS )z
Diffusers backend pipeline wrapper.

This module provides a wrapper that allows running any diffusers-supported model
through sglang's infrastructure using vanilla diffusers pipelines.
    N)BytesIO)Any)DiffusionPipeline)Image)PipelineConfig)get_local_torch_device)ComposedPipelineBase)PipelineExecutor)SyncExecutor)Req)PipelineStage)AttentionBackendEnum)
ServerArgs)maybe_download_model)init_loggerc                
       s  e Zd ZdZdef fddZdededefdd	Zd
dde	de
dee	ee f fddZdedejdB fddZdedejdB fddZdedejdB fddZdejdejfddZdejdejfddZdedede	fddZdefdd ZdedejdB fd!d"Z  ZS )#DiffusersExecutionStagez7Pipeline stage that wraps diffusers pipeline execution.diffusers_pipec                    s   t    || _d S N)super__init__r   )selfr   	__class__ n/home/ubuntu/.local/lib/python3.10/site-packages/sglang/multimodal_gen/runtime/pipelines/diffusers_pipeline.pyr   /   s   

z DiffusersExecutionStage.__init__batchserver_argsreturnc                 C   s  |  ||}| |\}}d|vrd|d< t V tjdd@ td z
| jdi |}W n( tyX } zdt	|v rM|
dd | jdi |}n W Y d}~nd}~ww W d   n1 scw   Y  W d   n1 srw   Y  | ||_|jdur| |j|_|S )zExecute the diffusers pipeline.output_typeptT)recordalwaysNr   )_build_pipeline_kwargs_filter_pipeline_kwargstorchno_gradwarningscatch_warningssimplefilterr   	TypeErrorstrpop_extract_outputoutput_postprocess_output)r   r   r   kwargs_r-   er   r   r   forward3   s0   
 
zDiffusersExecutionStage.forwardF)strictr/   r3   c             	   C   s   z	t | jj}W n ttfy   |g f Y S w |j}tdd | D }|r-|g fS t	|
 dh }i }g }| D ]\}	}
|	|v rK|
||	< q>||	 q>|rrt| jj}d| ddt| d}|rmt|t| ||fS )a  Filter kwargs to those accepted by the pipeline's __call__.

        Args:
            kwargs: Arguments to filter
            strict: If True, raise ValueError on unsupported args; otherwise warn

        Returns:
            Tuple of (filtered_kwargs, ignored_keys)
        c                 s   s    | ]
}|j tjjkV  qd S r   )kindinspect	ParameterVAR_KEYWORD).0pr   r   r   	<genexpr>c   s    
zBDiffusersExecutionStage._filter_pipeline_kwargs.<locals>.<genexpr>r   z
Pipeline 'z' does not support: z, z". These arguments will be ignored.)r5   	signaturer   __call__
ValueErrorr)   
parametersanyvaluessetkeysitemsappendtype__name__joinsortedloggerwarning)r   r/   r3   sigparamsaccepts_var_kwargsvalidfilteredignoredkv	pipe_namemsgr   r   r   r#   Q   s4   

z/DiffusersExecutionStage._filter_pipeline_kwargsr-   Nc                 C   sf   dD ])}t ||s
qt||}|du rq| |}|dur+td||j|j |  S qtd dS )z+Extract tensor output from pipeline result.)imagesframesvideosamplepred_original_sampleNz.Extracted output from '%s': shape=%s, dtype=%sz-Could not extract output from pipeline result)hasattrgetattr_convert_to_tensorrI   debugshapedtyperJ   )r   r-   attrdataresultr   r   r   r,      s$   



	z'DiffusersExecutionStage._extract_outputra   c                 C   s   t |tjr|S t |tjr>t| }| dkr|d }|jdkr.|	dddd}|S |jdkr<|	ddddd}|S t
|d	rIt |S t |trYt|dkrY| |S d
S )z)Convert various data formats to a tensor.      ?     o@   r               modeN)
isinstancer$   Tensornpndarray
from_numpyfloatmaxndimpermuterZ   TToTensorlistlen_convert_list_to_tensor)r   ra   tensorr   r   r   r\      s"   



z*DiffusersExecutionStage._convert_to_tensorc                 C   s0  |d }t |trt|dkr|}|d }t|dr8dd |D }t|}t|dkr4|ddddS |d S t |tjrUt|}t|dkrQ|ddddS |d S t |tj	rdd |D }|d 
 d	krqd
d |D }|d jdkrdd |D }t|}t|dkr|ddddS |d S dS )z$Convert a list of items to a tensor.r   rj   c                 S   s   g | ]}t  |qS r   )rt   ru   )r8   imgr   r   r   
<listcomp>   s    zCDiffusersExecutionStage._convert_list_to_tensor.<locals>.<listcomp>rg   rh   rf   c                 S   s   g | ]	}t | qS r   )r$   ro   rp   )r8   arrr   r   r   r{          rc   c                 S   s   g | ]}|d  qS )rd   r   r8   tr   r   r   r{      s    c                 S   s   g | ]	}| d ddqS )rh   r   rg   )rs   r~   r   r   r   r{      r}   N)rk   rv   rw   rZ   r$   stackrs   rl   rm   rn   rq   rr   )r   ra   firsttensorsstackedr   r   r   rx      s4   



z/DiffusersExecutionStage._convert_list_to_tensorc                 C   s   |   }t| st| r"td tj|dddd}|	 
 | 
 }}|dk s7|dkr=|d d	 }|d
d}| |}td|j |S )zDPost-process output tensor to ensure valid values and correct shape.z)Output contains invalid values, fixing...g      ?rc   g        )nanposinfneginfg      g      ?rg   rh   r   zFinal output tensor shape: %s)cpurp   r$   isnanr?   isinfrI   rJ   
nan_to_numminitemrq   clamp_fix_output_shaper]   r^   )r   r-   min_valmax_valr   r   r   r.      s   

z+DiffusersExecutionStage._postprocess_outputc                 C   s   |  dkr|dddddS |  dkr1|jd dks#|jd dv r%|S |ddddddS |  dkr_|j\}}}|dkrL|dkrL|ddd}|jd dkrZ|ddd}|dS |  dkrr|dddddS |S )z~Fix tensor shape for downstream processing.

        Expected: (B, C, H, W) for images or (B, C, T, H, W) for videos.
        ri   r   rh   rg   rf   re   )rg   rf   re   )dimrs   r^   	unsqueezerepeat)r   r-   chwr   r   r   r      s    
z)DiffusersExecutionStage._fix_output_shapec                 C   s>  i }|j dur|j |d< |jr|j|d< |jdur|j|d< |jdur(|j|d< |jdur2|j|d< |jdur<|j|d< |jdurF|j|d< |jdurU|jd	krU|j|d
< |jdur`|j|d< n|j	duru| 
 }tj|d|j	|d< | |}|dur||d< |jd	kr|j|d< |jr|jdi }|r|| |S )z.Build kwargs dict for diffusers pipeline call.Npromptnegative_promptnum_inference_stepsguidance_scaletrue_cfg_scaleheightwidthrg   
num_frames	generator)deviceimagenum_images_per_promptdiffusers_kwargs)r   r   r   r   r   r   r   r   r   seed_get_pipeline_devicer$   	Generatormanual_seed_load_input_imagenum_outputs_per_promptextragetupdate)r   r   r   r/   r   r   r   r   r   r   r"     s@   



















z.DiffusersExecutionStage._build_pipeline_kwargsc              	   C   sZ   dD ]!}t | j|d}|dur#z
t| jW   S  ty"   Y qw qtj r+dS dS )z*Get the device the pipeline is running on.)unettransformervaeNcudar   )	r[   r   nextr>   r   StopIterationr$   r   is_available)r   r`   	componentr   r   r   r   6  s   z,DiffusersExecutionStage._get_pipeline_devicec              
   C   s   |j durt|j tjr|j S |jdurt|jtjr|jS |js#dS t|jtr/|jd |_z(|jdrNtj|jdd}|	  t
t|jdW S t
|jdW S  tyr } ztd|j| W Y d}~dS d}~ww )zLoad input image from batch.Nr   )zhttp://zhttps://   )timeoutRGBz Failed to load image from %s: %s)condition_imagerk   r   pixel_values
image_pathrv   
startswithrequestsr   raise_for_statusopenr   contentconvert	ExceptionrI   error)r   r   responser1   r   r   r   r   A  s.   z)DiffusersExecutionStage._load_input_image)rF   
__module____qualname____doc__r   r   r   r   r2   dictbooltuplerv   r*   r#   r   r$   rl   r,   r\   rx   r.   r   r"   r   r   r   __classcell__r   r   r   r   r   ,   s(    
.#3 r   c                   @   s  e Zd ZU dZd ZdZg Zee e	d< 			d5dede
dee dB deeejjf dB d	edB f
d
dZdede
defddZdede
ddfddZdede
ddfddZdede
defddZde
dejfddZdd Z	d6de
deeejjf dB deeef fddZde
fddZde
fddZd7d d!Z	d6d"ed#edB dd fd$d%Zedee fd&d'Z e! d(e"de
de"fd)d*Z#e$						d8ded+edB d,ejdB d-ee%B dB d.e&j'dB dee dB deeejjf dB dd fd/d0Z(d6d1ed2edefd3d4Z)dS )9DiffusersPipelinez
    Pipeline wrapper that uses vanilla diffusers pipelines.

    This allows running any diffusers-supported model through sglang's infrastructure
    without requiring native sglang implementation.
    F_required_config_modulesN
model_pathr   required_config_modulesloaded_modulesexecutorc                 C   s`   || _ || _g | _i | _i | _i | _d| _|pt|d| _t	
d| | ||| _|   d S )NF)r   z"Loading diffusers pipeline from %s)r   r   _stages_stage_name_mappingmodulesmemory_usagespost_init_calledr
   r   rI   info_load_diffusers_pipeliner   _detect_pipeline_type)r   r   r   r   r   r   r   r   r   r   j  s   zDiffusersPipeline.__init__r   c                 C   s  |}t |}|| _| |}td| ||j|jd}|j}|dur:t|dd}|dur:||d< tdt	|j
 ztj|fi |}W n ty }	 zRdt|	v rtd zi |d|i}
d	|
d
< tj|fi |
}W n) ty } ztdt|	}|r|dnd}td| d|	 |d}~ww  W Y d}	~	n=d}	~	w ty }	 z-dt|	 v sdt|	 v rtd||	 tj|d< tj|fi |}n W Y d}	~	nd}	~	ww |t }| || | || | ||}td|jj
 |S )ak  Load the diffusers pipeline.

        Optimizations applied:
        - device_map: Loads models directly to GPU, warming up CUDA caching allocator
          to avoid small tensor allocations during inference.
        - Parallel shard loading: When using device_map with accelerate, model shards
          are loaded in parallel for faster initialization.
        z(Loading diffusers pipeline with dtype=%s)torch_dtypetrust_remote_coderevisionNquantization_configzUsing quantization config: %szhas no attributezJPipeline class not found in diffusers, trying custom_pipeline from repo...custom_pipelineTr   zhas no attribute (\w+)rg   unknownzPipeline class 'z' not found in diffusers and no custom pipeline.py in repo. Try: pip install --upgrade diffusers (some pipelines require latest version). Original error: r_   rp   z1Failed with dtype=%s, falling back to float32: %sr   zLoaded diffusers pipeline: %s)r   r   
_get_dtyperI   r   r   r   pipeline_configr[   rE   rF   r   from_pretrainedAttributeErrorr*   r   researchgroupRuntimeErrorlowerrJ   r$   float32tor   _apply_vae_optimizations_apply_attention_backend_apply_cache_ditr   )r   r   r   original_model_pathr_   load_kwargsconfigquant_configpiper1   custom_kwargse2match
class_namer   r   r   r     s   



	 
z*DiffusersPipeline._load_diffusers_pipeliner   c                 C   sn   |j }|du r	dS t|ddrt|dr|  td t|ddr3t|dr5|  td dS dS dS )	zFApply VAE memory optimizations (tiling, slicing) from pipeline config.Nvae_slicingFenable_vae_slicingz*Enabled VAE slicing for lower memory usage
vae_tilingenable_vae_tilingz*Enabled VAE tiling for large image support)r   r[   rZ   r   rI   r   r   )r   r   r   r   r   r   r   r     s   


z*DiffusersPipeline._apply_vae_optimizationsc           	      C   s   |j }|du r|j}|durt|dd}|du rdS | }dd tD ddhB }||v r5td| dS dD ]:}t||d}|durqt|d	rqz|| t	d
|| W q7 t
yp } ztd||| W Y d}~q7d}~ww q7dS )zApply attention backend setting from pipeline config or server_args.

        See: https://huggingface.co/docs/diffusers/main/en/optimization/attention_backends
        Available backends: flash, _flash_3_hub, sage, xformers, native, etc.
        Ndiffusers_attention_backendc                 S   s   h | ]}|j  qS r   )namer   )r8   r1   r   r   r   	<setcomp>  s    z=DiffusersPipeline._apply_attention_backend.<locals>.<setcomp>fa3fa4zSkipping diffusers attention backend '%s' because it matches a SGLang backend name. Use diffusers backend names when running the diffusers backend.)r   r   set_attention_backendz Set attention backend '%s' on %sz.Failed to set attention backend '%s' on %s: %s)attention_backendr   r[   r   r   rI   r]   rZ   r   r   r   rJ   )	r   r   r   backendr   sglang_backendscomponent_namer   r1   r   r   r   r     sJ   
z*DiffusersPipeline._apply_attention_backendc              
   C   s   |j }|s|S zddl}W n ty } ztd|d}~ww t|ds(tdz||}W n ty@ } ztd|d}~ww z|j|fi |}W n tyY   t	
d  w t	d |S )	z6Enable cache-dit for diffusers pipeline if configured.r   NzVcache-dit is required for --cache-dit-config. Install it with `pip install cache-dit`.load_configszNcache-dit>=1.2.0 is required for --cache-dit-config. Please upgrade cache-dit.zdFailed to load cache-dit config. Provide a YAML/JSON path (or a dict supported by cache-dit>=1.2.0).z1Failed to enable cache-dit for diffusers pipelinez(Enabled cache-dit for diffusers pipeline)cache_dit_config	cache_ditImportErrorr   rZ   r  r   r=   enable_cacherI   	exceptionr   )r   r   r   r  r  r1   cache_optionsr   r   r   r     sF   


z"DiffusersPipeline._apply_cache_ditc                 C   sd   t j rt jnt j}t|dr0|jr0|jj}|dkr t j}|S |dkr)t j}|S |dkr0t j}|S )z?
        Determine the dtype to use for model loading.
        r   fp16bf16fp32)	r$   r   is_bf16_supportedbfloat16float16rZ   r   dit_precisionr   )r   r   r_   r  r   r   r   r   9  s   zDiffusersPipeline._get_dtypec                    sN   | j jj  g d}t fdd|D | _td| jr"d dS d dS )z-Detect if this is an image or video pipeline.)rW   animatcogvideowanhunyuanc                 3   s    | ]}| v V  qd S r   r   )r8   indpipe_class_namer   r   r:   N  s    z:DiffusersPipeline._detect_pipeline_type.<locals>.<genexpr>zDetected pipeline type: %srW   r   N)r   r   rF   r   r?   is_video_pipelinerI   r]   )r   video_indicatorsr   r  r   r   J  s   z'DiffusersPipeline._detect_pipeline_typec                 C   s
   d| j iS )z4Skip sglang's module loading - diffusers handles it.diffusers_pipeline)r   )r   r   r   r   r   r   load_modulesT  s   
zDiffusersPipeline.load_modulesc                 C   s   |  t| jd dS )z;Create the execution stage wrapping the diffusers pipeline.diffusers_executionN)	add_stager   r   r   r   r   r   r   create_pipeline_stages\  s   
z(DiffusersPipeline.create_pipeline_stagesc                 C   s   dS )zInitialize the pipeline.Nr   r  r   r   r   initialize_pipelineb  s   z%DiffusersPipeline.initialize_pipelinec                 C   s,   | j rdS d| _ | | j | | j dS )zPost initialization hook.NT)r   r!  r   r   r   r   r   r   	post_initf  s
   zDiffusersPipeline.post_initstage
stage_namec                 C   sD   |du r	|  |}|| jv rtd| | j| || j|< | S )zAdd a stage to the pipeline.NzDuplicate stage name detected: )_infer_stage_namer   r=   r   rD   )r   r$  r%  r   r   r   r  n  s   


zDiffusersPipeline.add_stagec                 C   s   | j S )zList of stages in the pipeline.)r   r"  r   r   r   stages{  s   zDiffusersPipeline.stagesr   c                 C   s    | j s|   | j| j||S )z(Execute the pipeline on the given batch.)r   r#  r   executer'  )r   r   r   r   r   r   r2     s   zDiffusersPipeline.forwardr   r   r   argsc                 K   s4   ||d< t jdi |}	| ||	||d}
|
  |
S )z@Load a pipeline from a pretrained model using diffusers backend.r   )r   r   Nr   )r   from_kwargsr#  )clsr   r   r   r   r)  r   r   r/   r   r   r   r   r   r     s   z!DiffusersPipeline.from_pretrainedmodule_namedefault_valuec                 C   s   |dkr| j S | j||S )zGet a module by name.r  )r   r   r   )r   r,  r-  r   r   r   
get_module  s   zDiffusersPipeline.get_module)NNNr   )r   N)NNNNNN)*rF   r   r   r   pipeline_namer  r   rv   r*   __annotations__r   r   r$   nnModuler	   r   r   r   r   r   r   r_   r   r   r  r   r!  r#  r   r  propertyr'  r%   r   r2   classmethodr   argparse	Namespacer   r.  r   r   r   r   r   ^  s   
 

R.&


	



r   )/r   r5  r5   r   r&   ior   typingr   numpyrm   r   r$   torchvision.transforms
transformsrt   	diffusersr   PILr   3sglang.multimodal_gen.configs.pipeline_configs.baser   )sglang.multimodal_gen.runtime.distributedr   Csglang.multimodal_gen.runtime.pipelines_core.composed_pipeline_baser   Hsglang.multimodal_gen.runtime.pipelines_core.executors.pipeline_executorr	   Dsglang.multimodal_gen.runtime.pipelines_core.executors.sync_executorr
   ;sglang.multimodal_gen.runtime.pipelines_core.schedule_batchr   3sglang.multimodal_gen.runtime.pipelines_core.stagesr   'sglang.multimodal_gen.runtime.platformsr   )sglang.multimodal_gen.runtime.server_argsr   6sglang.multimodal_gen.runtime.utils.hf_diffusers_utilsr   1sglang.multimodal_gen.runtime.utils.logging_utilsr   rF   rI   r   r   
EntryClassr   r   r   r   <module>   s@     4  K