o
    rri                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ ddlmZmZ ejdddZG d	d
 d
ZG dd deje ZG dd dee ZG dd deje ZdZG dd deZG dd de jZG dd deZdS )    N)Path)	TypedDict   )loggerutilsRT)	covariantc                	   @   s
  e Zd ZdZd deddfddZdejee	f dej
e	ej
e	e	f f fd	d
Zdejeje	  fddZd!de	de	de	fddZedefddZd"ddZd!de	de	dej
e	e	f fddZd!de	de	defddZde	ddfddZd"ddZde	ddfddZdS )#InfoWatchera  An instance of this class is shared by all jobs, and is in charge of calling slurm to check status for
    all jobs at once (so as not to overload it). It is also in charge of dealing with errors.
    Cluster is called at 0s, 2s, 4s, 8s etc... in the begginning of jobs, then at least every delay_s (default: 60)

    Parameters
    ----------
    delay_s: int
        Maximum delay before each non-forced call to the cluster.
    <   delay_sreturnNc                 C   s<   || _ t | _t | _i | _d| _d| _td| _d| _	d S )N            -infr   )
_delay_sset_registered	_finished
_info_dict_output_start_timefloat_last_status_check
_num_calls)selfr    r   F/home/ubuntu/.local/lib/python3.10/site-packages/submitit/core/core.py__init__'   s   

zInfoWatcher.__init__stringc                 C      t NNotImplementedError)r   r   r   r   r   	read_info1      zInfoWatcher.read_infoc                 C   r   r    r!   r   r   r   r   _make_command4   r$   zInfoWatcher._make_commandstandardjob_idmodec                 C   r   r    r!   r   r(   r)   r   r   r   	get_state7   r$   zInfoWatcher.get_statec                 C      | j S )zNumber of calls to sacct)r   r%   r   r   r   	num_calls:      zInfoWatcher.num_callsc                 C   s,   t  | _t | _td| _i | _d| _dS )zsClears cache.
        This should hopefully not be used. If you have to use it, please add a github issue.
        r   r   N)	r   r   _timetimer   r   r   r   r   r%   r   r   r   clear?   s
   


zInfoWatcher.clearc                 C   s<   |du rt d|| jvr| | | | | j|i S )ad  Returns a dict containing info about the job.
        State of finished jobs are cached (use watcher.clear() to remove all cache)

        Parameters
        ----------
        job_id: str
            id of the job on the cluster
        mode: str
            one of "force" (forces a call), "standard" (calls regularly) or "cache" (does not call)
        Nz$Cannot call sacct without a slurm id)RuntimeErrorr   register_jobupdate_if_long_enoughr   getr*   r   r   r   get_infoI   s   


zInfoWatcher.get_infoc                 C   s"   | j ||d}g d}| |vS )a  Returns whether the job is finished.

        Parameters
        ----------
        job_id: str
            id of the job on the cluster
        mode: str
            one of "force" (forces a call), "standard" (calls regularly) or "cache" (does not call)
        r)   )READYPENDINGRUNNINGUNKNOWNREQUEUED
COMPLETING	PREEMPTED)r+   upper)r   r(   r)   state
incompleter   r   r   is_done\   s   
zInfoWatcher.is_donec                 C   sn   |dv sJ |dkrdS t  | j }t  | j }t| jtd|d }|dkr+d}||kr5|   dS dS )zUpdates if forced to, or if the delay is reached
        (Force-updates with less than 1ms delay are ignored)
        Also checks for finished jobs
        )r'   forcecacherD   N   rC   gMbP?)r/   r0   r   r   minr   maxupdate)r   r)   last_check_deltalast_job_deltarefresh_delayr   r   r   r4   j   s   z!InfoWatcher.update_if_long_enoughc              
   C   s   |   }|du r
