o
    پi}                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZmZmZmZmZ d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d d	l%m&Z& d d
l'm(Z( d dl)m*Z*m+Z+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3m4Z4 d dl5m6Z6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< d dl=m>Z>m?Z?m@Z@ d dlAmBZBmCZCmDZDmEZEmFZFmGZGmHZH eIeJZKe L ZMeN aOeePeeP f eQd< eN ZReePeSf eQd< eN ZTeePePf eQd< e L ZUi ZVeePe jWf eQd< eSeXdddkZYG dd deZZ[G dd de[Z\G dd  d e[Z]G d!d" d"Z^d#d$ Z_d%d&gZ`d'd( ZaG d)d* d*ZbG d+d, d,Zce Zddaeeeb eQd-< g Zfeejg eQd.< d/e?d0eSfd1d2Zhd3d4 Zid/e?fd5d6Zjd7d8 Zkedld9d:eNfd;d<Zmedld=d:eNfd>d?Znedld@d:eNfdAdBZoedpdCedpdDdEdF ZqedjrdGdHdIgdJdQdKee3 fdLdMZsedjrdNdHdIgdJdOdP ZtdS )R    N)
HTTPStatus)DictListOptionalSetTuple)FastAPI)ORJSONResponseResponse)AutoImageProcessor)DeviceConfig)
LoadConfig)ModelConfig)EmbeddingData)get_mooncake_transfer_engineinit_distributed_environmentinitialize_model_parallel)envs)initialize_dp_attention)
ProfileReqProfileReqInputProfileReqType)ModalityMultimodalDataItem)EmbeddingResultMultiModalStaticCache)	get_model)PortArgs
ServerArgs$set_global_server_args_for_scheduler)config_socketget_local_ip_autoget_zmq_socket
load_audio
load_image
load_videorandom_uuidrid_to_receive_endpointrid_to_receive_countrid_to_err_msgrid_to_cond&SGLANG_ENCODER_IMAGE_PROCESSOR_USE_GPU0   c                       s"   e Zd Zejf fdd	Z  ZS )MMErrorc                    s   || _ || _t | j  d S N)messagecodesuper__init__)selfr0   r1   	__class__ [/home/ubuntu/.local/lib/python3.10/site-packages/sglang/srt/disaggregation/encode_server.pyr3   D   s   zMMError.__init__)__name__
__module____qualname__r   INTERNAL_SERVER_ERRORr3   __classcell__r7   r7   r5   r8   r.   C   s    r.   c                          e Zd Z fddZ  ZS )BadRequestErrorc                       t  j|tjd d S N)r1   )r2   r3   r   BAD_REQUESTr4   r0   r5   r7   r8   r3   K      zBadRequestError.__init__r9   r:   r;   r3   r=   r7   r7   r5   r8   r?   J       r?   c                       r>   )InternalErrorc                    r@   rA   )r2   r3   r   r<   rC   r5   r7   r8   r3   P   rD   zInternalError.__init__rE   r7   r7   r5   r8   rG   O   rF   rG   c                   @   s    e Zd ZdZdd Zdd ZdS )TensorWrapperzAWrapper to keep tensor alive while exposing buffer for zero-copy.c                 C   s<   |j r| }| s| }|| _t|j| _|j| _d S r/   )is_cudacpuis_contiguous
contiguoustensorlistshapedtype)r4   rM   r7   r7   r8   r3   W   s   zTensorWrapper.__init__c                 C   s<   | j  }| j  | j   }tj| |}| |_t|S r/   )	rM   data_ptrnumelelement_sizectypesc_charfrom_address_keep_alive_ref
memoryview)r4   rQ   total_bytesc_objr7   r7   r8   
__buffer__c   s
   
zTensorWrapper.__buffer__N)r9   r:   r;   __doc__r3   r[   r7   r7   r7   r8   rH   T   s    rH   c                 C   sz   t | tjr| S t | tjrt| S t | tr(t | d tjr(tt| S t | tr;t | d tt	fr;t| S | S )Nr   )

isinstancetorchTensornpndarrayrM   rN   arrayintfloat)datar7   r7   r8   _convertk   s   

