o
    `۷i*                     @   s   d Z ddlZddlZddlZddlZddlm  mZ ddl	m
Z
 ddlmZmZ ddlmZ ddlmZmZ ddlmZ ddlmZ eeZejG d	d
 d
Zdd Zdd ZdddZedkrhe  dS dS )z,This is the script for `ray microbenchmark`.    N)get_or_create_event_loop)asyncio_timeittimeit)get_actor_node_id)	InputNodeMultiOutputNode)CompiledDAG)NodeAffinitySchedulingStrategyc                   @      e Zd Zdd Zdd ZdS )DAGActorc                 C      |S N selfxr   r   X/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/_private/ray_experimental_perf.pyecho      zDAGActor.echoc                 G   r   r   r   r   r   r   r   echo_multiple   r   zDAGActor.echo_multipleN)__name__
__module____qualname__r   r   r   r   r   r   r          r   c                  C   s   t jjsd} t|  d S d S )Na  WARNING: Unoptimized build! To benchmark an optimized build, try:
	bazel run -c opt //:gen_ray_pkg
You can also make this permanent by adding
	build --compilation_mode=opt
to your user-wide ~/.bazelrc file. (Do not add this to the project-level .bazelrc file.))ray_raylet	OPTIMIZEDloggerwarning)msgr   r   r   check_optimized_build   s
   	r    c                   C   s"   t jjtt  ddd S )NF)soft)scheduling_strategy)r   DAGDriverProxyActoroptionsr	   r   get_runtime_contextget_node_idremoter   r   r   r   create_driver_actor,   s   r(   c                    s  | pg } t  }t  td t  dTdd	tjG dd d}t }t|}t	d ||fgdg| t
d	fd	d
7 } | 
t
t	d 
fgdgt
j  
j | t
d	fdd
7 } t
 t d }td|  g t|D ]}| 
t

f qt	d dgtdd D  D ]
\
}
j q| t
d	fdd
7 } D ]	\
}t
 q| 
t

fddt|D t
j  
j | t
d	fdd
7 } t
 g t|D ]}| 
t

f qfddt|D tdd D  tD ]\}}|d 

j|g q3| t
d	fdd
7 } D ]
\
}t
 qRdUdd fdd}	t }
t |
jW d    n	1 sw   Y  | t
d fd!d
7 }  | t
d" fd#d
7 } ~
t }
t |
jW d    n	1 sw   Y  jd$d%| ||	d&7 } ~
t d }d'd t|D t tfd(dD W d    n	1 sw   Y  | t
d)| d*fd+d
7 }  | t
d,| d* fd-d
7 } d.d t|D t tfd/dD W d    n	1 sDw   Y  jd$d%| ||	d0| d*7 } d1d t|D t D ]	}
|
jqmW d    n	1 sw   Y  | t
d2| d*fd3d
7 }  | t
d4| d* fd5d
7 } d6d t|D t D ]	}
|
jqW d    n	1 sw   Y  jd$d%| ||	d7| d*7 } d8|ksJ d9| d: d;d<d tD t tfd=dtD W d    n	1 s w   Y  d| t
d> d*fd?d
7 }  | t
d@ d* fdAd
7 } dBd tD t tfdCdtD W d    n	1 stw   Y  dD| t
dE d*fdFd
7 }  | t
dG d* fdHd
7 } dId tD t tfdJdtD W d    n	1 sw   Y  dK| t
dL d*fdMd
7 }  | t
dN d* fdOd
7 } t }d8t |jjfdPdtD  W d    n	1 sw   Y  d| t
dQfdRd
7 }  | t
dQ fdSd
7 } t  | S )VNz=Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarksFc                 S   s$   | D ]}| d |r|  qd S )N   0)writeread)chansdo_getchanr   r   r   put_channel_smallA   s   
zmain.<locals>.put_channel_smallc                   @   r
   )zmain.<locals>.ChannelReaderc                 S   s   d S r   r   )r   r   r   r   readyI   r   z!main.<locals>.ChannelReader.readyc                 S   s   	 |D ]}|   qqr   )r+   )r   r,   r.   r   r   r   r+   L   s   
z main.<locals>.ChannelReader.readN)r   r   r   r0   r+   r   r   r   r   ChannelReaderG   r   r1     z4[unstable] local put:local get, single channel callsc                      s    ddS )NT)r-   r   r   r,   r/   r   r   <lambda>V   s    zmain.<locals>.<lambda>z7[unstable] local put:1 remote get, single channel callsc                          S r   r   r   r3   r   r   r4   `          z%Testing multiple readers/channels, n=c                 S      g | ]	\}}|j  qS r   r0   r'   .0reader_r   r   r   
<listcomp>m       zmain.<locals>.<listcomp>z7[unstable] local put:n remote get, single channel callsc                      r5   r   r   r   r3   r   r   r4   r   r6   c                    s    g | ]}t d  fgdqS Nr2   ray_channelChannelr;   r=   )r<   reader_noder   r   r>   y       z3[unstable] local put:1 remote get, n channels callsc                      r5   r   r   r   r3   r   r   r4      r6   c                    s    g | ]}t d  | gdqS r@   rA   r;   i)reader_and_node_listr   r   r>      rF   c                 S   r8   r   r9   r:   r   r   r   r>      r?   r   z3[unstable] local put:n remote get, n channels callsc                      r5   r   r   r   r3   r   r   r4      r6      c                    s*   | j  fddt|D  }t| d S )Nc                       g | ]}d   qS    xr   rD   payload_sizer   r   r>          z'main.<locals>._exec.<locals>.<listcomp>)executeranger   get)dagnum_argsrO   
output_refr   rN   r   _exec   s   zmain.<locals>._execc                    s    fdd}t | |I d H S )Nc                     s>     dI d H } t| ts| I d H  d S tj|  I d H  d S NrM   )execute_async
isinstancelistasynciogather)futcompiled_dagr   r   _exec_async   s
   
