o
    پi                     @   s   d Z ddlmZmZ ddlZddlmZ ddlmZ zddl	m
  mZ W n ey7   ddlm
  mZ Y nw ddlmZmZmZ dejdedeej d	d
fddZG dd deZG dd deZ	ddejjdee fddZdS )z/
Common utilities for torch model parallelism.
    )OptionalSequenceN)
DeviceMesh)ColwiseParallelRowwiseParallelparallelize_modulefull_tensordevice_mesh
placementsreturnz
dt.DTensorc                 C   sB   t j| j||\}}dd t||D }| | }t j|||S )aD  
    Locally shards a full tensor based on indicated sharding arrangement, and
    returns a DTensor containing the local shard.

    .. warning:: This is a private API that is subject to change. It skips the
        communication otherwise required by `distribute_tensor`. It is only
        applicable to cases where all ranks have the same `full_tensor`. For
        example, in distributed inference all ranks load from the same
        checkpoint. This API will not check for data equality between ranks, it
        is thus user's responsibility to ensure the `full_tensor` is the same
        across ranks.

    Args:
        full_tensor (torch.Tensor): the full tensor to be sharded.
        device_mesh (:class:`DeviceMesh`): DeviceMesh to place the
            DTensor.  Must have same dimension as the number of placements.
        placements (Sequence[:class:`Shard`]): the placements that
            describes how to place the local tensor on DeviceMesh.

    Returns:
        A :class:`DTensor` object with the shard as its local tensor.

    Examples:
        >>> # xdoctest: +SKIP("need world_size and rank")
        >>> device_mesh = dist.init_device_mesh("cuda", (world_size,))
        >>> full_tensor = torch.arange(world_size, device=f"cuda:{rank}")
        >>> dtensor = _shard_tensor(full_tensor, device_mesh, [Shard(1)])
    c                 S   s   g | ]\}}t ||| qS  )slice).0	cur_shape
cur_offsetr   r   T/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/layers/model_parallel.py
<listcomp><   s    z!_shard_tensor.<locals>.<listcomp>)dt_utils%compute_local_shape_and_global_offsetshapezipDTensor
from_local)r   r	   r
   r   offsetsliceslocal_tensorr   r   r   _shard_tensor   s   !r   c                   @   s   e Zd ZdZdd ZdS )ColwiseParallelShardedz
    A version of ColwiseParallel where the local weight has been already
    sharded.  This is used for the fused wqkv case, where during loading, we
    already sharded wq, wk, wv before fusing them.
    c                 C   sJ   |  D ]\}}tj||tdg}tjj|dd}||| qd S )Nr   F)requires_grad)	named_parametersr   r   r   Shardtorchnn	Parameterregister_parameter)selfnamemoduler	   paramdtensor
dist_paramr   r   r   _partition_linear_fnL   s
   z+ColwiseParallelSharded._partition_linear_fnN)__name__
__module____qualname____doc__r,   r   r   r   r   r   D   s    r   c                       s,   e Zd ZdZdd Ze fddZ  ZS )RowwiseParallelMaybeWaita5  
    A version of RowwiseParallel that waits for the output (establish dependency
    between comm stream and compute stream in CUDA sense) before going into the
    next op. This is needed to workaround the current interaction between
    AsyncCollectiveTensor and multi-platform ops, such as `RMSNorm`.
    c                 C   s`   | dtt|j|tdg t|dd d ur.| dtt|j	|t
 g d S d S )Nweight   bias)r%   r#   r$   r   r2   r   r!   getattrdistribute_tensorr4   	Replicate)r&   r'   r(   r	   r   r   r   r,   ^   s   z-RowwiseParallelMaybeWait._partition_linear_fnc                    s&   t tt| ||||}tjj|S N)superr1   _prepare_output_fnr"   distributed_functional_collectiveswait_tensor)output_layoutsuse_local_outputmodoutputsr	   	__class__r   r   r:   o   s   
z+RowwiseParallelMaybeWait._prepare_output_fn)r-   r.   r/   r0   r,   staticmethodr:   __classcell__r   r   rB   r   r1   V   s
    r1   r(   c                    s(   dt jjddf fdd}| | dS )a  
    Tensor parallelize the model across the given device mesh.
    Args:
        module (`torch.nn.Module`):
            The module to tensor parallelize.
        device_mesh (`torch.distributed.DeviceMesh`):
            The device mesh to use for tensor parallelism.
    r@   r   Nc                    s   t | dd }|d u rd S | D ]3\}}| |}|dkr%t| t  q|dkr1t| t  q|dkr=t| t  qtd| d S )N_tp_planColwiseRowwiseColwise_ShardedzUnknown TP style )r5   itemsget_submoduler   r   r1   r   
ValueError)r@   tp_plan
child_nametp_stylesubmodr	   r   r   tplize   s   
ztensor_parallel.<locals>.tplize)r"   r#   Moduleapply)r(   r	   rR   r   rQ   r   tensor_parallely   s   rU   r8   )r0   typingr   r   r"   torch.nnr#   torch.distributed.device_meshr   torch.distributed.tensorr;   tensorr   ImportErrortorch.distributed._tensor_tensor!torch.distributed.tensor.parallelr   r   r   Tensorr!   r   r   r1   rS   rU   r   r   r   r   <module>   s8    
,%