o
    
۾i#                     @   s   d Z ddlZddlmZmZ ddlmZ ddlZdZdZ	ee	fdee
eejf  ded	ed
ee
eejf gejf dededdfddZee	fdee
ee
ee ejf f  ded	edeee
eejf  gdf dededdfddZdS )z6Packed tensor utilities for efficient weight transfer.    N)CallableIterator)Anyi   @   iteratorgroupsrcpost_iter_funcbuffer_size_bytesnum_buffersreturnc              	   C   s  |}dd t |D }d}dd t |D }	dd t |D }
dd t |D }	 ||   tj||  zMg |	|< d|
|< 	 |t|  tjd}|	| 	| |
|  |
 7  < |
| |krinqBtj|	| dd	||< |j|| |d
 |d | }W n, ty   t|	| dkrtj|	| dd	||< |j|| |d
 Y W d   dS w W d   n1 sw   Y  q))a  Broadcast tensors in a packed manner from trainer to workers.

    Args:
        iterator: Iterator of model parameters. Returns a tuple of (name, tensor)
        group: Process group (PyNcclCommunicator)
        src: Source rank (0 in current implementation)
        post_iter_func: Function to apply to each (name, tensor) pair before
                       packing, should return a tensor
        buffer_size_bytes: Size in bytes for each packed tensor buffer.
                          Both producer and consumer must use the same value.
        num_buffers: Number of buffers for double/triple buffering.
                    Both producer and consumer must use the same value.

    c                 S      g | ]}t j qS  torchcudaStream.0_r   r   b/home/ubuntu/.local/lib/python3.10/site-packages/vllm/distributed/weight_transfer/packed_tensor.py
<listcomp>)       z-packed_broadcast_producer.<locals>.<listcomp>r   c                 S      g | ]}g qS r   r   r   r   r   r   r   ,       c                 S      g | ]}d qS r   r   r   r   r   r   r   -   r   c                 S      g | ]}t jd t jddqS r   r   dtypedevicer   emptyuint8r   r   r   r   r   .       T)dimr      N)rangesynchronizer   r   streamnext
contiguousviewr$   appendnumelcat	broadcastStopIterationlen)r   r   r   r	   r
   r   target_packed_tensor_sizestreams
buffer_idxpacking_tensor_listpacking_tensor_sizespacked_tensorstensorr   r   r   packed_broadcast_producer   sT   


r=   post_unpack_funcc                 C   sX  dt jdtt dttt  dtt j dtt dtttt jf  fdd}|}d	d
 t|D }d}	dd
 t|D }
dd
 t|D }dd
 t|D }	 ||	   t j	
||	  g |
|	< d||	< zh	 t| \}\}}t||j }|
|	 ||||f ||	  |7  < ||	 |krnqft j||	 t jdd||	< |j||	 |d t|
|	  \}}}}||||	 t|t|t|t| |	d | }	W nN ty   t|
|	 dkrt j||	 t jdd||	< |j||	 |d t|
|	  \}}}}||||	 t|t|t|t| Y W d   dS w W d   n	1 s&w   Y  qM)a  Consume packed tensors and unpack them into a list of tensors.

    Args:
        iterator: Iterator of parameter metadata. Returns (name, (shape, dtype))
        group: Process group (PyNcclCommunicator)
        src: Source rank (0 in current implementation)
        post_unpack_func: Function to apply to each list of (name, tensor) after
                         unpacking
        buffer_size_bytes: Size in bytes for each packed tensor buffer.
                          Both producer and consumer must use the same value.
        num_buffers: Number of buffers for double/triple buffering.
                    Both producer and consumer must use the same value.

    packed_tensornamesshapesdtypestensor_sizesr   c                 S   s&   |  |}dd t||||D }|S )ay  Unpack a single tensor into a list of tensors.

        Args:
            packed_tensor: The packed torch.uint8 tensor to unpack
            names: List of tensor names
            shapes: List of tensor shapes
            dtypes: List of tensor dtypes
            tensor_sizes: List of tensor sizes in bytes

        Returns:
            unpacked List[(name, tensor)]
        c                 S   s,   g | ]\}}}}||  |j| fqS r   )r.   r/   )r   nameshaper    r<   r   r   r   r      s    
zDpacked_broadcast_consumer.<locals>.unpack_tensor.<locals>.<listcomp>)splitzip)r?   r@   rA   rB   rC   unpacked_tensorsunpacked_listr   r   r   unpack_tensorp   s   
z0packed_broadcast_consumer.<locals>.unpack_tensorc                 S   r   r   r   r   r   r   r   r      r   z-packed_broadcast_consumer.<locals>.<listcomp>r   c                 S   r   r   r   r   r   r   r   r      s    c                 S   r   r   r   r   r   r   r   r      r   c                 S   r   r   r"   r   r   r   r   r      r%   Tr   r   r(   r)   N)r   Tensorliststrintr    tupler*   r+   r   r,   r-   mathproditemsizer0   r#   r$   r3   rG   r4   r5   )r   r   r   r>   r
   r   rJ   r6   r7   r8   packing_tensor_meta_datar:   r;   rD   rE   r    tensor_sizer@   rA   rB   rC   r   r   r   packed_broadcast_consumerY   s   






	#rU   )__doc__rP   collections.abcr   r   typingr   r    DEFAULT_PACKED_BUFFER_SIZE_BYTESDEFAULT_PACKED_NUM_BUFFERSrO   rM   rK   rN   r=   rL   r    rU   r   r   r   r   <module>   sR   
M