o
    ٷi&o                     @  s   d Z ddlmZ ddlZddlmZ ddlmZ ddlZddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZ ddlmZmZ dd	lmZmZ eeZd
ZdZeG dd dZd-ddZG dd deZ G dd deZ!d.ddZ"d.ddZ#d/d$d%Z$d0d&d'Z%	d1d2d)d*Z&d3d+d,Z'dS )4a  Sequence Parallelism hooks for non-intrusive SP support.

This module implements the hook-based mechanism for applying sequence parallelism
to models without modifying their forward() methods.

Usage:
    1. Define _sp_plan on your model class (corresponds to diffusers' _cp_plan)
    2. Call apply_sequence_parallel(model, config, plan) to enable SP
    3. Call remove_sequence_parallel(model, plan) to disable SP

The hooks automatically shard inputs before forward and gather outputs after,
based on the plan specification.
    )annotationsN)	dataclass)Any)init_logger)AnySequenceParallelInputSequenceParallelConfigSequenceParallelInputSequenceParallelModelPlanSequenceParallelOutputSequenceParallelPartialInput)	sp_gathersp_shard)HookRegistry	ModelHookzsp_input---{}zsp_output---{}c                   @  s:   e Zd ZU dZdZded< dZded< 		ddddZdS )ModuleForwardMetadatazMetadata for mapping forward() parameter names to args/kwargs positions.

    This caches the inspection of a module's forward signature to efficiently
    locate parameters by name in subsequent calls.
    Nzdict[str, int] | Nonecached_parameter_indicesztype | None_cls 
identifierstrargstuplekwargsdict | Nonereturntuple[Any, bool, int | None]c                 C  s  |pi }||v r|| ddfS | j dur9| j |d}|du r'td| d|t|k r4|| d|fS dd|fS | jdu rBtdtt| jjj	
 }|dd }dd	 t|D | _ || j vrktd| d
| j | }|t|kr{dd|fS || d|fS )a(  Get a parameter value from args or kwargs by name.

        Args:
            identifier: The parameter name to look up.
            args: Positional arguments passed to forward.
            kwargs: Keyword arguments passed to forward.

        Returns:
            Tuple of (value, is_kwarg, index).
            - value: The parameter value (or None if not found)
            - is_kwarg: True if found in kwargs
            - index: Position in args if found there

        Raises:
            ValueError: If parameter not found in signature.
        TNParameter 'z' not found in cached indices.Fz$Model class is not set for metadata.   c                 S  s   i | ]\}}||qS r   r   ).0iparamr   r   _/home/ubuntu/.local/lib/python3.10/site-packages/vllm_omni/diffusion/hooks/sequence_parallel.py
<dictcomp>u   s    zIModuleForwardMetadata._get_parameter_from_args_kwargs.<locals>.<dictcomp>z"' not found in function signature.)r   get
ValueErrorlenr   listinspect	signatureforward
parameterskeys	enumerate)selfr   r   r   indexr*   r   r   r!   _get_parameter_from_args_kwargsJ   s*   





z5ModuleForwardMetadata._get_parameter_from_args_kwargsr   N)r   r   r   r   r   r   r   r   )__name__
__module____qualname____doc__r   __annotations__r   r/   r   r   r   r!   r   ?   s   
 r   module	nn.Moduler   c                 C  sZ   t | dr+t| jdkr+tt| j }|dur|} n	 | S t | dr+t| jdks| S )zUnwrap a module from any wrappers to get the original class.

    Args:
        module: Potentially wrapped module.

    Returns:
        The unwrapped module.
    _modulesr   N)hasattrr%   r8   nextitervalues)r6   innerr   r   r!   _unwrap_module   s   
