o
    rri)9                     @   s  d Z ddlmZmZ ddlmZmZ ddlZddlm	Z	 ddl
Z
ddlZddlZddlZddlZddlmZ ddlZddlmZ dd	lmZmZ dd
lmZ ddlmZ ddlmZ ddlmZm Z  e!e"Z#G dd dZ$G dd dZ%de&fddZ'eG dd dZ(G dd dZ)dS )z)Scheduling and job monitoring utilities.
    )contextmanager	ExitStack)	dataclassfieldN)Path)SlurmJob   )git_save)SlurmConfigSubmitRules)get_distrib_spec)DecoratedMain)try_load)XP_get_sigc                   @   s,   e Zd Zdedeje fddZdd ZdS )_SubmitItTargetmainargvc                 C   s:   | || _t }t|jtjd< |tjdd < |  d S )NRANKr   )	get_xpxpr   strrankosenvironsysr   )selfr   r   spec r   =/home/ubuntu/.local/lib/python3.10/site-packages/dora/shep.py__call__#   s
   
z_SubmitItTarget.__call__c                 O   s>   t  jdkr| jj r| jj  tjj| g|R i |S )Nr   )	r   r   r   rendezvous_fileexistsunlinksubmitithelpersDelayedSubmission)r   argskwargsr   r   r   
checkpoint,   s   z_SubmitItTarget.checkpointN)	__name__
__module____qualname__r   tpSequencer   r    r)   r   r   r   r   r   "   s    	r   c                   @   sX   e Zd ZdZdefddZedefddZdd	d
Z	dddZ
edd Zdd ZdS )Sheepz[
    A Sheep is a specific run for a given XP. Sheeps are managed
    by the Shepherd.
    r   c                 C   sN   || _ d | _d | _| j r%t| j}t|tr |\| _| _d S || _d S d S N)r   job_other_jobs	_job_filer"   r   
isinstancetuple)r   r   contentr   r   r   __init__9   s   



zSheep.__init__returnc                 C   s   | j j| j jjj S r0   )r   folderdorashepjob_filer   r   r   r   r3   E   s   zSheep._job_filestandardc                 C   sZ   | j du rdS | j j| j j|}|dkr$| jr$tdd | jD r$d}|dr+dS |S )z1Return the current state of the `Sheep`.
        NUNKNOWNc                 s   s    | ]}|j d kV  qdS )r?   N)state.0r1   r   r   r   	<genexpr>P   s    zSheep.state.<locals>.<genexpr>MISSING	CANCELLED)r1   watcher	get_statejob_idr2   any
startswith)r   moder@   r   r   r   r@   I   s   

zSheep.statec                 C   s"   | j du rdS | j j| j j|S )zDReturn True if the job is no longer running on the cluster.
        NT)r1   rF   is_donerH   )r   rK   r   r   r   rL   Y   s   
zSheep.is_donec                 C   s$   | j dur| jj| j j d S dS )z)Return the path to the main log.
        Nz
_0_log.out)r1   r   r$   rH   r=   r   r   r   log`   s   
z	Sheep.logc                 C   sP   d| j j d|   d}| jd ur|d| jj d7 }|d| j j d7 }|S )NzSheep(z, state=z, zsid=zargv=))r   sigr@   r1   rH   r   )r   outr   r   r   __repr__h   s
   
zSheep.__repr__N)r>   )r*   r+   r,   __doc__r   r7   propertyr   r3   r@   rL   rM   rQ   r   r   r   r   r/   4   s    


r/   xc                 C   s   dS )z7No logging logging function, passed to `Shepherd`.
    Nr   )rT   r   r   r   no_logq   s   rU   c                   @   s.   e Zd ZU eed< eedZej	e
 ed< dS )	_JobArrayslurm_config)default_factorysheepsN)r*   r+   r,   r
   __annotations__r   listrY   r-   Listr/   r   r   r   r   rV   w   s   
 rV   c                   @   sP  e Zd ZdZefdedejegdf fddZ	dej
e defd	d
Zdedeje fddZdedeje fddZdd ZedefddZdededefddZdejfddZdd Zedefdd Zedefd!d"Zedefd#d$Zd%eje fd&d'Z d(ed)ededej!fd*d+Z"d,d- Z#ed(efd.d/Z$d0e%fd1d2Z&dS )3Shepherdz
    Takes care of the little jobs.

    Args:
        main (DecoratedMain): main function decorated by Dora.
        log (callable): log function, if provided should take a single string
            argument.
    r   rM   Nc                 C   s`   || _ | jjddd | jjddd | jjddd || _d| _d | _g | _g | _	| 
  d S )NT)exist_okparentsF)r   _by_idmkdir_orphans_arraysrM   _in_job_array_existing_git_clone
