o
    پic)                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
 d dlZd dlmZ d dlmZmZ d dlm  m  m  mZ d dlmZ eeZe ZG dd deZd	ejfd
dZdd Zdde de fddZ!G dd dZ"dS )    N)contextmanager)IntEnum)OptionalUnion)ProcessGroupReduceOp)is_hipc                   @   s   e Zd ZdZdZdS )MscclContextSelection      N)__name__
__module____qualname__MSCCL1SHOT1NODELLMSCCL1SHOT2NODELL r   r   i/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/distributed/device_communicators/pymscclpp.pyr	      s    r	   inpc                 C   s4   |   p|   |  |    |  |   kS N)is_contiguousstoragenbytesstorage_offsetelement_sizenumel)r   r   r   r   mscclpp_is_weak_contiguous   s
   r   c                 C   s   |    } | stdtt| D ]}| |  s"| | dkr" nq| d| }| |d   }zt|}W n tyF   td|  dw |dkrOt|S |dkrYt|d S |d	kret|d d S |d
krst|d d d S td| d)a  
    Converts a human-readable size string (e.g., "1MB", "2.5kb", "3 GB")
    into the equivalent number of bytes using binary units.

    Args:
        size_str (str): A string representing size with unit (KB, MB, GB).

    Returns:
        int: Number of bytes.
    zEmpty input string.NzInvalid numeric value in ''bkb   mbgbzUnsupported unit: z, support B, KB, MB, GB only)striplower
ValueErrorrangelenisdigitfloatint)size_strinum_strunitnumr   r   r   mscclpp_convert_to_bytes!   s.   r0   
   r   
test_niterwarmup_niterc                 C   s   t |D ]}|   qtjjdd}tjjdd}tj  t  |  t |D ]}|   q)|  |  ||| d }|S )NT)enable_timingi  )	r&   torchcudaEventsynchronizedistbarrierrecordelapsed_time)funcr2   r3   _start_event	end_eventfunc_cost_usr   r   r   mscclpp_bench_timeJ   s   
rB   c                   @   s   e Zd ZddgZeeddZej	ej
ejgZefdedeeeejf ddfd	d
ZejfdefddZejfdejdedefddZejfdejdefddZe	ddee fddZdS )PyMscclppCommunicator      SGLANG_MSCCLPP_MAX_BYTES1MBgroupdevicereturnNc           	      C   s  d| _ d| _tjsdS || _t|tjjksJ dtj	| jd}tj
| jd}|dkr/dS |tjvr@td|ttj dS tj|| _tj | _t| jd | jd	  |d kshtd
t| j dS t|trvtd| }n
t|trt|}t|tjsJ || _|| _|| _|| _t	|d	krt g}ndg}tj|| jd	 | jd |d	 | _ t!t"|t!t"|| _#| _$t"|D ]}|d | j#|< | jd | j$|< qd| _%d| _&dd t"dt'(t')| jd D | _*i | _+|dkrt,j-| _&n	|dkr
t,j.| _&t/sFtj0| jd tj1| jd| _2tj0| jd | j tj1| jd| _3t4| j | j| j| j2| j3| j| j#| j$t| j&	| _%nt5di | _+| 6  t	|d	kr^| j+g}ndg}tj|| jd	 | jd |d	 | _+d| _dS )a  
        Args:
            group: the process group to work on. If None, it will use the
                default process group.
            device: the device to bind the CustomAllreduce to. If None,
                it will be bind to f"cuda:{local_rank}".
        It is the caller's responsibility to make sure each communicator
        is bind to a unique device, and all communicators in this group
        are in the same node.
        FTNz7CustomAllreduce should be attached to a non-NCCL group.)rH   r
   zPyMscclpp is disabled due to an unsupported world size: %d. Supported world sizes: %s. To silence this warning, specify disable_mscclpp=True explicitly.r   zPyMscclpp is disabled due to an unsupported group %s.Please ensure all ranks in the group are consecutive.To silence this warning, specify disable_mscclpp=True explicitly.zcuda:)srcrH   rD   c                 S   s   g | ]}d | qS )r   r   ).0r,   r   r   r   
