o
    xiv                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlmZ ddlmZ ddlmZ ddlZddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZ ddlmZ ddlmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z( ddl)m*Z*m+Z+ ddl,m-Z-m.Z.m/Z/m0Z0 ddl1m2Z2 ddl3m4Z4 dZ5dZ6dZ7dZ8dZ9dZ:dZ;dZ<dZ=d Z>d!Z?ej@Ad"ZBeBrzeCeBZDW n eEy   e+d#eB w d$ZDeFeGZHeG d%d& d&ZId>d*d+ZJd?d@d2d3ZKG d4d5 d5ZL		dAdBd:d;ZMG d<d= d=ZNdS )CzImplementation of launch agent.    )annotationsN)	dataclass)Event)Any)
get_sentry)Api)	CommError)
launch_add)LocalSubmittedRun)LocalProcessRunner)	Scheduler)LAUNCH_CONFIG_FILE!resolve_build_and_registry_config)runid   )loader)LaunchProject)LaunchDockerErrorLaunchError)LAUNCH_DEFAULT_PROJECT
LOG_PREFIXPROJECT_SYNCHRONOUSevent_loop_thread_exec   )JobAndRunStatusTracker)RunQueueItemFileSaver
           POLLINGRUNNINGKILLEDzsweep-controller   <   i,     WANDB_LAUNCH_START_TIMEOUTz.Invalid value for WANDB_LAUNCH_START_TIMEOUT: i  c                   @  s   e Zd ZU ded< ded< dS )JobSpecAndQueuedict[str, Any]jobstrqueueN)__name__
__module____qualname____annotations__ r.   r.   P/home/ubuntu/.local/lib/python3.10/site-packages/wandb/sdk/launch/agent/agent.pyr%   H   s   
 r%   accessr(   returnc                 C  s$   |   } | dks| dksJ d| S )z3Convert access string to a value accepted by wandb.PROJECTUSERz+Queue access must be either project or user)upper)r0   r.   r.   r/   _convert_accessN   s
   r5   configr&   keydefaultintint | floatc                 C  s   z|  |}|du r|}t|}W n ty/ } ztd| d|  | dt| d}~ww |dkr8tdS |dk rJtd| d|  | d|S )	zGet an integer from the config, or float.inf if -1.

    Utility for parsing integers from the agent config with a default, infinity
    handling, and integer parsing. Raises more informative error if parse error.
    Nz-Error when parsing LaunchAgent config key: ['z': z
]. Error: infr   z]. Error: negative value.)getr9   
ValueErrorr   r(   float)r6   r7   r8   valmax_from_configer.   r.   r/   _max_from_configW   s0   
rC   c                   @  s>   e Zd ZdddZdddZddd	Zdd
dZdddZdS )InternalAgentLoggerr   c                 C  s   |dk| _ d S )Nr   )_print_to_terminal)self	verbosityr.   r.   r/   __init__s   s   zInternalAgentLogger.__init__messager(   c                 C  .   | j rtt |  tt |  d S N)rE   wandb	termerrorr   _loggererrorrF   rI   r.   r.   r/   rO   v      zInternalAgentLogger.errorc                 C  rJ   rK   )rE   rL   termwarnr   rN   warningrP   r.   r.   r/   warn{   rQ   zInternalAgentLogger.warnc                 C  rJ   rK   )rE   rL   termlogr   rN   inforP   r.   r.   r/   rV      rQ   zInternalAgentLogger.infoc                 C  rJ   rK   )rE   rL   rU   r   rN   debugrP   r.   r.   r/   rW      rQ   zInternalAgentLogger.debugN)r   )rI   r(   )r*   r+   r,   rH   rO   rT   rV   rW   r.   r.   r.   r/   rD   r   s    



rD   launch_configdict | Nonebuild_config<tuple[dict[str, Any] | None, dict[str, Any], dict[str, Any]]c                 C  s   dd l }d }d }| d ur| d}| d}d }tjtjtrFttjt}||p1i }W d    n1 s<w   Y  |d}t	|||\}}|||fS )Nr   builderregistryenvironment)
yamlr=   ospathexists
expanduserr   open	safe_loadr   )rX   rZ   r_   registry_configenvironment_configdefault_launch_configfr.   r.   r/   construct_agent_configs   s"   



