o
    `۷i6O                     @   sJ   d dl Z d dlZd dlmZmZmZmZ d dlmZm	Z	 G dd dZ
dS )    N)DictListOptionalSet)InstanceInstanceUpdateEventc                   @   s  e Zd ZU dZdZeeded f  ed< e		d2de
de
dejd	e
d
ef
ddZe	d
e
fddZe	dejd
efddZe	dejd
efddZe	dejd
efddZdejd
efddZe		d2dedejd	e
d
efddZe	dedejd	e
fddZe	deded
efdd Ze	d
eded f fd!d"Ze		d3ded#ed d
ed$ fd%d&Ze		d3ded#ed d
ed$ fd'd(Ze		d3ded#ed d
ee fd)d*Zedejd
ed fd+d,Ze	ded-ed
e
fd.d/Z ed0d1 Z!dS )4InstanceUtilzt
    A helper class to group updates and operations on an Instance object defined
    in instance_manager.proto
    NzInstance.InstanceStatus_reachable_from instance_idinstance_typestatusdetailsreturnc                 C   s0   t  }d|_| |_||_||_t||| |S )a  
        Returns a new instance with the given status.

        Args:
            instance_id: The instance id.
            instance_type: The instance type.
            status: The status of the new instance.
            details: The details of the status transition.
        r   )r   versionr   r   r   r   _record_status_transition)r   r   r   r   instance r   _/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/autoscaler/v2/instance_manager/common.pynew_instance   s   zInstanceUtil.new_instancec                   C   s   t t S )z/
        Returns a random instance id.
        )struuiduuid4r   r   r   r   random_instance_id-   s   zInstanceUtil.random_instance_idinstance_statusc                 C   s>   | t jksJ | t jt jt jt jt jt jt jt j	t j
t jh
v S )z
        Returns True if the instance is in a status where there could exist
        a cloud instance allocated by the cloud provider.
        )r   UNKNOWN	ALLOCATEDRAY_INSTALLINGRAY_RUNNINGRAY_STOPPINGRAY_STOP_REQUESTEDRAY_STOPPEDTERMINATINGRAY_INSTALL_FAILEDTERMINATION_FAILEDALLOCATION_TIMEOUTr   r   r   r   is_cloud_instance_allocated4   s   z(InstanceUtil.is_cloud_instance_allocatedc                 C   s:   | t jksJ | tt jv rdS | tt jv rdS dS )z
        Returns True if the instance is in a status where the ray process is
        running on the cloud instance.
            i.e. RAY_RUNNING, RAY_STOP_REQUESTED, RAY_STOPPING
        FT)r   r   r   get_reachable_statusesr   r   r&   r   r   r   is_ray_runningH   s   zInstanceUtil.is_ray_runningc                 C   s:   | t jksJ t jt| vrdS | tt jv rdS dS )z
        Returns True if the instance is in a status where the ray process is
        pending to be started on the cloud instance.

        FT)r   r   r   r   r(   r&   r   r   r   is_ray_pending[   s   zInstanceUtil.is_ray_pendingc                 C   s   t jt| v S )zt
        Returns True if the instance is in a status where it may transition
        to RAY_RUNNING status.
        )r   r   r   r(   r&   r   r   r   is_ray_running_reachableo   s   z%InstanceUtil.is_ray_running_reachabler   new_instance_statusc                 C   s.   |t  | j vrdS || _t | || dS )aB  Transitions the instance to the new state.

        Args:
            instance: The instance to update.
            new_instance_status: The new status to transition to.
            details: The details of the transition.

        Returns:
            True if the status transition is successful, False otherwise.
        FT)r   get_valid_transitionsr   r   )r   r,   r   r   r   r   
set_statusx   s   zInstanceUtil.set_statusc                 C   s$   t  }| jtj|||d dS )zRecords the status transition.

        Args:
            instance: The instance to update.
            status: The new status to transition to.
        )r   timestamp_nsr   N)timetime_nsstatus_historyappendr   StatusHistory)r   r   r   now_nsr   r   r   r      s   
z&InstanceUtil._record_status_transition	timeout_sc              	   C   sr   | j }tj| |d}t|dks%J d| j dt| dtj| dt|d }t	
 | |d kr7d	S d
S )al  
        Returns True if the instance has been in the current status for more
        than the timeout_seconds.

        Args:
            instance: The instance to check.
            timeout_seconds: The timeout in seconds.

        Returns:
            True if the instance has been in the current status for more than
            the timeout_s seconds.
        )select_instance_status   z	instance z has  z statusg    eAFT)r   r   get_status_transition_times_nslenr   r   InstanceStatusNamesortedr0   r1   )r   r6   
cur_statusstatus_times_nsstatus_time_nsr   r   r   has_timeout   s   
zInstanceUtil.has_timeoutc                   C   s   t jt jt jht jt jt jt jht jt jt jt jt j	t j
t jt jht jt jt jt j
t jt jht jt jt j	t j
t jt jht jt j	t j
t jt jht jt jht j	t j
t jt jht j
t jt jht jt jt jht jt jht jt t jt t jt jt jht jt iS N)r   QUEUED	REQUESTED
TERMINATEDr   ALLOCATION_FAILEDr   r   r%   r   r!   r"   r#   r    r$   setr   r   r   r   r   r-      s   
		
 z"InstanceUtil.get_valid_transitionsr7   zInstance.StatusHistoryc                 C   s.   g }| j D ]}|r|j|krq|| q|S )aH  
        Returns the status history of the instance.

        Args:
            instance: The instance.
            select_instance_status: The go-to status to search for, i.e. select
                only status history when the instance transitions into the status.
                If None, returns all status updates.
        )r2   r   r3   )r   r7   historystatus_updater   r   r   get_status_transitionsh  s   

z#InstanceUtil.get_status_transitionsc                 C   s,   t | |}|jdd d |r|d S dS )z
        Returns the last status transition of the instance.

        Args:
            instance: The instance.
            instance_status: The status to search for. If None, returns the last
                status update.
        c                 S   s   | j S rD   r/   )xr   r   r   <lambda>  s    z9InstanceUtil.get_last_status_transition.<locals>.<lambda>)keyr:   N)r   rL   sort)r   r7   rJ   r   r   r   get_last_status_transition  s
   z'InstanceUtil.get_last_status_transitionc                 C   s   dd t | |D S )aL  
        Returns a list of timestamps of the instance status update.

        Args:
            instance: The instance.
            instance_status: The status to search for. If None, returns all
                status updates timestamps.

        Returns:
            The list of timestamps of the instance status updates.
        c                 S   s   g | ]}|j qS r   rM   ).0er   r   r   
<listcomp>  s    z?InstanceUtil.get_status_transition_times_ns.<locals>.<listcomp>)r   rL   )r   r7   r   r   r   r;     s
   z+InstanceUtil.get_status_transition_times_nsc                 C   s   | j du r	|   | j | S )at  
        Returns the set of instance status that is reachable from the given
        instance status following the status transitions.
        This method is memoized.
        Args:
            instance_status: The instance status to start from.
        Returns:
            The set of instance status that is reachable from the given instance
            status.
        N)r	   _compute_reachable)clsr   r   r   r   r(     s   

z#InstanceUtil.get_reachable_statusesupdatec                 C   s   |j r!dtj|j d| j d| j d| j d| j d|j	 S dtj| j
 dtj|j d| j d| j d| j d| j d|j	 S )	z3Returns a log string for the given instance update.zNew instance z (id=z, type=z, cloud_instance_id=z	, ray_id=z): zUpdate instance z->)upsertr   r=   r>   r,   r   r   cloud_instance_idnode_idr   r   )r   rX   r   r   r   get_log_str_for_update  s>   z#InstanceUtil.get_log_str_for_updatec                    sF   |   } fdd i | _tj D ]}t } |||| j|< qdS )zo
        Computes and memorize the from status sets for each status machine with
        a DFS search.
        c                    s0   | | D ]}||vr| |  | || q|S )z^
            Regular DFS algorithm to find all reachable nodes from a given node.
            )add)graphstartvisited	next_nodedfsr   r   rc     s   
z,InstanceUtil._compute_reachable.<locals>.dfsN)r-   r	   r   r=   valuesrI   )rW   valid_transitionsr   r`   r   rb   r   rV     s   zInstanceUtil._compute_reachable)r
   rD   )"__name__
__module____qualname____doc__r	   r   r   r   __annotations__staticmethodr   r   r=   r   r   boolr'   r)   r*   r+   r.   r   intrC   r-   r   rL   rR   r;   classmethodr(   r   r\   rV   r   r   r   r   r      s   
 
	
 'r   )r0   r   typingr   r   r   r   'ray.core.generated.instance_manager_pb2r   r   r   r   r   r   r   <module>   s
    