o
    bi(                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZmZm	Z	m
Z
mZ d dlmZmZmZ d dlmZmZmZmZmZ d dlmZ d dlmZmZmZmZmZ e eZ eG d	d
 d
Z!eG dd dZ"G dd deZ#dS )    N)defaultdict)	dataclassfield)AnyDictListOptionalSet)DISABLE_LAUNCH_CONFIG_CHECK_KEYDISABLE_NODE_UPDATERS_KEYFOREGROUND_NODE_LAUNCH_KEY)NodeIDNodeIPNodeKind
NodeStatusNodeType)NodeProvider)NODE_KIND_HEADTAG_RAY_NODE_KINDTAG_RAY_NODE_STATUSTAG_RAY_REPLICA_INDEXTAG_RAY_USER_NODE_TYPEc                   @   sB   e Zd ZU dZeedZeee	f e
d< eedZee e
d< dS )ScaleRequestzStores desired scale computed by the autoscaler.

    Attributes:
        desired_num_workers: Map of worker NodeType to desired number of workers of
            that type.
        workers_to_delete: List of ids of nodes that should be removed.
    )default_factorydesired_num_workersworkers_to_deleteN)__name__
__module____qualname____doc__r   dictr   r   r   int__annotations__setr   r	   r    r$   r$   Y/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/batching_node_provider.pyr      s   
 r   c                   @   sF   e Zd ZU dZeed< eed< ee ed< e	ed< dZ
ee ed< dS )NodeDataa  Stores all data about a Ray node needed by the autoscaler.

    Attributes:
        kind: Whether the node is the head or a worker.
        type: The user-defined type of the node.
        replica_index: An identifier for nodes in a replica of a TPU worker group.
            This value is set as a Pod label by a GKE webhook when TPUs are requested
        ip: Cluster-internal ip of the node. ip can be None if the ip
            has not yet been assigned.
        status: The status of the node. You must adhere to the following semantics
            for status:
            * The status must be "up-to-date" if and only if the node is running.
            * The status must be "update-failed" if and only if the node is in an
                unknown or failed state.
            * If the node is in a pending (starting-up) state, the status should be
                a brief user-facing description of why the node is pending.
    kindtypeipstatusNreplica_index)r   r   r   r   r   r"   r   r   r   r   r+   strr$   r$   r$   r%   r&   &   s   
 r&   c                   @   s  e Zd ZdZdeeef deddfddZdeee	f fdd	Z
d
eddfddZd#ddZdeeef dee fddZdd Zdeeef fddZdedeeef fddZdedefddZdeeef deeef dedeeeef  fdd Zdedeeeef  fd!d"ZdS )$BatchingNodeProvidera  Abstract subclass of NodeProvider meant for use with external cluster managers.

    Batches reads of cluster state into a single method, get_node_data, called at the
    start of an autoscaling update.

    Batches modifications to cluster state into a single method, submit_scale_request,
    called at the end of an autoscaling update.

    Implementing a concrete subclass of BatchingNodeProvider only requires overriding
    get_node_data() and submit_scale_request().

    See the method docstrings for more information.

    Note that an autoscaling update may be conditionally
    cancelled using the optional method safe_to_scale()
    of the root NodeProvider.
    provider_configcluster_namereturnNc                 C   s   t | || i | _|tddu sJ dt d|tddu s*J dt d|tddu s:J dt dd| _t | _	t
tt | _d S )NFTz'To use BatchingNodeProvider, must set `z:True`.)r   __init__node_data_dictgetr   r
   r   scale_change_neededr   scale_requestr   listr,   replica_index_to_nodes)selfr.   r/   r$   r$   r%   r1   T   s    

