o
    `۷i?"                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m	Z	 d dl
mZ d dlmZmZmZ d dlZd dlZd dlm  mZ d dlm  mZ d dlmZmZ d dlmZ d d	lm Z  d d
l!m"Z"m#Z#m$Z$ d dl%m&Z&m'Z' d dl(m)Z)m*Z* d dl+m,Z, d dl-m.Z/ d dl0m1Z1m2Z2m3Z3 e4e5Z6eZ7ej89e7 e:ej;<ddZ=e ddZ>dede2de3fddZ?G dd de,ej@ZAdS )    N)OrderedDictdefaultdict)ThreadPoolExecutor)datetime)islice)DictListUnion)TagKeyrecord_extra_usage_tag)get_or_create_event_loop)env_integer)!RAY_STATE_SERVER_MAX_HTTP_REQUEST)RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED*RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME)monitor_eventsparse_event_strings)	do_filterhandle_list_api)SubprocessModule)SubprocessRouteTable)ClusterEventStateListApiOptionsListApiResponse!RAY_DASHBOARD_MAX_EVENTS_TO_CACHEi'  (RAY_DASHBOARD_EVENT_HEAD_TPE_MAX_WORKERS   executoroptionreturnc                    s*   dt f fdd}t ||| I dH S )a  
    List all cluster events from the cluster. Made a free function to allow unit tests.

    Returns:
        A list of cluster events in the cluster.
        The schema of returned "dict" is equivalent to the
        `ClusterEventState` protobuf message.
    r   c                    s   g }|   D ]\}}|  D ]\}}ttt|d |d< || qqt|}|jdd d t|}t| j	t
 j}t|}tt| j}t||||dS )N	timestamptimec                 S   s   | d S )Nr     )entryr"   r"   \/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/dashboard/modules/event/event_head.py<lambda>A   s    z>_list_cluster_events_impl.<locals>.transform.<locals>.<lambda>)key)resulttotalnum_after_truncationnum_filtered)itemsstrr   fromtimestampintappendlensortr   filtersr   detaillistr   limitr   )
all_eventsr'   _eventseventr)   r(   r*   r   r"   r$   	transform9   s$   z,_list_cluster_events_impl.<locals>.transformN)r   r   run_in_executor)r6   r   r   r;   r"   r:   r$   _list_cluster_events_impl-   s
   
r=   c                       s   e Zd Zdd Zdd Zdd Zeddd	 Zd
d Z	e
dejdejjfddZe
dejjdejjdejjfddZ fddZ  ZS )	EventHeadc                 O   s   t j| g|R i | tj| ttt tj	| j
d| _tj| jdd d | _d| _d| _t | _tt| _ttdd| _| jd usHJ tjj sPJ d S )Nr8   T)exist_okr   event_head_executor)max_workersthread_name_prefix)r   __init__dashboard_utilsRateLimitedModuleminr   r   ospathjoinlog_dir
_event_dirmakedirs_monitortotal_report_events_counttotal_events_receivedr!   	monotonicmodule_startedr   	JobEventsr8   r   r   	_executor
gcs_clientrayexperimentalinternal_kv_internal_kv_initialized)selfargskwargsr"   r"   r$   rC   W   s*   

zEventHead.__init__c                    s*   t jtjjd| j dt dt d dS )Nz#Max number of in-progress requests=zB reached. To set a higher limit, set environment variable: export z='xxx'. Max allowed = )status_codeerror_messager'   )dashboard_optional_utilsrest_responserD   HTTPStatusCodeINTERNAL_ERRORmax_num_call_r   r   rY   r"   r"   r$   limit_handler_r   s   zEventHead.limit_handler_c           
      C   s   t t}|D ]$}|d }|d}d}|r|ddpd}nd}|du r*||| |< q| D ](\}}| j| }	|	| t|	td krWt|	tkrW|	jdd t|	tksKq/d S )Nevent_idcustom_fieldsFjob_idglobalg?)last)	r   rR   getr+   r8   updater0   MAX_EVENTS_TO_CACHEpopitem)
rY   
event_listall_job_eventsr9   re   rf   system_eventrg   new_job_events
job_eventsr"   r"   r$   _update_events   s(   


zEventHead._update_eventsz/report_eventsc              
      s   z	|  I dH }W n ty& } ztd|d| tj d}~ww t|ts9td| tj t	|}t
dt| | | |  jd7  _|  jt|7  _tjddtjjd	S )
z
        Report events to the dashboard.
        The request body is a JSON array of event strings in type string.
        Response should contain {"success": true}.
        Nz&Failed to parse request body: request=z, e=z)Request body is not a list, request_body=zReceived %d eventsr   T )successmessager\   )json	ExceptionloggerwarningaiohttpwebHTTPBadRequest
isinstancer4   r   debugr0   rs   rN   rO   r^   r_   rD   r`   OK)rY   requestrequest_bodyer8   r"   r"   r$   report_events   s*   



