o
    
۾i0                     @   s   d Z ddlmZmZ ddlmZ ddlmZmZ ddl	Z	er$ddl
mZ ddlmZ ddlmZ dd	lmZmZmZ dd
lmZmZmZ eG dd deZeG dd deZG dd deeef ZdS )z"NCCL-based weight transfer engine.    )CallableIterator)	dataclass)TYPE_CHECKINGAnyNPyNcclCommunicator)ParallelConfig)WeightTransferConfig)WeightTransferEngineWeightTransferInitInfoWeightTransferUpdateInfo) DEFAULT_PACKED_BUFFER_SIZE_BYTESDEFAULT_PACKED_NUM_BUFFERSpacked_broadcast_consumerc                   @   s2   e Zd ZU dZeed< eed< eed< eed< dS )NCCLWeightTransferInitInfoz5Initialization info for NCCL weight transfer backend.master_addressmaster_portrank_offset
world_sizeN)__name__
__module____qualname____doc__str__annotations__int r   r   `/home/ubuntu/.local/lib/python3.10/site-packages/vllm/distributed/weight_transfer/nccl_engine.pyr      s   
 r   c                   @   sl   e Zd ZU dZee ed< ee ed< eee  ed< dZe	ed< 	 e
Zeed< 	 eZeed< 	 d	d
 ZdS )NCCLWeightTransferUpdateInfoz-Update info for NCCL weight transfer backend.namesdtype_namesshapesFpackedpacked_buffer_size_bytespacked_num_buffersc                 C   sj   t | j}t | j|krtdt | j dt | j t | j|kr3tdt | j dt | j dS )z-Validate that all lists have the same length.z9`dtype_names` should be of the same size as `names`: got z and z4`shapes` should be of the same size as `names`: got N)lenr    r!   
ValueErrorr"   )self
num_paramsr   r   r   __post_init__8   s$   
z*NCCLWeightTransferUpdateInfo.__post_init__N)r   r   r   r   listr   r   r   r#   boolr   r$   r   r%   r*   r   r   r   r   r   &   s   
 r   c                       s  e Zd ZdZeZeZdede	ddf fddZ
deddfd	d
Zdedeeeeejf  gdf ddfddZd"ddZeddddeefdeeeejf  dededeeeejf gejf dB dedejjdB dededdfddZedeeB ddfddZed d! Z   Z!S )#NCCLWeightTransferEnginez
    Weight transfer engine using NCCL for communication between trainer and workers.

    This implementation uses NCCL broadcast operations to transfer weights from
    the trainer (rank 0) to all inference workers in a process group.
    configparallel_configreturnNc                    s   t  || d| _dS )z
        Initialize the NCCL weight transfer engine.

        Args:
            config: The configuration for the weight transfer engine
            parallel_config: The configuration for the parallel setup
        N)super__init__model_update_group)r(   r.   r/   	__class__r   r   r2   U   s   

z!NCCLWeightTransferEngine.__init__	init_infoc                 C   sR   | j j}| j j}| j j}|| | }||j }t|j|j||jt	j
 | _dS )z
        Initialize NCCL process group with the trainer.

        Args:
            init_info: NCCL initialization info containing master address, port,
                      rank offset, and world size
        N)r/   data_parallel_rankr   rankr   r-   _stateless_init_process_groupr   r   torchcudacurrent_devicer3   )r(   r6   dp_rankworld_size_per_dprank_within_dpworker_rankr8   r   r   r   init_transfer_engineb   s   
z-NCCLWeightTransferEngine.init_transfer_engineupdate_infoload_weightsc           	         s   | j du r	td jr" fdd}t| | j d| j jd dS t j j j	D ]&\}}}t
t|}tj||dd}| j j|dtj d	 |||fg ~q+dS )
aF  
        Receive weights from trainer via NCCL broadcast and load them incrementally.

        If update_info.packed is True, uses packed tensor broadcasting for
        efficient transfer of multiple weights in batches. Otherwise, uses simple
        one-by-one broadcasting.

        Args:
            update_info: NCCL update info containing parameter names, dtypes, shapes,
                        and packed flag
            load_weights: Callable that loads weights into the model. Called
                         incrementally for each batch of weights to avoid OOM.
        NzHNCCL weight transfer not initialized. Call init_transfer_engine() first.c                  3   s<    t  j j jD ]\} }}tt|}| ||ffV  q
d S N)zipr    r!   r"   getattrr:   )name
dtype_nameshapedtyperB   r   r   state_dict_info_iterator   s   
zJNCCLWeightTransferEngine.receive_weights.<locals>.state_dict_info_iteratorr   )iteratorgroupsrcpost_unpack_funcbuffer_size_bytesnum_buffersr;   )rJ   devicerO   stream)r3   RuntimeErrorr#   r   r$   r%   rE   r    r!   r"   rF   r:   empty	broadcastr;   current_stream)	r(   rB   rC   rL   rG   rH   rI   rJ   weightr   rK   r   receive_weights   s2   



z(NCCLWeightTransferEngine.receive_weightsc                 C   s   | j d ur
d | _ d S d S rD   )r3   )r(   r   r   r   shutdown   s   

z!NCCLWeightTransferEngine.shutdownr   FrM   rN   rO   post_iter_funcr#   rU   r$   r%   c                 C   sh   |du rdd }|rddl m} || |||||d dS | D ]}	||	}
|j|
||p.tj d qdS )a  Broadcast weights from trainer to vLLM workers.

        Args:
            iterator: Iterator of model parameters. Returns (name, tensor) tuples
            group: Process group (PyNcclCommunicator)
            src: Source rank (default 0, trainer is typically rank 0)
            post_iter_func: Optional function to apply to each (name, tensor) pair
                           before broadcasting. If None, extracts just the tensor.
            packed: Whether to use packed tensor broadcasting for efficiency.
                   When True, multiple tensors are batched together before
                   broadcasting to reduce NCCL communication overhead.
            stream: CUDA stream to use for broadcasting if packed is False.
                    If packed is True, new streams will be created for each buffer.
            packed_buffer_size_bytes: Size in bytes for each packed tensor buffer.
                   Must match the value used in NCCLWeightTransferUpdateInfo.
            packed_num_buffers: Number of buffers for double/triple buffering.
                   Must match the value used in NCCLWeightTransferUpdateInfo.

        Example:
            >>> from vllm.distributed.weight_transfer.nccl_engine import (
            ...     NCCLWeightTransferEngine,
            ... )
            >>> param_iter = ((n, p) for n, p in model.named_parameters())
            >>> NCCLWeightTransferEngine.trainer_send_weights(
            ...     param_iter, group, packed=True
            ... )
        Nc                 S   s   | d S )N   r   )xr   r   r   <lambda>   s    z?NCCLWeightTransferEngine.trainer_send_weights.<locals>.<lambda>r   )packed_broadcast_producer)rM   rN   rO   r]   rQ   rR   rT   ).vllm.distributed.weight_transfer.packed_tensorra   rX   r:   r;   rY   )rM   rN   rO   r]   r#   rU   r$   r%   ra   itemtensorr   r   r   trainer_send_weights   s$   '

z-NCCLWeightTransferEngine.trainer_send_weightsr   c                 C   sN   t | tr| d }| d }| d }n	| j}| j}| j}t||d|tj	 S )a  
        Initialize NCCL process group for trainer-side weight transfer.

        The trainer is always rank 0 in the process group. Uses the current
        CUDA device (torch.cuda.current_device()).

        Args:
            init_info: Either an NCCLWeightTransferInitInfo object or a dict with keys:
                - master_address: str
                - master_port: int
                - world_size: int

        Returns:
            PyNcclCommunicator for weight transfer.

        Example:
            >>> from vllm.distributed.weight_transfer.nccl_engine import (
            ...     NCCLWeightTransferEngine,
            ... )
            >>> group = NCCLWeightTransferEngine.trainer_init(
            ...     dict(
            ...         master_address=master_address,
            ...         master_port=master_port,
            ...         world_size=world_size,
            ...     ),
            ... )
        r   r   r   r   )

isinstancedictr   r   r   r-   r9   r:   r;   r<   )r6   r   r   r   r   r   r   trainer_init   s   

z%NCCLWeightTransferEngine.trainer_initc           	      C   s:   ddl m} ddlm} |j| |||d}|||d}|S )aV  
        vLLM provides `StatelessProcessGroup` to create a process group
        without considering the global process group in torch.distributed.
        It is recommended to create `StatelessProcessGroup`, and then initialize
        the data-plane communication (NCCL) between external (train processes)
        and vLLM workers.
        r   r   )StatelessProcessGroup)hostportr8   r   )rS   ),vllm.distributed.device_communicators.pyncclr   vllm.distributed.utilsri   create)	r   r   r8   r   rS   r   ri   pgpyncclr   r   r   r9   )  s   z6NCCLWeightTransferEngine._stateless_init_process_group)r0   N)"r   r   r   r   r   init_info_clsr   update_info_clsr
   r	   r2   rA   r   r+   tupler   r:   Tensorr[   r\   staticmethodr   r   r   r   r   r,   r;   Streamre   rg   rh   r9   __classcell__r   r   r4   r   r-   G   sp    

6
	
@-r-   )r   collections.abcr   r   dataclassesr   typingr   r   r:   rl   r   vllm.config.parallelr	   vllm.config.weight_transferr
   %vllm.distributed.weight_transfer.baser   r   r   rb   r   r   r   r   r   r-   r   r   r   r   <module>   s$   	
 
