o
    پi
-                     @  s   d Z ddlmZ ddlZddlmZ ddlZddlmZm	Z	 ddl
mZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ eeZd&ddZd'ddZd(ddZd)ddZd*d"d#ZG d$d% d%ZdS )+af  
In-place weight updates for diffusion pipeline modules.

This module provides WeightsUpdater, which swaps model weights at runtime
without restarting the server.  It is the diffusion-engine counterpart of the
LLM engine's ModelRunner.update_weights_from_disk.

Detailed usage of higher level API can be found in

/python/sglang/multimodal_gen/test/server/test_update_weights_from_disk.py

Key design decisions:

- All-or-nothing with rollback: modules are updated sequentially.  If
  any module fails (shape mismatch, corrupted file, etc.), every module
  that was already updated is rolled back by reloading its weights from
  pipeline.model_path (the last successfully-loaded checkpoint).  On
  success, pipeline.model_path is updated to the new model_path so
  that future rollbacks target the latest good checkpoint, not the
  originally-launched model.

- Rollback failures propagate: if rollback itself fails, the exception is
  not caught so the caller knows the model is in an inconsistent state.
  This matches the LLM engine behaviour.

- Offload-aware: the diffusion LayerwiseOffloadManager replaces GPU
  parameters with torch.empty((1,)) placeholders while real weights live
  in consolidated pinned CPU buffers.  A naive param.data.copy_() would
  fail with a shape mismatch.  Instead, the updater dynamically detects
  active offload managers and writes new weights directly into their CPU
  buffers via update_cpu_weights(), bypassing the placeholders entirely.
  For any layer that happens to be prefetched on GPU at update time, the
  live GPU tensor is also updated so the change takes effect immediately.
  This requires no extra GPU memory and does not disturb the offload state.

- DTensor-aware: parameters that have been distributed via
  torch.distributed.tensor are updated through distribute_tensor
  so that each shard is correctly placed on the right device mesh.
    )annotationsN)Path)DTensordistribute_tensor)TeaCacheMixin)_list_safetensors_files)safetensors_weights_iterator)DiffusersPipeline)maybe_download_model)OffloadableDiTMixin)init_loggerreturndict[str, torch.nn.Module]c                 C  sL   t | tr| d}|dur|jdur|j}ni }n| j}dd | D S )zReturn updatable nn.Module components for the given pipeline.

    Works with both the native ComposedPipelineBase backend and the
    DiffusersPipeline wrapper.
    diffusers_pipelineNc                 S  s$   i | ]\}}t |tjjr||qS  )
isinstancetorchnnModule).0nmr   r   h/home/ubuntu/.local/lib/python3.10/site-packages/sglang/multimodal_gen/runtime/loader/weights_updater.py
<dictcomp>N   s   $ z)get_updatable_modules.<locals>.<dictcomp>)r   r	   
get_module
componentsmodulesitems)pipelinediffusers_piperawr   r   r   get_updatable_modules@   s   

r!   weights_dirstrc                 C  s"   t | }|std|  t|S )zAReturn a (name, tensor) iterator over safetensors in weights_dir.zNo safetensors files found in )r   FileNotFoundErrorr   )r"   safetensors_filesr   r   r   _get_weights_iterQ   s   r&   local_model_pathmodules_to_update!list[tuple[str, torch.nn.Module]] tuple[dict[str, str], list[str]]c                 C  sV   i }g }|D ] \}}t | | }| r!tt|r!t|||< q|| q||fS )zCheck that every module has a weights directory with safetensors files.

    Returns:
        (weights_map, missing) where weights_map maps module name to its
        weights directory and missing lists modules without weight files.
    )r   existsr   r#   append)r'   r(   weights_mapmissingmodule_name_r"   r   r   r   _validate_weight_filesY   s   
r1   moduletorch.nn.ModuleNonec                   s   g }t | tr| jrdd | jD }|r>t|}t  |D ]
} || q fdd| D }t|t| 	  dS t|t| 	  dS )zLoad weights into a module, handling offload-managed parameters.

    For offloaded modules, updates CPU buffers directly via
    update_cpu_weights(); non-offloaded parameters use in-place copy.
    c                 S  s   g | ]}|j r|qS r   )enabled)r   r   r   r   r   
<listcomp>v   s    z-_load_weights_into_module.<locals>.<listcomp>c                 3  s$    | ]\}}| vr||fV  qd S Nr   )r   r   woffloaded_namesr   r   	<genexpr>}   s   " z,_load_weights_into_module.<locals>.<genexpr>N)
r   r   layerwise_offload_managersdictsetupdateupdate_cpu_weightsr   load_weights_into_modelnamed_parameters)r2   weights_iteroffload_managersweight_dictmanager	remainingr   r9   r   _load_weights_into_modulen   s   rH   model_paramsr=   c                 C  s   | D ]E\}}||vrq|| }|j |j kr$td| d|j  d|j  t|tr=t||j|j|j}|j	
|j	 q|j
||j qdS )z:Copy weights from weights_iter into model_params in-place.zShape mismatch for z: model=z	, loaded=N)shape
ValueErrorr   r   r   todtypedevice_mesh
placements_local_tensorcopy_data)rC   rI   nameloaded_weightparamdistributed_weightr   r   r   rA      s"   

