o
    wOiY                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZ d dlmZmZmZmZmZmZ d dlmZ d dlmZmZmZmZmZmZm Z m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z'm(Z( d dl)m*Z* e* Z+de,d	e,fd
dZ-dZ.e,e/d< G dd de j0Z1G dd de1Z2e,Z3e,Z4e,Z5eG dd dZ6G dd dZ7d#ddZ8eG dd dZ9eG dd dZ:G dd de$Z;G dd dZ<d e,d	e;fd!d"Z=dS )$    N)asdict	dataclass)datetime)IODictIterableListOptionalTuple)uuid4)AppDryRunInfoResourceApplicationAppStateNULL_RESOURCEDescribeAppResponseInvalidRunConfigExceptionSchedulerBackendRole	RunConfig	Scheduleris_terminalmacrosrunoptsNONE)
get_loggerapp_namereturnc                 C   s   |  dt t dd  S )N_-r   )strr   split)r    r"   [/home/ubuntu/.local/lib/python3.10/site-packages/torchelastic/tsm/driver/local_scheduler.pymake_unique2   s   r$   z<N/A>NAc                   @   s(   e Zd ZdZejdedefddZdS )ImageFetcherz
    Downloads and sets up an image onto the localhost. This is only needed for
    ``LocalhostScheduler`` since typically real schedulers will do this
    on-behalf of the user.
    imager   c                 C   s   t  )zi
        Pulls the given image and returns a path to the pulled image on
        the local host.
        )NotImplementedErrorselfr'   r"   r"   r#   fetch@   s   zImageFetcher.fetchN)__name__
__module____qualname____doc__abcabstractmethodr    r+   r"   r"   r"   r#   r&   9   s    r&   c                   @   s"   e Zd ZdZdedefddZdS )LocalDirectoryImageFetchera  
    Interprets the image name as the path to a directory on
    local host. Does not "fetch" (e.g. download) anything. Used in conjunction
    with ``LocalScheduler`` to run local binaries.

    The image name must be an absolute path and must exist.

    Example:

    1. ``fetch(Image(name="/tmp/foobar"))`` returns ``/tmp/foobar``
    2. ``fetch(Image(name="foobar"))`` raises ``ValueError``
    2. ``fetch(Image(name="/tmp/dir/that/does/not_exist"))`` raises ``ValueError``
    r'   r   c                 C   s<   t j|std| dt j|std| d|S )z
        Raises:
            ValueError - if the image name is not an absolute dir
                         and if it does not exist or is not a directory

        zInvalid image name: z%, image name must be an absolute pathz&, does not exist or is not a directory)ospathisabs
ValueErrorisdirr)   r"   r"   r#   r+   X   s   

z LocalDirectoryImageFetcher.fetchN)r,   r-   r.   r/   r    r+   r"   r"   r"   r#   r2   I   s    r2   c                   @   sr   e Zd ZU dZeed< eed< ejed< e	e
 ed< e	e
 ed< eed< dd
dZdefddZdefddZd	S )_LocalReplicazD
    Contains information about a locally running role replica.
    	role_name
replica_idprocstdoutstderr
error_filer   Nc                 C   s<   | j   | j   | jr| j  | jr| j  dS dS )z
        terminates the underlying process for this replica
        closes stdout and stderr file handles
        safe to call multiple times
        N)r;   	terminatewaitr<   closer=   r*   r"   r"   r#   r?      s   


z_LocalReplica.terminatec                 C   s   | j  d u S N)r;   pollrB   r"   r"   r#   is_alive   s   z_LocalReplica.is_alivec                 C   s   |   rdS | jjdkS )NFr   )rE   r;   
returncoderB   r"   r"   r#   failed   s   z_LocalReplica.failedr   N)r,   r-   r.   r/   RoleName__annotations__int
subprocessPopenr	   r   r    r?   boolrE   rG   r"   r"   r"   r#   r8   r   s   
 

