o
    -wii                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlZddlZddl	Z	ddl
mZmZ ddlmZ ddlmZ ddlmZmZmZmZmZmZmZmZ ddlZddlZddlZddlmZ ddlm Z  dd	l!m"Z" dd
l#m$Z$ ddl%m&Z&m'Z' ddl(m)Z)m*Z* ddl+m,Z, erddl-m.  m/Z/ ddl0m1Z1 ddl-m2Z2m3Z3 ddl4m3Z5 e6e7Z8ej9ddd dZ:dZ;G dd deZ<G dd deZ=eG dd dZ>eG dd dZ?G dd deZ@dS ) zAbstract Scheduler class.    N)ABCabstractmethod)	dataclass)Enum)TYPE_CHECKINGAnyDictIteratorListOptionalTupleUnion)	CommError)
launch_add)LaunchError)SchedulerError)create_sweep_command_argsmake_launch_sweep_entrypoint)event_loop_thread_exec%strip_resource_args_and_template_vars)generate_idApi)	QueuedRunRun)r   zsched:cyan)fg g      @c                   @   s,   e Zd ZdZdZdZdZdZdZdZ	dZ
d	S )
SchedulerStater                        N)__name__
__module____qualname__PENDINGSTARTINGRUNNING
FLUSH_RUNS	COMPLETEDFAILEDSTOPPED	CANCELLED r1   r1   ^/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/wandb/sdk/launch/sweeps/scheduler.pyr   0   s    r   c                   @   st   e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
edededd fddZddededdfddZedefddZdS )RunState)runningalive)pendingr5   )
preemptingr5   )crasheddead)failedr9   )killedr9   )finishedr9   )	preemptedr9   )unknownr5   clsargskwdsreturnc                 O   s   t | }|d |_|S )Nr   )object__new___value_)r?   r@   rA   objr1   r1   r2   rD   H   s   

zRunState.__new__r>   _lifeNc                 C   s
   || _ d S N_life)selfrG   rH   r1   r1   r2   __init__M   s   
zRunState.__init__c                 C   s
   | j dkS )Nr5   rJ   rL   r1   r1   r2   is_aliveP      
zRunState.is_alive)r>   )r&   r'   r(   r+   r)   
PREEMPTINGCRASHEDr.   KILLEDFINISHED	PREEMPTEDUNKNOWNr   r
   rD   strrM   propertyboolrO   r1   r1   r1   r2   r3   ;   s    r3   c                   @   s&   e Zd ZU eeef ed< eed< dS )_Workeragent_configagent_idN)r&   r'   r(   r   rW   r   __annotations__r1   r1   r1   r2   rZ   U   s   
 rZ   c                   @   sh   e Zd ZU eed< eed< ejZeed< dZ	e