dS |  jd7  _zt d| j dd|  tj|dd| _	W n! t
yM } zt d| j d| d	 W Y d}~nd}~ww | j| | j	 t | _| j| j }|D ]}| j|d
drt| j| qedS )z<Updates the info of all registered jobs with a call to sacctNr   zCall #z - Command  Fshellz - Bypassing sacct error z, status may be inaccurate.rD   r7   )r&   r   r   
get_loggerdebugr-   join
subprocesscheck_outputr   	Exceptionwarningr   rH   r#   r/   r0   r   r   r   rB   add)r   commandeto_checkr(   r   r   r   rH   }   s*   "
zInfoWatcher.updatec                 C   s2   t |tsJ | j| t | _td| _dS )z0Register a job on the instance for shared updater   N)	
isinstancestrr   rV   r/   r0   r   r   r   )r   r(   r   r   r   r3      s   
zInfoWatcher.register_job)r
   )r'   r   N)__name__
__module____qualname____doc__intr   tpUnionbytesr[   Dictr#   OptionalListr&   r+   propertyr-   r1   r6   boolrB   r4   rH   r3   r   r   r   r   r	      s    0

"

r	   c                	   @   s4  e Zd ZdZdZdZe ZdJdej	e
ef dedeje dd	fd
dZdKddZedefddZedejfddZedefddZdejfddZdLdeddfddZdeddfddZdLdedd	fdd Zdefd!d"Zdeje fd#d$Zdej ej	ej!ej"f  fd%d&Z#dej$eej%f fd'd(Z&dKd)d*Z'dMd,edefd-d.Z(edej e fd/d0Z)edefd1d2Z*dNd4edej+eef fd5d6Z,d7edej e fd8d9Z-dej e fd:d;Z.dej e fd<d=Z/dOd?d@Z0defdAdBZ1dKdCdDZ2dej+eej%f fdEdFZ3dGej+eej%f dd	fdHdIZ4d	S )PJobaF  Access to a cluster job information and result.

    Parameters
    ----------
    folder: Path/str
        A path to the submitted job file
    job_id: str
        the id of the cluster job
    tasks: List[int]
        The ids of the tasks associated to this job.
        If None, the job has only one task (with id = 0)
    dummy   r   folderr(   tasksr   Nc                    st   _ t|_g _d_t|dkr! fddjD _tj jd_	t
 _j_  d S )NFr   c                    s   g | ]}j  |fd qS )rn   r(   ro   	__class__).0krn   r(   r   r   r   
<listcomp>       z Job.__init__.<locals>.<listcomp>)r(   task_id)_job_idtuple_tasks	_sub_jobs_cancel_at_deletionlenr   JobPathsrx   _pathsr/   r0   r   r   _register_in_watcher)r   rn   r(   ro   r   ru   r   r      s   

zJob.__init__c                 C   s    | j d s| j| j d S d S )Nr   )r{   watcherr3   r(   r%   r   r   r   r      s   
zJob._register_in_watcherc                 C   r,   r    )ry   r%   r   r   r   r(         z
Job.job_idc                 C   r,   r    )r   r%   r   r   r   paths   r   z	Job.pathsc                 C   s   | j sdS t| j S )z&Returns the number of tasks in the Jobr   )r|   r~   r%   r   r   r   	num_tasks   s   
zJob.num_tasksc                 C   s.   | j j sJ d| j j tj| j jS )zMReturns the submitted object, with attributes `function`, `args` and `kwargs`z#Cannot find job submission pickle: )r   submitted_pickleexistsr   DelayedSubmissionloadr%   r   r   r   
submission   s
   
zJob.submissionTvaluezJob[R]c                 C   s
   || _ | S )aB  Sets whether the job deletion in the python environment triggers
        cancellation of the corresponding job in the cluster
        By default, jobs are not cancelled unless this method is called to turn the
        option on.

        Parameters
        ----------
        value: bool
            if True, the cluster job will be cancelled at the instance deletion, if False, it
            will not.

        Returns
        -------
        Job
            the current job (for chaining at submission for instance: "job = executor.submit(...).cancel_at_deletion()")
        )r}   )r   r   r   r   r   cancel_at_deletion   s   zJob.cancel_at_deletionrx   c                 C   sF   d|  kr| j k sn td| d| j d  | js| S | j| S )ae  Returns a given sub-Job (task).

        Parameters
        ----------
        task_id
            The id of the task. Must be between 0 and self.num_tasks
        Returns
        -------
        job
            The sub_job. You can call all Job methods on it (done, stdout, ...)
            If the job doesn't have sub jobs, return the job itself.
        r   ztask_id z must be between 0 and r   )r   