r8   c                   @   s   e Zd ZdZdedefddZdededd	fd
dZdedd	fddZ	dddZ
dee fddZdefddZdddZdd Zd	S )_LocalApplicationz
    Container object used by ``LocalhostScheduler`` to group the pids that
    form an application. Each replica of a role in the application is a
    process and has a pid.
    idlog_dirc                 C   s$   || _ || _i | _tj| _d| _d S )N)rP   rQ   role_replicasr   PENDINGstatelast_updated)r*   rP   rQ   r"   r"   r#   __init__   s
   
z_LocalApplication.__init__r9   replicar   Nc                 C   s   | j |g }|| d S rC   )rS   
setdefaultappend)r*   r9   rX   procsr"   r"   r#   add_replica   s   z_LocalApplication.add_replicarU   c                 C   s   t   | _|| _d S rC   )timerV   rU   )r*   rU   r"   r"   r#   	set_state   s   

z_LocalApplication.set_statec                 C   s&   | j  D ]}|D ]}|  q	qdS )z
        terminates all procs associated with this app,
        and closes any resources (e.g. log file handles)
        safe to call multiple times
        N)rS   valuesr?   )r*   replicasrr"   r"   r#   r?      s
   
z_LocalApplication.terminatec                 C   sX   d }t j}| j D ]}|D ]}tj|jsqtj|j}||k r(|}|j}qq
|S rC   )	sysmaxsizerS   r_   r3   r4   existsr>   getmtime)r*   r>   min_timestampr`   rX   mtimer"   r"   r#   _get_error_file   s   z!_LocalApplication._get_error_filec                 C   sN   |   }|stS t|d}tt|W  d    S 1 s w   Y  d S )Nra   )rh   r   openjsondumpsload)r*   r>   fr"   r"   r#   get_structured_error_msg   s   $z*_LocalApplication.get_structured_error_msgc              	   C   s   |    dtt fdd}i }| j D ])\}}g }|D ]}|j|jj|jj||j	||j
|jd}|| q|||< q| j| j| jj| j|d}tj|dd}	ttj| jdd	}
|
|	 W d
   n1 smw   Y  td| j d|	  d
S )aA  
        terminates all procs associated with this app,
        and closes any resources (e.g. log file handles)
        and if log_dir has been specified,
        writes a SUCCESS file indicating that the log files
        have been flushed and closed and ready to read.
        NOT safe to call multiple times!
        std_ioc                 S   s   | r| j S dS )Nz	<CONSOLE>)name)ro   r"   r"   r#   _fmt_io_filename   s   z1_LocalApplication.close.<locals>._fmt_io_filename)r:   pidexitcoder<   r=   r>   )app_idrQ   final_staterV   roles   )indentSUCCESSwNzSuccessfully closed app_id: z.
)r?   r	   r   rS   itemsr:   r;   rr   rF   r<   r=   r>   rZ   rP   rQ   rU   rp   rV   rj   rk   ri   r3   r4   joinwriteloginfo)r*   rq   
roles_infor9   r`   replicas_inforX   replica_infoapp_infoinfo_strfpr"   r"   r#   rA      s4   		
z_LocalApplication.closec                 C   sX   i }| j  D ]\}}||g }|D ]	}||jj qqd| j d| j d| dS )Nz{app_id:z, state:z
, pid_map:})rS   r{   rY   rZ   r;   rr   rP   rU   )r*   role_to_pidr9   r`   pidsra   r"   r"   r#   __repr__  s   z_LocalApplication.__repr__rH   )r,   r-   r.   r/   r    rW   r8   r\   r   r^   r?   r	   rh   rn   rA   r   r"   r"   r"   r#   rO      s    


0rO   c                  C   s    t d} d}| |tj dS )z
    Sets PR_SET_PDEATHSIG to ensure a child process is
    terminated appropriately.

    See http://stackoverflow.com/questions/1884941/ for more information.
    For libc.so.6 read http://www.linux-m68k.org/faq/glibcinfo.html
    z	libc.so.6   N)ctypesCDLLprctlsignalSIGTERM)libcPR_SET_PDEATHSIGr"   r"   r#   _pr_set_pdeathsig  s   
r   c                   @   sF   e Zd ZU dZee ed< eeef ed< ee ed< ee ed< dS )ReplicaParamzS
    Holds ``LocalScheduler._popen()``parameters for each replica of the role.
    argsenvr<   r=   N)	r,   r-   r.   r/   r   r    rJ   r   r	   r"   r"   r"   r#   r     s   
 r   c                   @   sJ   e Zd ZU dZeed< eed< eee	e
 f ed< eee	e f ed< dS )PopenRequestzf
    Holds parameters to create a subprocess for each replica of each role
    of an application.
    rt   rQ   role_paramsrole_log_dirsN)r,   r-   r.   r/   AppIdrJ   r    r   rI   r   r   r"   r"   r"   r#   r   &  s   
 r   c                       s~  e Zd ZdZd5dedef fddZdefdd	Zd
e	de
ddfddZdeeef fddZdedefddZdefddZdee fddZdedededefddZdededeeef fdd Zd!edefd"d#Zd
e	dedee fd$d%Zd
e	dedefd&d'Zdedee  fd(d)Z!	*			d6deded+ed,ee d-ee" d.ee" de#fd/d0Z$deddfd1d2Z%d3d4 Z&  Z'S )7LocalSchedulera}  
    Schedules on localhost. Containers are modeled as processes and
    certain properties of the container that are either not relevant
    or that cannot be enforced for localhost
    runs are ignored. Properties that are ignored:

    1. Resource requirements
    2. Container limit enforcements
    3. Retry policies
    4. Retry counts (no retries supported)
    5. Deployment preferences

    ..note:: Use this scheduler sparingly since an application
             that runs successfully on a session backed by this
             scheduler may not work on an actual production cluster
             using a different scheduler.
    d   session_name
cache_sizec                    s,   t  | i | _|dkrtd|| _d S )Nr   z$cache size must be greater than zero)superrW   _appsr6   _cache_size)r*   r   r   	__class__r"   r#   rW   J  s
   
zLocalScheduler.__init__r   c                 C   s.   t  }|jdtddd |jdtd dd |S )Nimage_fetcherzimage fetcher typedir)type_helpdefaultrQ   z0dir to write stdout/stderr log files of replicas)r   r   r   )r   addr    )r*   optsr"   r"   r#   run_optsT  s   zLocalScheduler.run_optsapp	schedulerNc                 C   s   d S rC   r"   )r*   r   r   r"   r"   r#   	_validate_  s   zLocalScheduler._validatec                 C   s
   dt  iS )Nr   )r2   rB   r"   r"   r#   _img_fetchersc  s   
zLocalScheduler._img_fetcherscfgc                 C   sF   | d}|  }| |d }|s!td| d|  ||  |S )Nr   z Unsupported image fetcher type: z. Must be one of: )getr   r   keysr   )r*   r   img_fetcher_typefetchersimg_fetcherr"   r"   r#   _get_img_fetcherf  s   
zLocalScheduler._get_img_fetcherc                 C   st   t j}d}| j D ]\}}t|jr|j|kr|}q
|r,| j|= td| d dS tdt	| j d dS )a7  
        Evicts one least recently used element from the apps cache. LRU is defined as
        the oldest app in a terminal state (e.g. oldest finished app).

        Returns:
            ``True`` if an entry was evicted, ``False`` if no entries could be evicted
            (e.g. all apps are running)
        Nzevicting app: z, from local scheduler cacheTzno apps evicted, all z apps are runningF)
rb   rc   r   r{   r   rU   rV   r~   debuglen)r*   lru_time
lru_app_idrt   r   r"   r"   r#   
_evict_lrus  s   	

zLocalScheduler._evict_lrufilec                 C   sF   |sdS t j|rtd| dt jt j|dd t|ddS )z
        Given a file name, opens the file for write and returns the IO.
        If no file name is given, then returns ``None``
        Raises a ``FileExistsError`` if the file is already present.
        Nz
log file: zT already exists, specify a different log_dir, app_name, or remove the file and retryT)exist_okrz   )mode)r3   r4   isfileFileExistsErrormakedirsdirnameri   )r*   r   r"   r"   r#   _get_file_io  s   
zLocalScheduler._get_file_ior9   r:   replica_paramsc           
      C   s   |  |j}|  |j}tj }||j |d }tj	t
|ddd}td| d| d|  tj|j|||td}	t|||	|||d	S )
z
        Same as ``subprocess.Popen(**popen_kwargs)`` but is able to take ``stdout`` and ``stderr``
        as file name ``str`` rather than a file-like obj.
        TORCHELASTIC_ERROR_FILErw   P   rx   widthzRunning z
 (replica z):
 )r   r   r<   r=   
preexec_fn)r<   r=   r>   )r   r<   r=   r3   environcopyupdater   pprintpformatr   r~   r   rL   rM   r   r   r8   )
r*   r9   r:   r   stdout_stderr_r   r>   	args_pfmtr;   r"   r"   r#   _popen  s,   
zLocalScheduler._popenrt   c                 C   s<   | d}d}|stjdd}d}tjt|| j||fS )a-  
        Returns the log dir and a bool (should_redirect_std). We redirect stdout/err
        to a log file ONLY if the log_dir is user-provided in the cfg

        1. if cfg.get("log_dir") -> (user-specified log dir, True)
        2. if not cfg.get("log_dir") -> (autogen tmp log dir, False)
        rQ   Ttsm_)prefixF)r   tempfilemkdtempr3   r4   r|   r    r   )r*   rt   r   base_log_dirredirect_stdr"   r"   r#   _get_app_log_dir  s   
	zLocalScheduler._get_app_log_dirdryrun_infoc                 C   s   t | j| jkr|  std| j d|j}|j}|j}|| jvs'J dt	| t
||}|j D ]/}|j| }|j| }tt |D ]}	||	 }
||	 }t	| | ||	|
}||| qHq6|| j|< |S )NzApp cache size (z#) exceeded. Increase the cache sizez7no app_id collisons expected since uuid4 suffix is used)r   r   r   r   
IndexErrorrequestrt   rQ   r3   r   rO   r   r   r   ranger   r\   )r*   r   r   rt   app_log_dir	local_appr9   r   r   r:   r   replica_log_dirrX   r"   r"   r#   schedule  s2   





zLocalScheduler.schedulec                 C   s   |  ||}t|dd S )Nc                 S   s   t j| dddS )Nrw   r   r   )r   r   )pr"   r"   r#   <lambda>  s    z/LocalScheduler._submit_dryrun.<locals>.<lambda>)_to_popen_requestr   )r*   r   r   r   r"   r"   r#   _submit_dryrun  s   zLocalScheduler._submit_dryrunc              
   C   s$  t |j}| |}| ||\}}i }i }|jD ]q}	||	jg }
||	jg }|	j}||j}t	j
||	j}t|	jD ]J}|gt|	j||t| }t	j
||	jt|}dt	j
|di|	j}d}d}|rzt	j
|d}t	j
|d}|
t|||| || q?qt||||S )zK
        Converts the application and cfg into a ``PopenRequest``.
        r   z
error.jsonNz
stdout.log
stderr.log)r$   rp   r   r   rv   rY   	containerr+   r'   r3   r4   r|   
entrypointr   num_replicasr   
substituter   r    r   rZ   r   r   )r*   r   r   rt   r   r   r   r   r   roler   replica_log_dirsr   img_rootcmdr:   r   r   env_varsr<   r=   r"   r"   r#   r     s>   
	

z LocalScheduler._to_popen_requestc           
      C   s   || j vrd S | j | }| }t|jr|j}n1d}d}|j D ]}|D ]}|| O }|| O }q&q"|r<tj	}n	|rBtj
}ntj}|| t|jrS|  t }	||	_||	_||	_d|	_d|j |	_|	S )NFr   zfile://)r   rn   r   rU   rS   r_   rE   rG   r   RUNNINGFAILED	SUCCEEDEDr^   rA   r   rt   structured_error_msgnum_restartsrQ   ui_url)
r*   rt   r   r   rU   runningrG   r`   ra   respr"   r"   r#   describe*  s8   