rf   image_grid_thwimage_grid_hwsc                 C   s2   t D ]}|| v r| |   S qtdt  d|  )NzImage grid dim (z) not found in )_image_grid_attrs
ValueError)images_inputattrr7   r7   r8   _get_image_grid_dim{   s   rm   c                   @   s   e Zd Z			ddedefddZ			d ded	ee fd
dZdd Z	dd Z
dejfddZ					d!dejdefddZdd Z	d"ddZdd Zdd ZdS )#	MMEncoderNr   server_argsrankc                 C   s  t d| d|j  || _t| || _t|| _tj	|j
|jdd| _t|| _t|j|j|j|j|j|jd| _|j| _|j| | _t| j| jd| _t| j| j t | _ t!|j|||d t"|jd t#|| j t$| j| j| jd	| _%t&j'(d
| _)t&( | _*t+j,j-dd| _.t/t0j12dd}t3|d d | _4t'5 | _6t+j,j-t/t0j12ddd| _7t8j92 | _:|d urt;| j)t&j<|d| _=| jdkrt d| jj>  | jj>dkrt? | _@tA | _BtC | _DtE | _Ft d| d d S )Nzinit MMEncoder /T)trust_remote_codeuse_fast)load_formatdownload_dirmodel_loader_extra_config.remote_instance_weight_loader_seed_instance_ip8remote_instance_weight_loader_seed_instance_service_port6remote_instance_weight_loader_send_weights_group_ports)devicegpu_id)
world_sizerp   distributed_init_method
local_rank)tensor_model_parallel_size)model_configload_configdevice_config   
   )max_workersSGLANG_VLM_CACHE_SIZE_MB4096i   SGLANG_ENCODER_MM_LOAD_WORKERS   r   zUsing transfer backend: mooncakezrank z init finish )Gloggerinfotp_sizero   r   rp   EncoderProfilerprofilerr   from_pretrained
model_pathrr   image_processorr   from_server_argsr   r   rt   ru   rv   rw   rx   ry   r   rz   base_gpu_idr{   r   r   r^   get_device_module
set_deviceuse_image_processor_gpur   r   r   r   modelzmqasyncioContextcontextsync_context
concurrentfuturesThreadPoolExecutorexecutorrc   osenvirongetr   mm_cacheLockmm_cache_lockio_executorr   SGLANG_ENCODER_SEND_TIMEOUTsend_timeoutr"   PULLschedule_socketencoder_transfer_backendr!   local_ipr   enginedictembedding_to_sendsetbackground_tasks)r4   ro   schedule_pathdist_init_methodrp   embedding_cache_sizer7   r7   r8   r3      s   
	


zMMEncoder.__init__Tmodalityaudio_sample_ratec           	   
   C   s   t |tr|S z3|tjkr"t|\}}|r|jdkr|d}|W S |tjkr-t||W S |tj	kr8t
||W S W dS  tyP } z
td| d| d}~ww )z
        Load a single multimodal data.
        If data is precomputed, returns directly.
        Static method that can be pickled for multiprocessingRGBzError while loading data : N)r]   r   r   IMAGEr$   modeconvertVIDEOr%   AUDIOr#   	ExceptionRuntimeError)	r4   re   r   frame_count_limitr   discard_alpha_channelimg_er7   r7   r8   _load_single_item   s"   




zMMEncoder._load_single_itemc                 C   sR   g }g }t ||D ]\}}|d ur$|| j| j|| |||f q	||fS r/   )zipappendr   submitr   )r4   items
modalitiesr   	task_infore   r   r7   r7   r8   submit_data_loading_tasks   s   z#MMEncoder.submit_data_loading_tasksc                    s@  t |ttfs| |gtjg\}}t|d I dH S t|dkrt |d ttfrg }g }t	|D ]\}}|D ]}|
| |
| q:q4| |tjgt| \}}dd |D }	tj|	 I dH }
dd tt|D }t||
D ]\}}|| 
| qu|S | |tjgt| \}}dd |D }	tj|	 I dH S )z
        Flatten mm_items structure, load images concurrently, and restore original structure.

        Returns:
            Same structure as load_images would return
        r   Nc                 S      g | ]}t |qS r7   r   wrap_future.0fr7   r7   r8   
