o
    پiU                     @  s  d dl mZ 	 d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZmZmZmZ d dlZd dlZd dlZd dlmZmZ d dlmZ d dlmZmZmZmZmZmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z%m&Z& d dl'm(Z(m)Z) d dl*m+Z+ erd dl,m-Z- e.e/Z0G dd dZ1	d>d?ddZ2dd Z3G dd  d Z4G d!d" d"Z5G d#d$ d$e#Z6d%d& Z7d@d'd(Z8dAd+d,Z9dBd-d.Z:dCd5d6Z;dDdEd:d;Z<G d<d= d=Z=dS )F    )annotationsN)partialmethod)shared_memory)TYPE_CHECKINGAnyDictUnion)DisaggregationModeTransferBackend)start_disagg_service)BaseBatchReqBaseReqBatchEmbeddingOutputBatchMultimodalOutputBatchStrOutputBatchTokenIDOutput)_Communicator)TokenizerManager)PortArgs
ServerArgs)get_zmq_socketkill_process_tree)get_exception_traceback)DetokenizerManagerc                   @  s0   e Zd Zdd Zdd Zdd	d
ZdddZdS )SocketMappingc                 C  s   t  | _i | _d S N)zmqContext_zmq_context_mappingself r"   ]/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/managers/multi_tokenizer_mixin.py__init__<   s   

zSocketMapping.__init__c                 C  s&   | j  D ]}|  q| j   d S r   )r   valuescloseclear)r!   socketr"   r"   r#   clear_all_sockets@   s   
zSocketMapping.clear_all_socketsipc_namestris_tokenizerboolc                 C  sh   |rdnd}|| j v rt| d|d d S td| d|d t| jtj|d}|| j |< d S )	N	tokenizerdetokenizerz already registered ipc_name=, skipping...zRegistering z
 ipc_name=z in SocketMapping...F)r   loggerwarninginfor   r   r   PUSH)r!   r*   r,   type_strr(   r"   r"   r#   _register_ipc_mappingE   s   
z#SocketMapping._register_ipc_mappingoutputr   c                 C  sN   |d u rt dt| d d S || jvr| j|dd | j| | d S )NzIPC name is None, output type=r0   F)r,   )r1   r2   typer   r6   
send_pyobj)r!   r*   r7   r"   r"   r#   send_outputN   s   
zSocketMapping.send_outputN)r*   r+   r,   r-   )r*   r+   r7   r   )__name__
__module____qualname__r$   r)   r6   r:   r"   r"   r"   r#   r   ;   s
    
	r   Tr7   r   
field_namer+   indexintcheck_lengthr-   returnc                 C  sz   t | |d}|du rdS t|tr.i }| D ]\}}t||kr%d||< || ||< q|S |r8t||kr8dS || gS )a  Extract a field value from output by index, handling None and length checks.

    Args:
        output: The output object containing the field
        field_name: The name of the field to extract
        index: The index to access in the field list
        check_length: If True, check both field existence and length. If False, only check field existence.

    Returns:
        A list containing the field value at index, or None if not available.
    N)getattr
isinstancedictitemslen)r7   r>   r?   rA   field	new_fieldkvr"   r"   r#   _extract_field_by_indexY   s   

rL   c              
   C  s@  t | tr-td2i d| j| gdt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|d	t| d	|d
t| d
|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dddt| d|dddt| d|dddt| d|dddt| d|dddt| d|dddt| d|dddt| d|ddd t| d |ddd!t| d!|ddd"t| d"|ddd#t| d#|ddd$t| d$|ddd%t| d%|ddd&d'd(d'd)t| d)|dd}|S t | trTt| j| gt| d
|t| d*|t| d|t| d|d'd'd+}|S t | trptd2i d| j| gdt| d|dt| d|dt| d|dt| d|dt| d|dt| d|dt| d|d	t| d	|d
t| d
|d,t| d,|dt| d|dt| d|dt| d|dt| d|dt| d|dddt| d|dddt| d|dddt| d|dddt| d|dddt| d|dddt| d|dddt| d|ddd t| d |ddd!t| d!|ddd"t| d"|ddd#t| d#|ddd$t| d$|ddd%t| d%|ddd-t| d-|ddd.t| d.|ddd&d'd(d'd/t| d/|d)t| d)|dd}|S t | trt| j| gt| d
|t| d0|t| d|t| d|t| d|d'd'd1}|S | }|S )3z+NOTE: A maintainable method is better here.ridsspec_verify_ctspec_accepted_tokensspec_acceptance_histogram
queue_timeforward_entry_timeprefill_launch_delayprefill_launch_latencyprefill_finished_tsfinished_reasonsdecoded_texts
decode_idsread_offsets
output_idsskip_special_tokensspaces_between_special_tokensno_stop_trimprompt_tokenscompletion_tokenscached_tokenscached_tokens_detailsinput_token_logprobs_valF)rA   input_token_logprobs_idxoutput_token_logprobs_valoutput_token_logprobs_idxinput_top_logprobs_valinput_top_logprobs_idxoutput_top_logprobs_valoutput_top_logprobs_idxinput_token_ids_logprobs_valinput_token_ids_logprobs_idxoutput_token_ids_logprobs_valoutput_token_ids_logprobs_idxoutput_token_entropy_valoutput_hidden_statesplaceholder_tokens_idxNplaceholder_tokens_valtoken_steps
embeddings)rM   rV   rs   r^   r`   rp   rq   output_strsrouted_expertscustomized_inforetraction_countsoutputs)rM   rV   rx   r^   r_   r`   rp   rq   r"   )rD   r   rM   rL   r   r   r   )r7   i
new_outputr"   r"   r#   _handle_output_by_indexz   s  	
 !"#&),/258;>ADGJMPQR @