zBatchingNodeProvider.__init__c                 C      t )a8  Queries cluster manager for node info. Returns a mapping from node id to
        NodeData.

        Each NodeData value must adhere to the semantics of the NodeData docstring.
        (Note in particular the requirements for NodeData.status.)

        Consistency requirement:
        If a node id was present in ScaleRequest.workers_to_delete of a previously
        submitted scale request, it should no longer be present as a key in
        get_node_data.
        (Node termination must be registered immediately when submit_scale_request
        returns.)
        NotImplementedErrorr8   r$   r$   r%   get_node_datat   s   z"BatchingNodeProvider.get_node_datar5   c                 C   r9   )a  Tells the cluster manager which nodes to delete and how many nodes of
        each node type to maintain.

        Consistency requirement:
        If a node id was present in ScaleRequest.workers_to_delete of a previously
        submitted scale request, it should no longer be present as key in get_node_data.
        (Node termination must be registered immediately when submit_scale_request
        returns.)
        r:   )r8   r5   r$   r$   r%   submit_scale_request   s   
z)BatchingNodeProvider.submit_scale_requestc                 C   s   | j r	| | j d| _ dS )z3Submit a scale request if it is necessary to do so.FN)r4   r>   r5   r<   r$   r$   r%   post_process   s   
z!BatchingNodeProvider.post_processtag_filtersc                    s   d _    _t  t d _t j } j	
  |D ]} j| j}|d ur4 j	| | q  fdd|D }|S )NF)r   r   c                    s&   g | ]}   |  kr|qS r$   )items	node_tags).0noder8   r@   r$   r%   
<listcomp>   s
    z=BatchingNodeProvider.non_terminated_nodes.<locals>.<listcomp>)r4   r=   r2   r   cur_num_workersr#   r5   r6   keysr7   clearr+   append)r8   r@   	all_nodesnode_idr+   filtered_nodesr$   rE   r%   non_terminated_nodes   s"   

z)BatchingNodeProvider.non_terminated_nodesc                 C   s   |  | jS )zCReturns dict mapping node type to the number of nodes of that type.)_cur_num_workersr2   r<   r$   r$   r%   rG      s   z$BatchingNodeProvider.cur_num_workersr2   c                 C   s8   t t}| D ]}|jtkrq||j  d7  < q|S )N   )r   r!   valuesr'   r   r(   )r8   r2   num_workers_dict	node_datar$   r$   r%   rO      s   
z%BatchingNodeProvider._cur_num_workersrL   c                 C   s8   | j | }t|jt|jt|ji}|jd ur|j|t< |S N)	r2   r   r'   r   r*   r   r(   r+   r   )r8   rL   rS   tagsr$   r$   r%   rB      s   


zBatchingNodeProvider.node_tagsc                 C   s   | j | jS rT   )r2   r)   )r8   rL   r$   r$   r%   internal_ip   s   z BatchingNodeProvider.internal_ipnode_configrU   countc                 C   s&   |t  }| jj|  |7  < d| _d S )NT)r   r5   r   r4   )r8   rW   rU   rX   	node_typer$   r$   r%   create_node   s   
z BatchingNodeProvider.create_nodec                 C   s   || j jv rtd| d d S || jvr!td| d d S | j| j}| j j| dkr7td| d| j j|  d8  < | j j| | 	|}t
|v rv|t
 }| j| D ]}|| j jvru| j j| td| d	| d
 qZd| _d S )Nz#Autoscaler tried to terminate node z8 twice in the same update. Skipping termination request.z*Autoscaler tried to terminate unkown node z. Skipping termination request.r   z>NodeProvider attempted to request less than 0 workers of type rP   zAutoscaler terminating node z in multi-host replica .T)r5   r   loggerwarningr2   r(   r   AssertionErroraddrB   r   r7   infor4   )r8   rL   rY   rU   node_replica_index	worker_idr$   r$   r%   terminate_node   s@   




z#BatchingNodeProvider.terminate_node)r0   N)r   r   r   r   r   r,   r   r1   r   r&   r=   r   r>   r?   r   rN   rG   rO   rB   rV   r!   r   rZ   rc   r$   r$   r$   r%   r-   A   s6    

 
	


"r-   )$loggingcollectionsr   dataclassesr   r   typingr   r   r   r   r	   !ray.autoscaler._private.constantsr
   r   r   ray.autoscaler._private.utilr   r   r   r   r   ray.autoscaler.node_providerr   ray.autoscaler.tagsr   r   r   r   r   	getLoggerr   r\   r   r&   r-   r$   r$   r$   r%   <module>   s    
