o
    ٷiRe                     @   s4  d dl Z d dlZd dlZd dlmZmZ d dlmZmZm	Z	 d dl
mZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ erXd dlmZ eeZeeG dd dZeG dd dZeG dd dZeG dd dZ eG dd dZ!G dd de j"Z#ddiZ$dS )    N)CallableMapping)	dataclassfieldfields)TYPE_CHECKINGAny)model_validator)Self)config)init_logger)DiffusionQuantizationConfigget_diffusion_quant_config)is_port_availabler   c                   @   s   e Zd ZU dZdZeed< 	 dZeed< 	 dZeed< 	 dZ	edB ed< 	 dZ
eed< 	 dZeed	< 	 dZeed
< 	 dZeed< 	 dZeed< 	 dZeed< 	 dZeed< 	 edddefddZdddZedeeef dd fddZdS )DiffusionParallelConfigz8Configuration for diffusion model distributed execution.   pipeline_parallel_sizedata_parallel_sizetensor_parallel_sizeNsequence_parallel_sizeulysses_degreering_degreecfg_parallel_sizevae_patch_parallel_sizeFuse_hsdphsdp_shard_sizehsdp_replicate_sizeafter)modereturnc                 C   s
  | j dks	J d| jdksJ d| jdksJ d| jdks$J d| jdks-J d| jdks6J d| jdks?J d| jd	v sLJ d
| j | jdksUJ d| j| j| j ksnJ d| j d| j d| j | jr| j	dkszJ d| j
dksJ d| S )zAValidates the config relationships among the parallel strategies.r   z"Pipeline parallel size must be > 0zData parallel size must be > 0z Tensor parallel size must be > 0z"Sequence parallel size must be > 0zUlysses degree must be > 0zRing degree must be > 0zCFG parallel size must be > 0)r      z*CFG parallel size must be 1 or 2, but got z#VAE patch parallel size must be > 0z_Sequence parallel size must be equal to the product of ulysses degree and ring degree, but got z != z * zHSDP replicate size must be > 0z<HSDP shard size must be > 0 (should be set in __post_init__))r   r   r   r   r   r   r   r   r   r   r   self r%   L/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/diffusion/data.py_validate_parallel_configF   s,   z1DiffusionParallelConfig._validate_parallel_configc              
   C   s&  | j d u r| j| j | _ | j| j | j | j | j | j }| jr| jdkr/td| j d| j	dkrc| j
dkr=td|dkrEtd|| j
 dkrXtd| j
 d	| d
|| j
 | _	|| _d S | j
| j	 }|dkrr|| _d S ||krtd| j
 d| j	 d| d| d	|| _d S || _d S )Nr   zRHSDP (use_hsdp=True) cannot be used with Tensor Parallelism (tensor_parallel_size=z.). Set tensor_parallel_size=1 when using HSDP.r   r   zhsdp_replicate_size must be > 0zCannot auto-calculate hsdp_shard_size when other parallelism is all 1. Please specify hsdp_shard_size explicitly for standalone HSDP.z,Invalid HSDP configuration: replicate_size (z!) must evenly divide world_size (z) when shard_size is -1.zHSDP dimensions (u    × z = z0) must equal world_size from other parallelism ())r   r   r   r   r   r   r   r   
ValueErrorr   r   
world_size)r$   other_parallel_world_sizehsdp_world_sizer%   r%   r&   __post_init__]   sj   








z%DiffusionParallelConfig.__post_init__datac                 C   s*   t |tstdt|| di |S )z
        Create DiffusionParallelConfig from a dictionary.

        Args:
            data: Dictionary containing parallel configuration parameters

        Returns:
            DiffusionParallelConfig instance with parameters set from dict
        z#Expected parallel config dict, got Nr%   
isinstancedict	TypeErrortypeclsr.   r%   r%   r&   	from_dict   s   
z!DiffusionParallelConfig.from_dictr!   N)__name__
__module____qualname____doc__r   int__annotations__r   r   r   r   r   r   r   r   boolr   r   r	   r
   r'   r-   classmethodr1   strr   r6   r%   r%   r%   r&   r       s:   
 
