o
    bir5                     @   s^  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dl	Z
d dl	mZ d dlZd dlmZ d dlmZ d d	lmZmZ d d
lmZ d dlmZmZmZ d dlmZ d dlmZ d dlmZmZm Z m!Z! d dl"m#Z# d dl$m%Z& d dl'm(Z( d dl)m*Z* d dl+m,Z,m-Z-m.Z.m/Z/ d dl0m1Z1 d dl2m3Z3 e4e5Z6eddZ7G dd de#e*Z8dS )    N)ThreadPoolExecutor)asdict)datetime)Optional)Response)ActorID)env_integer)TagKeyrecord_extra_usage_tag)ActorTableData)!RAY_STATE_SERVER_MAX_HTTP_REQUEST)RAY_STATE_SERVER_MAX_HTTP_REQUEST_ALLOWED*RAY_STATE_SERVER_MAX_HTTP_REQUEST_ENV_NAME)LogsManager)StateAPIManager)do_replyhandle_list_apihandle_summary_apioptions_from_req)SubprocessModule)SubprocessRouteTable)ResponseType)RateLimitedModule)DEFAULT_DOWNLOAD_FILENAMEDEFAULT_LOG_LIMITDEFAULT_RPC_TIMEOUTGetLogOptions)DataSourceUnavailable)StateDataSourceClient(RAY_DASHBOARD_STATE_HEAD_TPE_MAX_WORKERS   c                   @   s  e Zd ZdZdd Zdd Zedej	de
jjde
jjfd	d
Zedej	de
jjde
jjfddZedej	de
jjde
jjfddZedej	de
jjde
jjfddZedej	de
jjde
jjfddZedej	de
jjde
jjfddZedej	de
jjde
jjfddZedej	de
jjde
jjfddZed ej	de
jjde
jjfd!d"Zejd#ejd$ej	de
jjde
jjfd%d&Zed'ej	de
jjde
jjfd(d)Zed*ej	de
jjde
jjfd+d,Zed-ej	de
jjde
jjfd.d/Zed0ej	de
jjde
jjfd1d2Zed3de
jjfd4d5Zd6d7 Zd8S )9	StateHeadzModule to obtain state information from the Ray cluster.

    It is responsible for state observability APIs such as
    ray.list_actors(), ray.get_actor(), ray.summary_actors().
    c                 O   sl   t j| g|R i | t| ttt d| _d| _d| _t	t
dd| _| jdus,J tjj s4J dS )z>Initialize for handling RESTful requests from State API ClientNstate_head_executor)max_workersthread_name_prefix)r   __init__r   minr   r   _state_api_data_source_client
_state_api_log_apir   r   	_executor
gcs_clientrayexperimentalinternal_kv_internal_kv_initialized)selfargskwargs r3   Z/home/ubuntu/.local/lib/python3.10/site-packages/ray/dashboard/modules/state/state_head.pyr%   ;   s"   zStateHead.__init__c                    s$   t dd| j dt dt d dS )NFz#Max number of in-progress requests=zB reached. To set a higher limit, set environment variable: export z='xxx'. Max allowed = successerror_messageresult)r   max_num_call_r   r   r0   r3   r3   r4   limit_handler_S   s   zStateHead.limit_handler_z/api/v0/actorsreqreturnc                    "   t tjd t| jj|I d H S N1)r
   r	   CORE_STATE_API_LIST_ACTORSr   r(   list_actorsr0   r<   r3   r3   r4   rB   `      zStateHead.list_actorsz/api/v0/jobsc              
      sr   t tjd z| jjt|dI d H }tddt|dW S  ty8 } ztdt	|d dW  Y d }~S d }~ww )Nr@   )optionT r5   F)
r
   r	   CORE_STATE_API_LIST_JOBSr(   	list_jobsr   r   r   r   str)r0   r<   r8   er3   r3   r4   rH   f   s   zStateHead.list_jobsz/api/v0/nodesc                    r>   r?   )r
   r	   CORE_STATE_API_LIST_NODESr   r(   
list_nodesrC   r3   r3   r4   rL   t   rD   zStateHead.list_nodesz/api/v0/placement_groupsc                    r>   r?   )r
   r	   $CORE_STATE_API_LIST_PLACEMENT_GROUPSr   r(   list_placement_groupsrC   r3   r3   r4   rN   z   s   zStateHead.list_placement_groupsz/api/v0/workersc                    r>   r?   )r
   r	   CORE_STATE_API_LIST_WORKERSr   r(   list_workersrC   r3   r3   r4   rP      rD   zStateHead.list_workersz/api/v0/tasksc                    r>   r?   )r
   r	   CORE_STATE_API_LIST_TASKSr   r(   
list_tasksrC   r3   r3   r4   rR      rD   zStateHead.list_tasksz/api/v0/objectsc                    r>   r?   )r
   r	   CORE_STATE_API_LIST_OBJECTSr   r(   list_objectsrC   r3   r3   r4   rT      rD   zStateHead.list_objectsz/api/v0/runtime_envsc                    r>   r?   )r
   r	    CORE_STATE_API_LIST_RUNTIME_ENVSr   r(   list_runtime_envsrC   r3   r3   r4   rV      rD   zStateHead.list_runtime_envsz/api/v0/logsc              
      s   t tjd |jdd}|jdd}|jdd}t|jdt}|s0|s0tdd	dd
S |s;| j	|I dH }|sGtdd| dd
S z| jj
|||dI dH }W n typ } ztdt|dd
W  Y d}~S d}~ww tdd|d
S )zReturn a list of log files on a given node id.

        Unlike other list APIs that display all existing resources in the cluster,
        this API always require to specify node id and node ip.
        r@   glob*node_idNnode_iptimeoutFzOBoth node id and node ip are not provided. Please provide at least one of them.r5   z1Cannot find matching node_id for a given node ip )glob_filterTrF   )r
   r	   CORE_STATE_API_LIST_LOGSquerygetintr   r   r)   ip_to_node_id	list_logsr   rI   )r0   r<   r\   rY   rZ   r[   r8   rJ   r3   r3   r4   rb      sB   zStateHead.list_logsz/api/v0/logs/{media_type})	resp_typec           	         s
  t tjd tt|jdt|jdd|jdd|jdd|jdd|jd	t	|jd
d|jdd|jdd|jdd|jdt
|jdd|jdd|jddd}td|  d
tdtt f fdd}tjjdd|j did}d|_ j||}z| I dH }||I dH  ||I dH  W n. ty   Y n& tjy   |    ty } ztd tjj t!|dd}~ww z|2 z3 dH W }||I dH  q6 W n ty   td |   w |" I dH  |S ) z7
        Fetches logs from the given criteria.
        r@   r[   rY   NrZ   
media_typefilefilenamedownload_filenameactor_idtask_idsubmission_idpidlinesintervalsuffixoutattempt_numberr   )r[   rY   rZ   rd   rf   rg   rh   ri   rj   rk   rl   rm   rn   rp   zStreaming logs with options: r=   c                    s.    j j| dI d H }t|dkrd S ||  S )N)rh   r   )r+   async_get_all_actor_infolen)rh   actor_info_dictr:   r3   r4   get_actor_fn   s   z(StateHead.get_logs.<locals>.get_actor_fnContent-Dispositionzattachment; filename="")headersz
text/plainzError while streaming logs)text)#r
   r	   CORE_STATE_API_GET_LOGr   r`   r^   r_   r   
match_infor   r   loggerinfor   r   r   aiohttpwebStreamResponserg   content_typer)   stream_logs	__anext__preparewriteStopAsyncIterationasyncioCancelledErrorforce_close	Exception	exceptionHTTPInternalServerErrorrI   	write_eof)	r0   r<   optionsrt   responselogs_genfirst_chunkrJ   logsr3   r:   r4   get_logs   sl   