rj   c                      s(  e Zd ZdZdZdX fddZedYd
dZedZddZd[ddZ	d\ddZ
	d]d^ddZd_d d!Zed`d#d$Zedad&d'Zedad(d)Zdbd+d,Zd_d-d.Zdcd0d1Zddd6d7Z	d]ded;d<Zdfd@dAZdgdCdDZd_dEdFZdhdJdKZdidLdMZdjdNdOZdkdPdQZdldSdTZdmdVdWZ  ZS )nLaunchAgentzWLaunch agent class which polls run given run queues and launches runs for wandb launch.Nargsr   kwargsr1   c                   s   | j du rt | | _ | j S )zCreate a new instance of the LaunchAgent.

        This method ensures that only one instance of the LaunchAgent is created.
        This is done so that information about the agent can be accessed from
        elsewhere in the library.
        N)	_instancesuper__new__)clsrl   rm   	__class__r.   r/   rp      s   
zLaunchAgent.__new__r(   c                 C  s6   | j du r	td| j j}t|tr|S td| )zReturn the name of the agent.Nz$LaunchAgent has not been initializedzFound invalid name for agent )rn   r   _name
isinstancer(   )rq   namer.   r.   r/   rv      s   

zLaunchAgent.nameboolc                 C  s
   | j duS )z(Return whether the agent is initialized.N)rn   )rq   r.   r.   r/   initialized   s   
zLaunchAgent.initializedapir   r6   r&   c                 C  s  |d | _ t| _|| _| j d| _d| _i | _t	
 | _t | _| j  t | _t | _td| _t|d| _t|d| _|dd| _|d	d| _t| jd
| _d| _|| _|dt | _!g | _"dt#j$ | _%tj&d}|r}|dkr}|| _%| j' du| _(| j) | _*|ddg| _+|, }d|v r|d= | j-| j | j| j+|| j%| j(}|d | _.| j/| j rt#0t1 d| j  d | j2| j.| j(}|d | _3| 4  dS )zInitialize a launch agent.

        Arguments:
            api: Api object to use for making requests to the backend.
            config: Config dictionary for the agent.
        entitybase_urlr   projectmax_jobsmax_schedulerssecure_modeFrG   )rG   r   stopped_run_timeoutzwandb@WANDB_AGENT_VERSIONzwandb-launch-agentNqueuesr8   launchAgentIdz!Agent is running on team entity (z@). Members of this team will be able to run code on this device.rv   )5_entityr   _project_apisettingsr=   	_base_url_ticks_jobs	threadingLock
_jobs_lockr   _jobs_eventsetr`   getcwd_cwdr   generate_id
_namespacer5   _accessrC   	_max_jobs_max_schedulers_secure_mode
_verbosityrD   _internal_logger_last_status_print_timedefault_configDEFAULT_STOPPED_RUN_TIMEOUT_stopped_run_timeout_known_warningsrL   __version__versionenvironlaunch_agent_introspectiongorilla_supports_agents!fail_run_queue_item_introspection&_gorilla_supports_fail_run_queue_items_queuescopycreate_launch_agent_identity_is_teamrR   r   get_launch_agentrt   _init_agent_run)rF   ry   r6   env_agent_versionsent_configcreate_responseagent_responser.   r.   r/   rH      sj   







zLaunchAgent.__init__run_specc                 C  s   |s| j d |dtjkrdS |ddkr>|dr dS |di d	g }t|d
k r2dS |dd ddgkr>dS dS )z5Determine whether a job/runSpec is a sweep scheduler.z4Received runSpec in _is_scheduler_job that was emptyuriFresourcezlocal-processr'   T	overridesentry_point   Nr   rL   	scheduler)r   rW   r=   r   PLACEHOLDER_URIlen)rF   r   cmdr.   r.   r/   _is_scheduler_job  s   
zLaunchAgent._is_scheduler_jobrun_queue_item_idrI   phasefileslist[str] | NoneNonec                   s0   | j rt| jj}|||||I d H  d S d S rK   )r   r   r   fail_run_queue_item)rF   r   rI   r   r   fail_rqir.   r.   r/   r   )  s
   zLaunchAgent.fail_run_queue_itemc                 C  s@   d | _ | jrtjdddd}tj| j| j|| jtd| _ d S d S )NT)silentdisable_gitdisable_job_creation)r|   rz   r   idjob_type)	