rA   c                   @  sF   e Zd ZdZdd Z		ddddZdddZd ddZd!ddZdS )"WeightsUpdatera  In-place weight updates for diffusion pipeline modules.

    Args:
        pipeline: A ComposedPipelineBase (or DiffusersPipeline) instance
            whose modules will be updated.  The pipeline's model_path
            attribute is used for rollback on failure.
    c                 C  s
   || _ d S r7   )r   )selfr   r   r   r   __init__   s   
zWeightsUpdater.__init__TN
model_pathr#   flush_cachebooltarget_moduleslist[str] | Noner   tuple[bool, str]c              
   C  s  t d|  z| |}W n ty. } zt t| dt|fW  Y d}~S d}~ww |sId| dtt| j	  }t | d|fS zt
|}W n tyh } zdd| fW  Y d}~S d}~ww t||\}}	|	rd|	 d}t | d|fS t d	t| d
ddd | D   | ||\}
}t  tj  |
r|r|D ]\}}t|tr|  qt | |
|fS )z=Update model weights from disk without restarting the server.zUpdating weights from disk: FNz1No matching modules found for update. Requested: z. Available nn.Module(s): zFailed to download model: z9Cannot update weights: missing weight files for modules: z. No partial updates allowed.z	Updating z
 modules: , c                 s  s"    | ]\}}| d | V  qdS )z <- Nr   )r   r   pr   r   r   r;      s     z:WeightsUpdater.update_weights_from_disk.<locals>.<genexpr>)loggerinfo_collect_modulesrK   errorr#   listr!   r   keysr
   	Exceptionr1   lenjoinr   _apply_weightsgccollectr   cudaempty_cacher   r   reset_teacache_state)rX   rZ   r[   r]   r(   e	error_msgr'   r-   r.   successmessager0   r2   r   r   r   update_weights_from_disk   s\   





z'WeightsUpdater.update_weights_from_diskr)   c                   sh   t | j |du rt  }n fdd|D }|r)td| dt   |} fdd|D S )zResolve target_modules to (name, module) pairs.

        Raises:
            ValueError: If target_modules contains names not found in the pipeline.
        Nc                   s   g | ]}| vr|qS r   r   )r   r   r   r   r   r6          z3WeightsUpdater._collect_modules.<locals>.<listcomp>z6Module(s) requested for update not found in pipeline: z. Available Module(s): c                   s   g | ]}| | fqS r   r   )r   rS   rv   r   r   r6      rw   )r!   r   rf   rg   rK   )rX   r]   namesunknownr   rv   r   rd      s   

zWeightsUpdater._collect_modulesr(   r-   dict[str, str]c           
      C  s   g }|D ]X\}}zt || }t|| || W q ty\ } z6||g }tjd| d| dt| d| d| ddd | | d	d
| d| dfW  Y d}~  S d}~ww d|}	ddt| d|	 dfS )z9Load weights into each module; rollback on first failure.z!Weight update failed for module 'z': z. Rolling back z( module(s) (including partially-loaded 'z'): .T)exc_infoFzFailed to update module 'z.. All modules rolled back to original weights.Nr`   zUpdated z
 modules (z).)	r&   rH   r,   rh   rb   re   ri   	_rollbackrj   )
rX   r(   r-   updated_modulesr/   r2   rC   rq   rollback_listrx   r   r   r   rk      s4   



zWeightsUpdater._apply_weightsr~   	list[str]r4   c                 C  sd   |sdS t | jj}|D ]#}| j|}|du rqt|| }| s$qtt|}t|| qdS )zRestore updated_modules to original weights.

        If rollback itself fails the exception propagates so the caller
        knows the model is in an inconsistent state.
        N)	r
   r   rZ   r   r   r+   r&   r#   rH   )rX   r~   original_pathrS   r2   r"   rC   r   r   r   r}     s   zWeightsUpdater._rollback)TN)rZ   r#   r[   r\   r]   r^   r   r_   )r]   r^   r   r)   )r(   r)   r-   rz   r   r_   )r~   r   r   r4   )	__name__
__module____qualname____doc__rY   ru   rd   rk   r}   r   r   r   r   rW      s    
:
rW   )r   r   )r"   r#   )r'   r#   r(   r)   r   r*   )r2   r3   r   r4   )rI   r=   r   r4   ) r   
__future__r   rl   pathlibr   r   torch.distributed.tensorr   r   ,sglang.multimodal_gen.runtime.cache.teacacher   *sglang.multimodal_gen.runtime.loader.utilsr   1sglang.multimodal_gen.runtime.loader.weight_utilsr   :sglang.multimodal_gen.runtime.pipelines.diffusers_pipeliner	   6sglang.multimodal_gen.runtime.utils.hf_diffusers_utilsr
   5sglang.multimodal_gen.runtime.utils.layerwise_offloadr   1sglang.multimodal_gen.runtime.utils.logging_utilsr   r   rb   r!   r&   r1   rH   rA   rW   r   r   r   r   <module>   s(    (




