o
    ۷iE                     @  s  d Z ddlmZ ddlmZ ddlmZ er ddlZddlm	Z	 eG dd dZ
edd	G d
d dZedd	G dd dZedd	G dd dZeeB ZeeeB eee B eedf B f Zeee B eedf B ZeeeeB f Zd#ddZd#ddZd$ddZd%d!d"ZdS )&a$  Sequence Parallelism configuration and plan type definitions.

This module provides:
1. SequenceParallelConfig: Configuration for SP (ulysses_degree, ring_degree)
2. SequenceParallelInput/Output: Type definitions for _sp_plan declarations
3. Validation utilities for _sp_plan

A _sp_plan is a dictionary that specifies how to shard/gather tensors at
different points in a model's forward pass. This allows automatic handling
of sequence parallelism without modifying the model's forward() method.

NOTE: Our "Sequence Parallelism" (SP) corresponds to "Context Parallelism" (CP)
in diffusers. We use "Sequence Parallelism" to align with vLLM-Omni terminology.

Example:
    class MyTransformer(nn.Module):
        _sp_plan = {
            # Split inputs before model forward
            "": {
                "hidden_states": SequenceParallelInput(split_dim=1, expected_dims=3),
                "encoder_hidden_states": SequenceParallelInput(split_dim=1, expected_dims=3),
            },
            # Split RoPE embeddings after pos_embed layer
            "pos_embed": {
                0: SequenceParallelInput(split_dim=0, expected_dims=2, split_output=True),
            },
            # Gather output after proj_out layer
            "proj_out": SequenceParallelOutput(gather_dim=1, expected_dims=3),
        }
    )annotations)	dataclass)TYPE_CHECKINGNc                   @  s   e Zd ZU dZdZded< dZded< dZded< d	Zd
ed< d	Z	d
ed< d	Z
ded< d)ddZed*ddZd*ddZd*ddZd*ddZd*ddZd*ddZd*dd Zd+d%d&Zd,d'd(Zd	S )-SequenceParallelConfiga  Configuration for Sequence Parallelism using vLLM-Omni's parallel state.

    This class provides a unified interface for SP configuration that integrates
    with vLLM-Omni's existing SequenceParallelGroupCoordinator. Unlike diffusers'
    DeviceMesh-based approach (ContextParallelConfig), this uses the existing
    parallel state management.

    Note: This corresponds to `ContextParallelConfig` in diffusers library.

    Args:
        ulysses_degree: Number of devices for Ulysses (All-to-All) attention.
            Sequence is split across devices, with Q/K/V redistributed via
            All-to-All communication. Best for moderate sequences with good
            interconnect bandwidth.
        ring_degree: Number of devices for Ring attention. Sequence is split
            across devices, with K/V passed in a ring topology. Best for long
            sequences with limited memory/bandwidth.
        convert_to_fp32: Whether to convert output and LSE to float32 for
            numerical stability in ring attention.

    Note:
        ulysses_degree * ring_degree = sequence_parallel_size
        vLLM-Omni supports hybrid Ulysses-Ring attention (both > 1).
       intulysses_degreering_degreeTboolconvert_to_fp32N
int | None_rank_world_sizeztorch.device | None_devicereturnNonec                 C  s@   | j dk s
| jdk rtd| j dkr| jdkrtdd S d S )Nr   z0`ulysses_degree` and `ring_degree` must be >= 1.zZAt least one of `ulysses_degree` or `ring_degree` must be > 1 to use sequence parallelism.)r   r	   
ValueErrorself r   ]/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/sp_plan.py__post_init__W   s   z$SequenceParallelConfig.__post_init__c                 C  s   | j | j S )z#Total sequence parallel world size.)r   r	   r   r   r   r   sequence_parallel_size`   s   z-SequenceParallelConfig.sequence_parallel_sizec                 C     ddl m} | S )zGet the sequence parallel world size from parallel state.

        Returns:
            The world size for sequence parallelism.

        Raises:
            RuntimeError: If parallel state is not initialized.
        r   ) get_sequence_parallel_world_size).vllm_omni.diffusion.distributed.parallel_stater   )r   r   r   r   r   get_world_sizee      	z%SequenceParallelConfig.get_world_sizec                 C  r   )zGet the current rank in the sequence parallel group.

        Returns:
            The rank within the sequence parallel group.

        Raises:
            RuntimeError: If parallel state is not initialized.
        r   )get_sequence_parallel_rank)r   r   )r   r   r   r   r   get_rankr   r   zSequenceParallelConfig.get_rankc                 C  r   )zGet the Ulysses parallel world size.

        Returns:
            The world size for Ulysses (All-to-All) parallelism.
        r   )get_ulysses_parallel_world_size)r   r    )r   r    r   r   r   get_ulysses_world_size      z-SequenceParallelConfig.get_ulysses_world_sizec                 C  r   )zGet the current rank in the Ulysses parallel group.

        Returns:
            The rank within the Ulysses parallel group.
        r   )get_ulysses_parallel_rank)r   r#   )r   r#   r   r   r   get_ulysses_rank   r"   z'SequenceParallelConfig.get_ulysses_rankc                 C  r   )zwGet the Ring parallel world size.

        Returns:
            The world size for Ring attention parallelism.
        r   )get_ring_parallel_world_size)r   r%   )r   r%   r   r   r   get_ring_world_size   r"   z*SequenceParallelConfig.get_ring_world_sizec                 C  r   )zGet the current rank in the Ring parallel group.

        Returns:
            The rank within the Ring parallel group.
        r   )get_ring_parallel_rank)r   r'   )r   r'   r   r   r   get_ring_rank   r"   z$SequenceParallelConfig.get_ring_rankrank