ValueErrorr|   )r   rx   r   r   r   task   s
   
zJob.taskcheckc                 C   s&   |rt jnt j| j| j gdd dS )zCancels the job

        Parameters
        ----------
        check: bool
            whether to wait for completion and check that the command worked
        FrM   N)rR   
check_callcall_cancel_commandr(   )r   r   r   r   r   cancel   s   
z
Job.cancelc                 C   s   |   }| jrJ d|d S )Nz4You should use `results()` if your job has subtasks.r   )resultsr|   )r   rr   r   r   result
  s   z
Job.resultc                 C   sT   |    | jrdd | jD S |  \}}|dkr'|  }|du r%td||gS )a_  Waits for and outputs the result of the submitted function

        Returns
        -------
        output
            the output of the submitted function.
            If the job has several tasks, it will return the output of every tasks in a List

        Raises
        ------
        Exception
            Any exception raised by the job
        c                 S   s   g | ]
}t t| qS r   )rb   castr   r   rs   sub_jobr   r   r   rv      s    zJob.results.<locals>.<listcomp>errorNzUnknown job exception)waitr|   _get_outcome_and_result	exceptionr2   )r   outcomer   job_exceptionr   r   r   r     s   zJob.resultsc                 C   s   |    | jrdd | jD }dd |D }|sdS |d S z|  \}}W n tjy; } z|W  Y d}~S d}~ww |dkr^td| j d| d	| j d
| j d| jj d| jj	 S dS )a  Waits for completion and returns (not raise) the
        exception containing the error log of the job

        Returns
        -------
        Exception/None
            the exception if any was raised during the job.
            If the job has several tasks, it returns the exception of the task with
            smallest id that failed.

        Raises
        ------
        UncompletedJobError
            In case the job never completed
        c                 S      g | ]}|  qS r   )r   r   r   r   r   rv   =      z!Job.exception.<locals>.<listcomp>c                 S      g | ]}|d ur|qS r    r   )rs   rX   r   r   r   rv   ?  s    Nr   r   z
Job (task=z>) failed during processing with trace:
----------------------
zA
----------------------
You can check full logs with 'job.stderr(z)' and 'job.stdout(z)'or at paths:
  - z
  - )
r   r|   r   r   UncompletedJobErrorFailedJobErrorrx   r   stderrstdout)r   all_exceptions
exceptionsr   tracerX   r   r   r   r   *  s<   
zJob.exceptionc                 C   s  | j rJ d| jj}| j}z
|| j W n ty%   |d9 }Y nw t	 }| jj
 sKt	 | |k rKtd | jj
 sKt	 | |k s8| jj
 sd| j d| j d| jj
 d| j dg}|  }|rv|d	d
|g n,| jj rtjddt| jjgdd}|d| jj d
|g n
|d| jj  td|z
t| jj
}W |S  ty   td| jj
 d td t| jj
}Y |S w )av  Getter for the output of the submitted function.

        Returns
        -------
        outcome
            the outcome of the job: either "error" or "success"
        result
            the output of the submitted function

        Raises
        ------
        UncompletedJobError
            if the job is not finished or failed outside of the job (from slurm)
        z(This should not be called for a meta-jobrE   r   zJob z (task: z) with path z$has not produced any output (state: )zError stream produced:z(----------------------------------------tailz-40zutf-8)encodingz*No error stream produced. Look at stdout: z)No output/error stream produced ! Check: 
zEOFError on file z, trying again in 2s)r|   r   rn   _results_timeout_schmodstatst_modePermissionErrorr/   r0   result_pickler   sleepr(   rx   r@   r   extendr   rR   rS   r[   appendr   r   rQ   pickle_loadEOFErrorwarningswarn)r   ptimeout
start_waitmessagelogoutputr   r   r   r   T  sF   

zJob._get_outcome_and_resultc                 C   s"   |   std |   rdS dS )zWait while no result find is found and the state is
        either PENDING or RUNNING.
        The state is checked from slurm at least every min and the result path
        every second.
        r   N)doner/   r   r%   r   r   r   r     s   