zLocalScheduler.describer   kregexsinceuntilc           	      C   sb   |s|r	t d | j| }tj|j|t|d}tj|s(t	d| dt
||p-d|| S )NzxSince and/or until times specified for LocalScheduler.log_iter. These will be ignored and all log lines will be returnedr   app: zU was not configured to log into a file. Did you run it with log_dir set in RunConfig?z.*)warningswarnr   r3   r4   r|   rQ   r    r   RuntimeErrorLogIterator)	r*   rt   r9   r   r   r   r   r   log_filer"   r"   r#   log_iterO  s   	

zLocalScheduler.log_iterc                 C   s   | j | }|  tj|_d S rC   )r   rA   r   	CANCELLEDrU   )r*   rt   r   r"   r"   r#   _cancel_existingi  s   
zLocalScheduler._cancel_existingc                 C   s0   | j  D ]\}}td|  |  qd S )NzTerminating app: )r   r{   r~   r   r?   )r*   rt   r   r"   r"   r#   __del__o  s   
zLocalScheduler.__del__)r   )r   NNN)(r,   r-   r.   r/   r    rK   rW   r   r   r   r   r   r   r&   r   r   r   rN   r   r	   r   rI   r   r8   r   r
   r   r   r   r   r   r   r   r   r   r   r  r  r  __classcell__r"   r"   r   r#   r   7  sp    

%

0)
r   c                   @   s>   e Zd ZdedededefddZdd Zd	d
 Zdd ZdS )r   rt   r   r  r   c                 C   s.   || _ t|| _|| _d | _|| _d| _d S )NF)_app_idrecompile_regex	_log_file_log_fp