<listcomp>(      z6MMEncoder._flatten_and_load_images.<locals>.<listcomp>c                 S   s   g | ]}g qS r7   r7   )r   r   r7   r7   r8   r   ,  s    c                 S   r   r7   r   r   r7   r7   r8   r   8  r   )r]   rN   tupler   r   r   r   r   len	enumerater   gatherranger   )r4   mm_itemsr   r   	flat_dataflat_indices	group_idximage_groupitemasync_futuresresultsnested_resultsidxresultr7   r7   r8   _flatten_and_load_images  s4   
z"MMEncoder._flatten_and_load_imagesreturnc              
      sj  z
|  |I d H }W n ty  } z	tdt| d }~ww z| jr*d| jini }| jdd|i|}|d }tt	j
t|d}| D ]\}}	|dkrRqI||t|	 qId }
d }| jjr|  t|jg}| j4 I d H  | j|jg}|d ur|j}
W d   I d H  n1 I d H sw   Y  |
d u rt  | j|g}
|
 }
W d    n1 sw   Y  t|
jdkr|
d|
jd }
| jjr| j4 I d H  | j|t |
d W d   I d H  n1 I d H sw   Y  | j!d ur| j!"  t#||
fW S  ty } z	td	t| d }~w ty4 } z	t$d
t| d }~ww )Nz"Failed to load images from input: rz   imagespixel_values)r   featurer   )	embeddingzBad request error: zInternal encoding error: r7   )%r   r   r?   strr   rz   r   r   	from_dictr   r   rf   r   r   ro   enable_prefix_mm_cacheset_pad_valuer   combine_hasheshashr   r   r   r   r^   inference_moder   get_image_featurerJ   r   rO   reshaper   r   steprm   rG   )r4   r   r   r   kwargsrk   r   mm_itemkvmm_embeddingmm_hashr   r7   r7   r8   _encode;  sf   (

(
zMMEncoder._encoder   mm_datac                    s  j jdkr.j| |j j|| ||j j|  d |_d |j	|j
< |d ur7d| nd| d| td j jdkrUt|d  n| }|jd urfd  t|nt|j}	t||	   fdd}
t j|
I d H  d S )Nr   tcp://:zendpoint = c                     sx   j tj} t| tj z)|   d ur!| j gdd n| jgdd W |   d S W |   d S |   w )NF)copy)r   socketr   PUSHr    connectsend_multipartclose)sockbufferendpointr4   serialized_datar7   r8   send_with_socket  s   
z)MMEncoder._send.<locals>.send_with_socket)ro   r   r   registerrQ   nbytestransfer_sync
deregisterr   embedding_listpart_idxr   r   pickledumpscopy_without_embedding	error_msgrH   r[   r   get_event_looprun_in_executorr   )r4   r   r   
session_idbuffer_addressprefill_hostembedding_porturlnew_mm_dataembedding_tensorr  r7   r	  r8   _sendo  s4   





zMMEncoder._sendc                    s   z+|  |I d H \}}| jdkrt|||||}|| j|< |j|jd |jd d d fW S  ty{ } zCt|dtj	}	t
|}
td| j d|
 d|	 | jdkrjt|||d |
|	d}|| j|< td|  ddd|
|	fW  Y d }~S d }~ww )	Nr   r-   r1   zRank z encode failed: z error_code = )r  
error_codezCreated error EmbeddingData: )r   rp   r   r   r  rO   r   getattrr   r<   r   r   errordebug)r4   r   req_id	num_partsr  image_grid_dimr   r   r   r"  r  r7   r7   r8   encode  s@   




