o
    rri2                  	   @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Zd dlmZ d dlZe jdejdejd fddZG dd deZG d	d
 d
eZG dd deZG dd dZG dd dZe jdejeef deje fddZ	d+dejejeef  dejejeef  defddZdejeef dejeef defddZdejeef dejfddZ dejdejeef ddfd d!Z!	"d,d#ej"d$ej#d%ej#d&e$fd'd(Z%G d)d* d*Z&dS )-    N)Pathkwargsreturnc               	   k   sv    dd | D }t jdd |  D  zd V  W | D ]}t j|= qt j| d S | D ]}t j|= q-t j| w )Nc                 S   s"   i | ]}|t jv r|t j| qS  )osenviron.0xr   r   G/home/ubuntu/.local/lib/python3.10/site-packages/submitit/core/utils.py
<dictcomp>   s   " z)environment_variables.<locals>.<dictcomp>c                 S   s   i | ]	\}}|t |qS r   strr	   r
   yr   r   r   r      s    )r   r   updateitems)r   backupr
   r   r   r   environment_variables   s   

r   c                   @      e Zd ZdZdS )UncompletedJobErrorz.Job is uncomplete: either unfinished or failedN__name__
__module____qualname____doc__r   r   r   r   r   #       r   c                   @   r   )FailedJobErrorzJob failed during processingNr   r   r   r   r   r   '   r   r   c                   @   r   )FailedSubmissionErrorzJob Submission failedNr   r   r   r   r   r   +   r   r   c                	   @   s"  e Zd ZdZ	d"dejeef deje deje	 ddfddZ
edefd	d
ZedefddZedefddZedefddZedefddZedefddZdejeef defddZ	d#dejeef dededdfddZedejeef defddZdefd d!ZdS )$JobPathsz9Creates paths related to the slurm job and its submissionNfolderjob_idtask_idr   c                 C   s&   t |  | _|| _|pd| _d S )Nr   )r   
expanduserabsolute_folderr!   r"   )selfr    r!   r"   r   r   r   __init__2   s   zJobPaths.__init__c                 C   s   |  | jS N)
_format_idr%   r&   r   r   r   r    9   s   zJobPaths.folderc                 C   s0   | j rd| j v r| | jd S | | jd S )N_z%A_submission.shz%j_submission.sh)r!   r)   r    r*   r   r   r   submission_file=   s   zJobPaths.submission_filec                 C      |  | jd S )Nz%j_submitted.pklr)   r    r*   r   r   r   submitted_pickleD      zJobPaths.submitted_picklec                 C   r-   )Nz%j_%t_result.pklr.   r*   r   r   r   result_pickleH   r0   zJobPaths.result_picklec                 C   r-   )Nz%j_%t_log.errr.   r*   r   r   r   stderrL   r0   zJobPaths.stderrc                 C   r-   )Nz%j_%t_log.outr.   r*   r   r   r   stdoutP   r0   zJobPaths.stdoutpathc                 C   s   | j du r	t|S t|dt| j dt| j}t| j dd^}}d|v r<t|dkr4td|d|d }t|d	|S )
z(Replace id tag by actual id if availableN%j%tr+      %az4%a is in the folder path but this is not a job arrayr   %A)r!   r   r   replacer"   splitlen
ValueError)r&   r4   replaced_patharray_idarray_indexr   r   r   r)   T   s   
$zJobPaths._format_idFtmp_pathnamekeep_as_symlinkc                 C   sD   | j jddd t|t| | |r t|t| | d S d S )NTparentsexist_ok)r    mkdirr   renamegetattr
symlink_to)r&   rA   rB   rC   r   r   r   move_temporary_file`   s
   zJobPaths.move_temporary_filec                    s6   t |   j}g d t fdd|}t | S )z2Returns the closest folder which is id independent)r5   r6   r9   r8   c                    s   t  fddD  S )Nc                 3   s    | ]}| v V  qd S r(   r   )r	   tagr
   r   r   	<genexpr>m   s    zMJobPaths.get_first_id_independent_folder.<locals>.<lambda>.<locals>.<genexpr>)anyrM   tagsrM   r   <lambda>m   s    z:JobPaths.get_first_id_independent_folder.<locals>.<lambda>)r   r#   r$   parts	itertools	takewhile)r    rS   indep_partsr   rP   r   get_first_id_independent_folderh   s   z(JobPaths.get_first_id_independent_folderc                 C   s   | j j d| j dS )N())	__class__r   r    r*   r   r   r   __repr__p   s   zJobPaths.__repr__)NNF)r   r   r   r   tpUnionr   r   Optionalintr'   propertyr    r,   r/   r1   r2   r3   r)   boolrK   staticmethodrW   r[   r   r   r   r   r   /   sL    

r   c                   @   s   e Zd ZdZdejdejf dejdejddfdd	Zdejfd
dZde	fddZ
dejeef ddfddZdededdfddZedejd  dejeef dd fddZdejd  fddZdS )DelayedSubmissiona]  Object for specifying the function/callable call to submit and process later.
    This is only syntactic sugar to make sure everything is well formatted:
    If what you want to compute later is func(*args, **kwargs), just instanciate:
    DelayedSubmission(func, *args, **kwargs).
    It also provides convenient tools for dumping and loading.
    function.argsr   r   Nc                 O   s.   || _ || _|| _d | _d| _d| _d| _d S )NFr   )re   rf   r   _result_done_timeout_min_timeout_countdown)r&   re   rf   r   r   r   r   r'   |   s   
zDelayedSubmission.__init__c                 C   s.   | j r| jS | j| ji | j| _d| _ | jS )NT)rh   rg   re   rf   r   r*   r   r   r   result   s
   zDelayedSubmission.resultc                 C   s   | j S r(   )rh   r*   r   r   r   done   s   zDelayedSubmission.donefilepathc                 C   s   t | | d S r(   )cloudpickle_dump)r&   rm   r   r   r   dump   s   zDelayedSubmission.dumptimeout_minmax_num_timeoutc                 C   s   || _ || _d S r(   )ri   rj   )r&   rp   rq   r   r   r   set_timeout   s   
zDelayedSubmission.set_timeoutclsc                 C   s4   t |}|jj| jksJ dt| d|  d|S )NzLoaded object is z but should be .)pickle_loadrZ   r   type)rs   rm   objr   r   r   load   s   (zDelayedSubmission.loadc                 C   sB   t | jdd }|d u rt | jdd }|d u rd S || ji | jS )N__submitit_checkpoint__
checkpoint)rI   re   rf   r   )r&   rz   r   r   r   _checkpoint_function   s   z&DelayedSubmission._checkpoint_function)r   r   r   r   r]   CallableAnyr'   rk   rb   rl   r^   r   r   ro   r`   rr   classmethodTyperx   r_   r{   r   r   r   r   rd   t   s    *	(rd   rm   c                 c   sb    t | } | | jd }| rJ d|V  | s td|  r)t|  t||  dS )aT  Yields a path where to save a file and moves it
    afterward to the provided location (and replaces any
    existing file)
    This is useful to avoid processes monitoring the filepath
    to break if trying to read when the file is being written.

    Note
    ----
    The temporary path is the provided path appended with .save_tmp
    z	.save_tmpz&A temporary saved file already exists.z(No file was saved at the temporary path.N)r   with_suffixsuffixexistsFileNotFoundErrorr   removerH   )rm   tmppathr   r   r   temporary_save_path   s   