<listcomp>   s    z2PyMscclppCommunicator.__init__.<locals>.<listcomp>r1   rE   dtyperI   z!HIP Mscclpp is not supported yet.)7_IS_CAPTURINGdisabledopsIS_MSCCLPP_AR_AVAILABLErH   r9   get_backendBackendNCCLget_rankget_world_sizerC   _SUPPORTED_WORLD_SIZESloggerwarningstrr5   distributedget_process_group_ranksranksr6   device_countnranks_per_nodeabs
isinstancer*   rI   	max_bytesrank
world_sizemscclpp_generate_unique_idbroadcast_object_list	unique_idlistr&   rank_to_node
rank_to_ib_contextcontext_selectionmathfloorlog2msg_size_for_finetunemsg_size2best_configr	   r   r   _is_hipemptyuint8scratch
put_buffermscclpp_init_contextNotImplementedErrorpre_tune_config)	selfrH   rI   re   rf   rg   rj   rrt   r   r   r   __init__b   s   
 









zPyMscclppCommunicator.__init__c           
   
      s   t dj  g d}g d}tjjd |j |dd}t|}jD ]V}|d ||j  |d ||j   d\}}|D ]!|D ]t fdd	}	|d u s\|	|k rbf}|	}qFqB|j	|< jd
kr}t d| d| d| d q'd S )Nz#start to pre-tune configs for rank )   i   r    )   *   T   rK   r6   rO   )NNc                      s   t j S r   )rS   mscclpp_allreducern   r   mock_inp	mock_outpnblocksnthreadsr}   r   r   <lambda>   s    z7PyMscclppCommunicator.pre_tune_config.<locals>.<lambda>r   zfor msg_size z, best_config: z, best_time: us)
r[   debugrf   r5   onesrs   itemsize
empty_likerB   rt   )
r}   rP   nthreads_to_trynblocks_to_try	inp_randn	oup_randnmsg_sizebest_config	best_timecur_costr   r   r   r|      s:   


	
z%PyMscclppCommunicator.pre_tune_configr   opc                 C   s\   | j s| jd u r
dS |jtjvrdS t|sdS |tjkrdS | |	  | j
kr,dS dS )NFT)rR   rn   rP   rC   _SUPPORTED_DTYPEr   r   SUMr   r   re   )r}   r   r   r   r   r   should_mscclpp_allreduce  s   
z.PyMscclppCommunicator.should_mscclpp_allreducetensorc           	      C   s|   | j rtj r| j|j| f | |j }t	
| j|}| j| }| j| \}}t|}t| j|||| |S r   )rQ   r5   r6   is_current_stream_capturinggraph_input_setaddrP   r   r   bisectbisect_leftrs   rt   r   rS   r   rn   )	r}   r   r   r   indexmsg_size_finetuner   r   resultr   r   r   
all_reduce  s   


z PyMscclppCommunicator.all_reduceenablec                 c   s.    |d u r| j }| j}| | _d V  || _d S r   )	availablerR   )r}   r   old_disabler   r   r   change_state   s   
z"PyMscclppCommunicator.change_stater   )r   r   r   rZ   r0   osgetenv
_MAX_BYTESr5   r)   float16bfloat16r   r   r   r*   r]   rI   r   boolr|   r   r   Tensorr   r   r   r   r   r   r   r   r   rC   [   s8    
 
rC   )r1   r   )#r   loggingrp   r   
contextlibr   enumr   typingr   r   r5   torch.distributedr^   r9   r   r   Asglang.srt.distributed.device_communicators.custom_all_reduce_opssrtdevice_communicatorscustom_all_reduce_opsrS   sglang.srt.utilsr   	getLoggerr   r[   ru   r	   r   r   r0   r*   rB   rC   r   r   r   r   <module>   s&    
)