o
    ci%#                     @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZmZ d dlZd dl	Z
d dlZd dlmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ dZdZefdededefddZ dedefddZ!dd Z"dd Z#ej$G dd dZ%ej$G dd dZ&dededefdd Z'dededed!ed"ed#eee(f fd$d%Z)d&d' Z*e+d(kre,  d)Z-d*gZ.ej/ee-e.d+d, e 0 Z1e 2e1 e13e*  dS dS )-    Nrandom)CallableDict)aio)Request)serve)RequestProtocol)gRPCOptions)	serve_pb2serve_pb2_grpc)DeploymentHandled   gHz>namefn
multiplierc           
         s   t   }t   | dk r| I dH  t   | dk sg }tdD ]1}d}t   }t   | dk rC| I dH  |d7 }t   | dk s1t   }||| ||   q!tt|d}tt|d}	td|  d	| d
|	 d ||	fS )zGet query TPS.

    Run the function for 0.5 seconds 10 times to calculate how many requests can
    be completed. And use those stats to calculate the mean and std of TPS.
    g?N
   r   g      ?      	  +- z requests/s)timerangeappendroundnpmeanstdprint)
r   r   r   startstats_countendtps_meantps_std r'   a/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/benchmarks/proxy_benchmark.pyget_query_tps   s&   r)   c                    sh   t | I dH }|  tt |d d}tt |d d}td|  d| d| d ||fS )zgGet query latencies.

    Take all the latencies from the function and calculate the mean and std.
    N  r   r   r   r   z ms)r   asarrayflattenr   r   r   r   )r   r   many_client_resultslatency_ms_meanlatency_ms_stdr'   r'   r(   get_query_latencies7   s   r0   c                    s<   d|i}| j d|dI d H }| I d H }t|  d S )Nnumszhttp://localhost:8000/)json)getreadfloatdecode)sessiondata	data_jsonresponseresponse_textr'   r'   r(   
fetch_httpE   s
   r<   c                    s$   |  tj|dI d H }|j}d S )N)r1   )	grpc_callr   RawDataoutput)stubr8   resultr"   r'   r'   r(   
fetch_grpcL   s   
rB   c                   @   s$   e Zd Zdd Zdd Zdd ZdS )
HTTPClientc                 C      dS Nokr'   selfr'   r'   r(   readyS      zHTTPClient.readyc              	      sb   t  4 I d H }t|D ]
}t||I d H  qW d   I d H  d S 1 I d H s*w   Y  d S N)aiohttpClientSessionr   r<   )rH   numr8   r7   r"   r'   r'   r(   
do_queriesV   s   .zHTTPClient.do_queriesc              	      s   g }t  4 I d H *}t|D ]}t }t||I d H  t }|||  qW d   I d H  |S 1 I d H s;w   Y  |S rK   )rL   rM   r   r   r<   r   )rH   rN   r8   r!   r7   r"   r    r$   r'   r'   r(   time_queries[   s   zHTTPClient.time_queriesN)__name__
__module____qualname__rI   rO   rP   r'   r'   r'   r(   rC   Q   s    rC   c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )

gRPCClientc                 C   s   t d}t|| _d S )Nzlocalhost:9000)r   insecure_channelr   RayServeBenchmarkServiceStubr@   )rH   channelr'   r'   r(   __init__i   s   
zgRPCClient.__init__c                 C   rD   rE   r'   rG   r'   r'   r(   rI   m   rJ   zgRPCClient.readyc                    s&   t |D ]}t| j|I d H  qd S rK   )r   rB   r@   )rH   rN   r8   r"   r'   r'   r(   rO   p   s   zgRPCClient.do_queriesc                    sH   g }t |D ]}t }t| j|I d H  t }|||  q|S rK   )r   r   rB   r@   r   )rH   rN   r8   r!   r"   r    r$   r'   r'   r(   rP   t   s   zgRPCClient.time_queriesN)rQ   rR   rS   rX   rI   rO   rP   r'   r'   r'   r(   rT   g   s
    rT   num_replicasmax_ongoing_requests	data_sizec                    sH   t jddG dd d}t j| |dG  fddd}|| S )Nr*   )rZ   c                   @   sN   e Zd ZdefddZdejdejfddZdefd	d
