o
    xioh                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlZddl	Z	ddl
Z
ddlZddlmZmZ ddlmZ ddlmZ ddlmZ ddlmZmZ ddlZddlZdd	lmZ dd
lmZ ddlmZ ddl m!Z! ddl"m#Z#m$Z$ ddl%m&Z&m'Z' ddl(m)Z) erddl*m+  m,Z, ddl-m.Z. ddl*m/Z/m0Z0 e1e2Z3ej4ddd dZ5dZ6G dd deZ7G dd deZ8eG dd dZ9eG dd dZ:G dd  d eZ;dS )!zAbstract Scheduler class.    )annotationsN)ABCabstractmethod)Iterator)	dataclass)Enum)TYPE_CHECKINGAny)	CommError)
launch_add)LaunchError)SchedulerError)create_sweep_command_argsmake_launch_sweep_entrypoint)event_loop_thread_exec%strip_resource_args_and_template_vars)generate_idApi)	QueuedRunRunzsched:cyan)fg g      @c                   @  s,   e Zd ZdZdZdZdZdZdZdZ	dZ
d	S )
SchedulerStater                        N)__name__
__module____qualname__PENDINGSTARTINGRUNNING
FLUSH_RUNS	COMPLETEDFAILEDSTOPPED	CANCELLED r-   r-   U/home/ubuntu/.local/lib/python3.10/site-packages/wandb/sdk/launch/sweeps/scheduler.pyr   1   s    r   c                   @  sT   e Zd ZdZdZdZdZdZdZdZ	dZ
d	ZdddZddddZed ddZdS )!RunState)runningalive)pendingr1   )
preemptingr1   )crasheddead)failedr5   )killedr5   )finishedr5   )	preemptedr5   )unknownr1   clsr	   argslistkwdsreturnc                 O  s   t | }|d |_|S )Nr   )object__new___value_)r;   r<   r>   objr-   r-   r.   rA   I   s   

zRunState.__new__r:   _strlifeNonec                 C  s
   || _ d S N_life)selfrD   rF   r-   r-   r.   __init__N   s   
zRunState.__init__boolc                 C  s
   | j dkS )Nr1   rI   rK   r-   r-   r.   is_aliveQ      
zRunState.is_aliveN)r;   r	   r<   r=   r>   r	   r?   r/   )r:   )rD   rE   rF   rE   r?   rG   r?   rM   )r"   r#   r$   r'   r%   
PREEMPTINGCRASHEDr*   KILLEDFINISHED	PREEMPTEDUNKNOWNrA   rL   propertyrO   r-   r-   r-   r.   r/   <   s    
r/   c                   @  s   e Zd ZU ded< ded< dS )_Workerdict[str, Any]agent_configrE   agent_idN)r"   r#   r$   __annotations__r-   r-   r-   r.   rY   V   s   
 rY   c                   @  sP   e Zd ZU ded< ded< ejZded< dZded	< dZd
ed< dZ	ded< dS )SweepRunrE   idint	worker_idr/   stateNzpublic.QueuedRun | None
queued_runzdict[str, Any] | Noner<   zlist[str] | Nonelogs)
r"   r#   r$   r]   r/   r'   rb   rc   r<   rd   r-   r-   r-   r.   r^   \   s   
 r^   c                   @  s  e Zd ZdZdZdZg dZddddddddwddZedxddZ	edyddZ
edyd d!Zedyd"d#Zedyd$d%Zedzd'd(Zejd{d*d(Zed|d,d-Zed|d.d/Zed}d0d1Zed~d3d4Zed~d5d6Zdd8d9Zdyd:d;Zdd=d>Zdyd?d@ZdydAdBZdydCdDZddGdHZd|dIdJZdydKdLZddNdOZddRdSZ dydTdUZ!ddXdYZ"dydZd[Z#dyd\d]Z$dd_d`Z%ddbdcZ&e'j(fddfdgZ)ddhdiZ*ddkdlZ+ddndoZ,ddsdtZ-ddudvZ.dS )	SchedulerzPA controller/agent that populates a Launch RunQueue from a hyperparameter sweep.zplaceholder-uri-schedulerzsweep-controller)wandb	schedulerWANDB_SWEEP_IDN)polling_sleepsweep_identityprojectproject_queuenum_workersapir   r<   
Any | Noneri   float | Nonerj   
str | Nonerk   rl   rm   rn   int | str | Nonekwargsc             
   O  s  dd l }
