o
    پi\                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlm	Z	m
Z
 ddlmZmZmZ ddlZddlZddlZddlmZ ddlmZ ddlmZmZmZmZmZ ddlmZmZ dd	lm Z  dd
l!m"Z" ddl#m$Z$m%Z%m&Z& ddl'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z- ddl.m/Z/ ddl0m1Z1m2Z2m3Z3m4Z4m5Z5m6Z6 ddl7m8Z8 ddl9m:Z: ddl;m<Z<m=Z= e>e?Z@G dd de	ZAG dd dZBG dd dZCe fde&de%defddZDdS )HA controller that dispatches requests to multiple data parallel workers.    N)Enumauto)CallableListOptional)envs)compute_dp_attention_world_info)ActiveRanksOutputBlockReqInputTokenizedEmbeddingReqInputTokenizedGenerateReqInputWatchLoadUpdateReq)ReqRequestStage)run_scheduler_process)start_cpu_monitor_thread)!DP_ATTENTION_HANDSHAKE_PORT_DELTAPortArgs
ServerArgs)process_tracing_init trace_get_proc_propagate_context trace_set_proc_propagate_contexttrace_set_thread_infotrace_slice_endtrace_slice_start)
numa_utils)	bind_portconfigure_ipv6configure_loggerget_zmq_socketkill_itself_when_parent_diedmaybe_reindex_device_id)TorchMemorySaverAdapter)Watchdog)TypeBasedDispatcherget_exception_tracebackc                   @   s:   e Zd ZdZe Ze Ze Ze Ze	de
fddZdS )LoadBalanceMethodzLoad balance method.methodc              
   C   s<   |  }z| | W S  ty } ztd| |d }~ww )NzInvalid load balance method: )upperKeyError
