o
    ۷i                     @   s.  d dl mZ d dlmZ d dlmZ d dlZd dlZd dlmZ d dlm	Z	m
Z
 d dlmZmZ d dlmZ d d	lmZmZmZmZ d d
lmZ eeZeG dd dZdededejjde	fddZdejdedejfddZ dddddejde!dedB de	dB de"eeejge!f  ddfddZ#dS )    )Callable)	dataclass)AnyN)nn)
DeviceMeshinit_device_mesh)MixedPrecisionPolicyfully_shard)init_logger)get_fs_groupget_fully_shard_rankget_fully_shard_world_sizeget_world_group)current_omni_platformc                   @   st   e Zd ZU dZdZeed< dZeed< dZ	eed< e
jZe
jed< e
jZe
jed	< d
Ze
jd
B ed< dZeed< d
S )HSDPInferenceConfigzzConfiguration for HSDP inference.

    This is a runtime config created from DiffusionParallelConfig's HSDP settings.
    Fenabled   hsdp_replicate_sizehsdp_shard_sizeparam_dtypereduce_dtypeNoutput_dtypeTreshard_after_forward)__name__
__module____qualname____doc__r   bool__annotations__r   intr   torchbfloat16r   dtypefloat32r   r   r    r%   r%   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/hsdp.pyr      s   
 r   device_typereplicate_sizeshard_pgreturnc                 C   sP   t j|}|| }t |||}t| ||fdd}td|||  |S )av  Create a 2D DeviceMesh for HSDP using an existing ProcessGroup for the shard dimension.

    Args:
        device_type: The device type (e.g., "cuda", "npu")
        replicate_size: Number of replica groups
        shard_pg: The ProcessGroup for the shard dimension (from FS GroupCoordinator)

    Returns:
        A 2D DeviceMesh with dimensions ("replicate", "shard")
    )	replicateshard)
mesh_shapemesh_dim_namesz<Created HSDP mesh: replicate_size=%d, shard_size=%d, mesh=%s)	r!   distributedget_world_sizearangereshaper   loggerdebugtolist)r'   r(   r)   
shard_size
world_sizemesh_tensordevice_meshr%   r%   r&   _create_hsdp_mesh-   s   	r:   modelhsdp_configc              	   C   s  |j stdt }t }|j}|j}t }t }|j}|j	}	||	kr.td| d|	 dt
d||	|||| t|j|j|jdd}
tj}t|||jd}t| d	d
}|r]t|dkrhtdt| j dt| |j|
||d |  D ]}d|_qvt
dt| j | S )a  
    Apply HSDP sharding to a model that already has weights loaded.

    This function redistributes the model's parameters across GPUs using HSDP.
    The model should already have its weights loaded via the standard load_weights method.

    Args:
        model: Model instance with weights already loaded
        hsdp_config: HSDP configuration with HSDP mesh dimensions

    Returns:
        HSDP-wrapped model ready for inference
    zHSDP is not enabled in configzFS group world_size (z") does not match HSDP shard_size (zK). Ensure fully_shard_degree is set correctly in initialize_model_parallel.zfHSDP Inference: replicate_size=%d, shard_size=%d, world_size=%d, rank=%d, fs_world_size=%d, fs_rank=%dF)r   r   r   cast_forward_inputs)r'   r(   r)   _hsdp_shard_conditionsNr   zModel z& has no _hsdp_shard_conditions defined)r   	mp_policymeshhsdp_shard_conditionszHSDP applied to model: %s)r   
ValueErrorr   r   r7   rank_in_groupr   r   r   r   r3   infor   r   r   r   r   r'   r:   device_groupgetattrlentyper   shard_modelr   
parametersrequires_grad)r;   r<   world_groupfs_groupr7   rankfs_world_sizefs_rankr   r   r?   r'   r9   rA   paramr%   r%   r&   apply_hsdp_to_modelX   sf   
rR   T)r   r?   r@   r   r?   r@   rA   c                   s   |||d}d}t t|  D ]\ t fdd|D r,t fi | |d7 }q|dkr5tdt| fi | td| dS )	z?Apply HSDP sharding to model modules based on shard conditions.)r   r@   r?   r   c                 3   s    | ]}| V  qd S )Nr%   ).0condmodulenamer%   r&   	<genexpr>   s    zshard_model.<locals>.<genexpr>r   zNo modules were shardedzSharded %d modules + rootN)reversedlistnamed_modulesanyr	   rB   r3   rD   )r;   r   r?   r@   rA   hsdp_kwargsnum_shardedr%   rU   r&   rI      s   
rI   )$collections.abcr   dataclassesr   typingr   r!   torch.distributedr   r   r   torch.distributed.fsdpr   r	   vllm.loggerr
   .vllm_omni.diffusion.distributed.parallel_stater   r   r   r   vllm_omni.platformsr   r   r3   r   strr    r/   ProcessGroupr:   ModulerR   r   rZ   rI   r%   r%   r%   r&   <module>   s\   
+
\