zStateHead.get_logsz/api/v0/tasks/summarizec                    r>   r?   )r
   r	   CORE_STATE_API_SUMMARIZE_TASKSr   r(   summarize_tasksrC   r3   r3   r4   r     rD   zStateHead.summarize_tasksz/api/v0/actors/summarizec                    r>   r?   )r
   r	   CORE_STATE_API_SUMMARIZE_ACTORSr   r(   summarize_actorsrC   r3   r3   r4   r     rD   zStateHead.summarize_actorsz/api/v0/objects/summarizec                    r>   r?   )r
   r	    CORE_STATE_API_SUMMARIZE_OBJECTSr   r(   summarize_objectsrC   r3   r3   r4   r      rD   zStateHead.summarize_objectsz/api/v0/tasks/timelinec                    sp   |j d}|j d}| j|I d H }|dkr/t d}d| d| d}d|i}nd }t|d	|d
S )Njob_iddownloadr@   z%Y-%m-%d_%H-%M-%Szattachment; filename="timeline--z.json"ru   zapplication/json)rx   r   rw   )r^   r_   r(   generate_task_timeliner   nowstrftimer   )r0   r<   r   r   r8   now_strcontent_dispositionrw   r3   r3   r4   tasks_timeline&  s   
zStateHead.tasks_timelinez/api/v0/delay/{delay_s}c                    s4   t |jdd}t|I dH  tddi ddS )z/Testing only. Response after a specified delay.delay_s
   NTrF   )r6   r7   r8   partial_failure_warning)r`   rz   r_   r   sleepr   )r0   r<   delayr3   r3   r4   delayed_response7  s   zStateHead.delayed_responsec                    sF   t | I d H  | j}t|| j| _t| j| j| _t	| j| _
d S )N)r   runaiogrpc_gcs_channelr   r+   r'   r   r*   r(   r   r)   )r0   gcs_channelr3   r3   r4   r   C  s   zStateHead.runN) __name__
__module____qualname____doc__r%   r;   routesr_   r   enforce_max_concurrent_callsr}   r~   Requestr   rB   rH   rL   rN   rP   rR   rT   rV   rb   r   STREAMr   r   r   r   r   r   r   r3   r3   r3   r4   r!   4   sj    -Ir!   )9r   loggingconcurrent.futuresr   dataclassesr   r   typingr   aiohttp.webr}   r   r,   r   ray._private.ray_constantsr   ray._private.usage.usage_libr	   r
   ray.core.generated.gcs_pb2r   ray.dashboard.constsr   r   r   %ray.dashboard.modules.log.log_managerr   ray.dashboard.state_aggregatorr   ray.dashboard.state_api_utilsr   r   r   r   !ray.dashboard.subprocesses.moduler   !ray.dashboard.subprocesses.routesr   r    ray.dashboard.subprocesses.utilsr   ray.dashboard.utilsr   ray.util.state.commonr   r   r   r   ray.util.state.exceptionr   ray.util.state.state_managerr   	getLoggerr   r{   r   r!   r3   r3   r3   r4   <module>   s:    