r>   c                      sd   e Zd ZdZd) fdd	Zd*ddZd+ddZd,ddZd-ddZ		d.d/d$d%Z	d0d'd(Z
  ZS )1SequenceParallelSplitHookaF  Hook for splitting inputs before a module's forward pass.

    This hook is registered to modules that need their inputs sharded
    across sequence parallel ranks. It intercepts the forward call,
    shards specified inputs according to the plan, and passes the
    sharded inputs to the original forward.

    For split_output=True inputs, it shards the output instead.

    Supports both SequenceParallelInput (full split) and SequenceParallelPartialInput
    (partial split for text/image separation).

    Note: This corresponds to `ContextParallelSplitHook` in diffusers.
    metadataJdict[str | int, AnySequenceParallelInput | list[AnySequenceParallelInput]]configr   r   Nonec                   s&   t    || _|| _d | _i | _d S N)super__init__r@   rB   module_forward_metadata_text_len_cacher-   r@   rB   	__class__r   r!   rF      s
   

z"SequenceParallelSplitHook.__init__r6   r7   c                 C  s   t |j}t|d| _|S )N)r   )r>   rK   r   rG   )r-   r6   clsr   r   r!   initialize_hook   s   
z)SequenceParallelSplitHook.initialize_hookr   r   r   tuple[tuple, dict]c              	   O  s  t |}| j  | j D ]\}}t|ttfr|jrq| j	
|||\}}}	|du r-qt|tjr<| ||||}nit|t tfrt|t tfsWtd| dt|j t|t|krptdt| d| dt| g }
t|D ]\}}t|r|| js| ||| ||}|
| qvt||
}n
tdt|j |r|||< q|	dur|	t|k r|||	< qtd| d	|| _t|| _t||fS )
zShard inputs before forward.Nz<Expected list/tuple of SequenceParallelInput for parameter 'z!' which is a list/tuple, but got 	Expected z elements for parameter 'z', got z%Unsupported input type for sharding: zFailed to update parameter 'z' after sharding.)r&   rH   clearr@   items
isinstancer   r   split_outputrG   r/   torchTensor_prepare_sp_inputr   r$   typer1   r%   r,   	is_tensorappend_last_kwargs
_last_args)r-   r6   r   r   	args_listnamespm	input_valis_kwargr.   sharded_input_valr   xr   r   r!   pre_forward   sH   

"


z%SequenceParallelSplitHook.pre_forwardoutputc                 C  s  ddl m}m} t|tj}t|ttfotdd |D }|s$|s$|S |r)|gnt|}d}| j	
 D ]@\}	}
t|	ts>q4t|
ttfrH|
jsIq4|	t|kr\td|	 dt| d||	 }| ||
| j| j||	< ||	 |urtd	}q4|r| r|  jd
7  _|r|d S t||S )z,Shard outputs for split_output=True entries.r   get_forward_contextis_forward_context_availablec                 s      | ]	}t |tjV  qd S rD   rR   rT   rU   r   rb   r   r   r!   	<genexpr>       z9SequenceParallelSplitHook.post_forward.<locals>.<genexpr>FzIndex z$ out of bounds for output of length .Tr   )#vllm_omni.diffusion.forward_contextrf   rg   rR   rT   rU   r&   r   allr@   rQ   intr   r   rS   r%   r$   rV   r[   rZ   _sp_shard_depthrW   )r-   r6   rd   rf   rg   rX   is_tensor_listoutput_listactually_shardedr.   r^   originalr   r   r!   post_forward   s,    

z&SequenceParallelSplitHook.post_forwardsp_inputr   r   dictrp   c           	   
   C  s   |j }t|tr
|S || jv r| j| S z@| j|||\}}}|du r,td| dt|tjr8|j	d }nt|tr@|}ntd| dt
|j || j|< |W S  tyk } ztd| d| |d}~ww )	z2Resolve text length from the source specification.Nr   z(' is None, cannot determine text length.r   z#Cannot determine text length from 'z
' of type z#Failed to resolve text_len_source 'z': )text_len_sourcerR   rp   rH   rG   r/   r$   rT   rU   shaperW   r1   )	r-   rw   r   r   sourceval_text_lener   r   r!   _resolve_text_len  s(   




z+SequenceParallelSplitHook._resolve_text_lenr   Nrb   torch.Tensorr   r   c           
      C  s   |pi }|j dur!| |j kr!td|j  d|  d |S t|tr8|jr0| ||jS t	||jddS t|t
