o
    پi;                     @   s   d dl mZ d dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ 	dd	ed
eeeejf  dededee f
ddZdejfddZdS )    )OptionalN)
DeviceMesh)DTensor)Engine)UpdateWeightsFromTensorReqInputLocalSerializedTensor)MultiprocessingSerializerengineparams_batchdevice_mesh_keydevice_meshload_formatc                    s   || j  d }||  }ddlm} |  dd |D }|dkr.dd t|D }	nd}	tj||	|| j  d || 	 d |dkrmt
|	dd	i}
d
d |
D  t fddt|D |d}| |I dH S dS )af  
    Update weights for the inference engine.
    This function is designed to be stateless, so that the caller process could keep the stateful engine.
    Example Use Case:
        - Multiple Producer Process will call this function in a SPMD style

    Args:
        engine: The inference engine created by the caller process.
        params_batch: A list of (name, tensor) tuples. We batched the tensors to avoid the overhead of cpu call.
        device_mesh_key: The key of the device mesh. Typically "tp" or "infer_tp"
        device_mesh: The device mesh.
        load_format: The format of the weights.
    r   )monkey_patch_torch_reductionsc                 S   s&   g | ]\}}|t t| fqS  )r	   	serialize%_preprocess_tensor_for_update_weightsdetach).0nametensorr   r   P/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/weight_sync/utils.py
<listcomp>,   s    
z"update_weights.<locals>.<listcomp>c                 S   s   g | ]}d qS Nr   r   _r   r   r   r   7   s    N)objobject_gather_listdstgroupstrictTc                 S   s,   g | ]}|d  d  t dd |D dfqS )r   c                 S   s   g | ]}|d  qS )   r   )r   	rank_partr   r   r   r   W   s    z-update_weights.<locals>.<listcomp>.<listcomp>)valuesr   )r   tensor_groupr   r   r   r   O   s    
c                    s   g | ]}t  qS r   )r	   r   r   named_tensorsr   r   r   ^   s    )serialized_named_tensorsr   )meshsizeget_local_ranksglang.srt.utils.patch_torchr   rangedistgather_objecttolist	get_groupzipr   update_weights_from_tensor)r
   r   r   r   r   infer_tp_sizeinfer_tp_rankr   named_tensors_batchgathered_serialized_batcheslogical_tensorsupdate_weights_requestr   r%   r   update_weights   s<   


r9   r   c                 C   s   t | tr	|  S | S )a  
    Preprocess the tensor for update weights.
    Example Use Case:
        - FSDP: we gather tensor by calling full_tensor in _preprocess_tensor_for_update_weights
        - Megatron: we do nothing here, assuming it is gathered when feed into this func

    Args:
        tensor: The tensor to be preprocessed.

    Returns:
        The full tensor if it is a DTensor, otherwise the original tensor.
    )
isinstancer   full_tensor)r   r   r   r   r   h   s   
r   r   )typingr   torchtorch.distributeddistributedr-   torch.distributed.device_meshr   torch.distributed.tensorr   sglang.srt.entrypoints.enginer   sglang.srt.managers.io_structr   &sglang.srt.model_executor.model_runnerr   sglang.srt.utilsr	   listtuplestrTensorr9   r   r   r   r   r   <module>   s,    
Z