_wandb_runr   rL   Settingsinitr   r   rt   HIDDEN_AGENT_RUN_TYPE)rF   r   r.   r.   r/   r   4  s   zLaunchAgent._init_agent_run	list[int]c                 C  s8   | j  t| j W  d   S 1 sw   Y  dS )z8Returns a list of keys running thread ids for the agent.N)r   listr   keysrF   r.   r.   r/   
thread_idsD  s   $zLaunchAgent.thread_idsr9   c                   B    j  t fdd jD W  d   S 1 sw   Y  dS )z%Return just the number of schedulers.c                   s   g | ]
} j | jr|qS r.   r   is_scheduler.0xr   r.   r/   
<listcomp>N      z6LaunchAgent.num_running_schedulers.<locals>.<listcomp>Nr   r   r   r   r.   r   r/   num_running_schedulersJ     $z"LaunchAgent.num_running_schedulersc                   r   )z3Return the number of jobs not including schedulers.c                   s   g | ]
} j | js|qS r.   r   r   r   r.   r/   r   T  r   z0LaunchAgent.num_running_jobs.<locals>.<listcomp>Nr   r   r.   r   r/   num_running_jobsP  r   zLaunchAgent.num_running_jobsr)   c              
     sb   zt | jj}||| j| j| jdI dH }|W S  ty0 } ztd| W Y d}~dS d}~ww )zPops an item off the runqueue to run as a job.

        Arguments:
            queue: Queue to pop from.

        Returns:
            Item popped off the queue.

        Raises:
            Exception: if there is an error popping from the queue.
        )rz   r|   agent_idNz
Exception:)r   r   pop_from_run_queuer   r   r   	Exceptionprint)rF   r)   popupsrB   r.   r.   r/   pop_from_queueV  s   
zLaunchAgent.pop_from_queuec                 C  s   t   | _d}| jr|| j d7 }| j| jk r$|dd| j d7 }|d| j d| j d7 }tt	 |  | jd	krO|d
ddd | j
D  7 }t| dS )z'Prints the current status of the agent.agent  zpolling on queues ,z, zrunning z out of a maximum of z jobsr   z: c                 s  s    | ]}t |V  qd S rK   )r(   )r   job_idr.   r.   r/   	<genexpr>}  s    z+LaunchAgent.print_status.<locals>.<genexpr>N)timer   rt   r   r   joinr   rL   rU   r   r   rN   rV   )rF   
output_strr.   r.   r/   print_statuso  s   

 zLaunchAgent.print_statusstatusc                   sH   t | jj}|| j|| jI dH }|d s"tt d|  dS dS )zoUpdate the status of the agent.

        Arguments:
            status: Status to update the agent to.
        Nsuccessz!Failed to update agent status to )r   r   update_launch_agent_statusr   r   rL   rM   r   )rF   r   _update_status
update_retr.   r.   r/   update_status  s   

zLaunchAgent.update_statusrz   r|   run_idrqi_idc                 C  sd   z| j |||}| dkrW dS W dS  ty1   | jd| d| d| d| d	 Y dS w )zfChecks the stateof the run to ensure it has been inited. Note this will not behave well with resuming.pendingTRun /z with rqi id: z did not have associated runF)r   get_run_statelowerr   r   rV   )rF   rz   r|   r   r   	run_stater.   r.   r/   _check_run_exists_and_inited  s   
z(LaunchAgent._check_run_exists_and_inited	thread_id	exception$Exception | LaunchDockerError | Nonec           
        s(  | j  | j| }W d   n1 sw   Y  |jdur+|j| jkr+| jd n|durUtjt|||j	d}|j