world_sizedevicetorch.devicec              
   C  sX   || _ || _|| _| j| j }|  }||kr*td| j d| j d| d| d	dS )a!  Initialize the config with runtime parallel state.

        This is called automatically when sequence parallelism is enabled.

        Args:
            rank: The global rank of this process.
            world_size: Total world size.
            device: The device for this rank.
        z(Configuration mismatch: ulysses_degree (z) * ring_degree (z) = z-, but actual sequence parallel world size is .N)r   r   r   r   r	   r   r   )r   r)   r*   r+   expected_sp_sizeactual_sp_sizer   r   r   setup   s    

zSequenceParallelConfig.setupc                 C  s
   | j duS )zCheck if the config has been initialized with runtime state.

        Returns:
            True if setup() has been called, False otherwise.
        N)r   r   r   r   r   is_initialized   s   
z%SequenceParallelConfig.is_initialized)r   r   )r   r   )r)   r   r*   r   r+   r,   r   r   )r   r
   )__name__
__module____qualname____doc__r   __annotations__r	   r   r   r   r   r   propertyr   r   r   r!   r$   r&   r(   r0   r1   r   r   r   r   r   3   s&   
 
	










r   T)frozenc                   @  sH   e Zd ZU dZded< dZded< dZded	< dZded
< dddZdS )SequenceParallelInputaa  Configuration for splitting an input tensor across sequence parallel ranks.

    This specifies how to shard a tensor in the pre-forward or post-forward hook
    of a layer. The tensor will be split along the specified dimension.

    Note: This corresponds to `ContextParallelInput` in diffusers library.

    Args:
        split_dim: The dimension along which to split the tensor.
        expected_dims: Expected number of dimensions. If provided, validates that
            the tensor has this many dimensions before splitting. If the tensor
            has a different number of dimensions, splitting is skipped with a warning.
        split_output: If True, split the output of the layer instead of the input.
            This is useful for layers whose outputs should be split after preprocessing
            (e.g., RoPE embeddings).
        auto_pad: If True, automatically pad the tensor if its size along split_dim
            is not divisible by world_size. Creates an attention mask to indicate
            valid vs padding positions. The mask is stored in ForwardContext.
            Note: Ring attention does not support attention mask, so auto_pad
            should only be used with Ulysses SP.

    Example:
        # Split hidden_states along sequence dimension (dim 1)
        SequenceParallelInput(split_dim=1, expected_dims=3)

        # Split RoPE output along sequence dimension (dim 0)
        SequenceParallelInput(split_dim=0, expected_dims=2, split_output=True)

        # Split with auto-padding for variable-length sequences
        SequenceParallelInput(split_dim=1, expected_dims=3, auto_pad=True)
    r   	split_dimNr   expected_dimsFr
   split_outputauto_padr   strc              	   C  s&   d| j  d| j d| j d| j d	S )Nz SequenceParallelInput(split_dim=, expected_dims=, split_output=z, auto_pad=))r:   r;   r<   r=   r   r   r   r   __repr__      
zSequenceParallelInput.__repr__r   r>   )	r2   r3   r4   r5   r6   r;   r<   r=   rB   r   r   r   r   r9      s   
  r9   c                   @  s0   e Zd ZU dZded< dZded< dd	d
ZdS )SequenceParallelOutputa  Configuration for gathering an output tensor across sequence parallel ranks.

    This specifies how to gather a tensor in the post-forward hook of a layer.
    The tensor will be gathered along the specified dimension from all ranks.

    Note: This corresponds to `ContextParallelOutput` in diffusers library.

    Args:
        gather_dim: The dimension along which to gather the tensor.
        expected_dims: Expected number of dimensions. If provided, validates that
            the tensor has this many dimensions before gathering.

    Example:
        # Gather output along sequence dimension (dim 1)
        SequenceParallelOutput(gather_dim=1, expected_dims=3)
    r   