ValueError)clsr(   exc r.   `/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/data_parallel_controller.pyfrom_strN   s   
zLoadBalanceMethod.from_strN)__name__
__module____qualname____doc__r   ROUND_ROBINFOLLOW_BOOTSTRAP_ROOMTOTAL_REQUESTSTOTAL_TOKENSclassmethodstrr0   r.   r.   r.   r/   r'   F   s    r'   c                   @   s6   e Zd ZdefddZdefddZdefdd	Zd
S )DPBudgetdp_sizec                 C   s"   || _ dg| | _dg| | _d S )Nr   )r<   total_requeststotal_tokens)selfr<   r.   r.   r/   __init__X   s   zDPBudget.__init__load_updatec                 C   s,   |j D ]}|j| j|j< |j| j|j< qdS )zUpdate the budget.N)loadsnum_reqsr=   dp_rank
num_tokensr>   )r?   rA   loadr.   r.   r/   update_budget]   s   
zDPBudget.update_budgetr(   c                    s^   |t jkr jt j}n|t jkr"tt j fddd}nd S  j|  d7  < |S )Nc                    s    j |   j|  fS N)r>   r=   )ir?   r.   r/   <lambda>j   s    z#DPBudget.dispatch.<locals>.<lambda>)key   )r'   r7   r=   indexminr8   ranger<   )r?   r(   target_rankr.   rJ   r/   dispatchc   s   


zDPBudget.dispatchN)	r1   r2   r3   intr@   r   rG   r'   rR   r.   r.   r.   r/   r;   W   s    r;   c                   @   s~  e Zd ZdZdedededdfddZd	d
 Zdd Z	dd Z
defddZdefddZdd Zdd Zdededededejf
ddZ	d8dedeee  dee fdd Zd!ed"edee dee fd#d$Zd!ed%edee fd&d'Zdedefd(d)Z	d8dedededee deee  f
d*d+Zdefd,d-Zdefd.d/Zdefd0d1Zdefd2d3Z defd4d5Z!d6d7 Z"dS )9DataParallelControllerr   server_args	port_argsrun_scheduler_process_funcreturnNc                 C   s.  || _ || _t|j| _|| _d| _td|j	 | _
|jdkr,t| j
tj|jd| _d| _tj| jtj| jtj| jtj| ji}|| j | _t|j	| _t | _g | _d g|j	 | _ dg|j	 | _!|j"rq| #|| |j$| _%n	| &|| d| _%| '  t(j)d|j*dt+j,- d| _.|j/rt0d d S d S )Nr   rM   FTrT   )
debug_namewatchdog_timeoutsofttest_stuck_timedata_parallel_controller)1rU   rV   r'   r0   load_balance_methodrW   global_balance_idzmqContextr<   context	node_rankr    PULLscheduler_input_ipc_namerecv_from_tokenizerround_robin_counterr5   round_robin_schedulerr6   follow_bootstrap_room_schedulerr7   total_requests_schedulerr8   total_tokens_schedulerdispatchingr;   	dp_budget	threadingLockenv_lockscheduler_procsworkersstatusenable_dp_attentionlaunch_dp_attention_schedulerstp_sizecontrol_message_steplaunch_dp_schedulersinit_dispatcherr$   createsoft_watchdog_timeoutr   SGLANG_TEST_STUCK_DP_CONTROLLERgetsoft_watchdogenable_metricsr   )r?   rU   rV   rW   dispatch_lookupr.   r.   r/   r@   w   sN   


zDataParallelController.__init__c                 C   s,   t | jD ]\}}| j| r|| qd S rH   )	enumeraterr   rs   
send_pyobj)r?   objrI   workerr.   r.   r/   send_to_all_workers   s
   

z*DataParallelController.send_to_all_workersc                 C   s&   | j d d | j D ]}|| q	d S rH   )rr   rw   r   )r?   r   r   r.   r.   r/   send_control_message   s   z+DataParallelController.send_control_messagec                 C   s   | j | d S rH   )rm   rG   )r?   r   r.   r.   r/   handle_load_update_req   s   z-DataParallelController.handle_load_update_reqranksc                 C   s   |j | _ d S rH   )rs   )r?   r   r.   r.   r/   update_active_ranks   s   z*DataParallelController.update_active_ranksreqc                 C   s\   | j jrt|j|j ttj|j t|j|_| 	| | j jr,t
tj|jdd d S d S )NT)thread_finish_flag)rU   enable_tracer   ridtrace_contextr   r   DC_DISPATCHr   rl   r   r?   r   r.   r.   r/   dispatching_with_trace   s   
z-DataParallelController.dispatching_with_tracec                 C   sD   t t| jft| jft| jft| jft| j	fg| _
| j
| j d S rH   )r%   r   r   r   r   r   r   r   r
   r   _request_dispatcheradd_fallback_fnr   rJ   r.   r.   r/   ry      s   	z&DataParallelController.init_dispatcherc                 C   s   d}g }g }g }t |jD ]O}t|}|j|_|j|_|t|j t	
 }	||	 t	j| j|||||	fd}
||
 ||j|j |j 7 }|jdkr\t| jtj|jd| j|< q|D ]}|  q_|D ]}
|
  qh|D ]}|  qqd S )Nr   targetargsT)rP   r<   r   init_newtokenizer_ipc_namedetokenizer_ipc_nameappendr   	nccl_portrn   EventThread#launch_tensor_parallel_group_threadrv   pp_sizegpu_id_steprc   r    rb   r`   PUSHre   rr   closestartwait)r?   rU   rV   base_gpu_idthreadssocketsready_eventsrD   tmp_port_argsready_eventthreadsockeventr.   r.   r/   rx      sD   







z+DataParallelController.launch_dp_schedulersr   rD   r   c                 C   s&   |  |||| |  	 td q)NTi ' )launch_tensor_parallel_groupsettimesleep)r?   rU   rV   r   rD   r   r.   r.   r/   r     s
   
