o
    bihg                     @   s&  d dl Z d dlZd dlmZ d dlmZ d dlmZmZ d dl	m
  mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z- d dl.m/Z/m0Z0 e1e2Z3dZ4dZ5G dd dZ6dS )    N)ThreadPoolExecutor)islice)ListOptional)NodeID)get_or_create_event_loop)chrome_tracing_dump)env_integer)	do_filter)compose_state_message)
RuntimeEnv)RAY_MAX_LIMIT_FROM_API_SERVER
ActorStateActorSummariesJobStateListApiOptionsListApiResponse	NodeStateObjectStateObjectSummariesPlacementGroupStateRuntimeEnvStateStateSummarySummaryApiOptionsSummaryApiResponse	TaskStateTaskSummariesWorkerStateprotobuf_message_to_dictprotobuf_to_task_state_dict)DataSourceUnavailableStateDataSourceClientzFailed to query data from GCS. It is due to (1) GCS is unexpectedly failed. (2) GCS is overloaded. (3) There's an unexpected network issue. Please check the gcs_server.out log to find the root cause.a  Failed to query data from {type}. Queried {total} {type} and {network_failures} {type} failed to reply. It is due to (1) {type} is unexpectedly failed. (2) {type} is overloaded. (3) There's an unexpected network issue. Please check the {log_command} to find the root cause.c                   @   s  e Zd ZdZdedefddZedd Zde	d	e
fd
dZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZde	d	e
fddZded	efddZded	efddZded	efddZd ee d	ee fd!d"Zd#S )$StateAPIManagerzZA class to query states from data source, caches, and post-processes
    the entries.
    state_data_source_clientthread_pool_executorc                 C   s   || _ || _d S N)_client_thread_pool_executor)selfr#   r$    r)   R/home/ubuntu/.local/lib/python3.10/site-packages/ray/dashboard/state_aggregator.py__init__B   s   
zStateAPIManager.__init__c                 C   s   | j S r%   )r&   )r(   r)   r)   r*   data_source_clientJ   s   z"StateAPIManager.data_source_clientoptionreturnc                   b   z| j j j jdI dH }W n ty   ttw dtf fdd}t | j	||I dH S )zList all actor information from the cluster.

        Returns:
            {actor_id -> actor_data_in_dict}
            actor_data_in_dict's schema is in ActorState

        timeoutfiltersNr.   c                    s   g }| j D ]}t|g dd}|| qt|| j }t| jt j}t|}|j	dd d t
t| j}t|| j||dS )N)actor_idowner_idjob_idnode_idplacement_group_idmessagefields_to_decodec                 S      | d S )Nr3   r)   entryr)   r)   r*   <lambda>w       z@StateAPIManager.list_actors.<locals>.transform.<locals>.<lambda>keyresulttotalnum_after_truncationnum_filtered)actor_table_datar   appendlenrF   r
   r2   r   detailsortlistr   limitr   rD   replyrC   r9   datarE   rF   r-   r)   r*   	transform]   s$   

z.StateAPIManager.list_actors.<locals>.transform)
r&   get_all_actor_infor1   r2   r    GCS_QUERY_FAILURE_WARNINGr   r   run_in_executorr'   r(   r-   rO   rR   r)   rQ   r*   list_actorsN   s   #
zStateAPIManager.list_actorsc                   s^   z| j j jdI dH }W n ty   ttw dtf fdd}t | j||I dH S )zList all placement group information from the cluster.

        Returns:
            {pg_id -> pg_data_in_dict}
            pg_data_in_dict's schema is in PlacementGroupState
        r1   Nr.   c                    sz   g }| j D ]}t|g dd}|| qt|}t| jt j}t|}|jdd d t	t
t| j| j||dS )N)r7   creator_job_idr6   r8   c                 S   r;   )Nr7   r)   r<   r)   r)   r*   r>      r?   zJStateAPIManager.list_placement_groups.<locals>.transform.<locals>.<lambda>r@   rB   )placement_group_table_datar   rH   rI   r
   r2   r   rJ   rK   r   rL   r   rM   rD   rN   rQ   r)   r*   rR      s&   