h	
 #&),/258;>ADGJKLM^





r{   c                   @  s$   e Zd ZdZd	ddZd	ddZdS )
MultiHttpWorkerDetokenizerMixinz"Mixin class for DetokenizerManagerr!   r   c                 C  s   t | dr| j  d S d S )Nsocket_mapping)hasattrr}   r)   r    r"   r"   r#   maybe_clear_socket_mappingB  s   
z:MultiHttpWorkerDetokenizerMixin.maybe_clear_socket_mappingc                 C  sh   t  | _	 | j }| |}|du rqt|tsJ dt|jD ]\}}t	||}| j
|| q"q)zFThe event loop that handles requests, for multi multi-http-worker modeTNz4for multi-http-worker, recv_obj must be BaseBatchReq)r   r}   recv_from_scheduler
recv_pyobj_request_dispatcherrD   r   	enumeratehttp_worker_ipcsr{   r:   )r!   recv_objr7   ry   r*   rz   r"   r"   r#   multi_http_worker_event_loopF  s   


z<MultiHttpWorkerDetokenizerMixin.multi_http_worker_event_loopN)r!   r   )r;   r<   r=   __doc__r   r   r"   r"   r"   r#   r|   ?  s    
r|   c                   @  s:   e Zd ZdZdddZdd	 Zd
d Zdd Zdd ZdS )MultiTokenizerRouterz1A router to receive requests from TokenizerWorkerserver_argsr   	port_argsr   c                 C  s   || _ tjd}t|tj|jd| _t|tj|j	d| _
t|tj|jd| _t | _tj| jdd| _| j  t|  | j| _tt| j| j| _t| j | _d S )N   T)targetdaemon)r   r   asyncior   r   PULLtokenizer_ipc_namerecv_from_detokenizerr4   scheduler_input_ipc_namesend_to_schedulertokenizer_worker_ipc_namereceive_from_workernew_event_loop_loop	threadingThread	_run_loop_threadstartrun_coroutine_threadsaferouter_worker_obj_taskprint_exception_wrapperhandle_loop_handle_taskr   disaggregation_bootstrap_server)r!   r   r   contextr"   r"   r#   r$   \  s*   


zMultiTokenizerRouter.__init__c                 C  s   | j   d S r   )r   run_foreverr    r"   r"   r#   r   x  s   zMultiTokenizerRouter._run_loopc                   s(   	 | j  I d H }| j|I d H  qr   )r   r   r   r9   r!   r   r"   r"   r#   r   {  s
   z&MultiTokenizerRouter.router_worker_objc                   s.   t  | _	 | j I d H }| |I d H  qr   )r   r}   r   r   _distribute_result_to_workersr   r"   r"   r#   r     s   z MultiTokenizerRouter.handle_loopc                   sh   t |tr|jg}nt |tr|j}n	tdt| t|D ]\}}t||}| j	
|| q!d S )NzUnknown recv_obj type: )rD   r   http_worker_ipcr   r   
ValueErrorr8   r   r{   r}   r:   )r!   r   	ipc_namesry   r*   new_recv_objr"   r"   r#   r     s   



z2MultiTokenizerRouter._distribute_result_to_workersNr   r   r   r   )	r;   r<   r=   r   r$   r   r   r   r   r"   r"   r"   r#   r   Y  s    
r   c                      s,   e Zd ZdZd fddZdd
dZ  ZS )TokenizerWorkerz*Tokenizer Worker in multi-http-worker moder   r   r   r   c                   sv   t  dt   |j}d|_t || t | _|j| _|| j_t	| jj| _t
| jj| _t| jd| _d S )Nzsglang::tokenizer_worker:null   )setproctitleosgetpiddisaggregation_modesuperr$   	worker_idr   r   r	   r
   disaggregation_transfer_backendr   r   %register_multi_tokenizer_communicator)r!   r   r   r   	__class__r"   r#   r$     s    

