o
    $ii                  	   @   s  d Z ddlZddlZddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlmZmZmZmZmZmZ ddlmZ ddlZdd	lmZ dd
lmZ ddlmZ ddlmZmZ e e!Z"dZ#dZ$dZ%e%d Z&dZ'de%e'fdee(ee( f dee) de*de*fddZ+de,de)de)fddZ-G dd de
Z.G dd deej/d Z0G d!d" d"e0Z1G d#d$ d$e0Z2G d%d& d&ej/d Z3G d'd( d(e3Z4G d)d* d*e3Z5dS )+a  Abstractions around GCP resources and nodes.

The logic has been abstracted away here to allow for different GCP resources
(API endpoints), which can differ widely, making it impossible to use
the same logic for everything.

Classes inheriting from ``GCPResource`` represent different GCP resources -
API endpoints that allow for nodes to be created, removed, listed and
otherwise managed. Those classes contain methods abstracting GCP REST API
calls.
Each resource has a corresponding node type, represented by a
class inheriting from ``GCPNode``. Those classes are essentially dicts
with some extra methods. The instances of those classes will be created
from API responses.

The ``GCPNodeType`` enum is a lightweight way to classify nodes.

Currently, Compute and TPU resources & nodes are supported.

In order to add support for new resources, create classes inheriting from
``GCPResource`` and ``GCPNode``, update the ``GCPNodeType`` enum,
update the ``_generate_node_name`` method and finally update the
node provider.
    N)UserDict)deepcopy)Enumwraps)AnyDictListOptionalTupleUnion)uuid4)AuthorizedHttp)Resource)	HttpError)TAG_RAY_CLUSTER_NAMETAG_RAY_NODE_NAME@            	exceptionregexmax_retriesretry_interval_sc                    s    fdd}|S )zDRetry a function call n-times for as long as it throws an exception.c                    s    t   fdd}|S )Nc                     sR    fdd}t D ]}| }t|ts nt qt|tr'||S )Nc               
      s`   z
 i } | W S  t y/ } zt|r!r#tt|s#||W  Y d }~S d }~ww N)	Exception
isinstanceresearchstr)valuee)argsr   funckwargsr    ]/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/autoscaler/_private/gcp/node.pytry_catch_excC   s   
zH_retry_on_exception.<locals>.dec.<locals>.wrapper.<locals>.try_catch_exc)ranger   r   timesleep)r#   r%   r(   _ret)r   r$   r   r   r   )r#   r%   r'   wrapperA   s   

z1_retry_on_exception.<locals>.dec.<locals>.wrapperr   )r$   r.   r   r   r   r   )r$   r'   dec@   s   z _retry_on_exception.<locals>.decr&   )r   r   r   r   r0   r&   r/   r'   _retry_on_exception8   s   r1   labelsnode_suffixreturnc                 C   sL   | t  }t|tt d ksJ |t|f| dt jdt  d| S )a  Generate node name from labels and suffix.

    This is required so that the correct resource can be selected
    when the only information autoscaler has is the name of the node.

    The suffix is expected to be one of 'compute' or 'tpu'
    (as in ``GCPNodeType``).
       -N)r   lenINSTANCE_NAME_MAX_LENINSTANCE_NAME_UUID_LENr   hex)r2   r3   
name_labelr&   r&   r'   _generate_node_name\   s   	 r<   c                   @   s8   e Zd ZdZdZdZedddZedefd	d
Z	dS )GCPNodeTypez'Enum for GCP node types (compute & tpu)computetpunodeGCPNodec                 C   s4   t | trtjS t | trtjS tdt|  d)z,Return GCPNodeType based on ``node``'s classzWrong GCPNode type .)r   
GCPTPUNoder=   TPUGCPComputeNodeCOMPUTE	TypeErrortype)r@   r&   r&   r'   from_gcp_nodes   s
   

zGCPNodeType.from_gcp_nodenamec                 C   s   t | dd S )zProvided a node name, determine the type.

        This expects the name to be in format '[NAME]-[UUID]-[TYPE]',
        where [TYPE] is either 'compute' or 'tpu'.
        r6   )r=   splitrJ   r&   r&   r'   name_to_type|   s   zGCPNodeType.name_to_typeN)r@   rA   )
__name__
__module____qualname____doc__rF   rD   staticmethodrI   r    rN   r&   r&   r&   r'   r=   m   s    r=   c                       s   e Zd ZdZdZdZdZdeddddf fddZde	fd	d
Z
de	fddZejdefddZejdefddZejdefddZdefddZ  ZS )rA   z(Abstraction around compute and tpu nodesN	base_dictresourceGCPResourcer4   c                    s.   t  j|fi | || _t| jtsJ d S r   )super__init__rU   r   rV   )selfrT   rU   r%   	__class__r&   r'   rX      s   zGCPNode.__init__c                 C   s   |  | j| jv S r   )getSTATUS_FIELDRUNNING_STATUSESrY   r&   r&   r'   
is_running      zGCPNode.is_runningc                 C   s   |  | j| jvS r   )r\   r]   NON_TERMINATED_STATUSESr_   r&   r&   r'   is_terminated   ra   zGCPNode.is_terminatedc                 C      d S r   r&   r_   r&   r&   r'   
get_labels      zGCPNode.get_labelsc                 C   rd   r   r&   r_   r&   r&   r'   get_external_ip   rf   zGCPNode.get_external_ipc                 C   rd   r   r&   r_   r&   r&   r'   get_internal_ip   rf   zGCPNode.get_internal_ipc                 C   s   d| j j d| d dS )N<z: rJ   >)r[   rO   r\   r_   r&   r&   r'   __repr__   s   zGCPNode.__repr__)rO   rP   rQ   rR   rb   r^   r]   dictrX   boolr`   rc   abcabstractmethodre   r    rg   rh   rk   __classcell__r&   r&   rZ   r'   rA      s    rA   )	metaclassc                   @   sT   e Zd ZdZh dZddhZdhZdZdefdd	Z	de
fd
dZde
fddZdS )rE   z Abstraction around compute nodes>   RUNNINGSTAGINGPROVISIONING
TERMINATED	SUSPENDEDrr   statusr4   c                 C      |  di S Nr2   r\   r_   r&   r&   r'   re         zGCPComputeNode.get_labelsc                 C   s(   |  di gd  di gd  dd S )NnetworkInterfacesr   accessConfigsnatIPrz   r_   r&   r&   r'   rg      s
   zGCPComputeNode.get_external_ipc                 C   s   |  di gd  dS )Nr|   r   	networkIPrz   r_   r&   r&   r'   rh      s   zGCPComputeNode.get_internal_ipN)rO   rP   rQ   rR   rb   TERMINATED_STATUSESr^   r]   rl   re   r    rg   rh   r&   r&   r&   r'   rE      s    rE   c                   @   s   e Zd ZdZh dZdhZdZdefddZe	de