z:DataParallelController.launch_tensor_parallel_group_threadworker_portsc                 C   s   |j du rd|jt  }n.|j dr(t|j \}}d| dt|t  }n|j d\}}d| dt|t  }|jdkrK| ||j	d |S | 
||jS )a  Broadcast worker ports from node 0 to all other nodes.

        Node 0 acts as the server, waiting for all other nodes to connect and
        sending them the pre-allocated worker ports. Other nodes act as clients,
        connecting to node 0 to receive their copy of the worker ports.

        Args:
            server_args: Server arguments containing node configuration.
            worker_ports: Pre-allocated worker ports to broadcast.

        Returns:
            List of worker ports (same on all nodes after broadcast).
        Nztcp://127.0.0.1:[ztcp://:r   rM   )dist_init_addrportr   
startswithr   rS   splitrc   _broadcast_ports_as_servernnodes_receive_ports_as_client)r?   rU   r   endpointr   hostr.   r.   r/   _broadcast_worker_ports  s   

z.DataParallelController._broadcast_worker_portsr   expected_clientsc                 C   s   t d| d t d|  t| jtj|d}z9d}||k rH|  }t d|  || |d7 }t d| d	| d
 ||k s!t d |W |	  S |	  w )z+Broadcast worker ports to all client nodes.zBroadcasting worker ports to z client nodeszWorker ports: Tr   zReceived handshake from node rM   zSent worker ports to /z nodeszWorker port broadcast completed)
loggerdebugr    rb   r`   REPrecvdecoder   r   )r?   r   r   r   
rep_socketconnected_clientsclient_rankr.   r.   r/   r   ?  s"   

z1DataParallelController._broadcast_ports_as_serverrc   c                 C   s   t d t| jtj|d}|tjd |tjd z2z |	t
|  | }t dt| d |W W |  S  tjyN   t d tdw |  w )z*Receive worker ports from the server node.z,Connecting to node 0 to receive worker portsFi'	 z	Received z worker ports from node 0z,Timeout waiting for worker ports from node 0z9Failed to receive worker ports from node 0 within timeout)r   r   r    rb   r`   REQ
setsockoptRCVTIMEOSNDTIMEOsendr:   encode
recv_pyobjlenr   AgainerrorRuntimeError)r?   r   rc   
req_socketr   r.   r.   r/   r   [  s"   



z/DataParallelController._receive_ports_as_clientc                 C   s   g }|j dkr1t|jD ]$}t| jtj}||d  |d | j|< t	
d|d  d|  q| ||r8|nd }| ||dd | d S )Nr   rM   zAssigned port z to worker )rc   rP   r<   r    rb   r`   r   r   rr   r   r   r   r   )r?   rU   rV   r   rD   port_and_socketbroadcasted_portsr.   r.   r/   ru   s  s   

z5DataParallelController.launch_dp_attention_schedulersc                 C   s  |j std| d| d tj|jd}g }t|j|j d}t|j|j d}	t	||j
|	  ||j
|	 d  }
|	}|j| }t	||j
|  ||j
| d  }d}d}|
D ]}|D ]}|}|j r~t|j ||j|j|j\}}}t|||}|j|_tjdd\}}|j| || |  || |j  }|j r|jnd}|j| |j }|| |j }||j|j  }||j|j  |j|j |j  }| j^ t|I}tj| j||||||||||f
d	}| " t|| |  W d    n1 sw   Y  W d    n	1 sw   Y  W d    n	1 sw   Y  W d    n	1 s-w   Y  | j | | | q]qYg }t	t!|D ]}| || "  qG|d d
 | _#|d d | _$d S )Nz	Launch DPz starting at GPU #.)enablerM   r   F)duplexr   max_total_num_tokensmax_req_input_len)%rt   r   infor#   rz   enable_memory_savermaxr   r   rP   rc   rv   r	   r<   attn_cp_sizer   r   r   mpPiper   r   moe_dp_sizeep_sizerp   r"   ProcessrW   configure_subprocessr   r   rq   r   r   r   r   r   )r?   rU   rV   r   rD   r   memory_saver_adapterscheduler_pipe_readerspp_size_per_nodennodes_per_pp_rankpp_rank_rangennodes_per_tp_grouptp_size_per_nodetp_rank_rangeattn_cp_rankmoe_dp_rankpp_ranktp_rankrank_port_args_readerwritergpu_idattn_dp_sizeattn_tp_sizemoe_ep_rankprocscheduler_inforI   r.   r.   r/   r     s   






  Kz3DataParallelController.launch_tensor_parallel_groupc                 C   s6   |j d urtd|j   | j|j  | dS dS )NzDirect routing to DP rank TF)data_parallel_rankr   r   rr   r   r   r.   r.   r/   maybe_external_dp_rank_routing  s
   
z5DataParallelController.maybe_external_dp_rank_routingc                 C   sr   |  |rd S 	 | j| j r-td| j  | j| j | | jd t| j | _d S | jd t| j | _q)NTzChoose worker rM   )r   rs   rg   r   r   rr   r   r   r   r.   r.   r/   rh     s   


z,DataParallelController.round_robin_schedulerc                 C   sx   |  |rd S |jd u r!| jjdkr!| j|_| jd t| j | _|jd us*J d|jt| j }| j| | d S )NfakerM   zreq.bootstrap_room should not be None. Do not send requests directly to prefill or decode instances; send to the router instead.)r   bootstrap_roomrU   disaggregation_transfer_backendrg   r   rr   r   )r?   r   rQ   r.   r.   r/   ri     s   


z6DataParallelController.follow_bootstrap_room_schedulerc                 C   0   |  |rd S | jtj}| j| | d S rH   )r   rm   rR   r'   r7   rr   r   r?   r   target_workerr.   r.   r/   rj   &     
z/DataParallelController.total_requests_schedulerc                 C   r  rH   )r   rm   rR   r'   r8   rr   r   r  r.   r.   r/   rk   ,  r  z-DataParallelController.total_tokens_schedulerc                 C   sD   	 	 | j   z	| jtj}W n
 tjy   Y nw | | qqrH   )r~   feedrf   r   r`   NOBLOCKZMQErrorr   )r?   recv_reqr.   r.   r/   
