o
    biw1                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZd dlmZmZmZmZ d dlmZ d dlmZmZmZmZ d d	lmZ d d
lmZ d dlm Z  e!e"Z#dddZ$G dd de Z%dS )    Nwraps)RLock)
ModuleType)AnyDictListOptionalTuple)bootstrap_gcp&construct_clients_from_provider_configget_node_typetpu_accelerator_config_to_type)GCPTPU)
GCPComputeGCPNodeGCPNodeTypeGCPResource)TPUCommandRunner)CommandRunnerInterface)NodeProvider      c                    s   t  fdd}|S )a!  Retry decorator for methods of GCPNodeProvider.

    Upon catching BrokenPipeError, API clients are rebuilt and
    decorated methods are retried.

    Work-around for https://github.com/ray-project/ray/issues/16072.
    Based on https://github.com/kubeflow/pipelines/pull/5250/files.
    c                    sx   d}|k r:z| g|R i |W S  t y3   td |d7 }|k r0|   t  n Y nw |k sd S d S )Nr   z#Caught a BrokenPipeError. Retrying.r   )BrokenPipeErrorloggerwarning_construct_clientstimesleep)selfargskwargs	try_count	backoff_s	max_triesmethod ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/_private/gcp/node_provider.pymethod_with_retries-   s   
z#_retry.<locals>.method_with_retriesr   )r&   r%   r$   r)   r'   r#   r(   _retry#   s   
r*   c                       s  e Zd ZdedefddZdd Zdedefd	d
Ze	defddZ
defddZdefddZdefddZe	dedefddZdefddZdefddZe	dedededeeef fddZdefd d!Ze	defd"d#Ze	dedefd$d%Zdedefd&d'Zed(d) Zed*eeef deeef fd+d,Z	-d5d.eded/eeef ded0ed1ed2eeeef  de f fd3d4Z!  Z"S )6GCPNodeProviderprovider_configcluster_namec                 C   s6   t | || t | _|   |dd| _i | _d S )Ncache_stopped_nodesF)r   __init__r   lockr   getr.   cached_nodes)r   r,   r-   r'   r'   r(   r/   @   s
   
zGCPNodeProvider.__init__c                 C   sp   t | j\}}}}i | _t|| jd | jd | j| jtj< |d ur6t|| jd | jd | j| jtj< d S d S )N
project_idavailability_zone)	r   r,   	resourcesr   r-   r   COMPUTEr   TPU)r   _computetpur'   r'   r(   r   J   s$   z"GCPNodeProvider._construct_clients	node_namereturnc                 C   s   | j t| S )zReturn the resource responsible for the node, based on node_name.

        This expects the name to be in format '[NAME]-[UUID]-[TYPE]',
        where [TYPE] is either 'compute' or 'tpu' (see ``GCPNodeType``).
        )r5   r   name_to_type)r   r;   r'   r'   r(   $_get_resource_depending_on_node_named   s   z4GCPNodeProvider._get_resource_depending_on_node_nametag_filtersc                 C   sn   | j * g }| j D ]}||}||7 }qdd |D | _dd |D W  d    S 1 s0w   Y  d S )Nc                 S   s   i | ]}|d  |qS namer'   .0ir'   r'   r(   