r   foldersoutfilec                 C   s   t | ttfsJ d|du rd}t|}t|ds J dtj|dd}| D ]}|jt|t|j	d q*W d   |S 1 sDw   Y  |S )	z/Creates a tar.gz file with all provided foldersz,Only lists and tuples of folders are allowedNz_dev_folders_.tar.gzz.tar.gzz(Archive file must have extension .tar.gzw)mode)arcname)

isinstancelisttupler   r   endswithtarfileTarFileaddrB   )r   r   tfr    r   r   r   archive_dev_folders   s   
r   par_filer    c                 C   sH   t |   } t |  }|jddd || j }t| | |S )a  Copy the par (or xar) file in the folder

    Parameter
    ---------
    par_file: str/Path
        Par file generated by buck
    folder: str/Path
        folder where the par file must be copied

    Returns
    -------
    Path
        Path of the copied .par file
    TrD   )r   r#   r$   rG   rB   shutilcopy2)r   r    dst_namer   r   r   copy_par_file   s   
r   filenamec                 C   s8   t | d}t|W  d    S 1 sw   Y  d S )Nrb)openpicklerx   )r   ifiler   r   r   ru      s   $ru   rw   c                 C   s@   t |d}t| |tj W d    d S 1 sw   Y  d S )Nwb)r   cloudpicklero   r   HIGHEST_PROTOCOL)rw   r   ofiler   r   r   rn      s   "rn   Fprocessr3   r2   verbosec                 C   s  dt jt jt  dt jt fdd}|| j|| j}}| ||tjf| ||tjfi}t|	 }t
 }	|D ]}
