o
    `۷i
y                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZmZmZmZmZmZ d dlZd dlZd dlZd dlm  mZ d dlm  mZ d dlm Z  d dl!m"Z" d dl#m$Z$ d d	l%m&Z&m'Z'm(Z( d d
l)m*Z* d dl+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1m2Z2 d dl3m4Z4m5Z5m6Z6 d dl7m8Z8m9Z9m:Z: d dl;m<Z<m=Z= d dl>m?Z?m@Z@ d dlAmBZB d dlCmDZD d dlEmFZG d dlmHZH eIeJZKe.ddZLeMd ejNO ZPdZQdZRde4jSdeTfddZUdd ZVG dd deDZWd eXdeTfd!d"ZYdS )#    N)defaultdictdeque)ThreadPoolExecutor)chain)AnyAsyncGeneratorDictIterableListOptionalSet)get_or_create_event_loop)ray_constants)split)GcsAioActorSubscriberGcsAioNodeInfoSubscriberGcsAioResourceUsageSubscriber)init_grpc_channel)DEBUG_AUTOSCALING_ERRORDEBUG_AUTOSCALING_STATUSenv_integer)LoadMetricsSummaryget_per_node_breakdown_as_dictparse_usage)gcs_pb2node_manager_pb2node_manager_pb2_grpc)DASHBOARD_AGENT_ADDR_IP_PREFIX#DASHBOARD_AGENT_ADDR_NODE_ID_PREFIXGCS_RPC_TIMEOUT_SECONDS)actor_constsnode_consts)DataOrganizer
DataSource)StatsPayload)SubprocessModule)SubprocessRouteTable)async_loop_forever'RAY_DASHBOARD_NODE_HEAD_TPE_MAX_WORKERS   )	stateaddressnumRestarts	timestamppid
exitDetail	startTimeendTimereprNamemessagereturnc                 C   s   t j| dhddS )NnodeIdT$always_print_fields_with_no_presence)dashboard_utilsmessage_to_dict)r3    r:   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/dashboard/modules/node/node_head.py_gcs_node_info_to_dictN   s   r<   c                    s   t j| h ddd}h d  fdd| D }|d |d< d	}d
|v rT|d
 }d|v r4|d d }n d|v r?|d d }nd|v rJ|d d }n
d|v rT|d d }||d< t|d |d< t|d |d< t| j|d< t| j|d< |S )N>
   jobIdr5   taskIdactorIdcallerIdparentIdworkerIdparentTaskIdsourceActorIdplacementGroupIdTr6   >   r.   namer=   r*   r?   r+   r1   callSiter2   	classNamer0   r-   r,   labelSelectorrE   c                    s   i | ]\}}| v r||qS r:   r:   .0kvfieldsr:   r;   
<dictcomp>z   s    z-_actor_table_data_to_dict.<locals>.<dictcomp>rH   
actorClass-
deathCauseactorDiedErrorContexterrorMessageruntimeEnvFailedContextactorUnschedulableContextcreationTaskFailureContextformattedExceptionStringr/   r0   r1   requiredResourcesrI   )r8   r9   itemsintdictrequired_resourceslabel_selector)r3   orig_messagelight_messageexit_detailcontextr:   rN   r;   _actor_table_data_to_dictT   s6   rd   c                       s  e Zd Z fddZdd Zdeedf fddZd	efd
dZdd Z	defddZ
edejdejjfddZedejdejjfddZeejdd Zdd Zdd Zdedeeeeef f fddZded eeef fd!d"Zdeeef fd#d$Z d%d& Z!ed'ejdejjfd(d)Z"ed*ejdejjfd+d,Z#ed-dejjfd.d/Z$ fd0d1Z%  Z&S )2NodeHeadc                    sz   t  j|i | i | _d| _t | _d | _d | _t | _	t
tdd| _d | _t | _t | _t
ddd| _t | _d S )NFnode_head_node_executor)max_workersthread_name_prefixr)   node_head_actor_executor)super__init___stubs_collect_memory_infotime_module_start_time_head_node_registration_time_s_registered_head_node_idr   _dead_node_queuer   r(   _node_executor_gcs_actor_channel_subscriber_destroyed_actors_queuer   _loop_actor_executorset_background_tasks)selfargskwargs	__class__r:   r;   rk      s$   
zNodeHead.__init__c                 C   s   | j ttjt | j dS )N)head_node_registration_time_sregistered_nodesmodule_lifetime_s)rp   lenr#   nodesrn   ro   rz   r:   r:   r;   get_internal_states   s   zNodeHead.get_internal_statesr4   Nc           
      C  s   t | jd}| I dH  | jjddI dH }dttj dtt	 fdd}| j
| j|| I dH }|D ]}|V  q6	 z+|jtjd	I dH }|rQt| \}}ng }| j
| j||I dH }	|	D ]}|V  qaW n tyv   td
 Y nw q=)z
        Yields the initial state of all nodes, then yields the updated state of nodes.

        It makes GetAllNodeInfo call only once after the subscription is done, to get
        the initial state of the nodes.
        r+   Ntimeoutmessagesr4   c                 S   s   dd | D S )Nc                 S   s   g | ]}t |qS r:   )r<   )rK   mr:   r:   r;   
<listcomp>   s    zRNodeHead._subscribe_for_node_updates.<locals>._convert_to_dict.<locals>.<listcomp>r:   )r   r:   r:   r;   _convert_to_dict   s   z>NodeHead._subscribe_for_node_updates.<locals>._convert_to_dictT
batch_sizezFailed handling updated nodes.)r   gcs_address	subscribe
gcs_clientasync_get_all_node_infor	   r   GcsNodeInfor
   r]   rv   run_in_executorrs   valuespollr!   'RAY_DASHBOARD_NODE_SUBSCRIBER_POLL_SIZEzip	Exceptionlogger	exception)
rz   
subscriberall_node_infor   all_node_infosnodenode_id_updated_info_tuples_updated_infos_protoupdated_infosr:   r:   r;   _subscribe_for_node_updates   s@   

z$NodeHead._subscribe_for_node_updatesr   c           
         sp  |d }|d rA|d dkrA j |krA j d ur$td| j    | _ t  j  _ jjt	j
| dt	jtdI d H  |d dv sIJ |d dk}|st | t |d	  g} fd
d|D }tj| I d H   j| t jtjkr j }tj|d   j|d  |tj|< d|d	 t|d }t	j}t||dd}t !|}	|	 j|< d S )Nr5   
isHeadNoder*   ALIVEzaA new head node has become ALIVE. New head node ID: %s, old head node ID: %s, internal states: %sT)	overwrite	namespacer   )r   DEADnodeManagerAddressc                    s"   g | ]} j j|d tjtdqS )F)del_by_prefixr   r   )r   async_internal_kv_delr   KV_NAMESPACE_DASHBOARDr   rK   keyr   r:   r;   r     s    z)NodeHead._update_node.<locals>.<listcomp>z{}:{}nodeManagerPort)asynchronous)"rq   r   warningr   rn   ro   rp   r   async_internal_kv_putr   KV_HEAD_NODE_ID_KEYencodeKV_NAMESPACE_JOBr   r   r   asynciogatherrr   appendr   r!   MAX_DEAD_NODES_TO_CACHEpopleftr#   r   poprl   formatr\   GLOBAL_GRPC_OPTIONSr   r   NodeManagerServiceStub)
rz   r   node_idis_alivekeystasksr+   optionschannelstubr:   r   r;   _update_node   sZ   



	


zNodeHead._update_nodec                    sp   d}|   2 z-3 dH W }| |I dH  | js4|s4t | j tjkr4tdtj d| 	   d}q6 dS )z
        Subscribe to node updates and update the internal states. If the head node is
        not registered after RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUT, it logs a
        warning only once.
        FNz'Head node is not registered even after zb seconds. The API server might not work correctly. Please report a Github issue. Internal states :T)
r   r   rp   rn   ro   r!   ,RAY_DASHBOARD_HEAD_NODE_REGISTRATION_TIMEOUTr   r   r   )rz   warning_shownr   r:   r:   r;   _update_nodes*  s(   zNodeHead._update_nodesc                    sF  ddl m} | rmddlm} ddlm} zt } j I d H }t }|j	|||| |dd}W n t
yE   td i  Y S w i }t|j|jD ]}|jsUqOdd	 |jjD }	d
t|	dd||j< qO|S tj fddttfD  I d H \}
}|
si S t|
}|d}|rtdi |}t|}|d u r|S i S )Nr   )is_autoscaler_v2)Stats)ClusterStatusParser)gcs_request_time_srequest_ts_s)statszError getting cluster statusc                 S   s   i | ]
}|j |j|jfqS r:   )resource_nameusedtotal)rK   rr:   r:   r;   rP   `  s    z8NodeHead.get_nodes_logical_resources.<locals>.<dictcomp>
T)verbosec                    s"   g | ]} j j| d tdqS )N)r   r   )r   async_internal_kv_getr   r   r   r   r:   r;   r   l  s    
z8NodeHead.get_nodes_logical_resources.<locals>.<listcomp>load_metrics_reportr:   )ray.autoscaler.v2.utilsr   ray.autoscaler.v2.schemar   ray.autoscaler.v2.sdkr   rn   r   async_get_cluster_statusfrom_get_cluster_status_replyr   r   r   r   active_nodes
idle_nodesresource_usageusagejoinr   r   r   r   r   r   jsonloadsgetr   r   )rz   r   r   r   req_timecluster_status
reply_timeper_node_resourcesr   
usage_dictstatus_stringerrorstatus_dictlm_summary_dict
lm_summarynode_logical_resourcesr:   r   r;   get_nodes_logical_resourcesC  sZ   