zMMEncoder.encodec                    s.   | j | }| j|j|||||dI d H  d S )N)r  r  r  r  )r   r!  r   )r4   r&  r  r  r  r  r   r7   r7   r8   send  s   
zMMEncoder.sendc                    s   | j |}|sd S t }g }t  }| j}t|I d H }z	 t4 I d H  t	|t 
 }t|}	W d   I d H  n1 I d H sHw   Y  || }
|
r~tdt|
 d| d |
D ]}t| j|j||d}|||f || qc|	d urt||	krtd|	 d| d n^|t  |  }|d	krtd
| dt| d|	  n?|4 I d H - ztj| |dI d H  W n tjy   Y W d   I d H  q"w W d   I d H  n1 I d H sw   Y  q#|r>tdt| d dd |D }tj|ddiI d H }t|D ]&\}}|| d }t|tr4td| d|  qtd|  qtd|  W td|  t4 I d H  t	|d  t|d  W d   I d H  n1 I d H ssw   Y  t4 I d H  t|d  W d   I d H  n1 I d H sw   Y  | j |d  d S td|  t4 I d H  t	|d  t|d  W d   I d H  n1 I d H sw   Y  t4 I d H  t|d  W d   I d H  n1 I d H sw   Y  | j |d  w )NTzFound z new endpoints for z. Starting tasks...)r  zAll z endpoints initiated for z. Breaking loop.r   [z] Timeout! Sent rq   timeoutz&Loop finished. Awaiting completion of z sending tasks...c                 S   s   g | ]}|d  qS )r   r7   )r   tr7   r7   r8   r     s    z+MMEncoder.send_with_url.<locals>.<listcomp>return_exceptionsr-   zFailed to send to r   zSuccessfully sent to z All tasks completed for req_id: z!Cleaning up resources for req_id ) r   r   r   r   get_running_looptimer   get_conditionrid_lockr'   r  r(   r   r   r   create_taskr!  r   r   addr$  wait_forwaitTimeoutErrorr   r   r]   r   r%  popcond_dict_lockr*   )r4   r&  r   	sent_urls	all_tasks
start_timer-  condcurrent_targetsexpected_countnew_targetsr  task	remaining
tasks_onlyr   ir   r7   r7   r8   send_with_url  s   (('****zMMEncoder.send_with_urlc              	      s   t jt jddd4 I d H %}|j| ddd idI d H }| I d H }|d W  d   I d H  S 1 I d H s:w   Y  d S )Ni  )totalr,  z/embedding_bootstrapr  )json)aiohttpClientSessionClientTimeoutpostrH  )r4   prefill_urlsessionresponseresponse_jsonr7   r7   r8   get_embedding_port'  s   
0zMMEncoder.get_embedding_port)NNr   )NNT)NNNNN)NN)r9   r:   r;   r   rc   r3   r   r   r   r   r   r^   r_   r   r   r!  r)  r*  rF  rQ  r7   r7   r7   r8   rn      sF    
_
/8
9#
Orn   c                   @   s8   e Zd ZdefddZdefddZdd Zd	d
 ZdS )r   rp   c                 C   s(   || _ d | _d | _d | _d | _d | _d S r/   )rp   r   
steps_left
output_dirprefix
profile_id)r4   rp   r7   r7   r8   r3   4  s   
zEncoderProfiler.__init__objc                 C   s
  | j d urdS |jptdd}tj|dd || _|jpd| _tt | _	|j