d|dd}| |jt||j|I dH  n|jdu s_|jdu rz| jd|j  td | |jd	d
I dH  no|jdurd}d}d}	 | | j|j|j|j}|s|tkrn|s|dkr|j I dH }t|I dH  |d9 }q|sd}|jdkrd}	nd}	|r|j
|dd}| |j|	d|I dH  n| jd| d t d | j  | j|= W d   n1 sw   Y  t| jdkr|  t!I dH  dS dS )z&Removes the job from our list for now.NzWSkipping check for completed run status because run is on a different entity than agent)valuetb 	error.logrO   zZcalled finish_thread_id on thread whose tracker has no project or run id. RunQueueItemID: z;Missing project or run id on thread called finish thread idz=submitted job was finished without assigned project or run idagentFr   Tr   finishedzCThe submitted job exited successfully but failed to call wandb.initz.The submitted run was not successfully startedrunzFinish thread id z had no exception and no runzGlaunch agent called finish thread id on thread without run or exceptionr   )"r   r   rz   r   r   rV   	tracebackformat_exceptiontype__traceback__saversave_contentsr   r   r   r(   	err_stager|   r   rL   rM   r  r   RUN_INFO_GRACE_PERIODget_logsasynciosleepcompleted_statusr   r   r   r   r   AGENT_POLLING)
rF   r   r   job_and_run_statustb_strfnamescalled_initlogsinterval_msgr.   r.   r/   finish_thread_id  s   







zLaunchAgent.finish_thread_idr'   
file_saverr   c              	     s   t |}d|v rd|d v rd|d d< t dt| d}t| t| | 	t
I dH  | jd |d }| | t|d ||}t| ||| j| j| dS )	zYSet up project and run the job.

        Arguments:
            job: Job to run.
        runSpec_wandb_api_keyz
<redacted>zLaunch agent received job:

NzParsing launch specrunQueueItemId)r   deepcopyr   pprintpformatrL   rU   rN   rV   r   AGENT_RUNNINGr   _assert_securer   r  create_tasktask_run_jobr   r   )rF   r'   r)   r  job_copyr  launch_specjob_trackerr.   r.   r/   run_job  s*   



zLaunchAgent.run_jobr(  c                 C  s   | j sdS |di di }g d}|di di di }|D ]}||v r1td| dq#|d	g }|D ]
}d
|v rDtdq:|di drRtddS )zCIf secure mode is set, make sure no vulnerable keys are overridden.Nresource_args
kubernetes)hostPIDhostIPChostNetworkinitContainersspectemplatez"This agent is configured to lock "z@" in pod spec but the job specification attempts to override it.
containerscommandzoThis agent is configured to lock "command" in container spec but the job specification attempts to override it.r   r   zmThis agent is configured to lock the "entrypoint" override but the job specification attempts to override it.)r   r=   r>   )rF   r(  
k8s_configpod_secure_keyspod_specr7   container_specscontainer_specr.   r.   r/   r$  &  s.   
zLaunchAgent._assert_securec           	   
     sL  |    | jdkrt}nt}zz	 d}|  jd7  _| j| j| j}|d r+t	| j
| jk r|  I dH }|dur|j}|j}z1t| j|d }| |di rh| j| jkrhtt d| j d	 W q| |||I dH  W n> ty } z2tt d
t   t | |jt ddd}| j |d t!|d|dI dH  W Y d}~nd}~ww | jd dkrt"| j#dkr| $t%I dH  n| $t&I dH  t'' | j( |kr|    | j
| jks|du rt)*t+I dH  nt)*t,I dH  q t	y   | $t-I dH  t.t d |    Y nw W | j/0  dS | j/0  w )zLoop infinitely to poll for jobs and run them.

        Raises:
            KeyboardInterrupt: if the agent is requested to stop.
        r   TNr   stopPollingr  r  z>Agent already running the maximum number of sweep schedulers: z@. To set this value use `max_schedulers` key in the agent configError running job: r  rO   )contentsfnamefile_sub_typer  )r   rI   r   r   r   zShutting down, active jobs:)1r   r   DEFAULT_PRINT_INTERVALVERBOSE_PRINT_INTERVALr   r   r   r   r   KeyboardInterruptr   r   get_job_and_queuer'   r)   r   r   r   r=   r   r   rL   rR   r   r*  r   rM   r  