zJob.waitFforce_checkc                 C   s|   | j rtdd | j D S | jj}z
|| j W n	 ty$   Y nw | jj	 r-dS | j
j| j|r6dnddr<dS dS )a  Checks whether the job is finished.
        This is done by checking if the result file is present,
        or checking the job state regularly (at least every minute)
        If the job has several tasks, the job is done once all tasks are done.

        Parameters
        ----------
        force_check: bool
            Forces the slurm state update

        Returns
        -------
        bool
            whether the job is finished or not

        Note
        ----
        This function is not foolproof, and may say that the job is not terminated even
        if it is when the job failed (no result file, but job not running) because
        we avoid calling sacct/cinfo everytime done is called
        c                 s   s    | ]}|  V  qd S r    )r   r   r   r   r   	<genexpr>  s    zJob.done.<locals>.<genexpr>TrC   r'   r7   F)r|   allr   rn   r   r   r   OSErrorr   r   r   rB   r(   )r   r   r   r   r   r   r     s   zJob.donec                 C   s   t | jdkr	d S | jd S )Nr   r   )r~   r{   r%   r   r   r   rx     s   zJob.task_idc                 C   s   | j j| jddS )z+State of the job (does not force an update)r'   r7   )r   r+   r(   r%   r   r   r   r@     s   z	Job.staterC   r)   c                 C   s   | j j| j|dS )z9Returns informations about the job as a dict (sacct call)r7   )r   r6   r(   )r   r)   r   r   r   r6     s   zJob.get_infonamec                 C   s   | j j| j jd}||vrtd| dt|  ||  s#dS || d}| }W d   |S 1 s:w   Y  |S )zReturns a string with the content of the log file
        or None if the file does not exist yet

        Parameter
        ---------
        name: str
            either "stdout" or "stderr"
        )r   r   z	Unknown "z", available are Nr   )	r   r   r   r   listkeysr   openread)r   r   r   fr   r   r   r   _get_logs_string  s   	

zJob._get_logs_stringc                 C   @   | j rdd | j D }dd |D }|sdS d|S | dS )zoReturns a string with the content of the print log file
        or None if the file does not exist yet
        c                 S   r   r   )r   r   r   r   r   rv     r   zJob.stdout.<locals>.<listcomp>c                 S   r   r    r   rs   sr   r   r   rv         Nr   r   r|   rQ   r   )r   stdout_stdout_not_noner   r   r   r     s   

z
Job.stdoutc                 C   r   )zoReturns a string with the content of the error log file
        or None if the file does not exist yet
        c                 S   r   r   )r   r   r   r   r   rv     r   zJob.stderr.<locals>.<listcomp>c                 S   r   r    r   r   r   r   r   rv     r   Nr   r   r   )r   stderr_stderr_not_noner   r   r   r     s   

z
Job.stderrAsyncJobProxy[R]c                 C   s   t | S )zSReturns a proxy object that provides asyncio methods
        for this Job.
        )AsyncJobProxyr%   r   r   r   	awaitable  s   zJob.awaitablec              
   C   sl   d}z| j }W n ty# } zt d|  W Y d }~nd }~ww | jj d| j d| j d| dS )Nr;   zBypassing state error:
