o
    ci                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d	d
de
de de dej!fddZ"			d6de
g ee# f de de de#dee#e#ej!f f
ddZ$d	dddde de%de&dee# fdd Z'd	d!d"de d#e%fd$d%Z(d&efd'd(Z)d	ej*ej+ fd)e fd*d+Z,G d,d- d-Z-ej.G d.d/ d/Z/ej.G d0d1 d1Z0ej.G d2d3 d3Z1ej.G d4d5 d5Z2dS )7    N)partial)AnyCallable	CoroutineListOptionalTuple)StreamingResponse)tqdm)serve)	serve_pb2serve_pb2_grpc)DeploymentHandled   )num_warmup_requestsfnum_requestsr   returnc                   sx   t  r	 }n fdd}g }tt|| D ]}t }| I d H  t }||kr6|d||   qt|S )Nc                      s      d S N r   r   r   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/_private/benchmarks/common.pyto_call   s   
z&run_latency_benchmark.<locals>.to_call  )	inspectiscoroutinefunctionr
   rangetimeperf_counterappendpdSeries)r   r   r   r   	latenciesistartendr   r   r   run_latency_benchmark   s   

r&      
   fn
multiplier
num_trialstrial_runtimec                    s   t   }t   | dk r|  I dH  t   | dk sg }g }tt|D ]8}t  }d}t  | |k rN|  I dH }	|	rB||	 |d7 }t  | |k s5t  }
||| |
|   q%tt|dtt	|dt
|fS )a;  Benchmarks throughput of a function.

    Args:
        fn: The function to benchmark. If this returns anything, it must
            return a list of latencies.
        multiplier: The number of requests or tokens (or whatever unit
            is appropriate for this throughput benchmark) that is
            completed in one call to `fn`.
        num_trials: The number of trials to run.
        trial_runtime: How long each trial should run for. During the
            duration of one trial, `fn` will be repeatedly called.

    Returns (mean, stddev, latencies).
    g?Nr   r'      )r   r
   r   r   extendr   roundnpmeanstdr    r!   )r)   r*   r+   r,   r$   statsr"   _countresr%   r   r   r   run_throughput_benchmark.   s&   
(r7   zhttp://localhost:8000F)
batch_sizeurlstreamr8   r9   r:   c              	      s   t j| d}t j|dd4 I dH $fdd tj fddt| D  I dH W  d  I dH  S 1 I dH s<w   Y  dS )	z9Sends a batch of http requests and returns e2e latencies.)limitT)	connectorraise_for_statusNc               	      s   t  } z;r7 4 I d H }|j 2 z3 d H W \}}q6 W d   I d H  n1 I d H s1w   Y  n I d H  W n tjjyK   Y nw t  }d||   S Nr   )r   r   getcontentiter_chunksaiohttpclient_exceptionsClientConnectionError)r$   rchunkr4   r%   )sessionr:   r9   r   r   do_queryi   s"   (z&do_single_http_batch.<locals>.do_queryc                       g | ]}  qS r   r   .0r4   rH   r   r   
<listcomp>x       z(do_single_http_batch.<locals>.<listcomp>)rB   TCPConnectorClientSessionasynciogatherr   )r8   r9   r:   r<   r   )rH   rG   r:   r9   r   do_single_http_batchY   s    0rS   zlocalhost:9000)r8   targetrT   c                    sT   t j|}t|tjddfdd tj fddt	| D  I d H S )N )datac                     s.   t  }  I d H  t  }d||   S r>   )r   r   	grpc_call)r$   r%   )payloadstubr   r   rH      s
   z&do_single_grpc_batch.<locals>.do_queryc                    rI   r   r   rJ   rL   r   r   rM      rN   z(do_single_grpc_batch.<locals>.<listcomp>)