<dictcomp>v   s    z8GCPNodeProvider.non_terminated_nodes.<locals>.<dictcomp>c                 S   s   g | ]}|d  qS r@   r'   rB   r'   r'   r(   
<listcomp>w   s    z8GCPNodeProvider.non_terminated_nodes.<locals>.<listcomp>)r0   r5   valueslist_instancesr2   )r   r?   	instancesresourcenode_instancesr'   r'   r(   non_terminated_nodesl   s   

$z$GCPNodeProvider.non_terminated_nodesnode_idc                 C   <   | j  | |}| W  d    S 1 sw   Y  d S N)r0   _get_cached_node
is_runningr   rM   noder'   r'   r(   rQ   y      
$zGCPNodeProvider.is_runningc                 C   rN   rO   )r0   rP   is_terminatedrR   r'   r'   r(   rU   ~   rT   zGCPNodeProvider.is_terminatedc                 C   rN   rO   )r0   rP   
get_labelsrR   r'   r'   r(   	node_tags   rT   zGCPNodeProvider.node_tagstagsc                 C   sT   | j  |}| |}| |}|j||d}|W  d    S 1 s#w   Y  d S )N)rS   labels)r0   	_get_noder>   
set_labels)r   rM   rX   rY   rS   rJ   resultr'   r'   r(   set_node_tags   s   

$zGCPNodeProvider.set_node_tagsc                 C   Z   | j   | |}| }|d u r| |}| }|W  d    S 1 s&w   Y  d S rO   )r0   rP   get_external_iprZ   r   rM   rS   ipr'   r'   r(   external_ip      

$zGCPNodeProvider.external_ipc                 C   r^   rO   )r0   rP   get_internal_iprZ   r`   r'   r'   r(   internal_ip   rc   zGCPNodeProvider.internal_ipbase_configcountc                    s   | j d |}t|}| j|  i }| jr@|d |d |d d} |dd| }|r@ fdd|D }	||	 |t|8 }|d	kr_ |||}
d
d |
D }|| W d   |S W d   |S 1 sjw   Y  |S )zCreates instances.

        Returns dict mapping instance id to each create operation result for the created
        instances.
        ray-node-nameray-node-typeray-user-node-type)rh   ri   rj   TNc                    s    i | ]}|d    |d  qS r@   )start_instance)rC   nrJ   r'   r(   rE      s    z/GCPNodeProvider.create_node.<locals>.<dictcomp>r   c                 S   s   i | ]\}}||qS r'   r'   )rC   r\   instance_idr'   r'   r(   rE      s    
)r0   r   r5   r.   rH   updatelencreate_instances)r   rf   rX   rg   rY   	node_type	all_nodesfiltersreuse_nodesreused_nodes_dictresultscreated_nodes_dictr'   rm   r(   create_node   s@   



 
  zGCPNodeProvider.create_nodec              
   C   s   t d| | |}z	|j|d}W |S  tjjy@ } z|jj	dkr2t 
d| d d }n|d W Y d }~|S d }~ww )Nz"NodeProvider: {}: Terminating noderM     !Tried to delete the node with id  but it was already gone.)r   infoformatr>   delete_instancegoogleapiclienterrors	HttpErrorrespstatusr   )r   rM   rJ   r\   
http_errorr'   r'   r(   _thread_unsafe_terminate_node   s$   


z-GCPNodeProvider._thread_unsafe_terminate_nodec                 C   s   | j U | |}z| jr | |}| r|j|d}n	d }n|j|d}W n' tjj	yN } z|j
jdkrAtd| d n|d W Y d }~nd }~ww |W  d    S 1 s[w   Y  d S )Nrz   r{   r|   r}   )r0   r>   r.   rP   rQ   stop_instancer   r   r   r   r   r   r   r   )r   rM   rJ   rS   r\   r   r'   r'   r(   terminate_node   s.   


$zGCPNodeProvider.terminate_nodec                 C   sp   |  i  | j& || jv r| j| W  d    S | |}|j|d}|W  d    S 1 s1w   Y  d S )Nrz   )rL   r0   r2   r>   get_instance)r   rM   rJ   instancer'   r'   r(   rZ      s   


$zGCPNodeProvider._get_nodec                 C   s   || j v r
| j | S | |S rO   )r2   rZ   )r   rM   r'   r'   r(   rP     s   


z GCPNodeProvider._get_cached_nodec                 C   s   t | S rO   )r   )cluster_configr'   r'   r(   bootstrap_config  s   z GCPNodeProvider.bootstrap_configr   c                 C   s   d| vr| S t | } | d }|D ]:}|| d }t|tjkrKi }d}d|v r-|d }n
d|v r7t|d }|s:qd|d| d< || d	 | q| S )
a  Fill out TPU resources to the cluster config.

        To enable TPU pod autoscaling, we provide the TPU accelerator
        type as a resource that only exists on worker 0 of the pod slice.
        For instance, a v4-16 should have the resource labels:
            worker 0: resources = {"TPU": 4, "TPU-v4-16-head": 1}
            worker 1: resources = {"TPU": 4}

        For the autoscaler to correctly process the demands of
        creating a new TPU pod, then the autoscaler must know what
        a TPU pod is in the form of the TPU accelerator resource.

        Therefore we fill out TPU pods appropriately by providing the
        expected resource which we can deduce from the cluster config.

        available_node_typesnode_config acceleratorTypeacceleratorConfigr   zTPU-z-headr5   )copydeepcopyr   r   r7   r   ro   )r   r   rr   r   autodetected_resourcesaccelerator_typer'   r'   r(   &fillout_available_node_types_resources  s.   

z6GCPNodeProvider.fillout_available_node_types_resourcesN
log_prefixauth_configprocess_runneruse_internal_ipdocker_configc                    sj   |  |}||}	|||||||d}
tj| jv r,|| jtj kr,td|	| d|
S t jdi |
S )z+Returns a TPU command runner as applicable.)r   r   rM   r   r-   r   r   )r   providerNr'   )r>   r   r   r7   r5   r   superget_command_runner)r   r   rM   r   r-   r   r   r   rJ   r   common_args	__class__r'   r(   r   B  s   


z"GCPNodeProvider.get_command_runnerrO   )#__name__
__module____qualname__dictstrr/   r   r   r>   r*   rL   rQ   rU   rW   r]   rb   re   intr   ry   r   r   r   rZ   rP   staticmethodr   r   r   r   boolr	   r   r   __classcell__r'   r'   r   r(   r+   ?   s^    
$(


2
	r+   )r   r   )&r   loggingr   	functoolsr   	threadingr   typesr   typingr   r   r   r	   r
   r   "ray.autoscaler._private.gcp.configr   r   r   r    ray.autoscaler._private.gcp.noder   r   r   r   r   .ray.autoscaler._private.gcp.tpu_command_runnerr   ray.autoscaler.command_runnerr   ray.autoscaler.node_providerr   	getLoggerr   r   r*   r+   r'   r'   r'   r(   <module>   s"    