z<job_id=z
, task_id=z	, state="z">)	r@   rT   r   rO   rU   rr   r]   r(   rx   )r   r@   rX   r   r   r   __repr__  s   
 $zJob.__repr__c                 C   s0   | j r| jj| jdds| jdd d S d S d S )NrD   r7   F)r   )r}   r   rB   r(   r   r%   r   r   r   __del__  s
   zJob.__del__c                 C   r,   r    )__dict__r%   r   r   r   __getstate__  s   zJob.__getstate__r@   c                 C   s   | j | |   dS )z7Make sure jobs are registered when loaded from a pickleN)r   rH   r   )r   r@   r   r   r   __setstate__  s   zJob.__setstate__)rm   r\   )TF)rC   )r   r   )5r]   r^   r_   r`   r   r   r	   r   rb   rc   r   r[   Sequencera   r   r   rh   r(   r   r   r   r   r   r   ri   r   r   r   r   r   rg   r   rf   r   r   r   TupleAnyr   r   r   rx   r@   re   r6   r   r   r   r   r   r   r   r   r   r   r   r   rj      sH    ,
"*
6	&

"rj   c                   @   sT   e Zd ZdZdddZdedejfdd	Zd
e	ej ddfddZ
defddZdS )
DelayedJoba  
    Represents a Job that have been queue for submission by an executor,
    but hasn't yet been scheduled.
    Typically obtained by calling `ex.submit` within a `ex.batch()` context

    Trying to read the attributes of the job will, by default, fail.
    But if you passed `ex.batch(allow_implicit_submission=True)` then
    the attribute read will in fact force the job submission,
    and you'll obtain a real job instead.
    exExecutorc                 C   
   || _ d S r    )_submitit_executor)r   r   r   r   r   r     s   
zDelayedJob.__init__r   r   c                 C   sV   |dkrdS | j d }|jstd|  | jtks&J d| d|  dt| |S )Nr}   Fr   zMAccesssing job attributes is forbidden within 'with executor.batch()' contextz	Executor z didn't properly submitted z !)r   _allow_implicit_submissionsAttributeError_submit_delayed_batchrr   r   getattr)r   r   r   r   r   r   __getattr__  s   
 
zDelayedJob.__getattr__new_jobNc                 C   s(   | j dd  | j |j  |j| _d S )Nr   )r   poprH   rr   )r   r   r   r   r   _promote/  s   zDelayedJob._promotec                 C   s
   t | S r    )objectr   r%   r   r   r   r   6     
zDelayedJob.__repr__)r   r   )r]   r^   r_   r`   r   r[   rb   r   r   rj   r   r   r   r   r   r   r     s    
r   c                   @   s   e Zd Zdee fddZddejee	f ddfdd	Z
ddejee	f defd
dZddejee	f deje fddZddejee	f dejej fddZdS )r   jobc                 C   r   r    )r   )r   r   r   r   r   r   ;  r   zAsyncJobProxy.__init__r   poll_intervalr   Nc                    s.   | j  st|I dH  | j  rdS dS )z&same as wait() but with asyncio sleep.N)r   r   asyncior   r   r   r   r   r   r   >  s   
zAsyncJobProxy.waitc                       |  |I dH  | j S )a  asyncio version of the result() method.
        Wait asynchornously for the result to be available by polling the self.done() method.
        Parameters
        ----------
        poll_interval: int or float
            how often to check if the result is available, in seconds
        N)r   r   r   r   r   r   r   r   C  s   
zAsyncJobProxy.resultc                    r   )a'  asyncio version of the results() method.

        Waits asynchornously for ALL the results to be available by polling the self.done() method.

        Parameters
        ----------
        poll_interval: int or float
            how often to check if the result is available, in seconds
        N)r   r   r   r   r   r   r   r   N  s   

zAsyncJobProxy.resultsc                 #   sJ    j jdkrt fddtj jD E dH  t V  dS )aZ  awaits for all tasks results concurrently. Note that the order of results is not guaranteed to match the order
        of the tasks anymore as the earliest task coming back might not be the first one you sent.

        Returns
        -------
        an iterable of Awaitables that can be awaited on to get the earliest result available of the remaining tasks.

        Parameters
        ----------
        poll_interval: int or float
            how often to check if the result is available, in seconds

        (see https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed)
        r   c                    s"   g | ]}j |  qS r   )r   r   r   r   )rs   ir   r   r   r   rv   m  s   " z6AsyncJobProxy.results_as_completed.<locals>.<listcomp>N)r   r   r   as_completedrangeensure_futurer   r   r   r   r   results_as_completed\  s   