_to_cancel
_to_submit_check_orphans)r   r   rM   r   r   r   r7      s   zShepherd.__init__r   r8   c                 C   s"   t |trJ | j|}t|S )z
        Given a list of of arguments, return the matching `Sheep`,
        which will contain both information on the `dora.xp.XP`, and on
        the latest job associated with that XP.
        )r4   r   r   r   r/   )r   r   r   r   r   r   get_sheep_from_argv   s   zShepherd.get_sheep_from_argvrO   c                 C   s   | j |}t|S )zj
        Returns a `Sheep` given the XP signature, if any exists, otherwise
        returns None.
        )r   get_xp_from_sigr/   )r   rO   r   r   r   r   get_sheep_from_sig   s   zShepherd.get_sheep_from_sigrH   c                 C   s4   | j | }| r| j}| j|}t|S dS )zu
        Returns the `Sheep` associated with the given `job_id`. If no sheep
        is found, returns None.
        N)r`   
is_symlinkresolvenamer   rj   r/   )r   rH   linkrO   r   r   r   r   get_sheep_from_job_id   s   

zShepherd.get_sheep_from_job_idc                 C   s   t j  dS )zB
        Force an update of all job states with submitit.
        N)r   rF   updater=   r   r   r   rq      s   zShepherd.updaterW   c                 c   s>    | j rJ | jt| d| _ z	dV  W d| _ dS d| _ w )z*Context manager to launch XP in job array.TNF)rd   rg   appendrV   )r   rW   r   r   r   	job_array   s   
zShepherd.job_arraysheeprulesc                 C   s   |j durM| }|dkr|jrtd|j j  d|_ n/|dv r4td|j j d |jr3d|_ n|jrMtd|j j d|  | |j  d|_ |j du rr| j	s]| j
t| || j
d	 jksgJ | j
d	 j| dS dS )
z
        Decides whether to schedule a new job for the given sheep, based on the rules
        given in `rules`.
        Jobs are actually only scheduled once the `commit()` method is called.
        N	COMPLETEDz"Ignoring previously completed job )FAILEDrE   OUT_OF_MEMORYTIMEOUTrD   	NODE_FAILzPrevious job z failed or was canceledzCancelling previous job z with status )r1   r@   replace_doneloggerdebugrH   retryreplacecancel_lazyrd   rg   rr   rV   rW   rY   )r   rt   rW   ru   r@   r   r   r   maybe_submit_lazy   s,   

zShepherd.maybe_submit_lazyr1   c                 C   s   | j | dS )z]
        Cancel a job. The job is actually cancelled only when `commit()` is called.
        N)rf   rr   )r   r1   r   r   r   r      s   zShepherd.cancel_lazyc                 C   sH   | j r| | j  g | _ d| _| jr"| jd}| | | jsdS dS )zu
        Commit all changes registered so far with either `maybe_submit_lazy()`
        and `cancel_lazy()`.
        Nr   )rf   _cancelre   rg   pop_submit)r   rs   r   r   r   commit   s   
zShepherd.commitc                 C      | j jj| j jjj S r0   )r   r:   dirr;   by_idr=   r   r   r   r`         zShepherd._by_idc                 C   r   r0   )r   r:   r   r;   orphansr=   r   r   r   rb      r   zShepherd._orphansc                 C   r   r0   )r   r:   r   r;   arraysr=   r   r   r   rc      r   zShepherd._arraysjobsc                 C   s8   dgdd |D  }t dd| tj|dd d S )Nscancelc                 S   s   g | ]}|j qS r   )rH   rA   r   r   r   
<listcomp>  s    z$Shepherd._cancel.<locals>.<listcomp>z
Running %s Tcheck)r}   r~   joinsprun)r   r   
cancel_cmdr   r   r   r      s   zShepherd._cancelrn   r9   c           
      C   s  dt jd< t|j}tj||dd}|j}|dkr.|d dkr%td|d |d< d}n|}d	|d< |j	}|rE|j	| }	|	 d