z$NodeHead.get_nodes_logical_resourcesz/nodesc           	         s   |j d}|dkr)t }|  }t||I d H \}}tjt	j
jd||dS |d urY| d krYt }tj D ]}|d dkrL||d  q=tjt	j
jd	t|d
S tjt	j
jd| dS )NviewsummaryzNode summary fetched.)status_coder3   r   r   hostNameListr*   r   nodeManagerHostnamezNode hostname list fetched.)r   r3   host_name_listzUnknown view r   r3   )queryr   r"   get_all_node_summaryr   r   r   dashboard_optional_utilsrest_responser8   HTTPStatusCodeOKlowerrx   r#   r   r   addlistINTERNAL_ERROR)	rz   reqr   all_node_summary_tasknodes_logical_resource_taskall_node_summarynodes_logical_resourcesalive_hostnamesr   r:   r:   r;   get_all_nodes  s:   zNodeHead.get_all_nodesz/nodes/{node_id}c                    s2   |j d}t|I d H }tjtjjd|dS )Nr   zNode details fetched.r   r3   detail)	
match_infor   r"   get_node_infor   r   r8   r   r   )rz   r  r   	node_infor:   r:   r;   get_node  s   zNodeHead.get_nodec              	      sF  t dtjd  t| j }g }g }t|D ]'\}\}}tj	|}|d dkr+q|
| |
|jtj| jd d qg }t|dD ]}	tj|	dd	iI d H }
||
 td