rj| |||}|j}||d|}|||||| }t	||dd}	tj||	g|dS td	t|j )
z4Shard a tensor according to the input specification.NzExpected tensor with  dims, got z. Skipping split.Fvalidater   dimzUnsupported input config type: )expected_dimsr   loggerwarning_oncerR   r   auto_pad_shard_with_auto_pad	split_dimr   r   r   narrowsizerT   catr$   rW   r1   )
r-   rb   rw   r   r   r~   r   	text_part
image_partimage_part_shardedr   r   r!   rV   0  s    

z+SequenceParallelSplitHook._prepare_sp_inputr   c                 C  sd  ddl m} ddlm}m}m} ddlm}m} | }	|	dkr!|S |	|}
|
|	 }|dkr5t
||ddS |d}|jsLtd	|
 d
|	 d|  d| dkr\td	|
 d
|	 d|	| }|
| }t|j}|||< tj||j|jd}tj||g|d}| r| }|jdu r||_|
|_td|
 d| d| d|	 d| d | }|j|	|d| S )aP  Shard tensor with automatic padding and attention mask creation.

        When sequence length is not divisible by SP world size, this method:
        1. Pads the tensor to make it divisible
        2. Creates an attention mask indicating valid vs padding positions
        3. Stores the mask and padding info in ForwardContext
        r   )get_attn_backend)get_ring_parallel_world_sizeget_sequence_parallel_rank get_sequence_parallel_world_sizere   r   Fr   zSequence length (z%) is not divisible by SP world size (z). Cannot use zZ which does not support attention_mask. Please switch to SDPA or Ascend attention backend.ze). Cannot use Ring attention which does not support attention_mask. Please switch to Ulysses SP only.)dtypedevicer   NzAuto-padded sequence from z to z (pad_size=z, world_size=z, dim=))&vllm_omni.diffusion.attention.selectorr   .vllm_omni.diffusion.distributed.parallel_stater   r   r   rn   rf   rg   r   r   supports_attention_maskr$   get_namer&   rz   rT   zerosr   r   r   sp_original_seq_lensp_padding_sizer   debugchunk)r-   rb   r   r   r   r   r   rf   rg   
world_sizeseq_len	remainderattn_backendpad_sizepadded_seq_len	pad_shapepaddingx_paddedctxrankr   r   r!   r   T  sV   



z.SequenceParallelSplitHook._shard_with_auto_pad)r@   rA   rB   r   r   rC   r6   r7   r   r7   )r6   r7   r   r   r   r   r   rN   r6   r7   rd   r   r   r   )rw   r   r   r   r   rx   r   rp   r0   )
rb   r   rw   r   r   r   r   r   r   r   )rb   r   r   rp   r   r   )r1   r2   r3   r4   rF   rM   rc   rv   r   rV   r   __classcell__r   r   rJ   r!   r?      s    


6
!'$r?   c                      s6   e Zd ZdZd fdd	ZdddZdddZ  ZS )SequenceParallelGatherHookaG  Hook for gathering outputs after a module's forward pass.

    This hook is registered to modules that need their outputs gathered
    from all sequence parallel ranks. It intercepts the output and gathers
    it according to the plan specification.

    Note: This corresponds to `ContextParallelGatherHook` in diffusers.
    r@   5SequenceParallelOutput | list[SequenceParallelOutput]rB   r   r   rC   c                   s*   t    t|tr|g}|| _|| _d S rD   )rE   rF   rR   r
   r@   rB   rI   rJ   r   r!   rF     s
   


