o
    پiLe                     @   s~  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZ d dlmZ d dlmZmZmZ d dlZd dlZd dlZd dlZd dlmZ d dlmZmZ d dlmZ d d	lmZ d d
lm Z m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d dl)m*Z* e+e,Z-erd dl.m/Z/ G dd dZ0G dd deZ1G dd dZ2dd Z3G dd deZ4G dd de4Z5dS )    N)ABCabstractmethod)IntEnum)
HTTPStatus)TYPE_CHECKINGListOptional)PretrainedConfig)GroupCoordinatorget_mooncake_transfer_engine)envs)TokenizedGenerateReqInput)get_mm_processorimport_processors)Req)
ServerArgs)get_local_ip_autoget_zmq_socket_on_host)get_processor)	Schedulerc                   @   sR   e Zd Z			dddZdd ZdddZd	d
 Zedd Zdd Z	dd Z
dS )EmbeddingDataNc                    s   |_ |_|_|_ _d _ d ur jnd _ d ur%t jnd _fddt	jD _
 fddt	jD _fddt	jD _|_|_d S )Nc                    s   g | ]}| j kqS  part_idx.0iselfr   ]/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/encode_receiver.py
<listcomp>9   s    z*EmbeddingData.__init__.<locals>.<listcomp>c                    s   g | ]}|j kr nd qS Nr   r   	embeddingr   r   r   r    :   s    c                    s    g | ]}| j kr jnd qS r!   )r   image_grid_dimr   r   r   r   r    =   s    )req_id	num_partsr   r$   r#   	send_timedtypelistshaperange
ready_listembedding_listimage_grid_dim_list	error_msg
error_code)r   r%   r&   r   r$   r#   r/   r0   r   r"   r   __init__&   s"   


zEmbeddingData.__init__c                 C   sL   | j |j ksJ | j|j rJ d| j|j< |j| j|j< |j| j|j< d S )NT)r%   r,   r   r$   r.   r#   r-   )r   embedding_datar   r   r   addD   s   
zEmbeddingData.addFc                 C   s    |rt dd | jD S | jS )Nc                 S   s   g | ]}|  qS r   )cuda)r   r#   r   r   r   r    O   s    z/EmbeddingData.get_embedding.<locals>.<listcomp>)torchconcatr-   )r   	is_concatr   r   r   get_embeddingM   s   zEmbeddingData.get_embeddingc                 C   s   t | jS r!   )r5   concatenater.   r   r   r   r   get_img_gridS   s   zEmbeddingData.get_img_gridc                 C   s   t | j| jkS r!   )sumr,   r&   r   r   r   r   readyV   s   zEmbeddingData.readyc                 C   s$   d| j  d| j d| j d| j S )NzEmbeddingData(req_id=z, num_parts=z, part_idx=z) error_msg=)r%   r&   r   r/   r   r   r   r   __repr__Z   s   $zEmbeddingData.__repr__c                 C   s<   t | j| j| j| j| j| jd}| j|_| j|_| j	|_	|S )N)r%   r&   r   r$   r/   r0   )
r   r%   r&   r   r$   r/   r0   r'   r(   r*   )r   new_datar   r   r   copy_without_embedding]   s   z$EmbeddingData.copy_without_embedding)NNN)F)__name__
__module____qualname__r1   r3   r8   r:   propertyr<   r=   r?   r   r   r   r   r   %   s    

	
r   c                   @   s   e Zd ZdZdZdZdZdS )WaitingImageRequestStatusr      N)r@   rA   rB   FAILPENDINGSUCCESSTIMEOUTr   r   r   r   rD   l   s
    rD   c                   @   s.   e Zd ZdedefddZdd Zdd Zd	S )
WaitingImageRequestridrecv_reqc                 C   s   || _ || _d | _d | _d | _|| _|| _|| _|| _|j	| _	t
t tj\| _| _td| j d | _tj| _d | _d | _t | _d S )Nz(Waiting for input self.embedding_port = )rM   rN   	mm_inputserrorthreadmm_processorencoder_urls	host_namereceive_countnum_items_assignedr   zmqContextPULLembedding_portrecv_socketloggerinforecv_embedding_datarD   rI   statusr/   r0   time
start_time)r   rM   rN   rR   rS   rT   rU   r   r   r   r1   u   s&   	
zWaitingImageRequest.__init__c                    s8   dd   fdd}t |jjjjj d S )Nc              
      s   z0| j ||d4 I d H }|  | I d H W  d   I d H  W S 1 I d H s*w   Y  W d S  tyI } ztd| d|   d }~ww )NjsonzFailed to send request to : )postraise_for_statustext	Exceptionr\   rP   )sessionurlpayloadresponseer   r   r   _send_single_request   s   4zEWaitingImageRequest.send_encode_request.<locals>._send_single_requestc              	      sn  t jt jddd4 I d H }g }tdjd tjD ]1\}}|dkr*q!j| }| d}	| || d| d	}