I d H  qG fdd}t|t|ksJ dt| d| dt| d| | j| j|t||I d H }| D ]	\}}|tj|< qd S )N   r)   r*   r   )include_memory_infor   d   return_exceptionsTg?c                    s   i }| D ]X\}}t |tjrqt |tjrD| tjjkr'd| d  d}n| tjjkr6d| d}nd| d}t	j
||d qt |trUt	j
d| d|d qt|||< q|S )z:Pure function reorganizing the data into {node_id: stats}.zCannot reach the node, z, after timeout  zI. This node may have been overloaded, terminated, or the network is slow.z$. The node may have been terminated.zError updating node stats of .exc_info)
isinstancer   CancelledErrorgrpcRpcErrorcode
StatusCodeDEADLINE_EXCEEDEDUNAVAILABLEr   r   r   r8   node_stats_to_dict)node_id_response_tuplesnew_node_statsr   responser3   r   r:   r;   postprocess  s.   


z0NodeHead._update_node_stats.<locals>.postprocessz	node_ids(z): z, responses()maxr!   "NODE_STATS_UPDATE_INTERVAL_SECONDSr  rl   r[   	enumerater#   r   r   r   GetNodeStatsr   GetNodeStatsRequestrm   r   r   r   extendsleepr   rv   r   rs   r   
node_stats)rz   current_stub_node_id_tuplesnode_idsget_node_stats_tasksr   r   r   r  	responsesget_node_stats_tasks_chunkcurrent_chunk_responsesr#  r!  new_statr:   r   r;   _update_node_stats  sN   
	