gather_dimNr   r;   r   r>   c                 C  s   d| j  d| j dS )Nz"SequenceParallelOutput(gather_dim=r?   rA   )rF   r;   r   r   r   r   rB     s   zSequenceParallelOutput.__repr__rD   )r2   r3   r4   r5   r6   r;   rB   r   r   r   r   rE      s
   
 rE   c                   @  sD   e Zd ZU dZded< ded< dZded< d	Zd
ed< dddZdS )SequenceParallelPartialInputa  Configuration for partially splitting a tensor (e.g., split image part, keep text part).

    This is designed for models like LongCat/Qwen where RoPE embeddings need special handling:
    - Text portion: kept full across all ranks (for joint attention)
    - Image portion: split across ranks

    The tensor is assumed to be concatenated as [text_part, image_part] along split_dim.

    Note: This is an extension beyond diffusers' standard ContextParallelInput,
    designed for vLLM-Omni's dual-stream attention models.

    Args:
        split_dim: The dimension along which to split the image portion.
        text_len_source: How to determine text length:
            - str: Name of a forward parameter that contains text length
            - int: Fixed text length value
        expected_dims: Expected number of dimensions for validation.
        split_output: If True, split the output instead of input.

    Example:
        # Split RoPE: text portion (from txt_ids.shape[0]) kept full, image portion split
        SequenceParallelPartialInput(
            split_dim=0,
            text_len_source="txt_ids",  # Get text length from txt_ids.shape[0]
            expected_dims=2,
            split_output=True,
        )

        # Or with fixed text length
        SequenceParallelPartialInput(
            split_dim=0,
            text_len_source=512,  # Fixed text length
            expected_dims=2,
            split_output=True,
        )
    r   r:   z	str | inttext_len_sourceNr   r;   Fr
   r<   r   r>   c              	   C  s&   d| j  d| jd| j d| j d	S )Nz'SequenceParallelPartialInput(split_dim=z, text_len_source=r?   r@   rA   )r:   rH   r;   r<   r   r   r   r   rB   B  rC   z%SequenceParallelPartialInput.__repr__rD   )r2   r3   r4   r5   r6   r;   r<   rB   r   r   r   r   rG     s   
 %rG   .valueobjectr   r
   c                 C  s   t | ttfS )z5Check if a value is a valid input configuration type.)
isinstancer9   rG   rI   r   r   r   _is_valid_input_configw  s   rM   c                 C  s$   t | ttfs	dS tdd | D S )z?Check if a value is a list/tuple of valid input configurations.Fc                 s  s    | ]}t |V  qd S N)rM   .0xr   r   r   	<genexpr>  s    z._is_valid_input_config_list.<locals>.<genexpr>)rK   listtupleallrL   r   r   r   _is_valid_input_config_list|  s   rV   planSequenceParallelModelPlanr   c              
   C  sz  t | tstdt| j |  D ]\}}t |ts&tdt|j t |tr,qt |tt	frBt
dd |D r=qt|rBqt |tr| D ]`\}}t |ttfsdtdt|j d| dt |tr~t|s~tdt|j d| d	| d
t|rt |tr|jstd| d	| d
qKt|rqKtdt|j d| d| dqtdt|j d| ddS )zValidate a _sp_plan dictionary for correctness.

    Args:
        plan: The _sp_plan dictionary to validate.

    Raises:
        ValueError: If the plan is invalid.
    z_sp_plan must be a dict, got z#_sp_plan keys must be strings, got c                 s  s    | ]}t |tV  qd S rN   )rK   rE   rO   r   r   r   rR     s    z#validate_sp_plan.<locals>.<genexpr>z(Input spec keys must be str or int, got z for module ''zRInteger keys (output indices) must map to SequenceParallelInput/PartialInput, got z'[]z\Integer keys (output indices) require split_output=True, got split_output=False for module 'zRInput spec values must be SequenceParallelInput/PartialInput or list thereof, got z'['z']zI_sp_plan values must be dict (input spec) or SequenceParallelOutput, got N)rK   dictr   typer2   itemsr>   rE   rS   rT   rU   rV   r   rM   r<   )rW   	module_idmodule_plankeyrI   r   r   r   validate_sp_plan  st   
	


ra   model	nn.Module SequenceParallelModelPlan | Nonec                 C  s    t | dd}|durt| |S )zGet the _sp_plan from a model if it exists.

    Args:
        model: The model to get the plan from.

    Returns:
        The _sp_plan dictionary, or None if not defined.
    _sp_planN)getattrra   )rb   rW   r   r   r   get_sp_plan_from_model  s   	rg   )rI   rJ   r   r
   )rW   rX   r   r   )rb   rc   r   rd   )r5   
__future__r   dataclassesr   typingr   torchtorch.nnnnr   r9   rE   rG   AnySequenceParallelInputr[   r>   r   rS   rT   SequenceParallelInputTypeSequenceParallelOutputTyperX   rM   rV   ra   rg   r   r   r   r   <module>   s8    .8


: