o
    ۷i&                      @  s   d Z ddlmZ ddlmZ ddlmZmZ ddlZddl	m
Z
 ddlmZmZmZ e
eZ	ddddZ	ddddZ	d d!ddZeG dd dZe Zd"ddZdS )#zSequence Parallelism sharding utilities.

This module provides low-level sharding and gathering functions for
Sequence Parallelism. These can be used directly in model forward methods
for semi-intrusive SP support, or internally by the SP hooks.
    )annotations)contextmanager)	dataclassfieldN)init_logger)get_sequence_parallel_rank get_sequence_parallel_world_sizeget_sp_groupTtensortorch.Tensordimintvalidateboolreturnc              	   C  s   t  }|dkr	| S t }| |}|r'|| dkr'td| d| d| d||k r<td| d| d| d| j | j||d	| S )
ab  Shard a tensor along the specified dimension for sequence parallelism.

    The tensor is split into world_size chunks along dim, and this rank
    receives its corresponding chunk.

    Args:
        tensor: The tensor to shard.
        dim: The dimension along which to split.
        validate: If True, validate that the tensor size is divisible by world_size.

    Returns:
        The shard for this rank.

    Raises:
        ValueError: If validate=True and tensor size is not divisible by world_size.

    Example:
        # In model forward:
        hidden_states = sp_shard(hidden_states, dim=1)
       r   zTensor size along dim z (z#) must be divisible by world_size (z!) for sequence parallel sharding.z) must be >= world_size (z). Tensor shape: r   )r   r   size
ValueErrorshapechunk)r
   r   r   
world_sizerankr    r   a/home/ubuntu/vllm_env/lib/python3.10/site-packages/vllm_omni/diffusion/distributed/sp_sharding.pysp_shard   s    
r   c                 C  s&   t  }|dkr	| S t }|j| |dS )a  Gather a tensor along the specified dimension from all sequence parallel ranks.

    The sharded tensors from all ranks are concatenated along dim.

    Args:
        tensor: The local shard to gather.
        dim: The dimension along which to gather.
        validate: If True, validate tensor consistency (currently unused).

    Returns:
        The full tensor gathered from all ranks.

    Example:
        # At end of model forward:
        output = sp_gather(output, dim=1)
    r   r   )r   r	   
all_gather)r
   r   r   r   sp_groupr   r   r   	sp_gatherJ   s
   r           	pad_valuefloattuple[torch.Tensor, int]c           	      C  s   t  }|dkr| dfS | |}|| }|dkr!t| |dddfS || }t| j}|||< tj||| j| jd}tj	| |g|d} t| |dd|fS )a  Shard a tensor with automatic padding if not divisible by world_size.

    This is useful for variable-length sequences where padding may be needed.

    Args:
        tensor: The tensor to shard.
        dim: The dimension along which to split.
        pad_value: Value to use for padding.

    Returns:
        Tuple of (sharded_tensor, padding_size). The padding_size indicates
        how much padding was added to the original tensor before sharding.

    Example:
        sharded, pad_size = sp_shard_with_padding(hidden_states, dim=1)
        # ... process ...
        output = sp_gather(output, dim=1)
        if pad_size > 0:
            output = output[..., :-pad_size]  # Remove padding
    r   r   Fr   )dtypedevicer   )
r   r   r   listr   torchfullr$   r%   cat)	r
   r   r    r   r   	remainderpad_size	pad_shapepaddingr   r   r   sp_shard_with_paddingh   s   

r.   c                   @  sz   e Zd ZU dZeedZded< eedZded< dZ	ded< dddZ
edd Z	dd ddZd!ddZdddZdS )"ShardingValidatora2  Validator for tracking and verifying sharding operations.

    This class helps ensure that sharding and gathering operations are
    correctly paired in model forward passes. It tracks which tensors
    have been sharded and verifies that they are properly gathered.

    Usage:
        validator = ShardingValidator()
        with validator.track():
            hidden_states = validator.shard(hidden_states, "hidden_states", dim=1)
            # ... model computation ...
            output = validator.gather(output, "hidden_states", dim=1)
        validator.validate()  # Raises if any shard was not gathered

    Attributes:
        _sharded: Set of tensor names that have been sharded.
        _gathered: Set of tensor names that have been gathered.
        _enabled: Whether tracking is currently enabled.
    )default_factoryzset[str]_sharded	_gatheredFr   _enabledr   Nonec                 C  s   | j   | j  dS )z1Reset the validator state for a new forward pass.N)r1   clearr2   selfr   r   r   reset   s   
zShardingValidator.resetc                 c  s,    d| _ |   z	dV  W d| _ dS d| _ w )z6Context manager to enable tracking for a forward pass.TNF)r3   r8   r6   r   r   r   track   s   zShardingValidator.trackTr
   r   namestrr   r   validate_divisiblec                 C  s<   | j r|| jv rtd| d | j| t|||dS )aP  Shard a tensor and track the operation.

        Args:
            tensor: The tensor to shard.
            name: A name to identify this tensor for validation.
            dim: The dimension along which to split.
            validate_divisible: If True, validate divisibility.

        Returns:
            The sharded tensor.
        Tensor 'z' sharded multiple timesr#   )r3   r1   loggerwarningaddr   )r7   r
   r:   r   r<   r   r   r   shard   s
   
zShardingValidator.shardc                 C  s8   | j r|| jvrtd| d | j| t||S )a  Gather a tensor and track the operation.

        Args:
            tensor: The local shard to gather.
            name: The name used when sharding (for validation).
            dim: The dimension along which to gather.

        Returns:
            The gathered tensor.
        r=   z ' gathered without being sharded)r3   r1   r>   r?   r2   r@   r   )r7   r
   r:   r   r   r   r   gather   s
   

zShardingValidator.gatherc                 C  s$   | j | j }|rtd| ddS )zValidate that all sharded tensors were gathered.

        Raises:
            ValueError: If any sharded tensor was not gathered.
        z5The following tensors were sharded but not gathered: z;. This may indicate a bug in the model's SP implementation.N)r1   r2   r   )r7   	unmatchedr   r   r   r      s   
zShardingValidator.validateN)r   r4   T)
r
   r   r:   r;   r   r   r<   r   r   r   )r
   r   r:   r;   r   r   r   r   )__name__
__module____qualname____doc__r   setr1   __annotations__r2   r3   r8   r   r9   rA   rB   r   r   r   r   r   r/      s   
 


r/   c                   C  s   t S )zdGet the global sharding validator instance.

    Returns:
        The global ShardingValidator.
    )_global_validatorr   r   r   r   get_sharding_validator  s   rL   rD   )r
   r   r   r   r   r   r   r   )r   )r
   r   r   r   r    r!   r   r"   )r   r/   )rH   
__future__r   
contextlibr   dataclassesr   r   r'   vllm.loggerr   .vllm_omni.diffusion.distributed.parallel_stater   r   r	   rE   r>   r   r   r.   r/   rK   rL   r   r   r   r   <module>   s$   2!8g