d ed< dZe
eeef  ed< dZe
ee  ed< dS )	SweepRunid	worker_idstateNzpublic.QueuedRun
queued_runr@   logs)r&   r'   r(   rW   r]   intr3   r+   ra   rb   r   r@   r   r   rc   r
   r1   r1   r1   r2   r^   [   s   
 r^   c                   @   s  e Zd ZdZdZdZg dZdddddddddd	ee d
ee	 dee
 dee
 dee
 dee
 deeee
f  dee fddZededee fddZeddddZeddddZeddddZeddddZedefdd Zejd!eddfd"d Zedefd#d$Zedefd%d&Zedefd'd(Zedeeef fd)d*Zedeeef fd+d,Z ded.d/Z!ddd0d1Z"d2ee
 ddfd3d4Z#ddd5d6Z$ddd7d8Z%ddd9d:Z&d;e'ee
ef  defd<d=Z(defd>d?Z)ddd@dAZ*de+e,e
ef  fdBdCZ-dDe'e
 ddfdEdFZ.dddGdHZ/dIe
defdJdKZ0dddLdMZ1dddNdOZ2dIe
de'e fdPdQZ3dIe
dee
ef fdRdSZ4e5j6fdIe
dTe5de5fdUdVZ7dee
ef fdWdXZ8dYe
ddfdZd[Z9d\e
de
fd]d^Z:d_ede,ee'e
  ee
ee
ef f f fd`daZ;d_edefdbdcZ<dS )f	SchedulerzPA controller/agent that populates a Launch RunQueue from a hyperparameter sweep.zplaceholder-uri-schedulerzsweep-controller)wandb	schedulerWANDB_SWEEP_IDN)polling_sleepsweep_identityprojectproject_queuenum_workersapir   r@   ri   rj   rk   rl   rm   rn   kwargsc             
   O   s  ddl m}
 || _|
 | _|ptjdp|dp|j| _	|p*tjdp*|d| _
|p/d| _tj| _z?| jj|d| j	| j
d	}|d
tjjkrOtj| _t|d | _| |d | _| jdkrstt d| j d| j  W n ty } ztt d| d| d }~ww |	| _i | _t | _ |d ur|nt!| _"|| _#i | _$| % | _&|p| j&j'di d}t(|) rt*|nd| _+| j&j'di | _,d S )Nr   r   WANDB_ENTITYrk   WANDB_PROJECTrl   zempty-sweep-idz{})rk   rl   ra   configrunszFound z previous valid runs for sweep zException when finding sweep (z) rg   rn      settings)-wandb.apis.publicr   _api_public_apiosenvirongetrv   default_entity_entity_project	_sweep_idr   r)   _statesweepr0   nameyaml	safe_load_sweep_config_get_num_runs_launched_num_runs_launchedrf   termlog
LOG_PREFIX	Exceptionr   _kwargs_runs	threadingLock_threading_lockDEFAULT_POLLING_SLEEP_polling_sleep_project_queue_workers_init_wandb_run
_wandb_runrs   rW   isdigitrd   _num_workers_settings_config)rL   ro   ri   rj   rk   rl   rm   rn   r@   rp   	PublicApiresper1   r1   r2   rM   l   s`   





zScheduler.__init__r`   rB   c                 C      dS )zCalled when worker available.Nr1   )rL   r`   r1   r1   r2   _get_next_sweep_run       zScheduler._get_next_sweep_runc                 C   r   )zCalled every polling loop.Nr1   rN   r1   r1   r2   _poll   r   zScheduler._pollc                 C      d S rI   r1   rN   r1   r1   r2   _exit      zScheduler._exitc                 C   r   rI   r1   rN   r1   r1   r2   _load_state   r   zScheduler._load_statec                 C   r   rI   r1   rN   r1   r1   r2   _save_state   r   zScheduler._save_statec                 C   s   t t d| jj  | jS )NzScheduler state is )_loggerdebugr   r   r   rN   r1   r1   r2   ra      s   zScheduler.statevaluec                 C   s*   t t d| jj d|j  || _d S )NzScheduler was z is )r   r   r   ra   r   r   )rL   r   r1   r1   r2   ra      s    
c                 C   s"   | j tjtjtjtjfv rdS dS )NFT)ra   r   r-   r.   r/   r0   rN   r1   r1   r2   rO      s   zScheduler.is_alivec                 C   s"   | j d}|s
dS | j|k}|S )z/False if under user-specified cap on # of runs.run_capF)r   r|   r   )rL   r   	at_runcapr1   r1   r2   r      s
   