format_excr   r   r  r   r(   r   r   r   r  r#  r   r   r  r  AGENT_POLLING_INTERVALRECEIVED_JOB_POLLING_INTERVALAGENT_KILLEDrU   r   clear)	rF   print_intervalr'   r   job_and_queuer)   r  rB   r   r.   r.   r/   loopC  s   

BzLaunchAgent.loopr   r)  r   c           	   
     s  |d }|s	J d }zz'| j  || j|< W d    n1 s w   Y  | ||||||I d H  W nn tyY } ztt d| j d |}t 	| W Y d }~nYd }~w t
y| } ztt d|  |}t 	| W Y d }~nBd }~w ty } ztt dt   |}t 	| W Y d }~n)d }~ww W | ||I d H  d S W | ||I d H  d S W | ||I d H  d S W | ||I d H  d S | ||I d H  w )Nr  r   zJ encountered an issue while starting Docker, see above output for details.r;  )r   r   _task_run_jobr   rL   rM   r   rt   r   r   r   r   r  rC  r  )	rF   r(  r'   r   ry   r)  r   r   rB   r.   r.   r/   r&    sJ   
*zLaunchAgent.task_run_jobc                   s^  t ||}| |||j t|j}||d |jI d H  | ||I d H  || | j	
d |  | j	
d |dpCd}	tdi}
| j	
d |d}t||\}}}|jp`|j}| }t|d	i }t||}t|||}t|	||
||}|js|jst|ts|d usJ ||||I d H }| j	
d
 t|tr|||I d H }n|sJ |||I d H }| |r| j d| j| _W d    n1 sw   Y  t t! d| j" d| j# d |s| j d|_$W d    d S 1 sw   Y  d S | j ||_W d    n	1 sw   Y  t%% }d }| j&' r|( I d H j)}|dkrYt*dkrYt%% | t*krY|+ I d H  t,dt* d| -||I d H red S |.| j/I d H r|d u ryt%% }nt%% | | j0kr|+ I d H  t12t3I d H  | j&' s/t|t4r|j5d ur|j56  d S d S d S )Nr  z"Fetching and validating project...zFetching resource...r   zlocal-containerFzLoading backendr\   r^   zBackend loaded...Tz"Preparing to run sweep scheduler (r   )startingr   zRun failed to start within za seconds. If you want to increase this timeout, set WANDB_LAUNCH_START_TIMEOUT to a larger value.)7r   	from_spec_set_queue_and_rqi_in_projectr)   r   ack_run_queue_itemr   check_sweep_stateupdate_run_infor   rV   fetch_and_validate_projectr=   r   rj   docker_imagejob_base_imageget_job_entry_pointr   environment_from_configregistry_from_configbuilder_from_configrunner_from_configru   r   build_imager  r   r   r   r   rL   rU   r   r   r   failed_to_startr   r   is_set
get_statusstateRUN_START_TIMEOUTcancelr   _check_run_finishedcheck_wandb_run_stoppedr   r   r  r  rD  r
   _command_prockill)rF   r(  r'   r   ry   r   r)  r|   ackr   backend_configoverride_build_config_rZ   rf   	image_uri
entrypointr^   r]   r\   backendr  
start_timestopped_timer_  r.   r.   r/   rK    s   	













zLaunchAgent._task_run_jobc              
     s   | drPzt|j}||d |d |d dI dH }W n ty: } z| jd|  d}W Y d}~nd}~ww |dkrR|dkrTtd	|d  d
| ddS dS dS )z@Check the state of a sweep before launching a run for the sweep.sweep_idrz   r|   )sweeprz   r|   NzFetch sweep state error: r   PAUSEDz-Launch agent picked up sweep job, but sweep (z) was in a terminal state (rL  )r=   r   get_sweep_stater   r   rW   r   )rF   r(  ry   rr  r_  rB   r.   r.   r/   rQ    s(   

zLaunchAgent.check_sweep_statec                   sZ  |j rdS |jd u rt|jS d}z|j}| I d H }|j}|jD ]+}|| jvrO| j| | j	
|j|dg }|sOtd| d|j  | j| q$|dkr|j| jkr| }	|j|	d< |	dd	d
 |	d< | j ||_ W d    n1 s~w   Y  |	d tkrtt d|j dt d W dS tt d|j d d|	v r|	d= t|	| j|jd W dS |dv r|jrtt d|j  |dkrz| j	j|j|j|jdd W n t y }
 zt!d|
 d }
~
ww ntt d|j  | j ||_ W d    W dS 1 sw   Y  W dS W dS  t!yb }
 z7t"t d|j dt#|
  d}| j d|_W d    n1 sKw   Y  W Y d }
~
|S W Y d }
~
|S d }
~
w t y }
 z>t"t d|j  t"t$%  t&d t&d t&d|j  t&t$%  t&d t' (|
 W Y d }
~
|S d }
~
ww )NTF
KuberneteszError adding warning z to run queue item 	preemptedr   _resume_countr   r   r   z has already resumed z times.z was preempted, requeuing...ro  )r6   project_queue
queue_name)stoppedfailedr  rt  zScheduler finished with ID: ry  CANCELED)rp  rz   r|   r_  zFailed to update sweep state: zJob finished with ID: zTerminating job z because it failed to start: zError getting status for job z---z&Caught exception while getting status.zJob ID: ))r  r  rw   r\  r^  r_  messagesr   appendr   update_run_queue_item_warningr   rN   rS   removerz   r   r   r   r=   r   MAX_RESUME_COUNTrL   rU   r   r	   r   r)   r   r   set_sweep_stater|   r   r   rM   r(   r  rC  rV   r   r   )rF   r)  r(  known_errorr  r   r_  rS   r   r6   rB   r.   r.   r/   rb  2  s   







(



zLaunchAgent._check_run_finishedJobSpecAndQueue | Nonec                   sP   | j D ]!}| |I d H }|d ur%| j | | j | t||  S qd S rK   )r   r   r~  r|  r%   )rF   r)   r'   r.   r.   r/   rB    s   
zLaunchAgent.get_job_and_queuer   c                 C  s   ||_ | j|_|d |_d S )Nr  )rw  r   queue_entityr   )rF   r|   r'   r)   r.   r.   r/   rO    s   z)LaunchAgent._set_queue_and_rqi_in_project)rl   r   rm   r   r1   rk   )r1   r(   )r1   rw   )ry   r   r6   r&   )r   r&   r1   rw   rK   )
r   r(   rI   r(   r   r(   r   r   r1   r   )r1   r   )r1   r   )r1   r9   )r)   r(   r1   r   )r   r(   r1   r   )
rz   r(   r|   r(   r   r(   r   r(   r1   rw   )r   r9   r   r   r1   r   )r'   r&   r)   r(   r  r   r1   r   )r(  r&   r1   r   )r(  r&   r'   r&   r   r&   ry   r   r)  r   r1   r   )r(  r&   r'   r&   r   r&   ry   r   r   r9   r)  r   r1   r   )r(  r&   ry   r   r1   r   )r)  r   r(  r&   r1   rw   )r1   r  )r|   r   r'   r&   r)   r(   r1   r   ) r*   r+   r,   __doc__rn   rp   classmethodrv   rx   rH   r   r   r   propertyr   r   r   r   r   r   r   r  r*  r$  rJ  r&  rK  rQ  rb  rB  rO  __classcell__r.   r.   rr   r/   rk      sD    	

I 





`
$

V
"
d

e	rk   )r0   r(   r1   r(   )r   )r6   r&   r7   r(   r8   r9   r1   r:   )NN)rX   rY   rZ   rY   r1   r[   )Or  
__future__r   r  r   loggingr`   r!  r   r   r  dataclassesr   multiprocessingr   typingr   rL   wandb.analyticsr   wandb.apis.internalr   wandb.errorsr   wandb.sdk.launch._launch_addr	   'wandb.sdk.launch.runner.local_containerr
   %wandb.sdk.launch.runner.local_processr   !wandb.sdk.launch.sweeps.schedulerr   wandb.sdk.launch.utilsr   r   wandb.sdk.libr   r  r   _project_specr   errorsr   r   utilsr   r   r   r   job_status_trackerr   run_queue_item_file_saverr   rD  rE  r  r#  rF  r   r  r  r   r?  r@  r   r=   _env_timeoutr?   r`  r>   	getLoggerr*   rN   r%   r5   rC   rD   rj   rk   r.   r.   r.   r/   <module>   sx    

	