|	|
t
jt
jB  q;|r|	 }|D ]5\}
}||
 \}}}|d}|sm||
 |	|
 qP| }|| |  |r|| |  qP|sJdS dS )z
    Reads the given process stdout/stderr and write them to StringIO objects.
    Make sure that there is no deadlock because of pipe congestion.
    If `verbose` the process stdout/stderr are also copying to the interpreter stdout/stderr.
    streamr   c                 S   s&   | d u rt dt| tjr| j} | S )NzStream should not be None)RuntimeErrorr   ioBufferedIOBaseraw)r   r   r   r   r      s
   z!copy_process_streams.<locals>.rawi   N)r]   r_   IObytesr3   r2   filenosysr   keysselectpollregisterPOLLINPOLLPRIreadr   
unregisterdecodewriteflush)r   r3   r2   r   r   p_stdoutp_stderrstream_by_fdfdspollerfdreadyr+   p_streamstringstdraw_bufbufr   r   r   copy_process_streams   s4   $	




r   c                   @   sv   e Zd ZdZ			ddeje dedejej	ee
f  dejejeef  ddf
d	d
ZdejdejdefddZdS )CommandFunctiona  Wraps a command as a function in order to make sure it goes through the
    pipeline and notify when it is finished.
    The output is a string containing everything that has been sent to stdout.
    WARNING: use CommandFunction only if you know the output won't be too big !
    Otherwise use subprocess.run() that also streams the outputto stdout/stderr.

    Parameters
    ----------
    command: list
        command to run, as a list
    verbose: bool
        prints the command and stdout at runtime
    cwd: Path/str
        path to the location where the command must run from

    Returns
    -------
    str
       Everything that has been sent to stdout
    TNcommandr   cwdenvr   c                 C   s>   t |ts	td|| _|| _|d u rd nt|| _|| _d S )Nz&The command must be provided as a list)r   r   	TypeErrorr   r   r   r   r   )r&   r   r   r   r   r   r   r   r'   3  s   

zCommandFunction.__init__rf   r   c                 O   s<  | j dd |D  dd | D  }| jr!tdd| d tj|tjtjd| j| j	dd}t
 }t
 }z
t|||| j W n ty\ } z|  |  td	|d
}~ww |  }|  }	| }
|	r{|
r{| js{t|	tjd |
rtj|
|j||	d}t|	|W d
   |S 1 sw   Y  |S )a  Call the cammand line with addidional arguments
        The keyword arguments will be sent as --{key}={val}
        The logs bufferized. They will be printed if the job fails, or sent as output of the function
        Errors are provided with the internal stderr.
        c                 S   s   g | ]}t |qS r   r   r   r   r   r   
<listcomp>H  s    z,CommandFunction.__call__.<locals>.<listcomp>c                 S   s    g | ]\}}d | d| qS )z--=r   r   r   r   r   r   H  s     z The following command is sent: " "F)r3   r2   shellr   r   z%Job got killed for an unknown reason.N)file)outputr2   )r   r   r   printjoin
subprocessPopenPIPEr   r   r   StringIOr   	Exceptionkillwaitr   getvaluestripr   r2   CalledProcessErrorrf   )r&   rf   r   full_commandr   stdout_bufferstderr_bufferer3   r2   retcodesubprocess_errorr   r   r   __call__A  sL   $



zCommandFunction.__call__)TNN)r   r   r   r   r]   Listr   rb   r_   r^   r   Dictr'   r}   r   r   r   r   r   r     s"    
r   r(   r\   )'
contextlibr   rT   r   r   r   r   r   r   r   typingr]   pathlibr   r   contextmanagerr}   Iteratorr   r   r   r   r   r   rd   r^   r   r   r   r_   r   r   ru   rn   r   r   rb   r   r   r   r   r   r   <module>   sX   E4$
*"
,