zEventHead.report_eventsc                    s8   | j dks| jdkrd S t | j }| j | j|dS )Nr   )rO   Total_requests_receivedtotal_uptime)rO   rN   r!   rP   rQ   )rY   elapsedr"   r"   r$   _periodic_state_print   s   zEventHead._periodic_state_printz/eventsr   c                    sf   |j d}|d u rdd | j D }tjtjjd|dS | j| }tjtjjd|t	|
 dS )Nrg   c                 S   s   i | ]\}}|t | qS r"   )r4   values).0rg   rr   r"   r"   r$   
<dictcomp>   s    z'EventHead.get_event.<locals>.<dictcomp>zAll events fetched.)r\   rv   r8   zJob events fetched.)r\   rv   rg   r8   )queryrj   r8   r+   r^   r_   rD   r`   r   r4   r   )rY   reqrg   r6   rr   r"   r"   r$   	get_event   s$   

zEventHead.get_eventz/api/v0/cluster_eventsr   c                    s0   t tjd dtf fdd}t||I d H S )N1r   c                    s   t  j j| dI d H S )N)r6   r   r   )r=   r8   rS   r:   rc   r"   r$   list_api_fn   s   
z2EventHead.list_cluster_events.<locals>.list_api_fn)r   r
   "CORE_STATE_API_LIST_CLUSTER_EVENTSr   r   )rY   r   r   r"   rc   r$   list_cluster_events   s   zEventHead.list_cluster_eventsc                    s0   t   I d H  t j fdd j _d S )Nc                    s     t| S )N)rs   r   )datarc   r"   r$   r%      s    zEventHead.run.<locals>.<lambda>)superrunr   rK   rS   rM   rc   	__class__rc   r$   r      s   

zEventHead.run)__name__
__module____qualname__rC   rd   rs   routespostr   r   rj   r^   aiohttp_cacher{   r|   Responser   rD   rE   enforce_max_concurrent_callsRequestr   r   __classcell__r"   r"   r   r$   r>   S   s$    
r>   )BasynciologgingrG   r!   collectionsr   r   concurrent.futuresr   r   	itertoolsr   typingr   r   r	   aiohttp.webr{   rU   ray.dashboard.optional_utils	dashboardoptional_utilsr^   ray.dashboard.utilsutilsrD   ray._common.usage.usage_libr
   r   ray._common.utilsr   ray._private.ray_constantsr   ray.dashboard.constsr   r   r   'ray.dashboard.modules.event.event_utilsr   r   ray.dashboard.state_api_utilsr   r   !ray.dashboard.subprocesses.moduler   !ray.dashboard.subprocesses.routesr   r   ray.util.state.commonr   r   r   	getLoggerr   ry   rR   _json_compatible_typesaddr.   environrj   rl   r   r=   rE   r>   r"   r"   r"   r$   <module>   sP    


&