o
    `۷i7a                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlZd dlm	Z	m
Z
 d dlmZ d dlmZmZ d dlmZmZ d dlmZmZ d dlmZmZ d d	lmZ d d
lmZmZmZm Z m!Z!m"Z"m#Z#m$Z$ e %e&Z'dZ(dZ)G dd dZ*G dd de*eZ+dS )    N)Thread)usage_constants	usage_lib)subprocess_output_util)cf
cli_logger)AUTOSCALER_NODE_START_WAIT_SProcessRunnerError)LABELS_ENVIRONMENT_VARIABLERESOURCES_ENVIRONMENT_VARIABLE)CreateClusterEventglobal_event_system)LogTimer)STATUS_SETTING_UPSTATUS_SYNCING_FILESSTATUS_UP_TO_DATESTATUS_UPDATE_FAILEDSTATUS_WAITING_FOR_SSHTAG_RAY_FILE_MOUNTS_CONTENTSTAG_RAY_NODE_STATUSTAG_RAY_RUNTIME_CONFIG      c                	   @   sb   e Zd ZdZddddeddddf	ddZdd Zdd	d
Zdd Zdd Z	dddZ
dddZdS )NodeUpdatera  A process for syncing files and running init commands on a node.

    Arguments:
        node_id: the Node ID
        provider_config: Provider section of autoscaler yaml
        provider: NodeProvider Class
        auth_config: Auth section of autoscaler yaml
        cluster_name: the name of the cluster.
        file_mounts: Map of remote to local paths
        initialization_commands: Commands run before container launch
        setup_commands: Commands run before ray starts
        ray_start_commands: Commands to start ray
        runtime_hash: Used to check for config changes
        file_mounts_contents_hash: Used to check for changes to file mounts
        is_head_node: Whether to use head start/setup commands
        rsync_options: Extra options related to the rsync command.
        process_runner: the module to use to run the commands
            in the CommandRunner. E.g., subprocess.
        use_internal_ip: Wwhether the node_id belongs to an internal ip
            or external ip.
        docker_config: Docker section of autoscaler yaml
        restart_only: Whether to skip setup commands & just restart ray
        for_recovery: True if updater is for a recovering node. Only used for
            metric tracking.
    NFc              	   C   s   d || _|p|ddo|o|dd }|| j||||||| _d| _|| _|d| _|| _|p6i }dd |	 D | _
|| _|| _|	| _|| _|| _|
| _|| _|pYg }d	d
 |D | _|pei | _|| _|| _|| _|| _d | _|| _d S )NzNodeUpdater: {}: use_internal_ipsFuse_external_head_ipTtypec                 S   s   i | ]\}}|t j|qS  ospath
expanduser).0remotelocalr   r   U/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/autoscaler/_private/updater.py
<dictcomp>x   s    z(NodeUpdater.__init__.<locals>.<dictcomp>c                 S   s   g | ]}t j|qS r   r   )r"   r    r   r   r%   
<listcomp>   s    z(NodeUpdater.__init__.<locals>.<listcomp>)format
log_prefixgetget_command_runner
cmd_runnerdaemonnode_idprovider_typeprovideritemsfile_mountsinitialization_commandssetup_commandsray_start_commandsnode_resourcesnode_labelsruntime_hashfile_mounts_contents_hashcluster_synced_filesrsync_optionsauth_configis_head_nodedocker_configrestart_onlyupdate_timefor_recovery)selfr.   provider_configr0   r<   cluster_namer2   r3   r4   r5   r8   r9   r=   r6   r7   r:   r;   process_runneruse_internal_ipr>   r?   rA   r   r   r%   __init__C   sR   


zNodeUpdater.__init__c              
   C   s  t   }t rt rd}t| z!t| jd| j	  | 
  W d    n1 s.w   Y  W np ty } zd| j| jtti tdtt td t|drmt|dd}tdt|j|j| ntd	tt| t }td
| tdt| td t  t|tj rW Y d }~d S  d }~ww tt!t"| j	i}| j#d ur| j#|t$< | j| j| t%dt! t   | | _&d| _'d S )NzOutput was redirected for an interactive command. Either do not pass `--redirect-command-output` or also pass in `--use-normal-shells`.zApplied config {}zNew status: {}z!!!cmdstderrzNo stderr availablez7Setup command `{}` failed with exit code {}. stderr: {}zException details: {}zFull traceback: {}zError message: {}
New statusr   )(timecmd_output_utildoes_allow_interactiveis_output_redirectedr   abortr   r)   r(   r8   	do_update	Exceptionr0   set_node_tagsr.   r   r   errorr   boldhasattrgetattrrH   
returncodeverbose_errorstrvars	traceback
format_excnewline
isinstanceclickClickExceptionr   r   r9   r   labeled_valuer@   exitcode)rB   update_start_timemsgestderr_outputfull_tracebacktags_to_setr   r   r%   run   sb   









zNodeUpdater.runr      c           	         s"  |\}}g  t jdkrddg d fdd	}t jdd|d	 |fd
 j D ]	\}}||| q*|d	7 }W d    n1 sBw   Y  jrt jdd|d	 |fd
# t dtj jD ]	}|||dd qc|d	7 }W d    d S 1 s|w   Y  d S t jdd|d	 |fd
 d S )Nr   z~/ray_bootstrap_key.pemz~/ray_bootstrap_config.yamlFc                    s  |rt j|std| d S t j|sJ |t j|r2|ds)|d7 }| ds2| d7 } tjd	||  D j
oGj
d dk}|sYjjd	t j| dd || d	d
 |  vrytdt| t| W d    d S W d    d S 1 sw   Y  d S )Nz"sync: {} does not exist. Skipping./zSynced {} to {}container_name zmkdir -p {}hostrun_envT)docker_mount_if_possiblez
{} from {})r   r    existsr   printisdirendswithr   r)   r(   r>   r,   ri   dirnamer   rT   )remote_path
local_pathallow_non_existing_paths	is_dockernolog_pathsrB   sync_cmdr   r%   do_sync   s6   

"z-NodeUpdater.sync_file_mounts.<locals>.do_synczProcessing file mounts[]   	_numberedzProcessing worker file mountszsynced files: {}T)rz   zNo worker file mounts to syncF)r   	verbositygroupr2   r1   r:   rt   rY   )	rB   r~   step_numbersprevious_stepstotal_stepsr   rx   ry   r    r   r|   r%   sync_file_mounts   s4   
%


"	
zNodeUpdater.sync_file_mountsc                 C   s  t jdddtfd t| jd  t dtd d }	 t |kr)t	d	| j
| jr4t	d
z| jjdddd t d W W d    W d    dS  typ } ztj||td}tt W Y d }~nfd }~w t	y } zVdt| d }t|drt|jtr|j}nt|jtrd|j}ntdt|j d t|j}d|j|}t dt |ttt tt W Y d }~nd }~ww q1 sw   Y  W d    d S 1 sw   Y  d S )Nz#Waiting for SSH to become availabler   r   r   zGot remote shellzRunning `{}` as a test.uptimeTzwait_ready timeout exceeded.z8wait_ready aborting because node detected as terminated.
   ro   )timeoutrq   zSuccess.)retry_interval()rH    ze.cmd type (z) not list or str.z(Exit Status {}): {}z3SSH still not available {}, retrying in {} seconds.)!r   r   NUM_SETUP_STEPSr   r)   rt   r   rT   rK   rQ   r0   is_terminatedr.   r,   ri   successr	   rL   handle_ssh_failsREADY_CHECK_INTERVALsleeprY   rU   r^   rH   listjoinloggerdebugr   r(   rW   dimmed)rB   deadlinefirst_conn_refused_timere   	retry_strcmd_r   r   r%   
wait_ready  sh   



"zNodeUpdater.wait_readyc                 C   s  | j | jtti tdt t t }| 	| t
tj | j | j}tdt| | jdkrP| j jrPddlm} || j j| j| j j| j |t| jkrp| jj| j| jdd}|rp|t  d7  < d| _ | j rvg | _!|t| jkr| j"r|t#| j"krtj$d	d
dt%fd ntj$dt&| jdd | j | jtt'i tdt' | j(| j)dt%fd |t| jkr<| j | jtt*i tdt* | j+rTtj,dd
dt%fdi t
tj- t.| j/d ddI | j+D ]=}t
tj-d|i z| jj0|| j1ddd W q t2y2 } z|j3dkr(t4d t4d t56dd d }~ww W d    n	1 s>w   Y  W d    n	1 sNw   Y  n
tj$dd
dt%fd tj,d d
d!t%fd | jj| j| jdd W d    n	1 sw   Y  | j!r2tj,d"d
d#t%fd t
tj7 t.| j/d$ ddv t8| j!}t9| j!D ]c\}}t
tj7d|i tj:dkrt8|d%krt;<|d d% d& }	nt;<|}	tj$d'|	d(||fd z| jj0|d)d* W q t2y } z|j3dkrt4d t4d t56d+d }~ww W d    n	1 sw   Y  W d    n	1 s,w   Y  n
tj$d,d
d#t%fd tj,d-d
d.t%fd t
tj= t.| j/d/ ddx | j>D ]l}i }
| jrrt?@ rmd|
tAjB< nd|
tAjB< | jd0kr| jCr| jC|
tD< | jEr| jE|
tF< ztGH }tGId | jj0||
d)d1 tGI| W qZ t2y } z|j3dkrt4d t4d t56d2d }~ww W d    n	1 sw   Y  t
tjJ W d    d S 1 sw   Y  d S )3NrJ   zNode tags: {}awsr   )CloudwatchHelperF)as_headr2   sync_run_yetz-invalidatezYConfiguration already up to date, skipping file mounts, initalization and setup commands.r   z2-6r   zUpdating cluster configuration.)hash)_tagsr   )r   zRunning initialization commands   zInitialization commandsT)show_statuscommandssh_private_keyro   )ssh_options_override_ssh_keyrq   ssh_command_failedzFailed.zSee above for stderr.zInitialization command failed.z"No initialization commands to run.zInitializing command runnerr   zRunning setup commands   zSetup commands   z...z{}z()autorp   zSetup command failed.zNo setup commands to run.zStarting the Ray runtimer   zRay start commandsr$   )environment_variablesrq   zStart command failed.)Kr0   rR   r.   r   r   r   ra   rK   r   r   r   execute_callbackr   ssh_control_acquired	node_tagsr   r   r(   rY   r/   rC   8ray.autoscaler._private.aws.cloudwatch.cloudwatch_helperr   rD   update_from_configr=   r*   r   r8   r,   run_initr2   r?   r4   r9   r   rt   r   dictr   r   rsync_upr   r3   r   run_initialization_cmdr   r)   ri   r<   r	   msg_typerS   r_   r`   run_setup_cmdlen	enumerater   r   rT   start_ray_runtimer5   r   usage_stats_enabledr   USAGE_STATS_ENABLED_ENV_VARr6   r   r7   r
   rL   rN   set_output_redirectedstart_ray_runtime_completed)rB   r   r   r   init_requiredrH   re   totalicmd_to_printenv_varsold_redirectedr   r   r%   rP   O  sn  