; r   c                   @   s   e Zd ZU dZeedZeeef e	d< e
deeef dd fddZdeeef fdd	Zddeded
B defddZdedefddZd
S )TransformerConfigz9Container for raw transformer configuration dictionaries.default_factoryparamsr.   r!   c                 C   s*   t |tstdt|| t|dS )Nz&Expected transformer config dict, got )rD   r/   r4   r%   r%   r&   r6      s   
zTransformerConfig.from_dictc                 C   s
   t | jS N)r1   rD   r#   r%   r%   r&   to_dict      
zTransformerConfig.to_dictNkeydefaultc                 C   s   | j ||S rE   )rD   get)r$   rH   rI   r%   r%   r&   rJ      s   zTransformerConfig.getitemc              
   C   s:   t | d}z|| W S  ty } zt||d }~ww )NrD   )object__getattribute__KeyErrorAttributeError)r$   rK   rD   excr%   r%   r&   __getattr__   s   

zTransformerConfig.__getattr__rE   )r8   r9   r:   r;   r   r1   rD   r@   r   r=   r?   r6   rF   rJ   rQ   r%   r%   r%   r&   rA      s   
 rA   c                   @   s
  e Zd ZU dZdZeed< dZee dB ed< dZ	e
ed< dZe
ed	< d
Ze
ed< dZe
ed< dZeed< dZe
ed< dZeed< dZe
ed< dZedB ed< dZeed< dZe
dB ed< eeddZeeef ed< edeeef dd fddZdedefd d!ZdS )"DiffusionCacheConfiga  
    Configuration for cache adapters (TeaCache, cache-dit, etc.).

    This dataclass provides a unified interface for cache configuration parameters.
    It can be initialized from a dictionary and accessed via attributes.

    Common parameters:
        - TeaCache: rel_l1_thresh, coefficients (optional)
        - cache-dit: Fn_compute_blocks, Bn_compute_blocks, max_warmup_steps,
                    residual_diff_threshold, enable_taylorseer, taylorseer_order,
                    scm_steps_mask_policy, scm_steps_policy

    Example:
        >>> # From dict (user-facing API) - partial config uses defaults for missing keys
        >>> config = DiffusionCacheConfig.from_dict({"rel_l1_thresh": 0.3})
        >>> # Access via attribute
        >>> print(config.rel_l1_thresh)  # 0.3 (from dict)
        >>> print(config.Fn_compute_blocks)  # 8 (default)
        >>> # Empty dict uses all defaults
        >>> default_config = DiffusionCacheConfig.from_dict({})
        >>> print(default_config.rel_l1_thresh)  # 0.2 (default)
    g?rel_l1_threshNcoefficientsr   Fn_compute_blocksr   Bn_compute_blocks   max_warmup_stepsr   max_cached_stepsgQ?residual_diff_threshold   max_continuous_cached_stepsFenable_taylorseertaylorseer_orderscm_steps_mask_policydynamicscm_steps_policynum_inference_steps)rC   repr_extra_paramsr.   r!   c                    st   t |tstdt|dd t| D   fdd| D } fdd| D }| d	i |d|i}|S )
z
        Create DiffusionCacheConfig from a dictionary.

        Args:
            data: Dictionary containing cache configuration parameters

        Returns:
            DiffusionCacheConfig instance with parameters set from dict
        z Expected cache config dict, got c                 S      h | ]}|j qS r%   name.0fr%   r%   r&   	<setcomp>      z1DiffusionCacheConfig.from_dict.<locals>.<setcomp>c                    s(   i | ]\}}| v r| d s||qS )_)
startswithri   kvfield_namesr%   r&   
<dictcomp>  s   ( z2DiffusionCacheConfig.from_dict.<locals>.<dictcomp>c                    s   i | ]\}}| vr||qS r%   r%   ro   rr   r%   r&   rt         rd   Nr%   )r0   r1   r2   r3   r   items)r5   r.   known_paramsextra_paramsinstancer%   rr   r&   r6   	  s   
zDiffusionCacheConfig.from_dictrK   c                 C   sT   |dks	| drt| |S t| d}||v r|| S td| jj d| d)z
        Allow access to extra parameters via attribute access.

        This enables accessing parameters that weren't explicitly defined
        in the dataclass fields but were passed in the dict.
        rd   rm   'z' object has no attribute ')rn   rL   rM   rO   	__class__r8   )r$   rK   extrar%   r%   r&   rQ   %  s   z DiffusionCacheConfig.__getattr__)r8   r9   r:   r;   rS   floatr=   rT   listrU   r<   rV   rX   rY   rZ   r\   r]   r>   r^   r_   r@   ra   rb   r   r1   rd   r   r?   r6   rQ   r%   r%   r%   r&   rR      s&   
 rR   c                	   @   sb  e Zd ZU dZedB ed< dZedB ed< ejZ	ej	ed< e