fdd	Zdee fd
dZdde
defddZdee fddZdde
defddZdS )rC   zAbstraction around tpu nodes>   READYCREATINGSTARTING
RESTARTINGr   stater4   c                 C   rx   ry   rz   r_   r&   r&   r'   re      r{   zGCPTPUNode.get_labelsc                 C   s   t | di gS NnetworkEndpoints)r7   r\   r_   r&   r&   r'   num_workers   s   zGCPTPUNode.num_workersc                 C      |  di gS r   rz   r_   r&   r&   r'   get_external_ips      zGCPTPUNode.get_external_ipsr   worker_indexc                 C   s   |   | di dd S )NaccessConfig
externalIp)r   r\   rY   r   r&   r&   r'   rg      s   
zGCPTPUNode.get_external_ipc                 C   r   r   rz   r_   r&   r&   r'   get_internal_ips   r   zGCPTPUNode.get_internal_ipsc                 C   s   |   | dd S )N	ipAddress)r   r\   r   r&   r&   r'   rh      s   zGCPTPUNode.get_internal_ipN)r   )rO   rP   rQ   rR   rb   r^   r]   rl   re   propertyintr   r	   r    r   rg   r   rh   r&   r&   r&   r'   rC      s    rC   c                   @   s  e Zd ZdZdededededdf
dd	Zejd
e	de	fddZ
ejeefdedededefddZej		d-dee deded fddZejdeddfddZej	d.dedededefdd Zej	d.d!edededeeef fd"d#Z	d.d!eded$ededeeeef  f
d%d&Zejd.dededefd'd(Zejd.dededefd)d*Zejd.dededefd+d,ZdS )/rV   z,Abstraction around compute and TPU resourcesrU   
project_idavailability_zonecluster_namer4   Nc                 C   s   || _ || _|| _|| _d S r   )rU   r   r   r   )rY   rU   r   r   r   r&   r&   r'   rX      s   
zGCPResource.__init__httpc                 C      dS )@Generate a new AuthorizedHttp object with the given credentials.Nr&   )rY   r   r&   r&   r'   get_new_authorized_http      z#GCPResource.get_new_authorized_http	operation	max_pollspoll_intervalc                 C   r   )z8Waits a preset amount of time for operation to complete.Nr&   )rY   r   r   r   r&   r&   r'   wait_for_operation      zGCPResource.wait_for_operationFlabel_filtersrc   rA   c                 C   r   )zReturns a filtered list of all instances.

        The filter removes all terminated instances and, if ``label_filters``
        are provided, all instances which labels are not matching the
        ones provided.
        Nr&   )rY   r   rc   r&   r&   r'   list_instances   s   zGCPResource.list_instancesnode_idc                 C   r   )zReturns a single instance.Nr&   )rY   r   r&   r&   r'   get_instance  r   zGCPResource.get_instanceTr@   r2   r   c                 C   r   )zbSets labels on an instance and returns result.

        Completely replaces the labels dictionary.Nr&   )rY   r@   r2   r   r&   r&   r'   
