o
    `۷i
#                     @   s.  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	m
Z
mZmZmZmZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d	d
de
de de dej!fddZ"			d<de
g ee# f de de de#dee#e#ej!f f
ddZ$d	dddde de%de&dee# fdd Z'd	d!d"de d#e%fd$d%Z(d&efd'd(Z)d	ej*ej+ fd)e fd*d+Z,G d,d- d-Z-ej.G d.d/ d/Z/ej.G d0d1 d1Z0ej.G d2d3 d3Z1ej.G d4d5 d5Z2ej.G d6d7 d7Z3ej.G d8d9 d9Z4ej.G d:d; d;Z5dS )=    N)partial)AnyCallable	CoroutineListOptionalTuple)StreamingResponse)tqdm)serve)	serve_pb2serve_pb2_grpc)DeploymentHandled   )num_warmup_requestsfnum_requestsr   returnc                   sx   t  r	 }n fdd}g }tt|| D ]}t }| I d H  t }||kr6|d||   qt|S )Nc                      s      d S N r   r   r   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/_private/benchmarks/common.pyto_call   s   
z&run_latency_benchmark.<locals>.to_call  )	inspectiscoroutinefunctionr
   rangetimeperf_counterappendpdSeries)r   r   r   r   	latenciesistartendr   r   r   run_latency_benchmark   s   

r&      
   fn
multiplier
num_trialstrial_runtimec                    s   t   }t   | dk r|  I dH  t   | dk sg }g }tt|D ]8}t  }d}t  | |k rN|  I dH }	|	rB||	 |d7 }t  | |k s5t  }
||| |
|   q%tt|dtt	|dt
|fS )a;  Benchmarks throughput of a function.

    Args:
        fn: The function to benchmark. If this returns anything, it must
            return a list of latencies.
        multiplier: The number of requests or tokens (or whatever unit
            is appropriate for this throughput benchmark) that is
            completed in one call to `fn`.
        num_trials: The number of trials to run.
        trial_runtime: How long each trial should run for. During the
            duration of one trial, `fn` will be repeatedly called.

    Returns (mean, stddev, latencies).
    g?Nr   r'      )r   r
   r   r   extendr   roundnpmeanstdr    r!   )r)   r*   r+   r,   r$   statsr"   _countresr%   r   r   r   run_throughput_benchmark.   s&   
(r7   zhttp://localhost:8000F)
batch_sizeurlstreamr8   r9   r:   c              	      s   t j| d}t j|dd4 I dH $fdd tj fddt| D  I dH W  d  I dH  S 1 I dH s<w   Y  dS )	z9Sends a batch of http requests and returns e2e latencies.)limitT)	connectorraise_for_statusNc               	      s   t  } z: 4 I d H $}r"|j 2 z3 d H W \}}q6 n| I d H  W d   I d H  n1 I d H s9w   Y  W n tjjyJ   Y nw t  }d||   S Nr   )	r   r   getcontentiter_chunksreadaiohttpclient_exceptionsClientConnectionError)r$   rchunkr4   r%   )sessionr:   r9   r   r   do_queryi   s"   (z&do_single_http_batch.<locals>.do_queryc                       g | ]}  qS r   r   .0r4   rI   r   r   
<listcomp>y       z(do_single_http_batch.<locals>.<listcomp>)rC   TCPConnectorClientSessionasynciogatherr   )r8   r9   r:   r<   r   )rI   rH   r:   r9   r   do_single_http_batchY   s    0rT   zlocalhost:9000)r8   targetrU   c                    sT   t j|}t|tjddfdd tj fddt	| D  I d H S )N )datac                     s.   t  }  I d H  t  }d||   S r>   )r   r   	grpc_call)r$   r%   )payloadstubr   r   rI      s
   z&do_single_grpc_batch.<locals>.do_queryc                    rJ   r   r   rK   rM   r   r   rN      rO   z(do_single_grpc_batch.<locals>.<listcomp>)
grpcaioinsecure_channelr   RayServeBenchmarkServiceStubr   
StringDatarR   rS   r   )r8   rU   channelr   )rI   rY   rZ   r   do_single_grpc_batch|   s   
"ra   coroc                    s:   ddl m} | }|  | I dH  |  |  dS )z)Collects profiling events using Viztracerr   )	VizTracerN)	viztracerrc   r$   stopsave)rb   rc   tracerr   r   r   collect_profile_events   s   
rh   sizec                    s   d  fddt| D S )NrV   c                 3   s    | ]}t  V  qd S r   )randomchoicerK   charsr   r   	<genexpr>   s    z#generate_payload.<locals>.<genexpr>)joinr   )ri   rm   r   rl   r   generate_payload   s   rp   c                   @   s   e Zd Zdd ZdS )	Blackholec                 C   s   d S r   r   )selfor   r   r   sink      zBlackhole.sinkN)__name__
__module____qualname__rt   r   r   r   r   rq      s    rq   c                   @      e Zd Zdd Zdd ZdS )Noopc                 C      t dt j d S N	ray.servelogging	getLoggersetLevelWARNINGrr   r   r   r   __init__      zNoop.__init__c                 O   s   dS )N    r   rr   argskwargsr   r   r   __call__   ru   zNoop.__call__Nrv   rw   rx   r   r   r   r   r   r   rz      s    rz   c                   @   ry   )	ModelCompc                 C      t dt j || _d S r|   r   r   r   r   _childrr   childr   r   r   r         
zModelComp.__init__c                    s   | j  I d H S r   )r   remoter   r   r   r   r      s   zModelComp.__call__Nr   r   r   r   r   r      s    r   c                   @   $   e Zd Zdd Zdd Zdd ZdS )GrpcDeploymentc                 C   r{   r|   r~   r   r   r   r   r      r   zGrpcDeployment.__init__c                       t jddS N	   )outputr   ModelOutputrr   user_messager   r   r   rX         zGrpcDeployment.grpc_callc                    r   r   r   r   r   r   r   call_with_string   r   zGrpcDeployment.call_with_stringNrv   rw   rx   r   rX   r   r   r   r   r   r      s    r   c                   @   r   )GrpcModelCompc                 C   r   r|   r   r   r   r   r   r      r   zGrpcModelComp.__init__c                       | j  I d H  tjddS r   r   r   r   r   r   r   r   r   rX         zGrpcModelComp.grpc_callc                    r   r   r   r   r   r   r   r      r   zGrpcModelComp.call_with_stringNr   r   r   r   r   r      s    r   c                   @   s0   e Zd ZddedefddZdd Zdd	 Zd
S )Streamerr(   tokens_per_requestinter_token_delay_msc                 C   s&   t dt j || _|d | _d S )Nr}   r   )r   r   r   r   _tokens_per_request_inter_token_delay_s)rr   r   r   r   r   r   r      s   zStreamer.__init__c                 C  s.   t | jD ]}t| jI d H  dV  qd S )Ns   hi)r   r   rR   sleepr   )rr   r4   r   r   r   r:      s
   zStreamer.streamc                    s   t |  S r   r	   r:   r   r   r   r   r      r   zStreamer.__call__N)r(   )rv   rw   rx   intr   r:   r   r   r   r   r   r      s    r   c                   @   s*   e Zd ZdefddZdd Zdd ZdS )	IntermediateRouterhandlec                 C   s$   t dt j |jdd| _d S )Nr}   Tr:   )r   r   r   r   options_handle)rr   r   r   r   r   r      s   zIntermediateRouter.__init__c                 C  s(   | j j 2 z	3 d H W }|V  q6 d S r   )r   r:   r   )rr   tokenr   r   r   r:      s   zIntermediateRouter.streamc                 C   s   t |  S r   r   r   r   r   r   r      s   zIntermediateRouter.__call__N)rv   rw   rx   r   r   r:   r   r   r   r   r   r      s    r   c                   @   s   e Zd Z	ddedefddZddedefd	d
ZdefddZ	de
dee fddZddde
dedejfddZddde
de
dedee deeef f
ddZdS )BenchmarkerFr   r:   c                 C   s*   t dt j |j|d| _|| _d S )Nr}   r   )r   r   r   r   r   r   _stream)rr   r   r:   r   r   r   r      s   
zBenchmarker.__init__NrY   r   c                    sJ   t  }|du r| j I dH  n	| j|I dH  t  }d||  S )z<Completes a single unary request. Returns e2e latency in ms.Nr   )r   r   r   r   )rr   rY   r$   r%   r   r   r   do_single_request   s   zBenchmarker.do_single_requestc                    s:   t  }| jj 2 z3 dH W }q6 t  }d||  S )z?Consumes a single streaming request. Returns e2e latency in ms.Nr   )r   r   r   r:   r   )rr   r$   rF   r%   r   r   r   _do_single_stream  s   zBenchmarker._do_single_streamr8   c                    sL    j rtj fddt|D  I d H S tj fddt|D  I d H S )Nc                       g | ]}   qS r   )r   rK   r   r   r   rN         z0Benchmarker._do_single_batch.<locals>.<listcomp>c                    r   r   r   rK   r   r   r   rN     r   )r   rR   rS   r   )rr   r8   r   r   r   _do_single_batch  s   

zBenchmarker._do_single_batch)rY   r   c                   s"    fdd}t ||dI d H S )Nc                      s     I d H  d S r   r   r   rY   rr   r   r   r     s   z,Benchmarker.run_latency_benchmark.<locals>.f)r   )r&   )rr   r   rY   r   r   r   r   r&     s   z!Benchmarker.run_latency_benchmark)r   r+   r,   r   c                   s>   | j r|sJ || }n|}tt| j|d|||dI d H S )N)r8   )r)   r*   r+   r,   )r   r7   r   r   )rr   r8   r+   r,   r   r*   r   r   r   r7   $  s   
z$Benchmarker.run_throughput_benchmark)Fr   )rv   rw   rx   r   boolr   r   floatr   r   r   r   r   r    r!   r&   r   r   r7   r   r   r   r   r      s<    
	


r   )r'   r(   r'   )6rR   r   r   rj   stringr   	functoolsr   typingr   r   r   r   r   r   rC   aiohttp.client_exceptionsr[   numpyr0   pandasr    starlette.responsesr	   r
   rayr   ray.serve.generatedr   r   ray.serve.handler   r   r!   r&   r   r7   strr   rT   ra   rh   ascii_uppercasedigitsrp   rq   
deploymentrz   r   r   r   r   r   r   r   r   r   r   <module>   s     

-
$
	