Z	dd Z
dd ZdS )z$build_app.<locals>.DataPreprocessinghandlec                 S   s   || _ tdtj d S Nz	ray.serve)_handlelogging	getLoggersetLevelWARNING)rH   r\   r'   r'   r(   rX      s   z-build_app.<locals>.DataPreprocessing.__init__rawreturnc                 S   s&   |t | t |t | t  S rK   )r   minmaxDELTA)rH   rc   r'   r'   r(   	normalize   s   &z.build_app.<locals>.DataPreprocessing.normalizereqc                    s@   t | I dH }t|d }| |}| j|I dH S )zvHTTP entrypoint.

            It parses the request, normalize the data, and send to model for inference.
            Nr1   )r2   loadsbodyr   r+   rh   r^   remote)rH   ri   rk   rc   	processedr'   r'   r(   __call__   s
   
z-build_app.<locals>.DataPreprocessing.__call__c                    s6   t |j}| |}| j|I dH }tj|dS )zvgRPC entrypoint.

            It parses the request, normalize the data, and send to model for inference.
            Nr?   )r   r+   r1   rh   r^   rl   r   ModelOutput)rH   raq_datarc   rm   r?   r'   r'   r(   r=      s
   
z.build_app.<locals>.DataPreprocessing.grpc_callc                    s   t jddS )zgRPC entrypoint.r   ro   )r   rp   )rH   rq   r'   r'   r(   call_with_string   s   z5build_app.<locals>.DataPreprocessing.call_with_stringN)rQ   rR   rS   r   rX   r   ndarrayrh   r   rn   r=   rr   r'   r'   r'   r(   DataPreprocessing   s    

rt   )rY   rZ   c                       s,   e Zd Z fddZdejdefddZdS )z!build_app.<locals>.ModelInferencec                    s&   t dt j tj  | _d S r]   )r_   r`   ra   rb   r   r   randn_modelrG   r[   r'   r(   rX      s   z*build_app.<locals>.ModelInference.__init__rm   rd   c                    s   t || j}t|S rK   )r   dotrv   sum)rH   rm   model_outputr'   r'   r(   rn      s   z*build_app.<locals>.ModelInference.__call__N)rQ   rR   rS   rX   r   rs   r5   rn   r'   rw   r'   r(   ModelInference   s    r{   )r   
deploymentbind)rY   rZ   r[   rt   r{   r'   rw   r(   	build_app~   s   
"r~   num_clientsproxyrd   c              
      s   dd t |D t| ||d}t| |tjkr%dd t |D  n|tjkr3dd t |D  tdd  D   fdd} fd	d
}d| d| d|  d| d| 
}t	||I d H \}	}
t
||I d H \}}|j|| |||	|
||d	}|S )Nc                 S   s   g | ]}t  qS r'   r   .0r"   r'   r'   r(   
<listcomp>   s    ztrial.<locals>.<listcomp>)rY   rZ   r[   c                 S      g | ]}t  qS r'   )rT   rl   r   r'   r'   r(   r          c                 S   r   r'   )rC   rl   r   r'   r'   r(   r      r   c                 S   s   g | ]}|j  qS r'   )rI   rl   )r   clientr'   r'   r(   r      s    c                      s   t fdd D S )Nc                       g | ]	}|j t qS r'   )rP   rl   CALLS_PER_BATCHr   ar8   r'   r(   r          z6trial.<locals>.client_time_queries.<locals>.<listcomp>rayr3   r'   clientsr8   r'   r(   client_time_queries   s   z"trial.<locals>.client_time_queriesc                      s   t fdd D  d S )Nc                    r   r'   )rO   rl   r   r   r   r'   r(   r      r   z4trial.<locals>.client_do_queries.<locals>.<listcomp>r   r'   r   r'   r(   client_do_queries   s   z trial.<locals>.client_do_querieszproxy:z/num_client:z	/replica:z/concurrent_queries:z/data_size:)	r   
num_clientreplicaconcurrent_queriesr[   r%   tps_sdtr.   r/   )r   r~   r   runr	   GRPCHTTPr   r3   r)   r0   value)rY   rZ   r[   r   r   appr   r   trial_key_baser%   r   r.   r/   resultsr'   r   r(   trial   sX   


r   c            
         s   t   } g }dD ])}dD ]$}dD ]}dD ]}tjtjfD ]}|t|||||dI d H  qqqqq	tdt   |   d td| tj	|}|j
g dd	}td
 tt|jD ]}t|j| }	tdtt|	 q]d S )N)r      )r   '  )r   r   r   )rY   rZ   r[   r   r   zTotal time: sr   )r   r   r   r   r[   )byzResults from all conditions:r   )r   r	   r   r   r   r   r   pd	DataFrame	from_dictsort_valuesr   lenindexlistilocjoinmapstr)

start_timer   rY   rZ   r[   r   r   dfirowr'   r'   r(   main   s@   

r   __main__i(#  zQray.serve.generated.serve_pb2_grpc.add_RayServeBenchmarkServiceServicer_to_server)portgrpc_servicer_functions)grpc_options)4asyncior2   r_   r   r   typingr   r   rL   numpyr   pandasr   grpcr   starlette.requestsr   r   r   ray.serve._private.commonr	   ray.serve.configr
   ray.serve.generatedr   r   ray.serve.handler   r   rg   r   intr)   r0   r<   rB   rl   rC   rT   r~   r5   r   r   rQ   init	grpc_portr   r    new_event_looploopset_event_looprun_until_completer'   r'   r'   r(   <module>   s~   
:

> 