td
|	   ||	|
}|| q!|sftd 	 W d   I d H  d S tdt	| d t
j|ddiI d H }t|D ]\}}t|trtd| d|  qtd| d qW d   I d H  d S 1 I d H sw   Y  d S )N  totaltimeoutzself.num_items_assigned =  r   z/scheduler_receive_url:)r%   rU   receive_urlzPreparing to send  to zNo tasks to send.zConcurrently sending z requests...return_exceptionsTzRequest 	 failed: z succeeded.)aiohttpClientSessionClientTimeoutr\   r]   rV   	enumeraterS   appendlenasynciogather
isinstancerh   rP   debug)r%   rU   rT   rZ   ri   tasksidxassigned_numencoder_url
target_urlrk   taskresultsr   resultrn   r   r   r   send_embedding_port   s>   




.zDWaitingImageRequest.send_encode_request.<locals>.send_embedding_port)r   runrN   rM   rU   rT   rZ   )r   r   r   r   r   send_encode_request   s   	"z'WaitingImageRequest.send_encode_requestc                 C   s  | j tjkrd S | jd u s| jjsz| jjtjdd}W n tj	y'   Y d S w t
|d }t|dd d ur[td| j d|j d|j |j| _|j| _tj| _ | j  d S t|d d	rg|d jn|d }tj||jd
|j|_|j|j|j< | jd u r|| _n| j| | jd u s| jjr| jjdd}| j  }| j!"| j#j$||}|| j#_%|d | j#_&tj'| _ | j  d S )NF)flagscopyr   r/   z'Received error signal from encoder for rd   z recv_obj.error_code = rF   bufferr(   Tr7   	input_ids)(r_   rD   rI   r^   r<   r[   recv_multipartrW   NOBLOCKAgainpickleloadsgetattrr\   warningrM   r/   r0   rH   closehasattrr   r5   
frombufferr(   reshaper*   r#   r-   r   r3   r8   r:   rR   get_mm_datarN   
input_textrO   r   rJ   )r   partsrecv_objr   recv_embeddingimg_grid_thwrO   r   r   r   _try_recv_mm_data   sH   
 


z%WaitingImageRequest._try_recv_mm_dataN)r@   rA   rB   strr   r1   r   r   r   r   r   r   rL   t   s    
5rL   c                 C   s   | j }|rdS dS )Ndefaultcuda_ipc)dist_init_addr)server_argsis_cross_noder   r   r    _determine_tensor_transport_mode   s   r   c                   @   s~   e Zd Z						ddedeej dee dee dee dee	 ded	 fd
dZ
edd Zedd Zedd ZdS )MMReceiverBaseNr   r(   	hf_configpp_ranktp_ranktp_group	schedulerr   c                 C      d S r!   r   )r   r   r(   r   r   r   r   r   r   r   r   r1      s   
zMMReceiverBase.__init__c                 C   r   r!   r   )r   	recv_reqsr   r   r   process_waiting_requests	     z'MMReceiverBase.process_waiting_requestsc                    s   d S r!   r   )r   img_datarR   promptr   r   r   recv_mm_data  s   zMMReceiverBase.recv_mm_datac                 C   r   r!   r   )r   objr   r   r   r     r   z"MMReceiverBase.send_encode_requestNNNNNN)r@   rA   rB   r   r   r5   r(   r	   intr
   r1   r   r   r   r   r   r   r   r   r      s8    