z8StateAPIManager.list_placement_groups.<locals>.transform)	r&   get_all_placement_group_infor1   r    rT   r   r   rU   r'   rV   r)   rQ   r*   list_placement_groups   s   
z%StateAPIManager.list_placement_groupsc                   r/   )zList all node information from the cluster.

        Returns:
            {node_id -> node_data_in_dict}
            node_data_in_dict's schema is in NodeState
        r0   Nr.   c                    s   g }| j D ]9}t|dgd}|d |d< t|d |d< t|d |d< |di }t|dd |d	d |d
< || qt|| j }t| j	t
 j}t|}|jdd d tt| j}t|| j||dS )Nr6   r8   node_manager_addressnode_ipstart_time_msend_time_ms
death_inforeasonreason_messagestate_messagec                 S   r;   )Nr6   r)   r<   r)   r)   r*   r>      r?   z?StateAPIManager.list_nodes.<locals>.transform.<locals>.<lambda>r@   rB   )node_info_listr   intgetr   rH   rI   rF   r
   r2   r   rJ   rK   rL   r   rM   r   rD   )rO   rC   r9   rP   ra   rE   rF   rQ   r)   r*   rR      s2   


z-StateAPIManager.list_nodes.<locals>.transform)
r&   get_all_node_infor1   r2   r    rT   r   r   rU   r'   rV   r)   rQ   r*   
list_nodes   s   
zStateAPIManager.list_nodesc                   r/   )zList all worker information from the cluster.

        Returns:
            {worker_id -> worker_data_in_dict}
            worker_data_in_dict's schema is in WorkerState
        r0   Nr.   c                    s   g }| j D ]G}t|ddgd}|d d |d< |d d |d< |d d |d< t|d |d< t|d	 |d	< t|d
 |d
< t|d |d< || qt|| j }t| jt j	}t|}|j
dd d tt| j}t|| j||dS )N	worker_id	raylet_idr8   worker_addressr6   
ip_addressipr_   r`   worker_launch_time_msworker_launched_time_msc                 S   r;   )Nrj   r)   r<   r)   r)   r*   r>     r?   zAStateAPIManager.list_workers.<locals>.transform.<locals>.<lambda>r@   rB   )worker_table_datar   rf   rH   rI   rF   r
   r2   r   rJ   rK   rL   r   rM   r   rD   rN   rQ   r)   r*   rR      s0   
z/StateAPIManager.list_workers.<locals>.transform)
r&   get_all_worker_infor1   r2   r    rT   r   r   rU   r'   rV   r)   rQ   r*   list_workers   s   
zStateAPIManager.list_workersc                   s^   z| j j jdI d H }W n ty   ttw dtf fdd}t | j||I d H S )NrX   r.   c                    s`   dd | D }t |}t| jt j}t |}|jdd d tt| j}t	||||dS )Nc                 S   s   g | ]}|  qS r)   )dict).0jobr)   r)   r*   
<listcomp>  s    z@StateAPIManager.list_jobs.<locals>.transform.<locals>.<listcomp>c                 S   s   | d pdS )Nr5    r)   r<   r)   r)   r*   r>     s    z>StateAPIManager.list_jobs.<locals>.transform.<locals>.<lambda>r@   rB   )
rI   r
   r2   r   rJ   rK   rL   r   rM   r   )rO   rC   rD   rF   rQ   r)   r*   rR     s   z,StateAPIManager.list_jobs.<locals>.transform)	r&   get_job_infor1   r    rT   r   r   rU   r'   rV   r)   rQ   r*   	list_jobs  s   