z#SequenceParallelGatherHook.__init__r6   r7   c                 C  s   |S rD   r   )r-   r6   r   r   r!   rM     s   z*SequenceParallelGatherHook.initialize_hookrd   r   c                 C  s  ddl m}m} t|tj}|r|g}nt|ttfr$tdd |D s.t	dt
|j t|}t|t| jkrKt	dt| j dt| dd	}| rV| }|j}d
}t| jD ]X\}	}
|
d	u rfq]||	 }|
jd	ur| |
jkrtd|
j d|  d q]t||
jd
d}|d	ur||
j|kr||
jd|}td|j d| d |||	< d}q]|r| r| }td|jd |_|r|d S t
||S )z;Gather outputs after forward and remove padding if applied.r   re   c                 s  rh   rD   ri   rj   r   r   r!   rk     rl   z:SequenceParallelGatherHook.post_forward.<locals>.<genexpr>z.Expected tensor or list/tuple of tensors, got rO   z outputs, got rm   NFzExpected output tensor with r   z. Skipping gather.r   z Removed padding: gathered shape z (original_seq_len=r   Tr   )rn   rf   rg   rR   rT   rU   r&   r   ro   r$   rW   r1   r%   r@   r   r,   r   r   r   r   r   
gather_dimr   r   r   rz   maxrq   )r-   r6   rd   rf   rg   rX   original_seq_lenr   actually_gatheredr   r^   rb   gatheredr   r   r!   rv     sB     
z'SequenceParallelGatherHook.post_forward)r@   r   rB   r   r   rC   r   r   )r1   r2   r3   r4   rF   rM   rv   r   r   r   rJ   r!   r     s
    	
r   modelr]   r   nn.Module | list[nn.Module]c                 C  s    | ddkrtdt| |S )aa  Get a submodule by dotted name, supporting wildcards.

    Args:
        model: The root module.
        name: Dotted path to submodule. Use "*" to match all children
            of a ModuleList.

    Returns:
        The submodule or list of submodules if wildcard used.

    Raises:
        ValueError: If the path is invalid or module not found.
    *r   z.Wildcard '*' can only be used once in the name)countr$   _find_submodule_by_name)r   r]   r   r   r!   _get_submodule_by_name  s   
r   c                 C  s   |dkr| S d|v r| ddn|df\}}|dkr?t| tjs$tdg }| D ]}t||}t|ts7|g}|| q(|S t| |rNt	| |}t||S td| d| j
j d)z,Recursive helper for _get_submodule_by_name. rm   r   r   z-Wildcard '*' can only be used with ModuleList'z' is not a submodule of ')splitrR   nn
ModuleListr$   r   r&   extendr9   getattrrK   r1   )r   r]   
first_atomremaining_name
submodules	submodulesubsubmodulesr   r   r!   r     s"    




r   rB   r   planr	   rC   c           
   
   C  s  t d|j d|j dt|   | D ]t\}}t| |}t|ts)|g}t d| dt	| d |D ]R}t|t
rKt||}t|}n5t|tttfrvt|tr[|g}tdd |D sktd	| t||}t|}n
td
t|j t|}	|	|| q9qdS )a  Apply sequence parallel hooks to a model according to the plan.

    This function registers hooks on the specified submodules to automatically
    shard inputs and gather outputs for sequence parallelism.

    Note: This corresponds to `apply_context_parallel` in diffusers.

    The complete SP flow is:
    1. Input sharding (SequenceParallelSplitHook): Split sequence across SP ranks
    2. Attention parallelism (handled by vLLM-Omni's Attention layer):
       - Ulysses: All-to-All over Q/K/V heads
       - Ring: K/V circulation in ring topology
       - Hybrid: Both (Ulysses handles head redistribution, Ring handles K/V)
    3. Output gathering (SequenceParallelGatherHook): Gather sequence from SP ranks

    Args:
        module: The model to apply SP to.
        config: The sequence parallel configuration.
        plan: Dictionary mapping module names to input/output specifications.

    Example:
        config = SequenceParallelConfig(ulysses_degree=2)
        plan = {
            "": {"hidden_states": SequenceParallelInput(split_dim=1, expected_dims=3)},
            "proj_out": SequenceParallelOutput(gather_dim=1, expected_dims=3),
        }
        apply_sequence_parallel(model, config, plan)

    Note:
        vLLM-Omni's Attention layer automatically handles the internal
        parallelism (Ulysses All-to-All or Ring attention) based on the
        forward_context configuration. This function only handles input/output
        sharding for the model as a whole.
    z0Applying sequence parallel with config: ulysses=z, ring=z, plan keys: zApplying SP hooks to 'z' (z module(s))c                 s  s"    | ]}t |tp|d u V  qd S rD   )rR   r
   rj   r   r   r!   rk   R  s     z*apply_sequence_parallel.<locals>.<genexpr>z.Expected SequenceParallelOutput elements, got zUnsupported plan type: N)r   r   ulysses_degreering_degreer&   r+   rQ   r   rR   r%   rx   r?   _SP_INPUT_HOOK_TEMPLATEformatr
   r   ro   r$   r   _SP_OUTPUT_HOOK_TEMPLATErW   r1   r   get_or_createregister_hook)