set_labels  s   zGCPResource.set_labelsbase_configc                 C   r   )zgCreates a single instance and returns result.

        Returns a tuple of (result, node_name).
        Nr&   )rY   r   r2   r   r&   r&   r'   create_instance  r   zGCPResource.create_instancecountc                    s<    fddt |D }|rfdd|D }|S |}|S )zqCreates multiple instances and returns result.

        Returns a list of tuples of (result, node_name).
        c                    s   g | ]
}j  d dqS )Fr   )r   .0ir   r2   rY   r&   r'   
<listcomp>/  s    z0GCPResource.create_instances.<locals>.<listcomp>c                    s   g | ]\}}  ||fqS r&   r   )r   r   	node_namer_   r&   r'   r   5      )r)   )rY   r   r2   r   r   
operationsresultsr&   r   r'   create_instances$  s   
zGCPResource.create_instancesc                 C   r   z'Deletes an instance and returns result.Nr&   rY   r   r   r&   r&   r'   delete_instance>  r   zGCPResource.delete_instancec                 C   r   r   r&   r   r&   r&   r'   stop_instanceC  r   zGCPResource.stop_instancec                 C   r   )z,Starts a single instance and returns result.Nr&   r   r&   r&   r'   start_instanceH  r   zGCPResource.start_instanceNFT)rO   rP   rQ   rR   r   r    rX   rn   ro   r   r   	MAX_POLLSPOLL_INTERVALrl   r   r   r
   rm   r	   r   r   rA   r   r   r   r   r   r   r   r&   r&   r&   r'   rV      s    
	

rV   c                   @   s&  e Zd ZdZdedefddZeefdede	de	defd	d
Z
		d&dee dedee fddZdedefddZ	d'dedededefddZdeeef deeef fddZ	d'dedededeeef fddZd'dededefd d!Zd'dededefd"d#Zd'dededefd$d%ZdS )(
GCPComputez'Abstraction around GCP compute resourcer   r4   c                 C      t |jt d}|S r   r   r   credentialshttplib2HttprY   r   new_httpr&   r&   r'   r   Q     z"GCPCompute.get_new_authorized_httpr   r   r   c                 C   s   t d|d  d t|D ]=}| j j| j|d | jdj| 	| jj
d}d|v r3t|d |d dkrGt d	|d  d
  |S t| q|S )z/Poll for compute zone operation until finished.z7wait_for_compute_zone_operation: Waiting for operation rJ    to finish...)projectr   zoner   errorrw   DONEz+wait_for_compute_zone_operation: Operation 
 finished.)loggerinfor)   rU   zoneOperationsr\   r   r   executer   _httpr   r*   r+   rY   r   r   r   r,   resultr&   r&   r'   r   V  s6   	zGCPCompute.wait_for_operationNFr   rc   c                    s   |pi }|rdd dd | D  d }nd}|rtjntj}dd dd |D  d }d	jt jd
}djdd}dd ||||fD }d |}	 j	 j
 j j|	dj  jjd}