edZeed< dZedB ed< dZeed	< e
edZeed
< dZeed< e
edZeeeef B ed< dZeed< dZeed< dZedB ed< dZeed< dZedB ed< dZedB ed< dZedB ed< dZedB ed< dZ e!ed< dZ"edB ed< dZ#eed< dZ$eed< dZ%eed< dZ&eed< dZ'eed < dZ(eed!< dZ)edB ed"< d#Z*eed$< dZ+eed%< dZ,eed&< dZ-eed'< d(Z.e!ed)< dZ/edB ed*< dZ0edB ed+< dZ1edB ed,< dZ2eeef dB ed-< d.Z3eed/< dZ4edB ed0< dZ5edB ed1< d2Z6eed3< dZ7eed4< dZ8edB ed5< e
edZ9eeef ed6< e
d7d8 dZ:eeef ed9< dZ;edB ed:< dZ<e!dB ed;< dZ=e!dB ed<< dZ>eed=< d>Z?eed?< e
edZ@eeef ed@< dZAedB edA< dZBdBedC< dRd1edFedGedHefdIdJZCdKdL ZDdSdMdNZEeFdOedHd fdPdQZGdS )TOmniDiffusionConfigNmodelmodel_class_namedtyperB   tf_model_configattention_backendnonecache_strategyparallel_configcache_backendcache_configFenable_cache_dit_summarympdistributed_executor_backend	nccl_porttrust_remote_coderevisionnum_gpusdist_timeout	lora_pathg      ?
lora_scalemax_cpu_loraspiloutput_typeenable_cpu_offloadenable_layerwise_offloadTpin_cpu_memoryvae_use_slicingvae_use_tilingmask_strategy_file_path   skip_time_stepsenforce_eagerenable_sleep_modedisable_autocastg        VSA_sparsitymoba_config_pathmaster_portworker_extension_clscustom_pipeline_argsrI   diffusion_load_formathostporti  scheduler_portenable_stage_verificationprompt_file_pathmodel_pathsc                   C   s
   dddS )NT)transformervaer%   r%   r%   r%   r&   <lambda>  s   zOmniDiffusionConfig.<lambda>model_loadedoverride_transformer_cls_nameboundary_ratio
