o
    `۷i                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	  m
Z
 d dlm  mZ d dlm  mZ d dlmZ d dlmZ d dlmZ d dlmZmZ eeZe
dd	ZG d
d dejZ dS )    N)ThreadPoolExecutor)Union) get_auth_headers_if_auth_enabled)event_consts)monitor_events)async_loop_forevercreate_task)RAY_DASHBOARD_EVENT_AGENT_TPE_MAX_WORKERS   c                       sR   e Zd Z fddZdd Zeejdd Zdd Z	d	d
 Z
edd Z  ZS )
EventAgentc                    s   t  | tj| jjd| _tj| jdd d | _	d | _
ttj| _|j| _d| _d| _t | _ttdd| _td| jj d S )NeventsT)exist_okr   event_agent_executor)max_workersthread_name_prefixz!Event agent cache buffer size: %s)super__init__ospathjoin_dashboard_agentlog_dir
_event_dirmakedirs_monitor_dashboard_http_addressasyncioQueuer   EVENT_AGENT_CACHE_SIZE_cached_events
gcs_client_gcs_clienttotal_event_reportedtotal_request_senttime	monotonicmodule_startedr   r	   	_executorloggerinfomaxsize)selfdashboard_agent	__class__ ]/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/dashboard/modules/event/event_agent.pyr      s   
zEventAgent.__init__c                    s   	 | j r| j S z | jjtj tjtjdI dH }|s t	d|
 | _ | j W S  ty6   td Y nw tdI dH  q)zz
        Lazily get the dashboard http address from InternalKV. If it's not set, sleep
        and retry forever.
        T)	namespacetimeoutNz/Dashboard http address not found in InternalKV.z"Get dashboard http address failed.r
   )r   r!   async_internal_kv_getray_constantsDASHBOARD_ADDRESSencodeKV_NAMESPACE_DASHBOARDdashboard_constsGCS_RPC_TIMEOUT_SECONDS
ValueErrordecode	Exceptionr(   	exceptionr   sleep)r+   dashboard_http_addressr/   r/   r0   _get_dashboard_http_address5   s&   
z&EventAgent._get_dashboard_http_addressc           	         s8  |   I dH }| j I dH }|  jt|7  _d}ttjD ]^}z@t	dt| | j
jj| d|ti d4 I dH }|  W d  I dH  n1 I dH sSw   Y  |  jd7  _W  dS  ty~ } ztd|  |}W Y d}~q d}~ww t|}tj}tjd|d| ||d od |d	 dS )
zReport events from cached events queue. Reconnect to dashboard if
        report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.

        This method will never returns.
        NzReport %s events.z/report_events)jsonheadersr
   z!Report event failed, retrying... zReport event failed: %sz...)exc_info)r@   r   getr"   lenranger   EVENT_AGENT_RETRY_TIMESr(   debugr   http_sessionpostr   raise_for_statusr#   r<   warningstr#LOG_ERROR_EVENT_STRING_LENGTH_LIMITerror)	r+   r?   datalast_exception_responseedata_strlimitr/   r/   r0   report_eventsK   s<   
(
zEventAgent.report_eventsc                    s@   | j dks| jdkrd S t | j }| j | j| j |dS )Nr   )total_events_reportedTotal_report_request
queue_sizetotal_uptime)r"   r#   r$   r%   r&   r   qsize)r+   elapsedr/   r/   r0   get_internal_statesm   s   zEventAgent.get_internal_statesc                    s4   t  j fdd j _t  I d H  d S )Nc                    s   t  j| S )N)r   r   put)rP   r+   r/   r0   <lambda>}   s    z EventAgent.run.<locals>.<lambda>)r   r   r'   r   r   gatherrW   )r+   serverr/   r`   r0   runy   s   
zEventAgent.runc                   C   s   dS )NFr/   r/   r/   r/   r0   is_minimal_module   s   zEventAgent.is_minimal_module)__name__
__module____qualname__r   r@   r   r   #EVENT_AGENT_REPORT_INTERVAL_SECONDSrW   r^   rd   staticmethodre   __classcell__r/   r/   r-   r0   r      s    
!r   )!r   loggingr   r$   concurrent.futuresr   typingr   ray._private.ray_constants_privater4   ray.dashboard.consts	dashboardconstsr8   ray.dashboard.utilsutilsdashboard_utils5ray._private.authentication.http_token_authenticationr   ray.dashboard.modules.eventr   'ray.dashboard.modules.event.event_utilsr   r   r   	getLoggerrf   r(   env_integerr	   DashboardAgentModuler   r/   r/   r/   r0   <module>   s$    