$"
zNodeHead._update_node_statsc                    s   t | jd}| I dH  	 z)| I dH \}}|du rW q| j| jt|I dH }|dd }|t	j
|< W n tyF   td Y nw q)za
        Update DataSource.node_physical_stats by subscribing to the GCS resource usage.
        r   NT:zEError receiving node physical stats from _update_node_physical_stats.)r   r   r   r   rv   r   rs   _parse_node_statsr   r#   node_physical_statsr   r   r   )rz   r   r   dataparsed_datar   r:   r:   r;   _update_node_physical_stats  s&   
z$NodeHead._update_node_physical_statsc              
      sd  | j }t|d}| I dH  	 z:td |  I dH }|t_tt	}|
 D ]\}}|d d }|tjkr>||| |< q)|t_tdt| W n# tym } ztjd|d	 ttjI dH  W Y d}~nd}~ww q	 z'| |I dH }	|	
 D ]
\}
}| |
| q}td
t|	 d|j  W n ty } ztjd|d	 W Y d}~nd}~ww qp)z
        Processes actor info. First gets all actors from GCS, then subscribes to
        actor updates. For each actor update, updates DataSource.node_actors and
        DataSource.actors.
        r   NTz Getting all actor info from GCS.r+   r5   z Received %d actor info from GCS.z%Error Getting all actor info from GCSr  zTotal events processed: z, queue size: z%Error processing actor info from GCS.)r   r   r   r   info_get_all_actorsr#   actorsr   r]   r[   r    NIL_NODE_IDnode_actorsr   r   r   r   r*  )RETRY_GET_ALL_ACTOR_INFO_INTERVAL_SECONDS_poll_updated_actor_table_data_process_updated_actor_tabledebug
queue_size)rz   gcs_addractor_channel_subscriberactor_dictsr?  actor_id_bytesupdated_actor_tabler   eupdated_actor_table_entriesactor_idr:   r:   r;   _update_actors'  s\   



zNodeHead._update_actorsrF  c                    s6   |j ddI d H   fdd}| j| j|I d H S )N   r   c                      s   dd  D S )Nc                 S   s&   i | ]\}}|d ur|  t|qS Nhexrd   )rK   rH  actor_table_data_messager:   r:   r;   rP   u  s    zUNodeHead._poll_updated_actor_table_data.<locals>._convert_to_dict.<locals>.<dictcomp>r:   r:   batchr:   r;   r   t  s   zANodeHead._poll_updated_actor_table_data.<locals>._convert_to_dict)r   rv   r   rw   )rz   rF  r   r:   rS  r;   rA  k  s   	z'NodeHead._poll_updated_actor_table_datarL  actor_table_datac                 C   s   t j|}|r|d dkrtD ]}||v r|| ||< q|}|d }|d d }|d dkr5| j| |t j|< |tjkrQt j|i }|||< |t j|< dS dS )zNOTE: This method has to be executed on the event-loop, provided that it
        accesses DataSource data structures (to follow its thread-safety model)r*   DEPENDENCIES_UNREADYr?   r+   r5   r   N)	r#   r=  r   ACTOR_TABLE_STATE_COLUMNSru   r   r    r>  r?  )rz   rL  rU  actorrL   r   r?  r:   r:   r;   rB    s"   