|d< d| |d< |j
r`d	|d< |jd u r_||j |d< n||d< |jd u rn|j|d< |d= |d= |d= |d= td| |jd|dd| |S )N1SLURM_KILL_BAD_EXITmax_num_timeout)r9   r      r   z.Can only take <= 8 gpus, or multiple of 8 gpusnodesr   GBmemzgpu:gresntasks_per_nodecpus_per_taskgpusmem_per_gpucpus_per_gpuone_task_per_nodezSlurm parameters %rT)job_namestderr_to_stdoutr   )r   r   dict__dict__r$   SlurmExecutorr   r   
ValueErrorr   r   r   r   r}   r~   update_parameters)
r   rn   r9   rW   r(   executorr   gpus_per_noder   r   r   r   r   _get_submitit_executor  sL   






zShepherd._get_submitit_executorc              
   C   s   | j  D ]H}|j}td| d tjddt d|dddgd	d	d
}dd |j	
  dD }|rItd| d tjdg| d	d |  qdS )zCheck for orphaned jobs.zFound dirty tag z`, meaning a job might have been scheduled but Dora or Slurm crashed before the job id was saved.squeuez-uz-nz-oz%iz-hT)capture_outputr   c                 S   s   g | ]}|r|qS r   r   )rB   liner   r   r   r   5      z+Shepherd._check_orphans.<locals>.<listcomp>
zFound orphan job ids z, will cancelr   r   N)rb   iterdirrn   r}   warningr   r   r   getloginstdoutdecodestripsplitr#   )r   dirtyrn   procidsr   r   r   rh   -  s   
zShepherd._check_orphansc                 c   s4    | j | }|  z
dV  W |  dS |  w )z,Context manager to enter a potential orphan.N)rb   touchr#   )r   rn   tokenr   r   r   _enter_orphan;  s   
zShepherd._enter_orphanrs   c              
      s  |j }|j}|s
d S t|dk}|d }| j|j |jjj t fdd|D s/J d|r=t	t
dd |D }n|jj}|rL| jjd | }n| jjd	 | }|r\| j| }n|jj}|jd
d |D ]}	|	j}
| j|
 |
j r}|
j  qh| |||}g } r| jd u rt| j| _| | t H} r| jd usJ |t| j |r||  |j D ]!}	 r| jd usJ t|	j| j ||t | j|	jj qW d    n1 sw   Y  t ||D ]x\}	}t!"||ft#|	j$d t%&d|j' ||	_(||	_)| j*|j' }|}|+|	jj,-  |rA|	jj,|j }| r<|- |- ks;J n|+| |	jj.}| rN|  |+| | j/|	j}| 0d|j' d|	jj d|  qW d    d S 1 sxw   Y  d S )Nr   r   c                 3   s    | ]
}|j jj kV  qd S r0   )r   r:   r	   )rB   otheruse_git_saver   r   rC   O  s    z#Shepherd._submit.<locals>.<genexpr>z?All jobs inside an array must have the same value for git_save.c                 S   s   g | ]}|j jqS r   )r   rO   )rB   rt   r   r   r   r   S  r   z$Shepherd._submit.<locals>.<listcomp>_array__T)r^   wbzCreated job with id %szScheduled job z for sheep /)1rY   rW   lenr   init_xpr   r:   r	   allr   sortedrO   rn   rc   _xp_submititra   r!   r"   r#   r   re   get_new_cloner   r   enter_contextenter_clonebatchassign_clonerr   submitr   r   zippickledumpopenr3   r}   r~   rH   r1   r2   r`   
symlink_tor9   rm   _latest_submititget_namerM   )r   rs   rY   rW   is_arrayfirstname_sigrn   submitit_folderrt   r   r   r   stackr1   ro   submitit_linklatestr   r   r   r   E  s   







$$zShepherd._submit)'r*   r+   r,   rR   rU   r   r-   Callabler   r7   r.   r/   ri   Optionalrk   rp   rq   r   r
   rs   r   r   r$   r   r   r   rS   r   r`   rb   rc   r\   r   r   r   rh   r   rV   r   r   r   r   r   r]   }   s8    "



(	r]   )*rR   
contextlibr   r   dataclassesr   r   loggingpathlibr   r   r   
subprocessr   r   typingr-   r$   r    r	   confr
   r   distribr   r   r   utilsr   r   r   r   	getLoggerr*   r}   r   r/   r   rU   rV   r]   r   r   r   r   <module>   s2   
=