ddlm} || _| | _|p tjdp |dp |j	| _
|p.tjdp.|d| _|p3d| _tj| _z?| jj|d| j
| jd	}|d
tjjkrStj| _|
|d | _| |d | _| jdkrwtt d| j d| j  W n ty } ztt d| d| d }~ww |	| _i | _t | _ |d ur|nt!| _"|| _#i | _$| % | _&|p| j&j'di d}t(|) rt*|nd| _+| j&j'di | _,d S )Nr   r   WANDB_ENTITYrk   WANDB_PROJECTrl   zempty-sweep-idz{})rk   rl   rb   configrunszFound z previous valid runs for sweep zException when finding sweep (z) rg   rn      settings)-yamlwandb.apis.publicr   _api_public_apiosenvirongetrz   default_entity_entity_project	_sweep_idr   r%   _statesweepr,   name	safe_load_sweep_config_get_num_runs_launched_num_runs_launchedrf   termlog
LOG_PREFIX	Exceptionr   _kwargs_runs	threadingLock_threading_lockDEFAULT_POLLING_SLEEP_polling_sleep_project_queue_workers_init_wandb_run
_wandb_runrw   rE   isdigitr`   _num_workers_settings_config)rK   ro   ri   rj   rk   rl   rm   rn   r<   rt   r{   	PublicApiresper-   r-   r.   rL   m   sb   





zScheduler.__init__ra   r`   r?   SweepRun | Nonec                 C     dS )zCalled when worker available.Nr-   )rK   ra   r-   r-   r.   _get_next_sweep_run       zScheduler._get_next_sweep_runrG   c                 C  r   )zCalled every polling loop.Nr-   rN   r-   r-   r.   _poll   r   zScheduler._pollc                 C     d S rH   r-   rN   r-   r-   r.   _exit      zScheduler._exitc                 C  r   rH   r-   rN   r-   r-   r.   _load_state   r   zScheduler._load_statec                 C  r   rH   r-   rN   r-   r-   r.   _save_state   r   zScheduler._save_stater   c                 C  s   t t d| jj  | jS )NzScheduler state is )_loggerdebugr   r   r   rN   r-   r-   r.   rb      s   zScheduler.statevaluec                 C  s*   t t d| jj d|j  || _d S )NzScheduler was z is )r   r   r   rb   r   r   )rK   r   r-   r-   r.   rb      s    
rM   c                 C  s   | j tjtjtjtjfvS rH   )rb   r   r)   r*   r+   r,   rN   r-   r-   r.   rO      s   zScheduler.is_alivec                 C  s"   | j d}|s
dS | j|k}|S )z/False if under user-specified cap on # of runs.run_capF)r   r   r   )rK   r   	at_runcapr-   r-   r.   r      s
   
zScheduler.at_runcapc                 C  s
   t | jS rH   )lenr   rN   r-   r-   r.   num_active_runs   rP   zScheduler.num_active_runsdict[int, _Worker]c                 C  s,   i }|   D ]\}}| j|j ||j< q|S )zReturns dict of id:worker already assigned to a launch run.

        runs should always have a worker_id, but are created before
        workers are assigned to the run
        )_yield_runsr   ra   )rK   busy_workersrD   rr-   r-   r.   r      s   zScheduler.busy_workersc                   s*   t  jdkr	i S  fdd j D S )z6Returns dict of id:worker ready to launch another run.r   c                   s    i | ]\}}| j vr||qS r-   )r   ).0_idwrN   r-   r.   
<dictcomp>   s    z/Scheduler.available_workers.<locals>.<dictcomp>)r   r   itemsrN   r-   rN   r.   available_workers   s
   
