o
    bi                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	  m
Z
 d dlm  mZ d dlm  mZ d dlmZ d dlmZ d dlmZmZ eeZe
ddZG d	d
 d
ejZdS )    N)ThreadPoolExecutor)Union)event_consts)monitor_events)async_loop_forevercreate_task)RAY_DASHBOARD_EVENT_AGENT_TPE_MAX_WORKERS   c                       sR   e Zd Z fddZdd Zeejdd Zdd Z	d	d
 Z
edd Z  ZS )
EventAgentc                    s   t  | tj| jjd| _tj| jdd d | _	d | _
ttj| _|j| _d| _d| _t | _ttdd| _td| jj d S )NeventsT)exist_okr   event_agent_executor)max_workersthread_name_prefixz!Event agent cache buffer size: %s)super__init__ospathjoin_dashboard_agentlog_dir
_event_dirmakedirs_monitor_dashboard_http_addressasyncioQueuer   EVENT_AGENT_CACHE_SIZE_cached_events
gcs_client_gcs_clienttotal_event_reportedtotal_request_senttime	monotonicmodule_startedr   r   	_executorloggerinfomaxsize)selfdashboard_agent	__class__ [/home/ubuntu/.local/lib/python3.10/site-packages/ray/dashboard/modules/event/event_agent.pyr      s   
zEventAgent.__init__c                    s   	 | j r| j S z | jjtj tjtjdI dH }|s t	d|
 | _ | j W S  ty6   td Y nw tdI dH  q)zz
        Lazily get the dashboard http address from InternalKV. If it's not set, sleep
        and retry forever.
        T)	namespacetimeoutNz/Dashboard http address not found in InternalKV.z"Get dashboard http address failed.r	   )r   r    async_internal_kv_getray_constantsDASHBOARD_ADDRESSencodeKV_NAMESPACE_DASHBOARDdashboard_constsGCS_RPC_TIMEOUT_SECONDS
ValueErrordecode	Exceptionr'   	exceptionr   sleep)r*   dashboard_http_addressr.   r.   r/   _get_dashboard_http_address2   s&   
z&EventAgent._get_dashboard_http_addressc                    s&  |   I dH }| j I dH }|  jt|7  _ttjD ]Y}z=t	dt| | j
jj| d|d4 I dH }|  W d  I dH  n1 I dH sNw   Y  |  jd7  _W  dS  tyw } ztd|  W Y d}~qd}~ww t|}tj}td|d| ||d od  dS )	zReport events from cached events queue. Reconnect to dashboard if
        report failed. Log error after retry EVENT_AGENT_RETRY_TIMES.

        This method will never returns.
        NzReport %s events.z/report_events)jsonr	   z!Report event failed, retrying... zReport event failed: %sz...)r?   r   getr!   lenranger   EVENT_AGENT_RETRY_TIMESr'   debugr   http_sessionpostraise_for_statusr"   r;   warningstr#LOG_ERROR_EVENT_STRING_LENGTH_LIMITerror)r*   r>   data_responseedata_strlimitr.   r.   r/   report_eventsH   s4   
(zEventAgent.report_eventsc                    s@   | j dks| jdkrd S t | j }| j | j| j |dS )Nr   )total_events_reportedTotal_report_request
queue_sizetotal_uptime)r!   r"   r#   r$   r%   r   qsize)r*   elapsedr.   r.   r/   get_internal_statesf   s   zEventAgent.get_internal_statesc                    s4   t  j fdd j _t  I d H  d S )Nc                    s   t  j| S )N)r   r   put)rM   r*   r.   r/   <lambda>v   s    z EventAgent.run.<locals>.<lambda>)r   r   r&   r   r   gatherrS   )r*   serverr.   r\   r/   runr   s   
zEventAgent.runc                   C   s   dS )NFr.   r.   r.   r.   r/   is_minimal_module~   s   zEventAgent.is_minimal_module)__name__
__module____qualname__r   r?   r   r   #EVENT_AGENT_REPORT_INTERVAL_SECONDSrS   rZ   r`   staticmethodra   __classcell__r.   r.   r,   r/   r
      s    
r
   )r   loggingr   r#   concurrent.futuresr   typingr   ray._private.ray_constants_privater3   ray.dashboard.consts	dashboardconstsr7   ray.dashboard.utilsutilsdashboard_utilsray.dashboard.modules.eventr   'ray.dashboard.modules.event.event_utilsr   r   r   	getLoggerrb   r'   env_integerr   DashboardAgentModuler
   r.   r.   r.   r/   <module>   s"    