r   c                   @   s   e Zd Z						ddedeej dee dee dee dee	 ded	 fd
dZ
dd Zdd Zdd Z	dddZdd Zdd Zdd Zdd ZdS )MMReceiverHTTPNr   r(   r   r   r   r   r   r   c              
   C   sv  t jd| _|j| _|j| _ttt	| j| _
t|j| _| jdkr1|| _t | _t | _d S | jdkr|| _|| _|j| _|| _|j| _t | _g | _|| _tj | _|d urt|}td d }	zt |j!|j"|j#|j$|j% d}	W n4 t&y }
 z(t'|
}d|v rt()d|j! d t |j!|j"|j#|j$d	d}	n|
W Y d }
~
nd }
~
ww t*|||	|d	d
| _+d S d S d S )N   mooncakezmq_to_schedulerz sglang.srt.multimodal.processors)tokenizer_modetrust_remote_coderevisionuse_fastzdoes not have a slow versionz
Processor z= does not have a slow version. Automatically use fast versionT)skip_mm_pool),rW   r   rX   contextencoder_transfer_backendrS   encode_urlsr)   r+   r~   
encode_idxr   hostr(   r   embeddings_enginedictembeddings_bufferr   r   tp_sizer   nnodeshostnamewaiting_listr   r   SGLANG_ENCODER_RECV_TIMEOUTgetwait_timeoutr   r   r   tokenizer_pathr   r   r   disable_fast_image_processor
ValueErrorr   r\   r]   r   rR   )r   r   r(   r   r   r   r   r   transport_mode
_processorrm   error_messager   r   r   r1     sr   




zMMReceiverHTTP.__init__c                 C   s  t |j|j|j|jfi d|jd|jd|jd|jd|j	d|j
d|jd|jd	|jd
|jd| jjjd|jd|jd|jd| jjd|jd| jjjd|jd| jjra| jjnd d|jd| jj}| jj|_|S d|jd| jj}| jj|_|S )Nreturn_logprobtop_logprobs_numtoken_ids_logprobstreamlora_idinput_embedscustom_logit_processorrequire_reasoningreturn_hidden_statesreturn_routed_expertseos_token_idsbootstrap_hostbootstrap_portbootstrap_roomdisagg_modedata_parallel_rank
vocab_sizeprioritymetrics_collectorhttp_worker_ipcdllm_config)r   rM   r   r   sampling_paramsr   r   r   r   r   r   r   r   r   r   r   model_confighf_eos_token_idr   r   r   disaggregation_moder   r   r   enable_metricsr   r   r   	tokenizer)r   rN   reqr   r   r   
create_reqX  sx   	





zMMReceiverHTTP.create_reqc              	   C   s  g }|D ]+}t |tr*|jdu r*t|j|| j| j| j| jd}|	  | j
| q|| qt| j
dkr;|g fS t }g }| j
D ]}|  ||j | jkrVtj|_||j qDtj|dtjd}tjj|tjjj| jjd g }g }t| j
D ]a\}	}||	  }
|
tjkr||j  q}|
tj!krt"#d|j d|j$ d	|j% || &|j |j$|j%f q}|
tjkrt"#d
|j  || &|j d| j dt'j(f q}|| q}|| _
||fS )NT)rM   rN   rR   rS   rT   rU   r   cpu)devicer(   )opgroupzWaiting request rx   z waiting_req.error_code = z3Timed out waiting for image embeddings for request z*Timeout waiting for image embedding after s))r   r   need_wait_for_imagerL   rM   rR   r   r   r   r   r   r}   r~   r`   r   ra   r   rD   rK   r_   r5   tensorint32distributed
all_reduceReduceOpMINr   	cpu_groupr|   itemrJ   rN   rH   r\   rP   r/   r0   r   r   REQUEST_TIMEOUT)r   r   new_recv_reqsrN   waiting_reqcurrent_timelocal_statusnew_waiting
abort_reqsr   status_valuer   r   r   r   |  sx   







z'MMReceiverHTTP.process_waiting_requestsc              
   C   sd   zt | j||||d |d W d S  ty1 } ztjd| d| dd W Y d }~d S d }~ww )N)r%   r   rZ   endpoint_encodeendpoint_sendrV   zEncode failed for request rd   T)exc_info)r   r   encoderh   r\   rP   )r   r%   r   r  rV   rZ   rm   r   r   r   _run_encode_in_thread  s   

(z$MMReceiverHTTP._run_encode_in_threadc                    s  t dkr	d S g }|d u r tj fddjD }tdd |D }d}	d}
t|D ]%\}}|dkr:q1|||	|	|  ||
|j|d |
d7 }
|	|7 }	q1tj	tj
dd	d
4 I d H  fdd|D }tj| I d H }|D ]:}|jdkrz| I d H }|dd}W n   | I d H }Y td|j d|   W d   I d H  d S q{dd |D I d H }d |v r	 W d   I d H  d S dd t|D }d}dd t|D }|D ]}|d }|d ||< ||d 7 }|||< qd}g }|||d d I d H }tt |D ]/}|| }|| }|jj|d |jj|d   d| |d ||| 7 }qtj| I d H  W d   I d H  d S 1 I d H s_w   Y  d S )Nr   c                    "   g | ]}|t   t j qS r   r~   r   r   r   )r   r   r   r   r      s    z)MMReceiverHTTP.encode.<locals>.<listcomp>c                 s   s    | ]	}|d krdV  qdS )r   rF   Nr   )r   xr   r   r   	<genexpr>  s    z(MMReceiverHTTP.encode.<locals>.<genexpr>)encoder_idxmm_itemsr&   r   r%   prefill_hostrZ   rF   ro   rp   rr   c                    s.   g | ]}j j|d    d  |dqS )r  /rb   )re   r   )r   encode_request)r  r   ri   r   r   r      s       messagezUnknown encoder errorzEncoder returned error rd   c                    s   g | ]	}|  I d H qS r!   rb   )r   rl   r   r   r   r      s    c                 S      g | ]}d qS r!   r   r   _r   r   r   r    #      c                 S   r   r!   r   r!  r   r   r   r    %  r#  r   embedding_sizeembedding_lenembedding_dim)
session_idbuffer_addressr  r  rb   )r~   randomshuffler   r;   r|   r}   r   ry   rz   r{   r   r   r_   rc   r   rg   r\   rP   r+   allocate_embedding_bufferupdater   r'  re   r   )r   r%   r   rZ   r  r  rV   encode_requestsr&   cum_num_itemscum_idxr   r   r   	responsesrl   err_datamsgresponse_json_list_unsortembedding_size_list_sortembedding_length_totresponse_json_list_sortresponse_jsonoffsetmetadata_tasksr(  buffer_address_adjustr   )r  r   r   ri   r   r    s   	

	$


0zMMReceiverHTTP.encodec                    s<   t j||f| jd}| j| |j || j|< | S )Nr   )r5   zerosr(   r   registerdata_ptrnbytesr   )r   r%   embedding_lengthr&  
embeddingsr   r   r   r+  F  s   
z(MMReceiverHTTP.allocate_embedding_bufferc                    s   t |jtkr|jjg ndd |jD  |jd u r t j|_ rht dkrjt	
dt  d|j  d|_tttj}t|  fdd|D |_tjj|j d|jd fdd	}|  d S d S d S )
Nc                 S      g | ]}|j qS r   rj   r   imgr   r   r   r    W      z6MMReceiverHTTP.send_encode_request.<locals>.<listcomp>r   zProcessing z images for request Tc                    r  r   r  r  
image_urlsr   r   r   r    `  s    r  )targetargsdaemon)type
image_datar)   rj   rM   uuiduuid4hexr~   r\   r]   r   r+   r   r)  r*  rV   	threadingThreadr  start)r   r   r   encode_threadr   rF  r   r   S  s2   