zScheduler.at_runcapc                 C   s
   t | jS rI   )lenr   rN   r1   r1   r2   num_active_runs   rP   zScheduler.num_active_runsc                 C   s,   i }|   D ]\}}| j|j ||j< q|S )zReturns dict of id:worker already assigned to a launch run.

        runs should always have a worker_id, but are created before
        workers are assigned to the run
        )_yield_runsr   r`   )rL   busy_workersrG   rr1   r1   r2   r      s   zScheduler.busy_workersc                    s*   t  jdkr	i S  fdd j D S )z6Returns dict of id:worker ready to launch another run.r   c                    s    i | ]\}}| j vr||qS r1   )r   ).0_idwrN   r1   r2   
<dictcomp>   s    z/Scheduler.available_workers.<locals>.<dictcomp>)r   r   itemsrN   r1   rN   r2   available_workers   s
   
zScheduler.available_workersSdkRunc                 C   s,   t jdd}t jd| j d| j|d}|S )z8Controls resume or init logic for a scheduler wandb run.T)disable_job_creationz
Scheduler.allow)r   resumers   rv   )rf   Settingsinitr   r   )rL   rv   runr1   r1   r2   r     s   
zScheduler._init_wandb_runc                 C   s   t j| _dS )zStop the sweep.N)r   r/   r   rN   r1   r1   r2   
stop_sweep  s   zScheduler.stop_sweeperrc                 C   s   t j| _|r
t|dS )z%Fail the sweep w/ optional exception.N)r   r.   r   r   )rL   r   r1   r1   r2   
fail_sweep  s   zScheduler.fail_sweepc                 C   s   t t d | js t t d| jj  d |   dS t	j
| _|  s6t t d |   dS |   t|   |   dS )zAStart a scheduler, confirms prerequisites, begins execution loop.zScheduler starting.zSweep already in end state (z). Exiting...Nz1No 'job' or 'image_uri' loaded from sweep config.)rf   r   r   rO   	termerrorra   r   lowerexitr   r*   r   _try_load_executabler   asyncior   _register_agentsrN   r1   r1   r2   start  s"   zScheduler.startc                 C   s  t t d tj| _z	 |   | jsnt t d |   | 	  | jtj
krA| jdkr:t t d nrt| j q| jD ]`}| jr[t t d| j d tj
| _ nJz| |}|sfW  n?W n0 tyw } zt|d}~w ty } zt t d	|  tj| _W Y d}~ nd}~ww | |r|  jd
7  _qDt| j qW n8 ty   t t d tj| _|   Y dS  ty } zt t d|  tj| _|    d}~ww | jtj
kr| jrtj| _|   dS )zMain run function.zScheduler runningTzPolling for new runs to launchr   zDone polling on runs, exitingzSweep at run_cap ()NzFailed to get next sweep run: r   z-Scheduler received KeyboardInterrupt. Exitingz Scheduler failed with exception )rf   r   r   r   r+   ra   _update_scheduler_run_staterO   _update_run_statesr   r,   r   timesleepr   r   r   r   r   r   r   r   r.   _add_to_launch_queueKeyboardInterrupttermwarnr/   r   r-   )rL   r`   r   r   r1   r1   r2   r   .  sx   



)zScheduler.runc              	   C   s   |    z|   W n ty   tt dt   Y nw d}| jt	j
kr0| d d}n2| jt	jkr>| d d}n$| jt	jt	jfv rS| d d}|   nt	j| _| d	 d
}|   tt d|  | j  d S )NzFailed to save state:  PAUSEDpausedrT   	completedCANCELED	cancelledrR   r8   z
Scheduler )r   r   r   rf   r   r   	traceback
format_excra   r   r,   _set_sweep_stater-   r0   r/   
_stop_runsr.   r   r   finish)rL   statusr1   r1   r2   r   l  s2   




zScheduler.exitrt   c              	   C   sZ   d}|D ]&}| dddv r&| ds&td|d  d|d  d	|  q|d
7 }q|S )z.Returns the number of valid runs in the sweep.r   ra   r   )r;   r8   summaryMetricszexcluding run: r   z with state: z from run cap 
r   )r|   r   r   )rL   rt   countr   r1   r1   r2   r     s   
z Scheduler._get_num_runs_launchedc                 C   s|   | j dr4z| j| j d }tt d|j d W dS  ty3   t	t t
   Y dS w | j dr<dS dS )zoCheck existence of valid executable for a run.

        logs and returns False when job is unreachable
        jobzSuccessfully loaded job (z) in schedulerFT	image_uri)r   r|   ry   r   rf   r   r   r   r   r   r   r   )rL   _job_artifactr1   r1   r2   r     s   zScheduler._try_load_executablec           	         s   g }t | jj}t| jD ]H}tt d| d z|t	  d| | j
| j| jd}|| W q tyV } ztd|  | d|  W Y d }~qd }~ww tj| I d H }t|D ]\}}t||d d| j|< qcd S )Nz Starting AgentHeartbeat worker (r   -)rj   project_namerk   zfailed to register agent: r_   )r[   r\   )r   rx   register_agentranger   r   r   r   socketgethostnamer   r   r~   appendr   r   r   gather	enumeraterZ   r   )	rL   tasksr   r`   workerr   finished_tasksidxr[   r1   r1   r2   r     s2   zScheduler._register_agentsc                 c   s>    | j  | j E dH  W d   dS 1 sw   Y  dS )z)Thread-safe way to iterate over the runs.N)r   r   r   rN   r1   r1   r2   r     s   "zScheduler._yield_runsruns_to_removec                 C   sT   | j  |D ]}tt d| d | j|= qW d   dS 1 s#w   Y  dS )zHelper for removing runs from memory.

        Can be overloaded to prevent deletion of runs, which is useful
        for debugging or when polling on completed runs.
        zCleaning up finished run (r   N)r   rf   r   r   r   )rL   r   run_idr1   r1   r2   _cleanup_runs  s   