&







#








"$zNodeUpdater.do_updatec                 C   \   i }||d< | j d|d< | j d|d< | jj|||d tdt|t| d S )Nrr   rsync_excludersync_filteroptionsz#`rsync`ed {} (local) to {} (remote))r;   r*   r,   run_rsync_upr   verboser   rT   rB   sourcetargetrr   r   r   r   r%   r   !     zNodeUpdater.rsync_upc                 C   r   )Nrr   r   r   r   z#`rsync`ed {} (remote) to {} (local))r;   r*   r,   run_rsync_downr   r   r   rT   r   r   r   r%   
rsync_down+  r   zNodeUpdater.rsync_down)rj   r   )__name__
__module____qualname____doc__
subprocessrG   ri   r   r   rP   r   r   r   r   r   r%   r   (   s&    (
P
>C; 
S
r   c                   @   s   e Zd Zdd ZdS )NodeUpdaterThreadc                 O   s,   t |  tj| g|R i | d| _d S )N)r   rG   r   rb   )rB   argskwargsr   r   r%   rG   7  s   

zNodeUpdaterThread.__init__N)r   r   r   rG   r   r   r   r%   r   6  s    r   ),loggingr   r   rK   r[   	threadingr   r_   ray._common.usager   r   ray.autoscaler._privater   rL   "ray.autoscaler._private.cli_loggerr   r   &ray.autoscaler._private.command_runnerr   r	   !ray.autoscaler._private.constantsr
   r   $ray.autoscaler._private.event_systemr   r   !ray.autoscaler._private.log_timerr   ray.autoscaler.tagsr   r   r   r   r   r   r   r   	getLoggerr   r   r   r   r   r   r   r   r   r%   <module>   s0    (
    