z"MMReceiverHTTP.send_encode_requestc              	      s   zGt | jdkrW d S t j}t| jtj\}}t	|t
kr%|jg}ndd |D }t| |||dd tj| ||||ddI d H W S  tjyi   td|  t| d	rf|| jv rf| j|= Y d S w )
Nr   c                 S   rA  r   rB  rC  r   r   r   r    z  rE  z/MMReceiverHTTP.recv_mm_data.<locals>.<listcomp>r  sendr   rr   z#Embedding recv timeout for request r   )r~   r   rM  rN  rO  r   r   rW   rY   rK  r)   rj   r   create_taskr  wait_for_recv_mm_dataTimeoutErrorr\   r   r   r   )r   r   rR   r   r%   rZ   r[   r   r   r   r   q  s,   

zMMReceiverHTTP.recv_mm_datac                    s2  |d u rd S d }d }|d u s|j sf|jddI d H }t|d }td| | jdkrLt|d dr;|d jn|d }	t	j
|	|jd|j|_|d u rZ|j|j|j< |}n|| |d u s|j r| jd	kr}| j| }| j|= | j|  n| jdkr|jd
d}|  | }
||||
}|S )NF)r   r   zrecv_obj = zmq_to_tokenizerrF   r   r   r   Tr   )r<   r   r   r   r\   r]   r   r   r   r5   r   r(   r   r*   r#   r-   r   r3   r   r   
deregisterr=  r8   r   r:   r   )r   r%   r[   rR   r   r   r^   r   r   r   r   rO   r   r   r   rW    s>   
 



zMMReceiverHTTP._recv_mm_datar   r!   )r@   rA   rB   r   r   r5   r(   r	   r   r
   r1   r   r   r  r  r+  r   r   rW  r   r   r   r   r     s>    
@$J
or   )6r   loggingr   r)  rP  r`   rM  abcr   r   enumr   httpr   typingr   r   r   ry   r5   rW   zmq.asynciotransformersr	   %sglang.srt.distributed.parallel_stater
   r   sglang.srt.environr   sglang.srt.managers.io_structr   (sglang.srt.managers.multimodal_processorr   r   "sglang.srt.managers.schedule_batchr   sglang.srt.server_argsr   sglang.srt.utilsr   r   &sglang.srt.utils.hf_transformers_utilsr   	getLoggerr@   r\   sglang.srt.managers.schedulerr   r   rD   rL   r   r   r   r   r   r   r   <module>   sB    
G~