"zScheduler._cleanup_runsc                 C   sd   g }|   D ]	\}}||g7 }q|D ]}tt d| d | |s/tt d| d qd S )NzStopping run (r   zFailed to stop run ()r   rf   r   r   	_stop_runr   )rL   	to_deleter   rG   r1   r1   r2   r     s   
zScheduler._stop_runsr   c              
   C   s   || j vrtd| d| j   dS | j | }| j |= |js+td|j d dS |jjs1dS td| d| j	 d| j
  d	}z| jj|d
}|r^tt d| d W dS W dS  ty~ } ztd| d|  W Y d}~dS d}~ww )z.Stops a run and removes it from the scheduler.zrun: z not in _runs: Fz2tried to _stop_run but run not queued yet (run_id:r   TzRun:v1::utf-8)r   zStopped run .zerror stopping run (): N)r   r   r   rb   r_   ra   rO   base64standard_b64encoder   r~   encodedecoderx   stop_runrf   r   r   r   )rL   r   r   encoded_run_idsuccessr   r1   r1   r2   r     s:   

 zScheduler._stop_runc              
   C   s   |  | jj}|tjkrtj| _n|tjtj	fv rtj| _n	|tj
kr'tj| _z| j| j| j| j}W n tyO } ztd|  W Y d}~dS d}~ww |dkrZtj| _dS |dv rdtj| _dS |dkrntj| _dS dS )zGUpdate the scheduler state from state of scheduler run and sweep state.zsweep state error: NrT   )r0   r/   r   )_get_run_stater   r_   r3   rS   r   r/   ra   r.   rR   rT   r-   rx   get_sweep_stater   r~   r   r   r   r   r0   r,   )rL   ra   sweep_stater   r1   r1   r2   r     s.   



z%Scheduler._update_scheduler_run_statec                 C   s   g }|   D ]S\}}| ||j|_z|jr|jjnd}W n ttfy; } ztd|  d}W Y d}~nd}~ww |jjrD|dkrYtd| d|j d| d |	| q| 
| dS )ztIterate through runs.

        Get state from backend and deletes runs if not in running state. Threadsafe.
        Nz Failed to get queued_run.state: r:   (z) states: (z, r   )r   r   ra   rb   r   r   r   r   rO   r   r   )rL   r   r   r   	rqi_stater   r1   r1   r2   r     s    
zScheduler._update_run_statesc              
      s   z6| j | j}|sg W S | j|j d|j d| }| jd d  |jd gd} fdd|D }|W S  tyQ } zt	
d|  W Y d	}~g S d	}~ww )
zUse the public api to get metrics from a run.

        Uses the metric name found in the sweep config, any
        misspellings will result in an empty list.
        /metricr   _step)keysc                    s   g | ]}|  qS r1   r1   r   xmetric_namer1   r2   
<listcomp>C  s    z3Scheduler._get_metrics_from_run.<locals>.<listcomp>z[_get_metrics_from_run] N)r   rb   ry   r   rk   rl   r   scan_historyr   r   r   )rL   r   rb   api_runhistorymetricsr   r1   r  r2   _get_metrics_from_run3  s    zScheduler._get_metrics_from_runc              
   C   s\   z| j | j| j|}|r|W S W i S  ty- } ztd|  W Y d}~i S d}~ww )z+Use the public api to get info about a run.z[_get_run_info] N)rx   get_run_infor~   r   r   r   r   )rL   r   infor   r1   r1   r2   _get_run_infoJ  s   
zScheduler._get_run_infoprev_run_statec                 C   s   d}z| j | j| j|}t|}W |S  tyQ } z2td| d|  |tjkr<t	
d| dt   tj}n
tj}W Y d}~|S W Y d}~|S d}~w ttfyo   t	
d| d| dt   tj}Y |S w )z)Use the public api to get state of a run.Nzerror getting state for run (r   z Failed to get runstate for run (z
). Error: zBad state (z) for run ()rx   get_run_stater~   r   r3   r   r   r   rV   rf   r   r   r   r.   AttributeError
ValueError)rL   r   r  	run_statera   r   r1   r1   r2   r   V  s0   