|
dg } fdd|D S )N(z AND c                 S   s   g | ]\}}d j ||dqS )(labels.{key} = {value})keyr!   format)r   r   r!   r&   r&   r'   r     r   z-GCPCompute.list_instances.<locals>.<listcomp>) z OR c                 S   s   g | ]}d j |dqS )z(status = {status}))rw   r   )r   rw   r&   r&   r'   r     s    r   r   z(NOT labels.{label}:*)	tpu_cores)labelc                 S   s   g | ]}|r|qS r&   r&   )r   fr&   r&   r'   r     s    )r   r   filterr   itemsc                       g | ]}t | qS r&   )rE   r   r_   r&   r'   r         )joinr   rE   r   rb   r   r   r   rU   	instanceslistr   r   r   r   r   r\   )rY   r   rc   label_filter_exprstatusesinstance_state_filter_exprcluster_name_filter_exprtpu_negation_filter_exprnot_empty_filtersfilter_exprresponser   r&   r_   r'   r   z  s`   

zGCPCompute.list_instancesr   c                 C   s(   | j  j| j| j|d }t|| S Nr   r   instance)rU   r   r\   r   r   r   rE   rY   r   r   r&   r&   r'   r     s   

zGCPCompute.get_instanceTr@   r2   r   c                 C   sn   t |d fi ||d d}|d }| j j| j| j||dj| | jjd}|r3| 	|}|S |}|S )Nr2   labelFingerprint)r2   r   rJ   )r   r   r   bodyr   )
rl   rU   r   	setLabelsr   r   r   r   r   r   )rY   r@   r2   r   r   r   r   r   r&   r&   r'   r     s&   
zGCPCompute.set_labelsconfiguration_dictc                 C   sv   t |}|d }td|sdj| j|d d|d< |dg D ]}|d }td|s8dj| j| j|d	|d< q |S )
a  Ensures that resources are in their full URL form.

        GCP expects machineType and acceleratorType to be a full URL (e.g.
        `zones/us-west1/machineTypes/n1-standard-2`) instead of just the
        type (`n1-standard-2`)

        Args:
            configuration_dict: Dict of options that will be passed to GCP
        Returns:
            Input dictionary, but with possibly expanding `machineType` and
                `acceleratorType`.
        machineTypez.*/machineTypes/.*z(zones/{zone}/machineTypes/{machine_type})r   machine_typeguestAcceleratorsacceleratorTypez.*/acceleratorTypes/.*z>projects/{project}/zones/{zone}/acceleratorTypes/{accelerator})r   r   accelerator)r   r   r   r   r   r\   r   )rY   r   existing_machine_typer   gpu_typer&   r&   r'   _convert_resources_to_urls  s.   z%GCPCompute._convert_resources_to_urlsr   c           	      C   s   |  |}|dd  t|tjj}t|di fi |}|t|fi t	| j
i|d |dd }| j j| j| j||dj| | jjd}|rW| |}||fS |}||fS )NnetworkConfigr2   )r2   rJ   sourceInstanceTemplate)r   r   r   r   r   )r   popr<   r=   rF   r!   rl   r\   updater   r   rU   r   insertr   r   r   r   r   r   )	rY   r   r2   r   configrJ   source_instance_templater   r   r&   r&   r'   r     s2   

zGCPCompute.create_instancec                 C   8   | j  j| j| j|d }|r| |}|S |}|S r   )rU   r   deleter   r   r   r   rY   r   r   r   r   r&   r&   r'   r   @  s   

zGCPCompute.delete_instancec                 C   r  r   )rU   r   stopr   r   r   r   r  r&   r&   r'   r   R  s   

zGCPCompute.stop_instancec                 C   sF   | j  j| j| j|dj| | j jd}|r| |}|S |}|S )Nr   r   )	rU   r   startr   r   r   r   r   r   r  r&   r&   r'   r   c  s   

zGCPCompute.start_instancer   r   )rO   rP   rQ   rR   r   r   r   r   rl   r   r   r
   rm   r	   rE   r   r    r   r   r   r   r   r   r   r   r   r   r&   r&   r&   r'   r   N  sh    
&
E



'

4r   c                   @   s  e Zd ZdZedd ZdedefddZee	fde
d	ed
ede
fddZ		d&dee
 dedee fddZdedefddZeed	d'dede