zScheduler.available_workers	wandb.Runc                 C  s,   t jdd}t jd| j d| j|d}|S )z8Controls resume or init logic for a scheduler wandb run.T)disable_job_creationz
Scheduler.allow)r   resumerw   rz   )rf   Settingsinitr   r   )rK   rz   runr-   r-   r.   r     s   
zScheduler._init_wandb_runc                 C  s   t j| _dS )zStop the sweep.N)r   r+   r   rN   r-   r-   r.   
stop_sweep  s   zScheduler.stop_sweeperrc                 C  s   t j| _|r
t|dS )z%Fail the sweep w/ optional exception.N)r   r*   r   r   )rK   r   r-   r-   r.   
fail_sweep  s   zScheduler.fail_sweepc                 C  s   t t d | js t t d| jj  d |   dS t	j
| _|  s6t t d |   dS |   t|   |   dS )zAStart a scheduler, confirms prerequisites, begins execution loop.zScheduler starting.zSweep already in end state (z). Exiting...Nz1No 'job' or 'image_uri' loaded from sweep config.)rf   r   r   rO   	termerrorrb   r   lowerexitr   r&   r   _try_load_executabler   asyncior   _register_agentsrN   r-   r-   r.   start  s"   zScheduler.startc                 C  s  t t d tj| _z	 |   | jsnt t d |   | 	  | jtj
krA| jdkr:t t d nrt| j q| jD ]`}| jr[t t d| j d tj
| _ nJz| |}|sfW  n?W n0 tyw } zt|d}~w ty } zt t d	|  tj| _W Y d}~ nd}~ww | |r|  jd
7  _qDt| j qW n8 ty   t t d tj| _|   Y dS  ty } zt t d|  tj| _|    d}~ww | jtj
kr| jrtj| _|   dS )zMain run function.zScheduler runningTzPolling for new runs to launchr   zDone polling on runs, exitingzSweep at run_cap ()NzFailed to get next sweep run: r   z-Scheduler received KeyboardInterrupt. Exitingz Scheduler failed with exception )rf   r   r   r   r'   rb   _update_scheduler_run_staterO   _update_run_statesr   r(   r   timesleepr   r   r   r   r   r   r   r   r*   _add_to_launch_queueKeyboardInterrupttermwarnr+   r   r)   )rK   ra   r   r   r-   r-   r.   r   /  sx   



)zScheduler.runc              	   C  s   |    z|   W n ty   tt dt   Y nw d}| jt	j
kr0| d d}n2| jt	jkr>| d d}n$| jt	jt	jfv rS| d d}|   nt	j| _| d	 d
}|   tt d|  | j  d S )NzFailed to save state:  PAUSEDpausedrU   	completedCANCELED	cancelledrS   r4   z
Scheduler )r   r   r   rf   r   r   	traceback
format_excrb   r   r(   _set_sweep_stater)   r,   r+   
_stop_runsr*   r   r   finish)rK   statusr-   r-   r.   r   m  s2   




zScheduler.exitrx   list[dict[str, Any]]c              	   C  sZ   d}|D ]&}| dddv r&| ds&td|d  d|d  d	|  q|d
7 }q|S )z.Returns the number of valid runs in the sweep.r   rb   r   )r7   r4   summaryMetricszexcluding run: r   z with state: z from run cap 
r   )r   r   r   )rK   rx   countr   r-   r-   r.   r     s   
z Scheduler._get_num_runs_launchedc                 C  sx   | j dr4z| j| j d }tt d|j d W dS  ty3   t	t t
   Y dS w t| j dS )zoCheck existence of valid executable for a run.

        logs and returns False when job is unreachable
        jobzSuccessfully loaded job (z) in schedulerFT	image_uri)r   r   r~   r   rf   r   r   r   r   r   r   r   rM   )rK   _job_artifactr-   r-   r.   r     s   zScheduler._try_load_executablec           	        s   g }t | jj}t| jD ]H}tt d| d z|t	  d| | j
| j| jd}|| W q tyV } ztd|  | d|  W Y d }~qd }~ww tj| I d H }t|D ]\}}t||d d| j|< qcd S )Nz Starting AgentHeartbeat worker (r   -)rj   project_namerk   zfailed to register agent: r_   )r[   r\   )r   r}   register_agentranger   r   r   r   socketgethostnamer   r   r   appendr   r   r   gather	enumeraterY   r   )	rK   tasksr   ra   workerr   finished_tasksidxr[   r-   r-   r.   r     s2   zScheduler._register_agentsIterator[tuple[str, SweepRun]]c                 c  s>    | j  | j E dH  W d   dS 1 sw   Y  dS )z)Thread-safe way to iterate over the runs.N)r   r   r   rN   r-   r-   r.   r     s   "zScheduler._yield_runsruns_to_remove	list[str]c                 C  sT   | j  |D ]}tt d| d | j|= qW d   dS 1 s#w   Y  dS )zHelper for removing runs from memory.

        Can be overloaded to prevent deletion of runs, which is useful
        for debugging or when polling on completed runs.
        zCleaning up finished run (r   N)r   rf   r   r   r   )rK   r   run_idr-   r-   r.   _cleanup_runs  s   
"zScheduler._cleanup_runsc                 C  sd   g }|   D ]	\}}||g7 }q|D ]}tt d| d | |s/tt d| d qd S )NzStopping run (r   zFailed to stop run ()r   rf   r   r   	_stop_runr   )rK   	to_deleter   rD   r-   r-   r.   r     s   
zScheduler._stop_runsr   rE   c              
   C  s   || j vrtd| d| j   dS | j | }| j |= |js+td|j d dS |jjs1dS td| d| j	 d| j
  d	}z| jj|d
}|r^tt d| d W dS W dS  ty~ } ztd| d|  W Y d}~dS d}~ww )z.Stops a run and removes it from the scheduler.zrun: z not in _runs: Fz2tried to _stop_run but run not queued yet (run_id:r   TzRun:v1::utf-8)r   zStopped run .zerror stopping run (): N)r   r   r   rc   r_   rb   rO   base64standard_b64encoder   r   encodedecoder}   stop_runrf   r   r   r   )rK   r   r   encoded_run_idsuccessr   r-   r-   r.   r     s:   

 zScheduler._stop_runc              
   C  s   |  | jj}|tjkrtj| _n|tjtj	fv rtj| _n	|tj
kr'tj| _z| j| j| j| j}W n tyO } ztd|  W Y d}~dS d}~ww |dkrZtj| _dS |dv rdtj| _dS |dkrntj| _dS dS )zGUpdate the scheduler state from state of scheduler run and sweep state.zsweep state error: NrU   )r,   r+   r   )_get_run_stater   r_   r/   rT   r   r+   rb   r*   rS   rU   r)   r}   get_sweep_stater   r   r   r   r   r   r,   r(   )rK   rb   sweep_stater   r-   r-   r.   r     s.   



z%Scheduler._update_scheduler_run_statec                 C  s   g }|   D ]S\}}| ||j|_z|jr|jjnd}W n ttfy; } ztd|  d}W Y d}~nd}~ww |jjrD|dkrYtd| d|j d| d |	| q| 
| dS )ztIterate through runs.

        Get state from backend and deletes runs if not in running state. Threadsafe.
        Nz Failed to get queued_run.state: r6   (z) states: (z, r   )r   r  rb   rc   r
   r   r   r   rO   r   r   )rK   r   r   r   	rqi_stater   r-   r-   r.   r     s    
zScheduler._update_run_states	list[Any]c              
     s   z6| j | j}|sg W S | j|j d|j d| }| jd d  |jd gd} fdd|D }|W S  tyQ } zt	
d|  W Y d	}~g S d	}~ww )
zUse the public api to get metrics from a run.

        Uses the metric name found in the sweep config, any
        misspellings will result in an empty list.
        /metricr   _step)keysc                   s   g | ]}|  qS r-   r-   r   xmetric_namer-   r.   
<listcomp>A  s    z3Scheduler._get_metrics_from_run.<locals>.<listcomp>z[_get_metrics_from_run] N)r   rc   r~   r   rk   rl   r   scan_historyr   r   r   )rK   r   rc   api_runhistorymetricsr   r-   r  r.   _get_metrics_from_run1  s    zScheduler._get_metrics_from_runrZ   c              
   C  s\   z| j | j| j|}|r|W S W i S  ty- } ztd|  W Y d}~i S d}~ww )z+Use the public api to get info about a run.z[_get_run_info] N)r}   get_run_infor   r   r   r   r   )rK   r   infor   r-   r-   r.   _get_run_infoH  s   
zScheduler._get_run_infoprev_run_stater/   c                 C  s   d}z| j | j| j|}t|}W |S  tyQ } z2td| d|  |tjkr<t	
d| dt   tj}n
tj}W Y d}~|S W Y d}~|S d}~w ttfyo   t	
d| d| dt   tj}Y |S w )z)Use the public api to get state of a run.Nzerror getting state for run (r   z Failed to get runstate for run (z
). Error: zBad state (z) for run ()r}   get_run_stater   r   r/   r
   r   r   rW   rf   r   r   r   r*   AttributeError
ValueError)rK   r   r  	run_staterb   r   r-   r-   r.   r  T  s0   


zScheduler._get_run_statec              
   C  s^   z| j j| j| j| jd}|r|d W S W i S  ty. } ztd|  tdd}~ww )z)Use the public api to create a blank run.)rl   rk   
sweep_namer   z[_create_run] zHError creating run from scheduler, check API connection and CLI version.N)	r}   
upsert_runr   r   r   r   r   r   r   )rK   r   r   r-   r-   r.   _create_runm  s"   
zScheduler._create_runrb   c              
   C  sh   t t d|   z| jj| j|d W d S  ty3 } zt	d|  W Y d }~d S d }~ww )NzUpdating sweep state to: )r   rb   z[set_sweep_state] )
rf   r   r   r   r}   set_sweep_stater   r   r   r   )rK   rb   r   r-   r-   r.   r   ~  s   zScheduler._set_sweep_stater   c                 C  s$   t t|dddd S )Nr   r   r   )r   	b64decodebytesr  r  split)rK   r   r-   r-   r.   _encode  s   "zScheduler._encoder   r^   2tuple[list[str] | None, dict[str, dict[str, Any]]]c                 C  s   t d|ji}t|| jd\}}|r1d|v r1| jds%tt d|d}| jd ||< t	| j
jdi }d|vrFdi i|d< d|d vrRi |d d< |d d |d	  |re||d d< |r{d
d |D }|r{tt d| d ||fS )Nr<   commandz
${program}programzHProgram macro in command has no corresponding 'program' in sweep config.launch	overrides
run_config	args_dictc                 S  s   g | ]}t |d r|qS )z${)rE   
startswithr  r-   r-   r.   r    s    z;Scheduler._make_entry_and_launch_config.<locals>.<listcomp>z*Sweep command contains unresolved macros: z', see launch docs for supported macros.)r   r<   r   r   r   r   r   indexcopydeepcopyr   rw   updaterf   r   )rK   r   r<   entry_point
macro_argspidxlaunch_config
unresolvedr-   r-   r.   _make_entry_and_launch_config  s8   
z'Scheduler._make_entry_and_launch_configc                 C  sz  | j dp| jd}| jd}| j dp|}|du r-|du r-tt d|j d|dur<|dur<tt d| |\}}|rWtt d| d|rQdnd d	 t	
| jjd
pbi }t|dd}t| |jptt }	t|	||||| j| j| j d| j|d|d|d| j d| j|d}
|
|_tj|_|| j|	< tt d|	 d| j d d dS )z;Convert a sweeprun into a launch job then push to runqueue.r   r   NzNo 'job' nor 'image_uri' (r   z$Sweep has both 'job' and 'image_uri'zSweep command z will override z entrypointr-  priorityr   queueresourceresource_argstemplate_variablesauthor)r   r6  rw   docker_imager   rl   rk   
queue_namerm   r>  r?  r@  rA  rj   r<  zAdded run (z) to queue (T)r   r   r   r   r   r_   r;  rf   r   r3  r4  r   rw   r`   r   r   r   r   r   r   r   rc   r/   r'   rb   r   r   )rK   r   _job_sweep_config_uri
_image_urir6  r9  _job_launch_config	_priorityr   rc   r-   r-   r.   r     sT   