z-main.<locals>.exec_async.<locals>._exec_async)r   )tagra   r_   r   r   
exec_async   s   
zmain.<locals>.exec_asyncz![unstable] single-actor DAG callsc                         t  dS rX   r   rS   rQ   r   rT   r   r   r4          z*[unstable] compiled single-actor DAG callsc                          S r   r   r   rW   r`   r   r   r4      r6   T)enable_asyncioz2[unstable] compiled single-actor asyncio DAG callsc                 S      g | ]}t  qS r   r   r'   rD   r   r   r   r>      rP   c                       g | ]}|j  qS r   r   bindr;   ainpr   r   r>          z'[unstable] scatter-gather DAG calls, n=z actorsc                      rd   rX   re   r   rf   r   r   r4      rg   z0[unstable] compiled scatter-gather DAG calls, n=c                      rh   r   r   r   ri   r   r   r4      r6   c                 S   rk   r   rl   rD   r   r   r   r>      rP   c                    rm   r   rn   rp   rr   r   r   r>      rt   z8[unstable] compiled scatter-gather asyncio DAG calls, n=c                 S   rk   r   rl   rD   r   r   r   r>      rP   z[unstable] chain DAG calls, n=c                      rd   rX   re   r   rf   r   r   r4      rg   z'[unstable] compiled chain DAG calls, n=c                      rh   r   r   r   ri   r   r   r4      r6   c                 S   rk   r   rl   rD   r   r   r   r>      rP   z/[unstable] compiled chain asyncio DAG calls, n=   zn_cpu (z!) must be greater than n_actors ()c                 S   rk   r   rl   rD   r   r   r   r>     rP   c                        g | ]} | j | qS r   rn   rG   actorsrs   r   r   r>          z:[unstable] multiple args with small payloads DAG calls, n=c                      "   t  jfddtD  S )Nc                    rK   rL   r   rD   rN   r   r   r>     rP   *main.<locals>.<lambda>.<locals>.<listcomp>r   rS   rQ   rR   r   rT   n_actorsrO   r   r   r4        " zC[unstable] compiled multiple args with small payloads DAG calls, n=c                          dS N)rU   rO   r   r   rW   r`   r   rO   r   r   r4         c                 S   rk   r   rl   rD   r   r   r   r>     rP   c                    rw   r   rn   rG   rx   r   r   r>     rz   i   z;[unstable] multiple args with medium payloads DAG calls, n=c                      r{   )Nc                    rK   rL   r   rD   rN   r   r   r>     rP   r|   r}   r   r~   r   r   r4     r   zD[unstable] compiled multiple args with medium payloads DAG calls, n=c                      r   r   r   r   r   r   r   r4   #  r   c                 S   rk   r   rl   rD   r   r   r   r>   (  rP   c                    rw   r   rn   rG   rx   r   r   r>   *  rz   i   z:[unstable] multiple args with large payloads DAG calls, n=c                      r{   )Nc                    rK   rL   r   rD   rN   r   r   r>   .  rP   r|   r}   r   r~   r   r   r4   .  r   zC[unstable] compiled multiple args with large payloads DAG calls, n=c                      r   r   r   r   r   r   r   r4   4  r   c                    s   g | ]} | qS r   r   rG   rr   r   r   r>   =  rP   zO[unstable] single-actor with all args with small payloads DAG calls, n=1 actorsc                      r{   )Nc                    rK   rL   r   rD   rN   r   r   r>   B  rP   r|   r}   r   )rT   n_argsrO   r   r   r4   B  r   c                      r   r   r   r   )rW   r`   r   rO   r   r   r4   H  r   )F)rJ   rJ   )r   r    printr   initr'   r(   r   rB   rC   r   rS   r0   r+   killmultiprocessing	cpu_countrR   appendzipr   r   r   ro   experimental_compilerun_until_completer   r   shutdown)resultsloopr1   driver_actordriver_noden_cpur=   r.   reader_node_tuplerc   rq   actorr   )rW   ry   r,   r`   rT   rs   r   r   rO   r/   r<   rI   rE   r   main4   s  
	













	 r   __main__r   )__doc__r\   loggingr   r   ray.experimental.channelexperimentalchannelrB   ray._common.utilsr   'ray._private.ray_microbenchmark_helpersr   r   ray._private.test_utilsr   ray.dagr   r   ray.dag.compiled_dag_noder   ray.util.scheduling_strategiesr	   	getLoggerr   r   r'   r   r    r(   r   r   r   r   r   <module>   s.    

  