zTokenizerWorker.__init__reqUnion[BaseReq, BaseBatchReq]c                 C  sJ   t |tr| j|_d S t |tr| jgt|j |_d S tdt	| )NzUnknown req type: )
rD   r   r   r   r   rG   rM   r   r   r8   )r!   r   r"   r"   r#   _attach_multi_http_worker_info  s
   

z.TokenizerWorker._attach_multi_http_worker_infor   )r   r   )r;   r<   r=   r   r$   r   __classcell__r"   r"   r   r#   r     s    r   c                   sz   z	|  I dH  W dS  t y<   t }td|  t| dr,t| jtr,| j  t	t
 dd td Y dS w )zt
    Sometimes an asyncio function does not print exception.
    We do another wrapper to handle the exception.
    Nz'MultiTokenizerRouter hit an exception: __self__T)include_parent   )	Exceptionr   r1   errorr~   rD   r   r   dump_requests_before_crashr   r   r   sysexit)func	tracebackr"   r"   r#   r     s   
r   c                   C  s
   t  jS )zGet the main process ID)multiprocessingcurrent_process_parent_pidr"   r"   r"   r#   get_main_process_id  s   
r   nameshared_memory.SharedMemoryc                 C  s   t | }t|}ztj|d}|j|k r%|  |  tjd||d}W n ty7   tjd||d}Y nw ||j	d|< |S )zWrite data to shared memoryr   T)createsizer   N)
pickledumpsrG   r   SharedMemoryr   r&   unlinkFileNotFoundErrorbuf)objr   
serializedr   shmr"   r"   r#   write_to_shared_memory  s   

r   c                 C  sJ   zt j| d}tt|j}|  |W S  ty$   td|  dw )zRead data from shared memoryr   zShared memory z
 not found)r   r   r   loadsbytesr   r&   r   )r   r   datar"   r"   r#   read_from_shared_memory  s   r   r   r   r   r   scheduler_infor   c                 C  sJ   t  }t }td| d|  | ||f}t|d| }|  |S )z:Write args information to share memory for multi-tokenizerzmain process ID: z, current process ID: multi_tokenizer_args_)r   r   r   r1   r3   r   r&   )r   r   r   main_pidcurrent_pidargsargs_shmr"   r"   r#   write_data_for_multi_tokenizer  s   
r   
   timeoutfloatc                 C  sB   zddl m} t|j| d|_W dS  ty    td Y dS w )z5Monkey patch uvicorn multiprocessing is_alive timeoutr   )Process)r   zAuvicorn.supervisors.multiprocess not found, skipping monkey patchN) uvicorn.supervisors.multiprocessr   r   is_aliveImportErrorr1   r2   )r   r   r"   r"   r#   $monkey_patch_uvicorn_multiprocessing  s   
r   c                   @  s   e Zd Zd
ddZdd Zd	S )SenderWrapperr   r   r   
zmq.Socketc                 C  s   || _ || _d S r   )r   r   )r!   r   r   r"   r"   r#   r$     s   
zSenderWrapper.__init__c                 C  s$   t |tr
| jj|_| j| d S r   )rD   r   r   r   r   r   r9   )r!   r   r"   r"   r#   r9     s   

zSenderWrapper.send_pyobjN)r   r   r   r   )r;   r<   r=   r$   r9   r"   r"   r"   r#   r     s    
r   )T)
r7   r   r>   r+   r?   r@   rA   r-   rB   r   )rB   r@   )r   r+   rB   r   )r   r+   rB   r   )r   r   r   r   r   r   )r   )r   r   )>
__future__r   r   loggingr   r   r   r   r   	functoolsr   r   typingr   r   r   r   r   r   zmq.asynciosglang.srt.disaggregation.utilsr	   r
   "sglang.srt.managers.disagg_servicer   sglang.srt.managers.io_structr   r   r   r   r   r   0sglang.srt.managers.tokenizer_communicator_mixinr   %sglang.srt.managers.tokenizer_managerr   sglang.srt.server_argsr   r   sglang.srt.utilsr   r   sglang.utilsr   'sglang.srt.managers.detokenizer_managerr   	getLoggerr;   r1   r   rL   r{   r|   r   r   r   r   r   r   r   r   r   r"   r"   r"   r#   <module>   sR     
! F<(