r6   rB   r   	module_idsp_model_planr   mhook	hook_nameregistryr   r   r!   apply_sequence_parallel  s8   '








r   c                 C  s   |  D ]@\}}t| |}t|ts|g}|D ],}t|dd}|du r$qt|tr/t|}nt|ttt	fr=t
|}nq|| qqdS )zRemove sequence parallel hooks from a model.

    Note: This corresponds to `remove_context_parallel` in diffusers.

    Args:
        module: The model to remove SP from.
        plan: The same plan used when applying SP.
    _hook_registryN)rQ   r   rR   r&   r   rx   r   r   r
   r   r   remove_hook)r6   r   r   r   r   r   r   r   r   r   r!   remove_sequence_parallel]  s    


r   SequenceParallelConfig | Nonec           	      C  s   ddl m}m} ddlm} || }|du r td| jj d|du rR| }| }t||d}|dkr;|dkr;d	}n	|dkrBd
}nd}t	
d| d| d|  t| || t	
d| jj  dS )a  Enable sequence parallelism for a model using its _sp_plan.

    This is a convenience function that reads the model's _sp_plan attribute
    and applies sequence parallelism automatically.

    Note: This corresponds to `enable_context_parallel_for_model` in diffusers,
    but uses vLLM-Omni's _sp_plan instead of diffusers' _cp_plan.

    The function performs two main tasks:
    1. Applies _sp_plan hooks to shard inputs and gather outputs
    2. Ensures Attention layers are configured for the correct parallel mode
       (handled automatically by vLLM-Omni's forward_context mechanism)

    Args:
        model: The model to enable SP for. Must have a _sp_plan attribute.
        config: Optional config. If None, uses default based on current
            parallel state.

    Raises:
        ValueError: If model has no _sp_plan defined.

    Note:
        vLLM-Omni supports Ulysses + Ring hybrid parallelism:
        - ulysses_degree > 1: Uses All-to-All communication over Q/K/V heads
        - ring_degree > 1: Uses Ring attention with K/V passing
        - Both > 1: Hybrid mode (Ulysses handles head redistribution,
          Ring handles K/V circulation)
    r   )r   get_ulysses_parallel_world_sizeget_sp_plan_from_modelNzModel zY has no _sp_plan defined. Define _sp_plan as a class attribute or pass a plan explicitly.)r   r   r   hybridulyssesringz6Created SP config from parallel state: ulysses_degree=z, ring_degree=z, mode=z!Enabled sequence parallelism for )r   r   r   'vllm_omni.diffusion.distributed.sp_planr   r$   rK   r1   r   r   infor   )	r   rB   r   r   r   r   r   r   moder   r   r!   "enable_sequence_parallel_for_model}  s<    r   c                 C  s.   ddl m} || }|durt| | dS dS )zDisable sequence parallelism for a model.

    Note: This corresponds to `disable_context_parallel_for_model` in diffusers.

    Args:
        model: The model to disable SP for.
    r   r   N)r   r   r   )r   r   r   r   r   r!   #disable_sequence_parallel_for_model  s
   r   r   )r   r7   r]   r   r   r   )r6   r7   rB   r   r   r	   r   rC   )r6   r7   r   r	   r   rC   rD   )r   r7   rB   r   r   rC   )r   r7   r   rC   )(r4   
__future__r   r'   dataclassesr   typingr   rT   torch.nnr   vllm.loggerr   r   r   r   r   r	   r
   r   +vllm_omni.diffusion.distributed.sp_shardingr   r   vllm_omni.diffusion.hooks.baser   r   r1   r   r   r   r   r>   r?   r   r   r   r   r   r   r   r   r   r   r!   <module>   s8    
B  	
N


G"E