o
    $iL                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZmZ d dlmZ d dlmZ eeZdddZdd	 Zd
d Zdd Zedg dZejdfddZeje		 ej ddfdefddZ dS )    N)ThreadPoolExecutor)get_or_create_event_looprun_background_task)event_consts)async_loop_foreverc           
      C   s   t | }i }ttj}|ptjD ]9}||v sJ d| g }|D ]!}t|d| drCt j| |}	|d ur>||	s>q"||	 q"|rJ|||< q|S )NzInvalid source type: *z*.log)	oslistdirsetr   EVENT_SOURCE_ALLfnmatchpathjoinappend)
	event_dirsource_typesevent_file_filterevent_log_namessource_filesall_source_typessource_typefilesnf r   d/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/dashboard/modules/event/event_utils.py_get_source_files   s"   


r   c                 C   sD   z| d  dd dd| d< W | S  ty!   td|  Y | S w )Nmessagez\n
z\rz$Restore newline for event failed: %s)replace	Exceptionlogger	exception)
event_dictr   r   r   _restore_newline%   s   r$   c                 C   s   t t| S N)r$   jsonloads)	event_strr   r   r   _parse_line/   s   r)   c              	   C   sP   g }| D ]!}|s	qzt |}|| W q ty%   tdt| Y qw |S )NzParse event line failed: %s)r)   r   r    r!   r"   repr)event_string_listeventsdataeventr   r   r   parse_event_strings3   s   r/   ReadFileResult)fidsizemtimepositionlinesTc              
   C   s"  t | d|d}t| }|jp| }g }tj| dtjdM}|}	t|D ]>}
|d|	}|dkr6 n1||	 t	j
krJ|||	| d ntdt	j
}td	||	|	|  d||	  |d
 }	q(W d    n1 sqw   Y  t||j|j|	|W  d    S 1 sw   Y  d S )Nrbclosefdr   )access   
zutf-8d   z$Ignored long string: %s...(%s chars)   )openr   statfilenost_inommapACCESS_READrangefindr   EVENT_READ_LINE_LENGTH_LIMITr   decodeminr!   warningr0   st_sizest_mtime)fileposn_linesr8   r   r?   r1   r5   mmstart_septruncated_sizer   r   r   
_read_fileE   s.   

$rT   monitor_thread_pool_executorc                    s   t  du r	i td	du rdn tdg d 	fdd fdd	t|d
dfdd}t| S )aB  Monitor events in directory. New events will be read and passed to the
    callback.

    Args:
        event_dir: The event log directory.
        callback (def callback(List[str]): pass): A callback accepts a list of
            event strings.
        monitor_thread_pool_executor: A thread pool exector to monitor/update
            events. None means it will use the default execturo which uses
            num_cpus of the machine * 5 threads (before python 3.8) or
            min(32, num_cpus + 5) (from Python 3.8).
        scan_interval_seconds: An interval seconds between two scans.
        start_mtime: Only the event log files whose last modification
            time is greater than start_mtime are monitored.
        monitor_files (Dict[int, MonitorFile]): The map from event log file id
            to MonitorFile object. Monitor all files start from the beginning
            if the value is None.
        source_types (List[str]): A list of source type name from
            event_pb2.Event.SourceType.keys(). Monitor all source types if the
            value is None.
    NzEMonitor events logs modified after %s on %s, the source types are %s.allMonitorFile)r2   r3   r4   c                    s   t | }|j kS r%   )r   r?   rK   )source_filer?   )start_mtimer   r   _source_file_filter   s   

z+monitor_events.<locals>._source_file_filterc           	   
      sJ  t | tsJ dt|  d|  dt| tj}zzht|}|jdkr.g W W t| S |j	p2| }
|}|r_|j|jkr[|j|jkr[|j|jkr[td|  g W W t| S |j}ntd|  |}t||dd} |j|j|j|j< |j W n ty } ztd	|  |d }~ww W t| d S t| w )
NzFile should be a str, but a (z) foundr   z4Skip reading the file because there is no change: %szFound new event log file: %sFr7   zRead event file failed: )
isinstancestrtyper   r>   O_RDONLYr?   rJ   closerA   getr4   r2   r3   rK   r!   debuginforT   r1   call_soon_threadsafer5   r    )	rL   rM   fdr?   r1   monitor_filer4   re)rW   callbackloopmonitor_filesr   r   _read_monitor_file   sD   



z*monitor_events.<locals>._read_monitor_fileT)cancellablec                     sh    tI d H } ttjfdd tj fddttj	| 
  D  I d H  d S )Nc              	      sT   4 I d H    | dI d H W  d   I d H  S 1 I d H s#w   Y  d S )Nr   )run_in_executor)filename)rl   rj   rU   	semaphorer   r   _concurrent_coro   s   0zGmonitor_events.<locals>._scan_event_log_files.<locals>._concurrent_coroc                    s   g | ]} |qS r   r   ).0ro   )rq   r   r   
<listcomp>   s    zAmonitor_events.<locals>._scan_event_log_files.<locals>.<listcomp>)rn   r   asyncio	Semaphorer   CONCURRENT_READ_LIMITgatherlist	itertoolschainvalues)r   )rl   rZ   r   rj   rU   r   )rq   rp   r   _scan_event_log_files   s   
	
z-monitor_events.<locals>._scan_event_log_files)r   r!   rc   collections
namedtupler   r   )r   ri   rU   scan_interval_secondsrY   rk   r   r|   r   )
rW   rl   rZ   ri   r   rj   rk   rU   r   rY   r   monitor_events`   s   
%
r   )NN)!rt   r}   r   ry   r&   logging.handlersloggingrB   r   timeconcurrent.futuresr   ray._common.utilsr   r   ray.dashboard.modules.eventr   ray.dashboard.utilsr   	getLogger__name__r!   r   r$   r)   r/   r~   r0   EVENT_READ_LINE_COUNT_LIMITrT   SCAN_EVENT_DIR_INTERVAL_SECONDSSCAN_EVENT_START_OFFSET_SECONDSr   r   r   r   r   <module>   s<    



