o
    bi
#                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZmZ d dl	Z	d dl
mZ d dlmZ d dlmZmZmZmZmZmZmZmZ eeZd ZdZG dd	 d	eZdS )
    N)RLock)AnyDictOptional)NodeLaunchException)NodeProvider)NODE_KIND_HEADNODE_KIND_WORKERSTATUS_SETTING_UPSTATUS_UP_TO_DATETAG_RAY_NODE_KINDTAG_RAY_NODE_NAMETAG_RAY_NODE_STATUSTAG_RAY_USER_NODE_TYPEzray.head.defaultc                   @   s   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d Zdd Z	dd Z
dedee fddZdd Zdd Zdd Zdeeef deeef dedeeeef  fddZdd  Zd!d" Zd#d$ Zd%d& Zed'd( Zd)S )*SparkNodeProviderzCA node provider that implements provider for nodes of Ray on spark.c              
   C   s   t | || t | _ttdtttt	t
tttiii| _d| _| jd | _| jd }d| j d| | _| jd | _| jd | _d S )	Ntagsr   ray_head_ipspark_job_server_portzhttp://:ray_head_portcluster_unique_id)r   __init__r   lockstrHEAD_NODE_IDr   r   r   HEAD_NODE_TYPEr   r   r   _nodes_next_node_idprovider_configr   spark_job_server_urlr   
cluster_id)selfr   cluster_namer    r#   _/home/ubuntu/.local/lib/python3.10/site-packages/ray/autoscaler/_private/spark/node_provider.pyr      s"   

zSparkNodeProvider.__init__c                 C   s>   | j  |  jd7  _| jW  d    S 1 sw   Y  d S )N   )r   r   )r!   r#   r#   r$   get_next_node_id9   s   $z"SparkNodeProvider.get_next_node_idc                 C   s
  | j x g }g }| jD ][}|ttkrd}n| |}|dkr<| j| d t tkr<t| j| d t< t	d| d |dkrF|
| q| |}d}| D ]\}}	|||	kr^d}qQ|rf|
| q|D ]}
| j|
 qi|W  d    S 1 s~w   Y  d S )Nrunningr   zSpark node provider node z starts running.
terminatedTF)r   r   r   r   _query_node_statusr   r
   r   loggerinfoappend	node_tagsitemsgetpop)r!   tag_filtersnodes
died_nodesnode_idstatusr   okkvdied_node_idr#   r#   r$   non_terminated_nodes>   sB   




$z&SparkNodeProvider.non_terminated_nodesc                 C   sH   |  |}tj| jd d|id}|  |jd}t|}|d S )Nz/query_task_statusspark_job_group_idurljsonzutf-8r5   )	_gen_spark_job_group_idrequestspostr   raise_for_statuscontentdecoder>   loads)r!   r4   r;   responsedecoded_respjson_resr#   r#   r$   r)   e   s   

z$SparkNodeProvider._query_node_statusc                 C   sL   | j  || jv o| j| d t tk	 W  d    S 1 sw   Y  d S Nr   )r   r   r   r   r!   r4   r#   r#   r$   
is_runningr   s   
$zSparkNodeProvider.is_runningc                 C   s4   | j  || jvW  d    S 1 sw   Y  d S Nr   r   rJ   r#   r#   r$   is_terminatedz   s   $zSparkNodeProvider.is_terminatedc                 C   s8   | j  | j| d W  d    S 1 sw   Y  d S rI   rM   rJ   r#   r#   r$   r-   ~   s   $zSparkNodeProvider.node_tagsr4   returnc                 C   s   |S rL   r#   rJ   r#   r#   r$   _get_ip   s   zSparkNodeProvider._get_ipc                 C   
   |  |S rL   rP   rJ   r#   r#   r$   external_ip      
zSparkNodeProvider.external_ipc                 C   rQ   rL   rR   rJ   r#   r#   r$   internal_ip   rT   zSparkNodeProvider.internal_ipc                 C   s&   || j v sJ | j | d | d S rI   )r   update)r!   r4   r   r#   r#   r$   set_node_tags   s   zSparkNodeProvider.set_node_tagsnode_configr   countc                 C   s   t d)Nz!This method should not be called.)AssertionError)r!   rX   r   rY   r#   r#   r$   create_node   s   zSparkNodeProvider.create_nodec                 C   s   d| j  d| j d| S )Nzray-cluster--z-worker-node-)r   r    rJ   r#   r#   r$   r?      s   z)SparkNodeProvider._gen_spark_job_group_idc                 C   s"   t |D ]
}| |||| qd S rL   )range&_create_node_with_resources_and_labels)r!   rX   r   rY   	resourceslabels_r#   r#   r$   %create_node_with_resources_and_labels   s
   z7SparkNodeProvider.create_node_with_resources_and_labelsc                 C   sl  ddl m} | j | }|t }t|  }t||d< | j }|	d}	|	d}
|	d}|	d}||d ||d< t
j| jd	 | |d
| d| j d| j |d | j| j|d |	|
|||d |d |d dd}z|  W n ty   tdd| dt w dttt|t|ttii| j|< td| d W d    d S 1 sw   Y  d S )Nr   )_append_resources_configNODE_ID_AS_RESOURCECPUGPUmemoryobject_store_memoryworker_node_optionsz/create_nodezGThis job group is for spark job which runs the Ray cluster worker node z connecting to ray head node r   using_stage_schedulingray_temp_dircollect_log_to_path)r;   spark_job_group_descrj   r   r   rk   num_cpus_per_nodenum_gpus_per_nodeheap_memory_per_nodeobject_store_memory_per_noderi   rl   r4   r<   zNode creation failurezStarting ray worker node z failedr   z!Spark node provider creates node .)ray.util.spark.cluster_initrc   r   copyr   r   r&   intr   r0   r@   rA   r   r?   r   r   rB   	Exceptionr   sysexc_infor   r	   r   r   r
   r   r*   r+   )r!   rX   r   r_   r`   rc   	node_typer4   confrn   ro   rp   rq   rF   r#   r#   r$   r^      sn   






"z8SparkNodeProvider._create_node_with_resources_and_labelsc                 C   s   || j v rtj| jd d| |id}|  | j || j v r'| j | W d    n1 s1w   Y  t	d|  d S )Nz/terminate_noder;   r<   z$Spark node provider terminates node )
r   r@   rA   r   r?   rB   r   r0   r*   r+   )r!   r4   rF   r#   r#   r$   terminate_node   s   

z SparkNodeProvider.terminate_nodec                 C   s   | S rL   r#   )cluster_configr#   r#   r$   bootstrap_config   s   z"SparkNodeProvider.bootstrap_configN)__name__
__module____qualname____doc__r   r&   r:   r)   rK   rN   r-   r   r   rP   rS   rU   rW   r   r   ru   r[   r?   rb   r^   r{   staticmethodr}   r#   r#   r#   r$   r      s8    '


Er   )r>   loggingrw   	threadingr   typingr   r   r   r@   $ray.autoscaler.node_launch_exceptionr   ray.autoscaler.node_providerr   ray.autoscaler.tagsr   r	   r
   r   r   r   r   r   	getLoggerr~   r*   r   r   r   r#   r#   r#   r$   <module>   s    (