event_loop2  s   

z!DataParallelController.event_looprH   )#r1   r2   r3   r4   r   r   r   r@   r   r   r   r
   r   r   r   ry   rx   rS   rn   r   r   r   r   r   r:   r   r   ru   r   r   rh   ri   rj   rk   r	  r.   r.   r.   r/   rT   t   s    
@.


#



trT   rU   rV   rW   c           	      C   s  t  d t  t  t  }t|  | jr4t	| j
d d}| jdkr)d}n| jdkr0d}t| z4t| ||}|d|j|jd	 | jd
krO|  |jD ]}|  td|j d|j  qRW d S  ty   t }td|  |tj Y d S w )Nz sglang::data_parallel_controllersglangzDP ControllerprefillzPrefill DP Controllerr   zDecode DP Controllerready)rs   r   r   r   z$Scheduler or DataParallelController z terminated with z)DataParallelController hit an exception: )setproctitlefaulthandlerr   r!   psutilr   parentr   r   r   otlp_traces_endpointdisaggregation_moder   rT   r   r   r   rc   r	  rq   joinr   r   pidexitcode	Exceptionr&   send_signalsignalSIGQUIT)	rU   rV   pipe_writerrW   parent_processthread_label
controllerr   	tracebackr.   r.   r/   $run_data_parallel_controller_process=  sH   




r  )Er4   r  loggingmultiprocessingr   r  rn   r   enumr   r   typingr   r   r   r  r  r`   sglang.srt.environr   sglang.srt.layers.dp_attentionr	   sglang.srt.managers.io_structr
   r   r   r   r   "sglang.srt.managers.schedule_batchr   r   sglang.srt.managers.schedulerr   sglang.srt.metrics.cpu_monitorr   sglang.srt.server_argsr   r   r   sglang.srt.tracing.tracer   r   r   r   r   r   sglang.srt.utilsr   sglang.srt.utils.commonr   r   r   r    r!   r"   +sglang.srt.utils.torch_memory_saver_adapterr#   sglang.srt.utils.watchdogr$   sglang.utilsr%   r&   	getLoggerr1   r   r'   r;   rT   r  r.   r.   r.   r/   <module>   sP     
   P