p-ddg}g }d|v r<|tj jj d|v rH|tj jj d	|v }|sR|sRd
S tj j||jd u r]dn|j|jd u rfdn|j|d| _ | j   |j| _td| j d| j	  dS )N)Fzprofiling already runningSGLANG_TORCH_PROFILER_DIRz/tmpT)exist_okencoderCPUGPUMEM)Fzno supported activitiesF)
activities
with_stackrecord_shapesprofile_memoryz&Encoder profiling started. output_dir= profile_id=TN)r   rS  r   getenvmakedirsprofile_prefixrT  r   r1  rU  r]  r   r^   ProfilerActivityrZ  CUDAprofiler^  r_  start	num_stepsrR  r   r   )r4   rV  rS  r]  torch_activitiesr`  r7   r7   r8   ri  <  s8   

zEncoderProfiler.startc                 C   sN   | j d u rd S | j   | jd ur#|  jd8  _| jdkr%|   d S d S d S )Nr-   r   )r   r   rR  stop)r4   r7   r7   r8   r   ^  s   



zEncoderProfiler.stepc                 C   sl   | j d u rdS | j   | j d| j d| j d}tj| j|}| j 	| t
d| d | _ d | _dS )N)Fzprofiling not runningz-rank-z.trace.jsonzEncoder profiling saved to: %srb  )r   rl  rT  rp   rU  r   pathjoinrS  export_chrome_tracer   r   rR  )r4   filename
trace_pathr7   r7   r8   rl  g  s   

zEncoderProfiler.stopN)	r9   r:   r;   rc   r3   r   ri  r   rl  r7   r7   r7   r8   r   3  s
    "	r   rY  send_socketsro   rp   c                    s   t | |||}	 |j I d H }t|tr4|jtjkr.|jd u r't	|j
|_|j| n|j  n|j|d |d |d |d dI d H  q	)NTr   r&  r'  r  r   r&  r'  r  )rn   r   
recv_pyobjr]   r   typer   START_PROFILEr   r   rp   ri  rl  r)  )ro   r   r   rp   rY  requestr7   r7   r8   run_encodery  s"   

ry  c                 C   sZ   zt t| ||| W d S  ty   td|  Y d S  ty,   t  Y d S w )Nz
Exit rank )	r   runry  KeyboardInterruptr   r   r   	traceback	print_exc)ro   r   r   rp   r7   r7   r8   launch_encoder  s   r~  c              	   C   s   t d}td}t }t| }| jrd| j }nd|j }t	d| j
D ]$}d| d| }tt|tj|dd	 |jt| |||fd
d  q(t| |datjt| j| jd d S )Nspawnr   r   ztcp://127.0.0.1:r-   zipc:///tmp/
_schedule_F)bindT)targetargsdaemon)r   )hostport)mpget_contextr   r   r&   r   init_newdist_init_addr	nccl_portr   r   rs  r   r"   r  Processr~  ri  rn   rY  uvicornrz  appr  r  )ro   ctxzmq_ctxipc_path_prefix	port_argsr   rp   r   r7   r7   r8   launch_server  s(   



r  c              	      sZ   t 4 I d H  | tvrt t| < t|  W  d   I d H  S 1 I d H s&w   Y  d S r/   )r:  r*   r   	Condition)ridr7   r7   r8   r2    s   0r2  z/encoderx  c              
      sl  | d }zdd }|  dt i tD ]}||  qtj| d | d | d | d dI d H \}}}}}|rhtjjd	kr]| d
 d u rI|| n| d
 D ]}	tj|| d |	dI d H  qMt	|d||ddW S tjjdkr| d= |  |||d t	| dW S tjjd	krt
d| d
  | d
 d u rtj| d dI d H  n3t| d
 tksJ g }
| d
 D ]}|
tj| d | d |d qtj|
 I d H  tj| d d  t	d dW S tjjdkrtj| d | d | d
 dI d H  tj| d d  t	d dW S W d S  ty5 } z$t|}t
d| d|  |t|< t	tjd||ddW  Y d }~S d }~ww )Nr&  c                 S   s0   t tj| d}tj| |tjj d S )Nr&  )r   r4  rY  rF  r   r5  add_done_callbackdiscard)r&  rB  r7   r7   r8   start_background_send  s   z4handle_encode_request.<locals>.start_background_send
enter_timer   r'  r  rt  zmq_to_schedulerr  r  )r&  r  r  r$  )statusr0   r&  )status_codecontentr   )embedding_sizeembedding_lenembedding_dimr  zrequest['embedding_port'] = r  zmq_to_tokenizerz&Unexpected error in encoder logic for r   )updater1  rs  
send_pyobjrY  r)  ro   r   r*  r	   r   r   rF  rv  rN   r   r   r   r   r9  r   r   r$  r)   r   r<   )rx  r&  r  r  r  r  r  r  r"  r  tasksr  r   r7   r7   r8   handle_encode_request  s   
	

r  z/sendc                    sL   t j| d | d | d | d | d dI d H  t j| d d  td dS )Nr&  r  r  r  r  )r&  r  r  r  r  r  )rY  r*  r   r9  r	   )rx  r7   r7   r8   handle_send_request  s   
r  z/scheduler_receive_urlc              	      s   | d }t 4 I d H - |tvrt t|< | d t|< t| | d ks%J t| | d  W d   I d H  n1 I d H s>w   Y  t|I d H }|4 I d H  |  W d   I d H  d S 1 I d H sfw   Y  d S )Nr&  receive_countreceive_url)r3  r'   r   r(   r5  r2  
notify_all)rx  r  r>  r7   r7   r8   $handle_scheduler_receive_url_request   s   
(
.r  z/healthz/health_generatec                      s   t du r
tddS tddS )zp
    Health check endpoint for the encoder server.
    Returns 200 if the encoder is initialized and ready.
    N  )r     )rY  r
   r7   r7   r7   r8   health_generate/  s   

r  z/start_profileGETPOST)methodsrV  c                    s   t d u rtdddS d }| d u rttj}nttj| j| j| j| j| j	| j
| jtt | j| j| jd}tD ]}|| q8t jd u rKtt jt _t j|\}}|rhdt jj dt jj d}t|ddS t|pld	tjdS )
Nencoder not ready
r  r  r  )rv  rS  
start_steprj  r]  r^  r_  profile_by_stagerU  merge_profilesre  profile_stageszStart profiling. output_dir=ra  
r  zStart profiling failed.
)rY  r
   r   r   rw  rS  r  rj  r]  r^  r_  r  r   r1  r  re  r  rs  r  r   r   rp   ri  rU  r   rB   )rV  reqr  okmsgdetailr7   r7   r8   start_profile_async;  sD   


r  z/stop_profilec                     s|   t d u rtdddS t jd u rtdtjdS ttj} tD ]}|	|  qt j
 \}}|r5tdddS t|p9dtjdS )Nr  r  r  zprofiling not initialized
zStop profiling.
r  zStop profiling failed.
)rY  r
   r   r   rB   r   r   STOP_PROFILErs  r  rl  )r  r  r  r  r7   r7   r8   stop_profile_asynca  s    


r  r/   )ur   concurrent.futuresr   rT   loggingmultiprocessingr  r   r  r1  r|  httpr   typingr   r   r   r   r   rI  numpyr`   r^   r  r   zmq.asynciofastapir   fastapi.responsesr	   r
   transformersr    sglang.srt.configs.device_configr   sglang.srt.configs.load_configr   sglang.srt.configs.model_configr   )sglang.srt.disaggregation.encode_receiverr   %sglang.srt.distributed.parallel_stater   r   r   sglang.srt.environr   sglang.srt.layers.dp_attentionr   sglang.srt.managers.io_structr   r   r   "sglang.srt.managers.schedule_batchr   r   %sglang.srt.mem_cache.multimodal_cacher   r   sglang.srt.model_loaderr   sglang.srt.server_argsr   r   r   sglang.srt.utilsr    r!   r"   r#   r$   r%   r&   	getLoggerr9   r   r   r3  r   r'   r   __annotations__r(   rc   r)   r:  r*   r  rc  r   r   r.   r?   rG   rH   rf   ri   rm   rn   r   r  rY  rs  Socketry  r~  r  r2  rL  r  r  r  r   r  	api_router  r  r7   r7   r7   r8   <module>   s   
 $

	   2A
	Z
%