dede
fddZ	d'de
de
dedee
ef fddZd'dedede
fd d!Zd'dedede
fd"d#Zd'dedede
fd$d%ZdS )(GCPTPUz#Abstraction around GCP TPU resourcec                 C   s   d| j  d| j S )Nz	projects/z/locations/)r   r   r_   r&   r&   r'   path{  s   zGCPTPU.pathr   r4   c                 C   r   r   r   r   r&   r&   r'   r     r   zGCPTPU.get_new_authorized_httpr   r   r   c                 C   s   t d|d  d t|D ]<}| j   j|d  dj| 	| jj
d}d|v r4t|d d|v rFt d|d  d	  |S t| q|S )
z&Poll for TPU operation until finished.z.wait_for_tpu_operation: Waiting for operation rJ   r   rM   r   r   r   z"wait_for_tpu_operation: Operation r   )r   r   r)   rU   projects	locationsr   r\   r   r   r   r   r*   r+   r   r&   r&   r'   r     s4   
zGCPTPU.wait_for_operationNFr   rc   c                    s   j    jjdjj jd}|	dg }fdd|D } p)i  j
 t< dtdtf fdd	}tt||}|S )
N)parentr   nodesc                    r   r&   )rC   r   r_   r&   r'   r     r   z)GCPTPU.list_instances.<locals>.<listcomp>r   r4   c                    sN   |   rdS |  } r%  D ]\}}||vr dS ||| kr$ dS qdS )NFT)rc   re   r   )r   r2   r   r!   )r   r&   r'   filter_instance  s   z.GCPTPU.list_instances.<locals>.filter_instance)rU   r  r  r  r   r  r   r   r   r\   r   r   rC   rm   r   )rY   r   rc   r   r   r  r&   )r   rY   r'   r     s   
zGCPTPU.list_instancesr   c                 C   s6   | j    j|dj| | j jd}t|| S )NrM   r   )	rU   r  r  r  r\   r   r   r   rC   r   r&   r&   r'   r     s   
zGCPTPU.get_instancezunable to queue the operationTr@   r2   r   c                 C   sj   dt |d fi |i}d}| j   j|d ||dj| | jjd}|r1| 	|}|S |}|S )Nr2   rJ   )rJ   
updateMaskr   r   )
rl   rU   r  r  r  patchr   r   r   r   )rY   r@   r2   r   r   update_maskr   r   r&   r&   r'   r     s&   
zGCPTPU.set_labelsr   c                 C   s  |  }|dd  t|tjj}t|di fi |}|dt|fi t	| j
ii d|vr5i |d< d|d vrAd|d d< d|v rY|dd |d< |d d	|d d
< | j   j| j||dj| | jjd}|r}| |}||fS |}||fS )Nr|   r2   r   enableExternalIpsTserviceAccountsr   serviceAccountscopesscope)r  r   nodeIdr   )copyr  r<   r=   rD   r!   rl   r\   r  r   r   rU   r  r  r  creater  r   r   r   r   )rY   r   r2   r   r  rJ   r   r   r&   r&   r'   r     s>   
zGCPTPU.create_instancec                 C   J   | j    j|dj| | j jd}|r!| j|t	d}|S |}|S NrM   r   )r   )
rU   r  r  r  r  r   r   r   r   r   r  r&   r&   r'   r   )  s   	zGCPTPU.delete_instancec                 C   r  r  )
rU   r  r  r  r	  r   r   r   r   r   r  r&   r&   r'   r   :     zGCPTPU.stop_instancec                 C   r  r  )
rU   r  r  r  r
  r   r   r   r   r   r  r&   r&   r'   r   J  r  zGCPTPU.start_instancer   r   )rO   rP   rQ   rR   r   r  r   r   MAX_POLLS_TPUr   rl   r   r   r
   rm   r	   rC   r   r    r   r1   r   r   r   r   r   r   r   r&   r&   r&   r'   r  v  sd    

$
)

1r  )6rR   rn   loggingr   r*   collectionsr   r  r   enumr   	functoolsr   typingr   r   r	   r
   r   r   uuidr   r   google_auth_httplib2r   googleapiclient.discoveryr   googleapiclient.errorsr   ray.autoscaler.tagsr   r   	getLoggerrO   r   r8   r9   r   r   r   r   r    r   r1   rl   r<   r=   ABCMetarA   rE   rC   rV   r   r  r&   r&   r&   r'   <module>   sX     

$"!n  *