_scheduler_app_finished)r*   rt   r   r  r   r"   r"   r#   rW   w  s   
zLogIterator.__init__c                 C   s0   | j | j}|rt|jrd| _d S d| _d S )NTF)r  r   r  r   rU   r  )r*   descr"   r"   r#   _check_finished  s   

zLogIterator._check_finishedc                 C   sV   	 |    tj| jrt| jd| _	 | S | jr%td| j	 d| j t
d q)NTra   r   z finished without writing: r   )r  r3   r4   r   r  ri   r  r  r   r  r]   sleeprB   r"   r"   r#   __iter__  s   	
zLogIterator.__iter__c                 C   sX   	 | j  }|s| jr| j   t td |   n|d}t	
| j|r+|S q)NTr   
)r  readliner  rA   StopIterationr]   r  r  rstripr  matchr
  )r*   liner"   r"   r#   __next__  s   




zLogIterator.__next__N)	r,   r-   r.   r    r   rW   r  r  r  r"   r"   r"   r#   r   v  s    

	r   r   c                 K   s   t | |dddS )Nr   r   )r   r   )r   r   )r   kwargsr"   r"   r#   create_scheduler  s   
r  rH   )>r0   r   rj   r3   r   r  r   rL   rb   r   r]   r   dataclassesr   r   r   typingr   r   r   r   r	   r
   uuidr   torchelastic.tsm.driver.apir   r   r   r   r   r   r   r   r   r   r   r   r   r   r   torchelastic.utils.loggingr   r~   r    r$   r%   rJ   ABCr&   r2   r   AppNamerI   r8   rO   r   r   r   r   r   r  r"   r"   r"   r#   <module>   sN   
	 D$'
s  A9