zScheduler._add_to_launch_queue)ro   r   r<   rp   ri   rq   rj   rr   rk   rr   rl   rr   rm   rr   rn   rs   rt   rp   )ra   r`   r?   r   )r?   rG   )r?   r   )r   r   r?   rG   rQ   )r?   r`   )r?   r   )r?   r   )r   rr   r?   rG   )rx   r   r?   r`   )r?   r   )r   r   r?   rG   )r   rE   r?   rM   )r   rE   r?   r  )r   rE   r?   rZ   )r   rE   r  r/   r?   r/   )r?   rZ   )rb   rE   r?   rG   )r   rE   r?   rE   )r   r^   r?   r*  )r   r^   r?   rM   )/r"   r#   r$   __doc__PLACEHOLDER_URISWEEP_JOB_TYPE
ENTRYPOINTrL   r   r   r   r   r   r   rX   rb   setterrO   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r  r/   rW   r  r$  r   r)  r;  r   r-   r-   r-   r.   re   f   sx    M





>








"






#re   )<rI  
__future__r   r   r   r3  loggingr   r   r   r   r   abcr   r   collections.abcr   dataclassesr   enumr   typingr   r	   clickrf   wandb.errorsr
   wandb.sdk.launch._launch_addr   wandb.sdk.launch.errorsr   wandb.sdk.launch.sweepsr   wandb.sdk.launch.sweeps.utilsr   r   wandb.sdk.launch.utilsr   r   wandb.sdk.lib.runidr   r|   apispublicwandb.apis.internalr   r   r   	getLoggerr"   r   styler   r   r   r/   rY   r^   re   r-   r-   r-   r.   <module>   sN    
	