zStateAPIManager.list_jobsc                   s   z| j j j j jdI dH }W n ty   ttw dtf fdd}|jj	dkr9tg ddd|jj
gdS t | j||I dH S )zList all task information from the cluster.

        Returns:
            {task_id -> task_data_in_dict}
            task_data_in_dict's schema is in TaskState
        )r1   r2   exclude_driverNr.   c                    sp   dd | j D }t|}t|| j }t| jt j}t|}|jdd d tt	| j
}t||||dS )z
            Transforms from proto to dict, applies filters, sorts, and truncates.
            This function is executed in a separate thread.
            c                 S   s   g | ]}t |qS r)   )r   )ru   r9   r)   r)   r*   rw   ?  s    zAStateAPIManager.list_tasks.<locals>.transform.<locals>.<listcomp>c                 S   r;   )Ntask_idr)   r<   r)   r)   r*   r>   M  r?   z?StateAPIManager.list_tasks.<locals>.transform.<locals>.<lambda>r@   rB   )events_by_taskrI   num_status_task_events_droppedr
   r2   r   rJ   rK   rL   r   rM   r   )rO   rC   rE   	num_totalrF   rQ   r)   r*   rR   :  s   z-StateAPIManager.list_tasks.<locals>.transformr   )rC   rD   rE   rF   warnings)r&   get_all_task_infor1   r2   r{   r    rT   r   statuscoder9   r   rU   r'   rV   r)   rQ   r*   
list_tasks*  s,   
zStateAPIManager.list_tasksc                   sv   j j jddgdI dH } fdd|jD tjddiI dH }dtf fd	d
}t j	||I dH S )zList all object information from the cluster.

        Returns:
            {object_id -> object_data_in_dict}
            object_data_in_dict's schema is in ObjectState
        Nstate=ALIVEr1   rM   r2   c                    $   g | ]}j j|j|j jd qS rX   )r&   get_object_infor]   node_manager_portr1   ru   	node_infor-   r(   r)   r*   rw   r      z0StateAPIManager.list_objects.<locals>.<listcomp>return_exceptionsTr.   c              	      s  d}g }d}| D ])}t |tr|d7 }qt |tr|||j7 }|jD ]}|t|dgdd q#qd }tdkrX|dkrXtj	dt|dd}|tkrSt|d	| }g }t
|}	|	jD ]3}
|
 }|d
 |d< |d
= |d |d< |d= |d  |d< |d dkrdn|d |d< || qbg }tdd}|s|d t|}t| jt j}t|}|jdd d tt| j}t||||||dS )Nr      	object_idF)r9   r:   preserving_proto_field_namerayletz
raylet.outtyperD   network_failureslog_command1The returned data may contain incomplete result. 
object_refnode_ip_addressrn   r   task_status-NILRAY_record_ref_creation_siteszCallsite is not being recorded. To record callsite information for each ObjectRef created, set env variable RAY_record_ref_creation_sites=1 during `ray start` and `ray.init`.c                 S   r;   )Nr   r)   r<   r)   r)   r*   r>     r?   zAStateAPIManager.list_objects.<locals>.transform.<locals>.<lambda>r@   )rC   partial_failure_warningrD   rE   rF   r   )
isinstancer    	ExceptionrD   core_workers_statsrH   r   rI   NODE_QUERY_FAILURE_WARNINGformatmemory_utilsconstruct_memory_tabletableas_dictupperr	   r
   r2   r   rJ   rK   rL   r   rM   r   )repliesunresponsive_nodesworker_statstotal_objectsrO   core_worker_statr   warning_msgrC   memory_tabler=   rP   callsite_warningcallsite_enabledrE   rF   )r-   tasksr)   r*   rR     s|   






z/StateAPIManager.list_objects.<locals>.transform
r&   rh   r1   re   asynciogatherr   r   rU   r'   )r(   r-   all_node_info_replyr   rR   r)   )r-   r(   r   r*   list_objectsf  s$   	R
zStateAPIManager.list_objectsc                   s   j jjddgdI dH }dd |jD  fdd D tjddiI dH }d	tf fd
d}t j	||I dH S )ao  List all runtime env information from the cluster.

        Returns:
            A list of runtime env information in the cluster.
            The schema of returned "dict" is equivalent to the
            `RuntimeEnvState` protobuf message.
            We don't have id -> data mapping like other API because runtime env
            doesn't have unique ids.
        Nr   r   c                 S   s   g | ]	}|j d ur|qS r%   )runtime_env_agent_portr   r)   r)   r*   rw     s
    