z"AsyncJobProxy.results_as_completed)r   )r]   r^   r_   rj   r   r   rb   rc   ra   r   r   r   rg   r   Iteratorr   Futurer   r   r   r   r   r   :  s    $*r   zInteractions with jobs are not allowed within "with executor.batch()" context (submissions/creations only happens at exit time).c                   @   sJ   e Zd ZU dZeed< eed< eed< eed< eed< eed< eed< d	S )
EquivalenceDictz@Gives the specific name of the params shared across all plugins.r   timeout_minmem_gbnodescpus_per_taskgpus_per_nodetasks_per_nodeN)r]   r^   r_   r`   r[   __annotations__r   r   r   r   r   z  s   
 r   c                	   @   s  e Zd ZU dZeZejeej  e	d< d-dej
eef dejejeejf  fddZedefd	d
Zejd.dedejd fddZd/ddZdejdef dejdejdee fddZejdejej dejeej  fddZ dejdef dej!ej dejee  fddZ"dej#ejg ef  dejee  fddZ$dejddfd d!Z%edeje& fd"d#Z'edej(e fd$d%Z)d&ejeejf dejeejf fd'd(Z*dejddfd)d*Z+ede,fd+d,Z-dS )0r   zBase job executor.

    Parameters
    ----------
    folder: Path/str
        folder for storing job submission/output and logs.
    	job_classNrn   
parametersc                 C   s4   t |  | _|d u ri n|| _d | _d| _d S )NF)r   
expanduserabsolutern   r  _delayed_batchr   )r   rn   r  r   r   r   r     s   
zExecutor.__init__r   c                 C   s*   | j }|dr|d td  }| S )Nr   )r]   endswithr~   lower)clsnr   r   r   r     s   
zExecutor.nameFallow_implicit_submissionsc              
   c   st    || _ | jdurtdg | _z%zdV  W n ty+ } z	t d |d}~ww |   W d| _dS d| _w )a  Creates a context within which all submissions are packed into a job array.
        By default the array submissions happens when leaving the context

        Parameter
        ---------
        allow_implicit_submissions: bool
            submits the current batch whenever a job attribute is accessed instead of raising an exception

        Example
        -------
        jobs = []
        with executor.batch():
            for k in range(12):
                jobs.append(executor.submit(add, k, 1))

        Raises
        ------
        AttributeError
            if trying to access a job instance attribute while the batch is not exited, and
            intermediate submissions are not allowed.
        Nz8Nesting "with executor.batch()" contexts is not allowed.zOCaught error within "with executor.batch()" context, submissions are dropped.
 )r   r
  r2   rT   r   rO   r   r   )r   r  rX   r   r   r   batch  s    


zExecutor.batchc                 C   sl   | j d usJ | j s| jstjdtd d S t| j  \}}| |}t||D ]	\}}|| q'g | _ d S )Nz>No submission happened during "with executor.batch()" context.)category)r
  r   r   r   RuntimeWarningzip_internal_process_submissionsr   )r   jobssubmissionsnew_jobsjnew_jr   r   r   r     s   

zExecutor._submit_delayed_batchfn.argskwargsc                 O   sd   t j|g|R i |}| jd urt| }| j||f n| |gd }t|tu r0td|S )Nr   zEExecutors should never return a base Job class (implementation issue))	r   r   r
  r   r   r  typerj   r2   )r   r  r  r  dsr   r   r   r   submit  s   
zExecutor.submitdelayed_submissionsc                 C      d S r    r   )r   r   r   r   r   r       z&Executor._internal_process_submissionsiterablec                    s:    fddt | D }t|dkrtd g S | |S )a  A distributed equivalent of the map() built-in function

        Parameters
        ----------
        fn: callable
            function to compute
        *iterable: Iterable
            lists of arguments that are passed as arguments to fn.

        Returns
        -------
        List[Job]
            A list of Job instances.

        Example
        -------
        a = [1, 2, 3]
        b = [10, 20, 30]
        executor.map_array(add, a, b)
        # jobs will compute 1 + 10, 2 + 20, 3 + 30
        c                    s   g | ]}t j g|R  qS r   r   r   )rs   r  r  r   r   rv     rw   z&Executor.map_array.<locals>.<listcomp>r   Received an empty job array)r  r~   r   r   r  )r   r  r#  r  r   r%  r   	map_array  s
   

zExecutor.map_arrayfnsc                 C   s2   dd |D }t |dkrtd g S | |S )a  Submit a list of job. This is useful when submiting different Checkpointable functions.
        Be mindful that all those functions will be run with the same requirements
        (cpus, gpus, timeout, ...). So try to make group of similar function calls.

        Parameters
        ----------
        fns: list of callable
            functions to compute. Those functions must not need any argument.
            Tyically those are "Checkpointable" instance whose arguments
            have been specified in the constructor, or partial functions.

        Returns
        -------
        List[Job]
            A list of Job instances.

        Example
        -------
        a_vals = [1, 2, 3]
        b_vals = [10, 20, 30]
        fns = [functools.partial(int.__add__, a, b) for (a, b) in zip (a_vals, b_vals)]
        executor.submit_array(fns)
        # jobs will compute 1 + 10, 2 + 20, 3 + 30
        c                 S   s   g | ]}t |qS r   r$  )rs   r  r   r   r   rv     s    z)Executor.submit_array.<locals>.<listcomp>r   r&  )r~   r   r   r  )r   r(  r  r   r   r   submit_array  s
   

zExecutor.submit_arrayc                 K   s&   | j dur	td| jdi | dS )zUpdate submision parameters.NzPChanging parameters within batch context "with executor.batch():" is not allowedr   )r
  r2   _internal_update_parametersr   r  r   r   r   update_parameters$  s
   
zExecutor.update_parametersc                 C   r!  r    r   r  r   r   r   _equivalence_dict,     zExecutor._equivalence_dictc                 C   s   t  S )z4Parameters that can be set through update_parameters)r   r-  r   r   r   _valid_parameters0  r.   zExecutor._valid_parametersparamsc                    sB   t t jt jttf  |    du r|S  fdd| D S )zConvert generic parameters to their specific equivalent.
        This has to be called **before** calling `update_parameters`.

        The default implementation only renames the key using `_equivalence_dict`.
        Nc                    s   i | ]\}}  |||qS r   )r5   )rs   rt   veq_dictr   r   
<dictcomp>>  rw   z0Executor._convert_parameters.<locals>.<dictcomp>)rb   r   rf   re   r[   r.  items)r   r1  r   r3  r   _convert_parameters5  s    zExecutor._convert_parametersc                 K   s   | j | dS )zUpdate submission parameters.N)r  rH   r+  r   r   r   r*  @  s   z$Executor._internal_update_parametersc                 C      dS )a  The 'score' of this executor on the current environment.

        -> -1 means unavailable
        ->  0 means available but won't be started unless asked (eg debug executor)
        ->  1 means available
        ->  2 means available and is a highly scalable executor (cluster)
        r   r   r-  r   r   r   affinityD  s   	zExecutor.affinityr    r   r\   ).r]   r^   r_   r`   rj   r  rb   Typer   r  rc   r[   r   rf   re   r   classmethodr   
contextlibcontextmanagerri   r   r  r   Callabler   r  abcabstractmethodrg   r   r   r  Iterabler'  r   r)  r,  r   r.  Setr0  r7  r*  ra   r9  r   r   r   r   r     s6   
 0
&,
2,*r   c                	       s:  e Zd ZdZ	d%dejeef dede	ddf fd	d
Z
dejej dejeej  fddZd&ddZedefddZdedeej fddZdededdfddZejdefddZejdededefddZejdedeje fddZeejd ejeef defd!d"Zededdfd#d$Z  ZS )'PicklingExecutora  Base job executor.

    Parameters
    ----------
    folder: Path/str
        folder for storing job submission/output and logs.
    max_num_timeout: int
        maximum number of timeouts after which submitit will not reschedule the job.
        Note: only callable implementing a checkpoint method are rescheduled in case
        of timeout.
    max_pickle_size_gb: float
        maximum size of pickles in GB allowed for a submission.
        Note: during a batch submission, this is the estimated sum of all pickles.
             ?rn   max_num_timeoutmax_pickle_size_gbr   Nc                    s(   t  | || _|| _d| _d| _d S )Ng?r   )superr   rF  rG  _throttling_last_job_submitted)r   rn   rF  rG  rq   r   r   r   `  s
   
zPicklingExecutor.__init__r   c                 C   s   |   }| j|r|d ndd}g }d}|D ]v}t j}tj| j	| d }|j
jddd ||| j || |rrd}t|}	| jd }
|	|
 | jkrr|  d|	 d	|
d
d| j d}|d7 }|d7 }t||   t | _| | j}|j|d || q|S )a  Submits a task to the cluster.

        Parameters
        ----------
        fn: callable
            The function to compute
        *args: any positional argument for the function
        **kwargs: any named argument for the function

        Returns
        -------
        Job
            A Job instance, providing access to the job information,
            including the output of the function once it is computed.
        r      Tz.pkl)parentsexist_okFi   @zSubmitting an estimated z x z.2fz > zGB of objects zV(function and arguments) through pickle (this can be slow / overload the file system).zbIf this is the intended behavior, you should update executor.max_pickle_size_gb to a larger value r   )r.  r  r5   uuiduuid4hexr   r   get_first_id_independent_folderrn   parentmkdirset_timeoutrF  dumpr~   r   st_sizerG  unlinkr2   	_throttler/   r0   rJ  _submit_command_submitit_command_strr   move_temporary_filer   )r   r   r4  r   r  
check_sizedelayedtmp_uuidpickle_pathnumsizemsgr   r   r   r   r  i  s4   


z.PicklingExecutor._internal_process_submissionsc                 C   s<   t  | j | jk rt | j t  | j | jk s
d S d S r    )r/   r0   rJ  rI  r   r%   r   r   r   rX    s   zPicklingExecutor._throttlec                 C   r8  )Nrk   r   r%   r   r   r   rZ    r"  z&PicklingExecutor._submitit_command_strrW   c           
      C   s   t  j}tj| jd| d }|d}|| 	|| W d   n1 s+w   Y  | 
|}tj|dd }| |}tt|  }| j| j||d}	|	jj|dd	d
 | |	j| | |	jj |	S )a  Submits a command to the cluster
        It is recommended not to use this function since the Job instance assumes pickle
        files will be created at the end of the job, and hence it will not work correctly.
        You may use a CommandFunction as argument to the submit function instead. The only
        problem with this latter solution is that stdout is buffered, and you will therefore
        not be able to monitor the logs in real time.

        Parameters
        ----------
        command: str
            a command string

        Returns
        -------
        Job
            A Job instance, providing access to the crun job information.
            Since it has no output, some methods will not be efficient
        z.submission_file_z.shwNF)verboserp   submission_fileT)keep_as_symlink)rN  rO  rP  r   r   rQ  rn   r   write_make_submission_file_text_make_submission_commandCommandFunction#_get_job_id_from_submission_commandr   r   
_num_tasksr  r   r[  _write_job_idr(   _set_job_permissions)
r   rW   r^  submission_file_pathr   command_listr   r(   	tasks_idsr   r   r   r   rY    s   


z PicklingExecutor._submit_commandr(   uidc                 C   r8  )zWrite the job id in a file named {job-independent folder}/parent_job_id_{uid}.
        This can create files read by plugins to get the job_id of the parent job
        Nr   )r   r(   rr  r   r   r   rm    s    zPicklingExecutor._write_job_idc                 C   r   )z1Returns the number of tasks associated to the jobr!   r%   r   r   r   rl    r"  zPicklingExecutor._num_tasksc                 C   r   )z~Creates the text of a file which will be created and run
        for the submission (for slurm, this is sbatch file).
        r!   )r   rW   rr  r   r   r   rh    s   z+PicklingExecutor._make_submission_file_textro  c                 C   r   )zCreate the submission command.r!   )r   ro  r   r   r   ri    r"  z)PicklingExecutor._make_submission_commandr   c                 C   r   )z=Recover the job id from the output of the submission command.r!   )r   r   r   r   rk    s   z4PicklingExecutor._get_job_id_from_submission_commandc                 C   r!  r    r   )rn   r   r   r   rn    r/  z%PicklingExecutor._set_job_permissions)rD  rE  r\   ) r]   r^   r_   r`   rb   rc   r   r[   ra   r   r   rg   r   r   rj   r   r  rX  rh   rZ  rY  rm  r?  r@  rl  rh  ri  staticmethodrd   rk  rn  __classcell__r   r   rq   r   rC  P  sB    	


-$ rC  )r?  r   r<  rR   r0   r/   typingrb   rN  r   pathlibr   typing_extensionsr    r   r   TypeVarr   r	   Genericrj   r   r   _MSGr   ABCr   rC  r   r   r   r   <module>   s2      s-; F