z%NodeHead._process_updated_actor_tablec                    s8   | j jtdI d H   fdd}| j| j|I d H S )Nr   c                      s   dd    D S )Nc                 S   s   i | ]\}}|  t|qS r:   rP  )rK   rL  rU  r:   r:   r;   rP     s    zFNodeHead._get_all_actors.<locals>._convert_to_dict.<locals>.<dictcomp>)r[   r:   r=  r:   r;   r     s   z2NodeHead._get_all_actors.<locals>._convert_to_dict)r   async_get_all_actor_infor   rv   r   rw   )rz   r   r:   rY  r;   r<    s   zNodeHead._get_all_actorsc                    s   	 z<t | jtkr5| j }|tjv r.tj|}|d d}|r.|tj	kr.tj
| |= t | jtks
ttI d H  W n tyL   td Y nw q)NTr+   r5   z&Error cleaning up actor info from GCS.)r   ru   MAX_DESTROYED_ACTORS_TO_CACHEr   r#   r=  r   r   r    r>  r?  r   r*  ACTOR_CLEANUP_FREQUENCYr   r   r   )rz   rL  rX  r   r:   r:   r;   _cleanup_actors  s    

zNodeHead._cleanup_actorsz/logical/actorsc                    sH   d }d|j v r|j d d}tj|dI d H }tjtjjd|ddS )Nids,	actor_idszAll actors fetched.F)r   r3   r=  convert_google_style)	r   r   r"   get_actor_infosr   r   r8   r   r   )rz   r  ra  r=  r:   r:   r;   get_all_actors  s   
zNodeHead.get_all_actorsz/logical/actors/{actor_id}c                    s:   |j d}tj|gdI d H }tjtjjd|| dS )NrL  r`  zActor details fetched.r
  )	r  r   r"   rc  r   r   r8   r   r   )rz   r  rL  r=  r:   r:   r;   	get_actor  s   zNodeHead.get_actorz
/test/dumpc                    sz   |j d}|du r"dd tj D }tjd	tjj	dd|S t
tj|}tjd	tjj	d| dd||iS )
zW
        Dump all data from datacenter. This is used for testing purpose only.
        r   Nc                 S   s$   i | ]\}}| d s|t|qS )r   )
startswithr]   rJ   r:   r:   r;   rP     s    z!NodeHead.dump.<locals>.<dictcomp>z'Fetch all data from datacenter success.r   zFetch z from datacenter success.r:   )r   r   r#   __dict__r[   r   r   r8   r   r   r]   )rz   r  r   all_datar8  r:   r:   r;   dump  s&   
zNodeHead.dumpc              	      sx   t   I d H  |  |  |  |  |  t t	| j
g}|D ]}| j|}| j| || jj q$d S rO  )rj   runr   r3  r:  rM  r]  r"   purgeorganizers   rv   create_taskry   r   add_done_callbackdiscard)rz   coroscorotaskr}   r:   r;   rj    s   
	zNodeHead.run)'__name__
__module____qualname__rk   r   r   r]   r   r   r   r   routesr   r   aiohttp_cacheaiohttpwebResponser	  r  r'   r!   r%  r3  r:  rM  r   r   strr   rA  rB  r<  r]  rd  re  ri  rj  __classcell__r:   r:   r}   r;   re      sL    #2<> 	
\D


	re   node_stats_strc                 C   s$   t | }td urt| |S |S rO  )r   r   r$   	parse_obj)r}  
stats_dictr:   r:   r;   r6     s
   

r6  )Zr   r   loggingrn   collectionsr   r   concurrent.futuresr   	itertoolsr   typingr   r   r   r	   r
   r   r   aiohttp.webrx  r  ray._private.utilsrayray.dashboard.optional_utils	dashboardoptional_utilsr   ray.dashboard.utilsutilsr8   ray._common.utilsr   ray._privater   ray._private.collections_utilsr   ray._private.gcs_pubsubr   r   r   ray._private.grpc_utilsr   ray._private.ray_constantsr   r   r   ray.autoscaler._private.utilr   r   r   ray.core.generatedr   r   r   ray.dashboard.constsr   r   r   ray.dashboard.modules.noder    r!   %ray.dashboard.modules.node.datacenterr"   r#   .ray.dashboard.modules.reporter.reporter_modelsr$   !ray.dashboard.subprocesses.moduler%   !ray.dashboard.subprocesses.routesr&   rv  r'   	getLoggerrs  r   r(   r$  _config(maximum_gcs_destroyed_actor_cached_countr[  r\  rW  r   r]   r<   rd   re   r{  r6  r:   r:   r:   r;   <module>   sZ    $

=    s