z5StateAPIManager.list_runtime_envs.<locals>.<listcomp>c                    r   r   )r&   get_runtime_envs_infor]   r   r1   r   r   r)   r*   rw     r   r   Tr.   c                    s@  g }d}d}t  | D ]A\}}t|tr|d7 }qt|tr |||j7 }|j}|D ]!}t|g d}t|d 	 |d< t
|j |d< || q*qd }	tdkrs|dkrstjdt|dd}
|tkrnt|
d	|
 }	t|}t|jtj}t|}d
d }|j|dd tt|j}t||	|||dS )Nr   r   r8   runtime_envr6   agentzdashboard_agent.logr   r   c                 S   s0   d| vrt dS | d d u rt dS t | d S )Ncreation_time_msinf)floatr<   r)   r)   r*   	sort_func   s
   zGStateAPIManager.list_runtime_envs.<locals>.transform.<locals>.sort_funcT)rA   reverse)rC   r   rD   rE   rF   )zipr   r    r   rD   runtime_env_statesr   r   deserializeto_dictr   r6   hexrH   rI   r   r   r
   r2   r   rJ   rK   rL   r   rM   r   )r   rC   r   total_runtime_envsr   rO   statesr   rP   r   r   rE   rF   r   )
node_infosr-   r   r)   r*   rR     s\   


	z4StateAPIManager.list_runtime_envs.<locals>.transformr   )r(   r-   live_node_info_replyr   rR   r)   )r   r-   r(   r   r*   list_runtime_envs  s*   
	=
z!StateAPIManager.list_runtime_envsc                    s   |j pd}|dvrtd| jt|jt|j|dkddI d H }|dkr-tj|j	d}n| j
t|jtdd	dI d H }tj|j	|j	d
}td|id}|j}|j|j |j |jk rd|p^g }|d t|j||j||j|jdS )N	func_name)r   lineagez3summary_by must be one of "func_name" or "lineage".r   )r1   rM   r2   rJ   rQ   )r   T)r1   rM   rJ   )r   actorsclusternode_id_to_summaryzfThere is missing data in this aggregation. Possibly due to task data being evicted to preserve memory.rD   rC   r   r   rE   rF   )
summary_by
ValueErrorr   r   r1   r   r2   r   to_summary_by_func_namerC   rW   to_summary_by_lineager   r   total_actor_scheduledtotal_actor_taskstotal_tasksrF   rH   r   rD   r   rE   )r(   r-   r   rC   summary_resultsr   summaryr   r)   r)   r*   summarize_tasks9  sZ   
	zStateAPIManager.summarize_tasksc                    X   | j t|jt|jddI d H }tdtj|jdid}t	|j
||j|j|j|jdS )Nr   rQ   r   )r   r   r   )rW   r   r1   r   r2   r   r   
to_summaryrC   r   rD   r   r   rE   rF   r(   r-   rC   r   r)   r)   r*   summarize_actorsl  (   z StateAPIManager.summarize_actorsc                    r   )Nr   rQ   r   )objectsr   r   )r   r   r1   r   r2   r   r   r   rC   r   rD   r   r   rE   rF   r   r)   r)   r*   summarize_objects  r   z!StateAPIManager.summarize_objectsr5   c                    s<   |r	dd|fgnd }| j td|dddI d H }t|jS )Nr5   r   Ti'  )rJ   r2   rM   rQ   )r   r   r   rC   )r(   r5   r2   rC   r)   r)   r*   generate_task_timeline  s   
z&StateAPIManager.generate_task_timelineN)__name__
__module____qualname____doc__r!   r   r+   propertyr,   r   r   rW   r\   ri   rs   rz   r   r   r   r   r   r   r   r   r   strr   rt   r   r)   r)   r)   r*   r"   =   s*    

6-10<pc3r"   )7r   loggingconcurrent.futuresr   	itertoolsr   typingr   r   ray.dashboard.memory_utils	dashboardr   rayr   ray._common.utilsr   ray._private.profilingr   ray._private.ray_constantsr	   ray.dashboard.state_api_utilsr
   ray.dashboard.utilsr   ray.runtime_envr   ray.util.state.commonr   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   ray.util.state.state_managerr    r!   	getLoggerr   loggerrT   r   r"   r)   r)   r)   r*   <module>   s*    T