zScheduler._get_run_statec              
   C   s^   z| j j| j| j| jd}|r|d W S W i S  ty. } ztd|  tdd}~ww )z)Use the public api to create a blank run.)rl   rk   
sweep_namer   z[_create_run] zHError creating run from scheduler, check API connection and CLI version.N)	rx   
upsert_runr   r~   r   r   r   r   r   )rL   r   r   r1   r1   r2   _create_runo  s"   
zScheduler._create_runra   c              
   C   sh   t t d|   z| jj| j|d W d S  ty3 } zt	d|  W Y d }~d S d }~ww )NzUpdating sweep state to: )r   ra   z[set_sweep_state] )
rf   r   r   r   rx   set_sweep_stater   r   r   r   )rL   ra   r   r1   r1   r2   r     s   zScheduler._set_sweep_stater   c                 C   s$   t t|dddd S )Nr   r   r    )r   	b64decodebytesr   r   split)rL   r   r1   r1   r2   _encode  s   "zScheduler._encoder   c                 C   s   t d|ji}t|| jd\}}|r1d|v r1| jds%tt d|d}| jd ||< t	| j
jdi }d|vrFdi i|d< d|d vrRi |d d< |d d |d	  |re||d d< |r{d
d |D }|r{tt d| d ||fS )Nr@   commandz
${program}programzHProgram macro in command has no corresponding 'program' in sweep config.launch	overrides
run_config	args_dictc                 S   s   g | ]}t |d r|qS )z${)rW   
startswithr  r1   r1   r2   r
    s    z;Scheduler._make_entry_and_launch_config.<locals>.<listcomp>z*Sweep command contains unresolved macros: z', see launch docs for supported macros.)r   r@   r   r   r|   r   r   indexcopydeepcopyr   rs   updaterf   r   )rL   r   r@   entry_point
macro_argspidxlaunch_config
unresolvedr1   r1   r2   _make_entry_and_launch_config  s8   
z'Scheduler._make_entry_and_launch_configc                 C   sz  | j dp| jd}| jd}| j dp|}|du r-|du r-tt d|j d|dur<|dur<tt d| |\}}|rWtt d| d|rQdnd d	 t	
| jjd
pbi }t|dd}t| |jptt }	t|	||||| j| j| j d| j|d|d|d| j d| j|d}
|
|_tj|_|| j|	< tt d|	 d| j d d dS )z;Convert a sweeprun into a launch job then push to runqueue.r   r   NzNo 'job' nor 'image_uri' (r   z$Sweep has both 'job' and 'image_uri'zSweep command z will override z entrypointr"  priorityr    queueresourceresource_argstemplate_variablesauthor)r   r+  rs   docker_imager   rl   rk   
queue_namerm   r3  r4  r5  r6  rj   r1  zAdded run (z) to queue (T)r   r|   r   r   r   r_   r0  rf   r   r(  r)  r   rs   rd   r   r   r   r   r~   r   r   rb   r3   r+   ra   r   r   )rL   r   _job_sweep_config_uri
_image_urir+  r.  _job_launch_config	_priorityr   rb   r1   r1   r2   r     sT   



zScheduler._add_to_launch_queue)rB   N)rB   r   )=r&   r'   r(   __doc__PLACEHOLDER_URISWEEP_JOB_TYPE
ENTRYPOINTr   r   floatrW   r   rd   rM   r   r^   r   r   r   r   r   rX   r   ra   setterrY   rO   r   r   r   rZ   r   r   r   r   r   r   r   r   r
   r   r   r   r	   r   r   r   r   r   r   r   r  r  r3   rV   r   r  r   r  r0  r   r1   r1   r1   r2   re   e   s    	

K





>



"

"
#re   )Ar>  r   r   r(  loggingrz   r   r   r   r   abcr   r   dataclassesr   enumr   typingr   r   r   r	   r
   r   r   r   clickr   rf   wandb.errorsr   wandb.sdk.launch._launch_addr   wandb.sdk.launch.errorsr   wandb.sdk.launch.sweepsr   wandb.sdk.launch.sweeps.utilsr   r   wandb.sdk.launch.utilsr   r   wandb.sdk.lib.runidr   rw   apispublicwandb.apis.internalr   r   r   wandb.sdk.wandb_runr   	getLoggerr&   r   styler   r   r   r3   rZ   r^   re   r1   r1   r1   r2   <module>   sN    (
	