flow_shiftsupports_multimodal_inputsinfo	log_levelomni_kv_configquantizationz3DiffusionQuantizationConfig | dict[str, Any] | Nonequantization_config*   d   port_incmax_attemptsr!   c                 C   s   d}|}||k r7t |r|dkrtd| d| d |S |d7 }|dk r+||7 }ndtdd }||k std	| d
| d)a  
        Find an available port with retry logic.

        Args:
            port: Initial port to check
            port_inc: Port increment for each attempt
            max_attempts: Maximum number of attempts to find an available port

        Returns:
            An available port number

        Raises:
            RuntimeError: If no available port is found after max_attempts
        r   zPort z was unavailable, using port z insteadr   i`  i  i  z$Failed to find available port after z attempts (started from port r(   )r   loggerr   randomrandintRuntimeError)r$   r   r   r   attemptsoriginal_portr%   r%   r&   settle_port  s   
zOmniDiffusionConfig.settle_portc              
   C   s\  | j pdtdd }| |d| _ t| jtr t| j| _n
t| jts*t | _| j	d u r=| jd ur:| jj
| _	nd| _	| j	| jj
k rRtd| j	 d| jj
 dt| jtrtjtjtjtjtjtjtjtjtjd		}| j }||v r||| | _ntd
| j d tj| _t| jtrt| j| _n
t| jtst | _| jd us| jd urddlm} t| jtrt| j}|d| j}dd | D }| jd ur|d ur|| jkrtd| jd|d t|fi || _n$| jd u r| jd urt| j| _nt| j|stdt | j| j!d u r"d| _!d S | j!dk r,tdd S )Ni5u  r   r   %   r   z
num_gpus (z ) < parallel_config.world_size (r(   )	autobfloat16bf16float16fp16halffloat32fp32r}   zUnknown dtype string 'z', defaulting to bfloat16r   methodc                 S   s   i | ]\}}|d kr||qS )r   r%   ro   r%   r%   r&   rt   %  ru   z5OmniDiffusionConfig.__post_init__.<locals>.<dictcomp>z/Conflicting quantization methods: quantization=z , quantization_config['method']=z&. Using quantization_config['method'].zNquantization_config must be a DiffusionQuantizationConfig, dict, or None, got z-max_cpu_loras must be >= 1 for diffusion LoRA)"r   r   r   r   r0   r   r1   r   r6   r   r*   r)   r   r@   torchr   r   r   lowerr   warningr   rR   r   r    vllm_omni.diffusion.quantizationr   r   rJ   rv   r   r2   r3   r   )r$   initial_master_port	dtype_mapdtype_lowerr   config_dictquant_methodquant_kwargsr%   r%   r&   r-     sx   





z!OmniDiffusionConfig.__post_init__c                 C   s   | j dv | _d S )N>   QwenImageEditPlusPipeline)r   r   r#   r%   r%   r&   update_multimodal_support<  s   z-OmniDiffusionConfig.update_multimodal_supportkwargsc                    s   d|v rd|vr|d |d< | dd  d|vr.tjdp#tjd}|r*| nd|d< dd t| D   fd	d
| D }| di |S )Nstatic_lora_scaler   r   DIFFUSION_CACHE_BACKENDDIFFUSION_CACHE_ADAPTERr   c                 S   re   r%   rf   rh   r%   r%   r&   rk   P  rl   z2OmniDiffusionConfig.from_kwargs.<locals>.<setcomp>c                    s   i | ]\}}| v r||qS r%   r%   ro   valid_fieldsr%   r&   rt   Q  ru   z3OmniDiffusionConfig.from_kwargs.<locals>.<dictcomp>r%   )poposenvironrJ   r   r   rv   )r5   r   r   filtered_kwargsr%   r   r&   from_kwargs?  s   zOmniDiffusionConfig.from_kwargs)r   r   r7   )Hr8   r9   r:   r   r@   r=   r   r   r   r   r   rA   r   r   r   r   r   r   r1   r   rR   r   r   r>   r   r   r<   r   r   r   r   r   r   r}   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r-   r   r?   r   r%   r%   r%   r&   r   6  sv   
 	#
Wr   c                   @   s   e Zd ZU dZdZejdB ed< dZe	ej dB ed< dZ
ejdB ed< dZe	ej dB ed< dZedB ed< dZedef dB ed	< dS )
DiffusionOutputz2
    Final output (after pipeline completion)
    Noutputtrajectory_timestepstrajectory_latentstrajectory_decodederror.post_process_func)r8   r9   r:   r;   r   r   Tensorr=   r   r~   r   r   r   r@   r   r   r   r%   r%   r%   r&   r   V  s   
 r   c                   @   s\   e Zd Ze Ze Ze Ze Ze Z	e Z
e Ze Ze Zdd ZdS )AttentionBackendEnumc                 C   s
   | j  S rE   )rg   r   r#   r%   r%   r&   __str__s  rG   zAttentionBackendEnum.__str__N)r8   r9   r:   enumr   FASLIDING_TILE_ATTN
TORCH_SDPA	SAGE_ATTNSAGE_ATTN_THREEVIDEO_SPARSE_ATTN
VMOBA_ATTNAITERNO_ATTENTIONr   r%   r%   r%   r&   r   h  s    r   r3   shutdown)%r   r   r   collections.abcr   r   dataclassesr   r   r   typingr   r   r   pydanticr	   typing_extensionsr
   vllm.config.utilsr   vllm.loggerr   r   r   r   'vllm_omni.diffusion.utils.network_utilsr   r8   r   r   rA   rR   r   r   Enumr   SHUTDOWN_MESSAGEr%   r%   r%   r&   <module>   s@    s  !