grpcaioinsecure_channelr   RayServeBenchmarkServiceStubr   
StringDatarQ   rR   r   )r8   rT   channelr   )rH   rX   rY   r   do_single_grpc_batch{   s   
"r`   coroc                    s:   ddl m} | }|  | I dH  |  |  dS )z)Collects profiling events using Viztracerr   )	VizTracerN)	viztracerrb   r$   stopsave)ra   rb   tracerr   r   r   collect_profile_events   s   
rg   sizec                    s   d  fddt| D S )NrU   c                 3   s    | ]}t  V  qd S r   )randomchoicerJ   charsr   r   	<genexpr>   s    z#generate_payload.<locals>.<genexpr>)joinr   )rh   rl   r   rk   r   generate_payload   s   ro   c                   @   s   e Zd Zdd ZdS )	Blackholec                 C   s   d S r   r   )selfor   r   r   sink      zBlackhole.sinkN)__name__
__module____qualname__rs   r   r   r   r   rp      s    rp   c                   @   s   e Zd Zdd Zdd ZdS )Noopc                 C   s   t dt j d S )N	ray.serve)logging	getLoggersetLevelWARNINGrq   r   r   r   __init__   s   zNoop.__init__c                 O   s   dS )N    r   )rq   argskwargsr   r   r   __call__   rt   zNoop.__call__N)ru   rv   rw   r   r   r   r   r   r   rx      s    rx   c                   @   s0   e Zd ZddedefddZdd Zdd	 Zd
S )Streamerr(   tokens_per_requestinter_token_delay_msc                 C   s&   t dt j || _|d | _d S )Nry   r   )rz   r{   r|   r}   _tokens_per_request_inter_token_delay_s)rq   r   r   r   r   r   r      s   zStreamer.__init__c                 C  s.   t | jD ]}t| jI d H  dV  qd S )Ns   hi)r   r   rQ   sleepr   )rq   r4   r   r   r   r:      s
   zStreamer.streamc                    s   t |  S r   r	   r:   r~   r   r   r   r      s   zStreamer.__call__N)r(   )ru   rv   rw   intr   r:   r   r   r   r   r   r      s    r   c                   @   s*   e Zd ZdefddZdd Zdd ZdS )	IntermediateRouterhandlec                 C   s$   t dt j |jdd| _d S )Nry   Tr:   )rz   r{   r|   r}   options_handle)rq   r   r   r   r   r      s   zIntermediateRouter.__init__c                 C  s(   | j j 2 z	3 d H W }|V  q6 d S r   )r   r:   remote)rq   tokenr   r   r   r:      s   zIntermediateRouter.streamc                 C   s   t |  S r   r   r~   r   r   r   r      s   zIntermediateRouter.__call__N)ru   rv   rw   r   r   r:   r   r   r   r   r   r      s    r   c                   @   s   e Zd Z	ddedefddZddedefd	d
ZdefddZ	de
dee fddZddde
dedejfddZddde
de
dedee deeef f
ddZdS )BenchmarkerFr   r:   c                 C   s*   t dt j |j|d| _|| _d S )Nry   r   )rz   r{   r|   r}   r   r   _stream)rq   r   r:   r   r   r   r      s   
zBenchmarker.__init__NrX   r   c                    sJ   t  }|du r| j I dH  n	| j|I dH  t  }d||  S )z<Completes a single unary request. Returns e2e latency in ms.Nr   )r   r   r   r   )rq   rX   r$   r%   r   r   r   do_single_request   s   zBenchmarker.do_single_requestc                    s:   t  }| jj 2 z3 dH W }q6 t  }d||  S )z?Consumes a single streaming request. Returns e2e latency in ms.Nr   )r   r   r   r:   r   )rq   r$   rE   r%   r   r   r   _do_single_stream   s   zBenchmarker._do_single_streamr8   c                    sL    j rtj fddt|D  I d H S tj fddt|D  I d H S )Nc                       g | ]}   qS r   )r   rJ   r~   r   r   rM          z0Benchmarker._do_single_batch.<locals>.<listcomp>c                    r   r   r   rJ   r~   r   r   rM      r   )r   rQ   rR   r   )rq   r8   r   r~   r   _do_single_batch   s   

zBenchmarker._do_single_batch)rX   r   c                   s"    fdd}t ||dI d H S )Nc                      s     I d H  d S r   r   r   rX   rq   r   r   r      s   z,Benchmarker.run_latency_benchmark.<locals>.f)r   )r&   )rq   r   rX   r   r   r   r   r&      s   z!Benchmarker.run_latency_benchmark)r   r+   r,   r   c                   s>   | j r|sJ || }n|}tt| j|d|||dI d H S )N)r8   )r)   r*   r+   r,   )r   r7   r   r   )rq   r8   r+   r,   r   r*   r   r   r   r7      s   
z$Benchmarker.run_throughput_benchmark)Fr   )ru   rv   rw   r   boolr   r   floatr   r   r   r   r   r    r!   r&   r   r   r7   r   r   r   r   r      s<    
	


r   )r'   r(   r'   )3rQ   r   rz   ri   stringr   	functoolsr   typingr   r   r   r   r   r   rB   aiohttp.client_exceptionsrZ   numpyr0   pandasr    starlette.responsesr	   r
   rayr   ray.serve.generatedr   r   ray.serve.handler   r   r!   r&   r   r7   strr   rS   r`   rg   ascii_uppercasedigitsro   rp   
deploymentrx   r   r   r   r   r   r   r   <module>   s     

-
#
