o
    wOi                     @   s  U d dl Z d dlZd dlZd dlmZmZmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZmZmZmZmZmZmZmZ dZdZd	ZeZeG d
d dZeddddZeed< dZ eed< eG dd dZ!dZ"eed< e!e"dZ#e!ed< G dd dZ$G dd dee	Z%eG dd dZ&G dd de&Z'eG dd  d Z(G d!d" d"e)e	Z*e*j+e*j,e*j-gZ.d#e*d$e/fd%d&Z0d'Z1eed(< eG d)d* d*Z2eG d+d, d,Z3eG d-d. d.Z4eee)e5e/df Z6ed/d0G d1d2 d2Z7ed3Z8G d4d5 d5ee8 Z9G d6d7 d7Z:G d8d9 d9e;Z<G d:d; d;e j=Z>G d<d= d=e;Z?G d>d? d?e;Z@G d@dA dAe;ZAeZBG dBdC dCe;ZCdDedEedFefdGdHZDdIeBd$eeeef fdJdKZEG dLdM dMe j=ZFdS )N    N)asdict	dataclassfield)datetime)Enum)Template)AnyCallableDictGenericIterableListOptionalTupleTypeTypeVarUnionzS
State: ${state} ; Num Restarts: ${num_restarts}
Msg: ${msg}
Replicas: ${replicas}
z 
- Role: [${role}]:
${replicas}
z~
- [${role}:${replica_id}]
  Timestamp: ${timestamp}; Exit Code: ${exit_code}
  State: ${state}
  Error Message: ${error_msg}
c                   @   sR   e Zd ZU dZeed< eed< eed< eedZe	e
ef ed< eddd	Zd
S )Resourcea  
    Represents resource requirements for a ``Container``.

    Args:
            cpu: number of cpu cores (note: not hyper threads)
            gpu: number of gpus
            memMB: MB of ram
            capabilities: additional hardware specs (interpreted by scheduler)
    cpugpumemMBdefault_factorycapabilitiesoriginalc                 K   s*   t | j}|| t| j| j| j|dS )z
        Copies a resource and applies new capabilities. If the same capabilities
        are present in the original resource and as parameter, the one from parameter
        will be used.
        )r   r   r   r   )dictr   updater   r   r   r   )r   r   res_capabilities r   O/home/ubuntu/.local/lib/python3.10/site-packages/torchelastic/tsm/driver/api.pycopyF   s   

zResource.copyN)r   r   )__name__
__module____qualname____doc__int__annotations__r   r   r   r
   strr   staticmethodr    r   r   r   r   r   5   s   
 
r   )r   r   r   NULL_RESOURCEallALLc                   @   sd   e Zd ZU dZeed< eZeed< e	e
dZeeef ed< dedd fddZd	edd fd
dZdS )	ContaineraS  
    Represents the specifications of the container that instances of ``Roles``
    run on. Maps to the container abstraction that the underlying scheduler
    supports. This could be an actual container (e.g. Docker) or a physical
    instance depending on the scheduler.

    An ``image`` is a software bundle that is installed on a ``Container``.
    The container on the scheduler dictates what an image actually is.
    An image could be as simple as a tar-ball or map to a docker image.
    The scheduler typically knows how to "pull" the image given an
    image name (str), which could be a simple name (e.g. docker image) or a url
    (e.g. s3://path/my_image.tar).

    A ``Resource`` can be bound to a specific scheduler backend or ``SchedulerBackend.ALL`` (default)
    to specify that the same ``Resource`` is to be used for all schedulers.

    Usage:

    ::

     # define resource for all schedulers
     my_container = Container(image="pytorch/torch:1")
                       .require(Resource(cpu=1, gpu=1, memMB=500))
                       .ports(tcp_store=8080, tensorboard=8081)

     # define resource for a specific scheduler
     my_container = Container(image="pytorch/torch:1")
                       .require(Resource(cpu=1, gpu=1, memMB=500), "custom_scheduler")
                       .ports(tcp_store=8080, tensorboard=8081)

    image	resourcesr   port_mapreturnc                 C   
   || _ | S )z>
        Sets resource requirements on the container.
        )r/   )selfr/   r   r   r   require   s   zContainer.requirekwargsc                 K   s   | j i | | S )z7
        Adds a port mapping for the container
        )r0   r   )r3   r5   r   r   r   ports   s   zContainer.portsN)r!   r"   r#   r$   r'   r&   r*   r/   r   r   r   r0   r
   r%   r4   r6   r   r   r   r   r-   ^   s   
  r-   z	<MISSING>MISSING)r.   NULL_CONTAINERc                	   @   s>   e Zd ZdZdZdZdZedee	 de	de	de	fd	d
Z
dS )macrosap  
    Defines macros that can be used with ``Role.entrypoint`` and ``Role.args``.
    The macros will be substituted at runtime to their actual values.

    Available macros:

    1. ``img_root`` - root directory of the pulled image on the container
    2. ``app_id`` - application id as assigned by the scheduler
    3. ``replica_id`` - unique id for each instance of a replica of a Role,
                        for instance a role with 3 replicas could have the 0, 1, 2
                        as replica ids. Note that when the container fails and is
                        replaced, the new container will have the same ``replica_id``
                        as the one it is replacing. For instance if node 1 failed and
                        was replaced by the scheduler the replacing node will also
                        have ``replica_id=1``.

    Example:

    ::

     # runs: hello_world.py --app_id ${app_id}
     trainer = Role(name="trainer").runs("hello_world.py", "--app_id", macros.app_id)
     app = Application("train_app").of(trainer)
     app_handle = session.run(app, scheduler="local", cfg=RunConfig())

    z${img_root}z	${app_id}z${replica_id}argsimg_rootapp_id
replica_idc                 C   s0   g }| D ]}t |j|||d}|| q|S )N)r;   r<   r=   )r   safe_substituteappend)r:   r;   r<   r=   args_subargsubr   r   r   
substitute   s   zmacros.substituteN)r!   r"   r#   r$   r;   r<   r=   r(   r   r'   rC   r   r   r   r   r9      s    $r9   c                   @   s   e Zd ZdZdZdZdS )RetryPolicya  
    Defines the retry policy for the ``Roles`` in the ``Application``.
    The policy defines the behavior when the role replica encounters a failure:

    1. unsuccessful (non zero) exit code
    2. hardware/host crashes
    3. preemption
    4. eviction

    .. note:: Not all retry policies are supported by all schedulers.
              However all schedulers must support ``RetryPolicy.APPLICATION``.
              Please refer to the scheduler's documentation for more information
              on the retry policies they support and behavior caveats (if any).

    1. REPLICA: Replaces the replica instance. Surviving replicas are untouched.
                Use with ``ElasticRole`` to have torchelastic coordinate restarts
                and membership changes. Otherwise, it is up to the application to
                deal with failed replica departures and replacement replica admittance.
    2. APPLICATION: Restarts the entire application.

    REPLICAAPPLICATIONN)r!   r"   r#   r$   rE   rF   r   r   r   r   rD      s    rD   c                   @   s   e Zd ZU dZeed< eZeed< ee	dZ
ee ed< eedZeeef ed< eZeed< dZeed	< d
Zeed< ejZeed< dedededd fddZdedd fddZdedd fddZdededd fddZdS )Rolea  
    A set of nodes that perform a specific duty within the ``Application``.
    Examples:

    1. Distributed data parallel app - made up of a single role (trainer).

    2. App with parameter server - made up of multiple roles (trainer, ps).

    Usage:

    ::

     trainer = Role(name="trainer")
                 .runs("my_trainer.py", "--arg", "foo", ENV_VAR="FOOBAR")
                 .on(container)
                 .replicas(4)

    Args:
            name: name of the role
            entrypoint: command (within the container) to invoke the role
            args: commandline arguments to the entrypoint cmd
            env: environment variable mappings
            container: container to run in
            replicas: number of container replicas to run
            max_retries: max number of retries before giving up
            retry_policy: retry behavior upon replica failures
            deployment_preference: hint to the scheduler on how to best
                                   deploy and manage replicas of this role

    name
entrypointr   r:   env	container   num_replicasr   max_retriesretry_policyr5   r1   c                 O   s,   || _ |  jg |7  _| ji | | S N)rI   r:   rJ   r   r3   rI   r:   r5   r   r   r   runs
  s   z	Role.runsc                 C   r2   rP   )rK   )r3   rK   r   r   r   on     zRole.onreplicasc                 C   r2   rP   )rM   )r3   rU   r   r   r   rU     rT   zRole.replicasc                 C   s   || _ || _| S rP   )rO   rN   )r3   rO   rN   r   r   r   with_retry_policy  s   zRole.with_retry_policyN)r!   r"   r#   r$   r'   r&   r7   rI   r   listr:   r   r   rJ   r
   r8   rK   r-   rM   r%   rN   rD   rF   rO   rR   rS   rU   rV   r   r   r   r   rG      s   
 rG   c                       s@   e Zd ZdZdef fddZdedededd fd	d
Z  ZS )ElasticRoleac  
    A ``Role`` for which the user provided ``entrypoint`` is executed with the
    torchelastic agent (in the container). Note that the torchelastic agent
    invokes multiple copies of ``entrypoint``.

    For more information about torchelastic see
    `torchelastic quickstart docs <http://pytorch.org/elastic/0.2.0/quickstart.html>`__.

    .. important:: It is the responsibility of the user to ensure that the
                   container's image includes torchelastic. Since TSM has no
                   control over the build process of the image, it cannot
                   automatically include torchelastic in the container's image.

    The following example launches 2 ``replicas`` (nodes) of an elastic ``my_train_script.py``
    that is allowed to scale between 2 to 4 nodes. Each node runs 8 workers which are allowed
    to fail and restart a maximum of 3 times.

    .. warning:: ``replicas`` MUST BE an integer between (inclusive) ``nnodes``. That is,
                   ``ElasticRole("trainer", nnodes="2:4").replicas(5)`` is invalid and will
                   result in undefined behavior.

    ::

     elastic_trainer = ElasticRole("trainer", nproc_per_node=8, nnodes="2:4", max_restarts=3)
                        .runs("my_train_script.py", "--script_arg", "foo", "--another_arg", "bar")
                        .on(container)
                        .replicas(2)
     # effectively runs:
     #    python -m torchelastic.distributed.launch
     #        --nproc_per_node 8
     #        --nnodes 2:4
     #        --max_restarts 3
     #        my_train_script.py --script_arg foo --another_arg bar

    rH   c                    s   t  j|d d| _|  jddg7  _g | _|dd |dtj |d| | D ]%\}}t	|t
rD|rC|  jd	| g7  _q-|  jd	| t|g7  _q-d S )
NrH   pythonz-mztorchelastic.distributed.launchrdzv_backendetcdrdzv_idrolez--)super__init__rI   r:   torchelastic_launch_args
setdefaultr9   r<   items
isinstanceboolr'   )r3   rH   launch_kwargsrA   val	__class__r   r   r`   C  s   
zElasticRole.__init__rI   r:   r5   r1   c                 O   s`   t j|s|tjst jtj|}|  j| j7  _|  j|g|7  _| j	
i | | S rP   )ospathisabs
startswithr9   r;   joinr:   ra   rJ   r   rQ   r   r   r   rR   U  s   zElasticRole.runs)r!   r"   r#   r$   r'   r`   rR   __classcell__r   r   rh   r   rX     s    $"rX   c                   @   sB   e Zd ZU dZeed< eedZe	e
 ed< de
dd fddZdS )	Applicationz
    Represents a distributed application made up of multiple ``Roles``.
    Contains the necessary information for the driver to submit this
    app to the scheduler.
    rH   r   rolesr1   c                 G   s   |  j g |7  _ | S rP   )rq   )r3   rq   r   r   r   ofk  s   zApplication.ofN)r!   r"   r#   r$   r'   r&   r   rW   rq   r   rG   rr   r   r   r   r   rp   `  s
   
 rp   c                   @   s:   e Zd ZdZdZdZdZdZdZdZ	dZ
d	efd
dZdS )AppStatea  
    State of the application. An application starts from an initial
    ``UNSUBMITTED`` state and moves through ``SUBMITTED``, ``PENDING``,
    ``RUNNING`` states finally reaching a terminal state:
    ``SUCCEEDED``,``FAILED``, ``CANCELLED``.

    If the scheduler supports preemption, the app moves from a ``RUNNING``
    state to ``PENDING`` upon preemption.

    If the user stops the application, then the application state moves
    to ``STOPPED``, then to ``CANCELLED`` when the job is actually cancelled
    by the scheduler.

    1. UNSUBMITTED - app has not been submitted to the scheduler yet
    2. SUBMITTED - app has been successfully submitted to the scheduler
    3. PENDING - app has been submitted to the scheduler pending allocation
    4. RUNNING - app is running
    5. SUCCEEDED - app has successfully completed
    6. FAILED - app has unsuccessfully completed
    7. CANCELLED - app was cancelled before completing
    rL                   @   r1   c                 C      | j S rP   rY   r3   r   r   r   __str__     zAppState.__str__N)r!   r"   r#   r$   UNSUBMITTED	SUBMITTEDPENDINGRUNNING	SUCCEEDEDFAILED	CANCELLEDr'   r|   r   r   r   r   rs   p  s    rs   stater1   c                 C   s   | t v S rP   )_TERMINAL_STATES)r   r   r   r   is_terminal  s   r   z<NONE>NONEc                   @   sh   e Zd ZU dZeed< eed< eed< dZe	e ed< dZ
e	e ed< dZe	e ed< d	efd
dZdS )RoleReplicaStatusa  
    The status of the replica during the job execution.

    Args:
        replica_id: The node rank, note: this is not a worker rank.
        state: The current state of the node.
        exit_code: `None`` if still running
        role: The role name
        end_time: Timestamp value if the node finished execution, None otherwise
        error_msg: Error message if any, None if job succeeded.
    r=   r   r^   N	exit_codeend_time	error_msgr1   c                 C   s&   t tj| j| j| j| j| j| jdS )z>
        Return human readable status representation.
        )	timestampr=   r   r   r^   r   )	r   _REPLICA_FORMAT_TEMPLATErC   r   r=   r   r   r^   r   r{   r   r   r   get_formatted_str  s   z#RoleReplicaStatus.get_formatted_str)r!   r"   r#   r$   r%   r&   rs   r'   r   r   r   r   r   r   r   r   r   r     s   
 r   c                   @   s   e Zd ZU dZeed< dZeed< dZe	ed< e
Ze	ed< dZee	 ed	< eed
Zee	ee f ed< defddZdd Z	ddedee	ee f fddZddede	fddZdS )	AppStatusa  
    The runtime status of the ``Application``. The scheduler can
    return an arbitrary text message (msg field).
    If any error occurs, scheduler can populate ``structured_error_msg``
    with json response.

    ``replicas`` represent the statuses of the replicas in the job. If the job
    runs with multiple retries, the parameter will contain the statuses of the
    most recent retry. Note: if the previous retries failed, but the most recent
    retry succeeded or in progress, ``replicas`` will not contain ocurred errors.
    r   r   num_restarts msgstructured_error_msgNui_urlr   rU   r1   c                 C   s
   t | jS rP   )r   r   r{   r   r   r   r        
zAppStatus.is_terminalc                 C   s@   t | }|d}|tkrt|}nt}||d< tj|ddS )Nr   rt   )indent)r   popr   jsonloadsdumps)r3   app_status_dictr   structured_error_msg_parsedr   r   r   __repr__  s   
zAppStatus.__repr__   state_mask_filterc                    s2   i }| j  D ]\}} fdd|D ||< q|S )Nc                    s    g | ]}|j j B  kr|qS r   )r   value.0replicar   r   r   
<listcomp>  s
    z0AppStatus._get_role_replicas.<locals>.<listcomp>)rU   rc   )r3   r   filterred_replicasr^   role_replicasr   r   r   _get_role_replicas  s   
zAppStatus._get_role_replicasc                 C   st   d}|  |}| D ] \}}t|dkrqddd |D }|ttj||d7 }qttj| j| j	| j
|dS )zJ
        Return a human readable representation of the AppStatus.
        r   r   c                 s   s    | ]}|  V  qd S rP   )r   r   r   r   r   	<genexpr>  s    
z.AppStatus.get_formatted_str.<locals>.<genexpr>)r^   rU   )r   r   r   rU   )r   rc   lenrn   r   _ROLE_REPLICA_FORMAT_TEMPLATErC   _APP_STATUS_FORMAT_TEMPLATEr   r   r   )r3   r   r   r   r^   filterred_role_replicasreplicas_strr   r   r   r     s"   



zAppStatus.get_formatted_str)r   )r!   r"   r#   r$   rs   r&   r   r%   r   r'   r   r   r   r   r   r   rU   r
   r   r   re   r   r   r   r   r   r   r   r   r     s"   
 
r   c                   @   s   e Zd ZU dZdZeed< ejZ	eed< dZ
eed< eZeed< eZeed< d	Zee ed
< eedZee ed< eedZeeee f ed< d	S )DescribeAppResponsea  
    Response object returned by ``Scheduler.describe(app)`` API. Contains
    the status and description of the application as known by the scheduler.
    For some schedulers implementations this response object has necessary
    and sufficient information to recreate an ``Application`` object in the
    absence of the hosting ``Session``. For these types of schedulers,
    the user can re-``run()`` the attached application. Otherwise the user
    can only call non-creating methods (e.g. ``wait()``, ``status()``, etc).

    Since this class is a data class and contains many member variables we
    keep the usage simple and provide a no-args constructor and chose to
    access the member vars directly rather than provide accessors.

    If scheduler returns arbitrary message, the ``msg`` field should be populated.
    If scheduler returns a structured json, the ``structured_error_msg`` field should be populated.
    z	<NOT_SET>r<   r   r)   r   r   r   Nr   r   rq   rU   )r!   r"   r#   r$   r<   r'   r&   rs   r~   r   r   r%   r   r   r   r   r   r   rW   rq   r   rG   r   rU   r
   r   r   r   r   r   r     s   
 "r   T)frozenc                   @   sX   e Zd ZU dZeedZeee	f e
d< dede	fddZded	e	fd
dZdd ZdS )	RunConfiga  
    Additional run configs for the app. These are typically
    scheduler runtime configs/arguments that do not bind
    to ``Application`` nor the ``Scheduler``. For example
    a particular cluster (within the scheduler) the application
    should be submitted to. Since the same app can be launched
    into multiple types of clusters (devo, prod) the
    cluster id config does not bind to the app. Neither
    does this bind to the scheduler since the cluster can
    be partitioned by size of the instances (S, M, L) or by
    a preemption setting (e.g. on-demand vs spot).

    Since ``Session`` allows the application to be submitted
    to multiple schedulers, users who want to submit the same
    app into multiple schedulers from the same session can
    union all the ``RunConfig``s into a single object. The
    scheduler implementation will selectively read the configs
    it needs.

    This class is intended to be trivially serialized and
    passed around or saved hence only allow primitives
    as config values. Should the scheduler need more than
    simple primitives (e.g. list of str) it is up to the
    scheduler to document a way to encode thie value as a
    str and parse it (e.g. representing list of str as
    comma delimited str).

    Usage:

    ::

     # write
     config = RunConfig()
     config.set("run_as_user", prod")
     config.set("priority", 10)

     # read
     config.get("run_as_user") # "prod"
     config.get("priority") # 10
     config.get("never_set") # None
    r   cfgscfg_keycfg_valc                 C   s   || j |< d S rP   )r   )r3   r   r   r   r   r   setU     zRunConfig.setkeyr1   c                 C   s   | j |d S rP   )r   get)r3   r   r   r   r   r   X  r   zRunConfig.getc                 C   s
   | j  S rP   )r   r   r{   r   r   r   r   [  r   zRunConfig.__repr__N)r!   r"   r#   r$   r   r   r   r
   r'   ConfigValuer&   r   r   r   r   r   r   r   r   '  s   
 *r   Tc                   @   s4   e Zd ZdZdedeegef fddZdd ZdS )	AppDryRunInfoae  
    Returned by ``Scheduler.submit_dryrun``. Represents the
    request that would have been made to the scheduler.
    The ``fmt_str()`` method of this object should return a
    pretty formatted string representation of the underlying
    request object such that ``print(info)`` yields a human
    readable representation of the underlying request.
    requestfmtc                 C   s"   || _ || _d | _d | _d | _d S rP   )r   _fmt_app_cfg
_scheduler)r3   r   r   r   r   r   r`   l  s
   

zAppDryRunInfo.__init__c                 C   s   |  | jS rP   )r   r   r{   r   r   r   r   |  s   zAppDryRunInfo.__repr__N)	r!   r"   r#   r$   r   r	   r'   r`   r   r   r   r   r   r   b  s    	r   c                	   @   sR   e Zd ZdZdd Z		ddedee ded	efd
dZde	fddZ
dd ZdS )runoptsaD  
    Holds the accepted scheduler run configuration
    keys, default value (if any), and help message string.
    These options are provided by the ``Scheduler`` and validated
    in ``Session.run`` against user provided ``RunConfig``.
    Allows ``None`` default values. Required opts must NOT have a
    non-None default.

    .. important:: This class has no accessors because it is intended to
                   be constructed and returned by ``Scheduler.run_config_options``
                   and printed out as a "help" tool or as part of an exception msg.
    Usage:

    ::
     opts = runopts()

     opts.add("run_as_user", type_=str, help="user to run the job as")
     opts.add("cluster_id", type_=int, help="cluster to submit the job", required=True)
     opts.add("priority", type_=float, default=0.5, help="job priority")
     opts.add("preemptible", type_=bool, default=False, help="is the job preemptible")

     # invalid
     opts.add("illegal", default=10, required=True)
     opts.add("bad_type", type=str, default=10)

     opts.check(RunConfig)
     print(opts)
    c                 C   s
   i | _ d S rP   )_optsr{   r   r   r   r`     r   zrunopts.__init__NFr   type_helpdefaultc              
   C   sp   |r|durt d| d| |dur-t||s-td| d| d| dt|j d	||||f| j|< dS )	z
        Adds the ``config`` option with the given help string and ``default``
        value (if any). If the ``default`` is not specified then this option
        is a required option.
        NzRequired option: z( must not specify default value. Given: zOption: , must be of type: 	. Given:  ())
ValueErrorrd   	TypeErrortyper!   r   )r3   r   r   r   r   requiredr   r   r   add  s   
zrunopts.addconfigc           	      C   s   t |j }| j D ]G\}\}}}}||}|r)|du r)td| d|| |durIt||sItd| d|j d| dt	|j d	|| |du rS|
|| q|S )	z
        Checks the given config against this ``runopts`` and sets default configs
        if not set.

        .. warning:: This method mutates the provided config!

        NzRequired run option: z, must be provided and not NonezRun option: r   z, but was: r   r   )r   r   r    r   rc   r   InvalidRunConfigExceptionrd   r!   r   r   )	r3   r   resolved_cfgr   r   r   r   _helprg   r   r   r   resolve  s.   


zrunopts.resolvec           
      C   sz   i }| j  D ])\}\}}}}|rd| n|}d|ji}|r$d|d< n||d< ||d< |||< qdd l}	|	j|dd	d
S )N*r   Tr   r   r   r   rt   P   )r   width)r   rc   r!   pprintpformat)
r3   pretty_optsr   r   r   r   r   r   optr   r   r   r   r     s   


zrunopts.__repr__)NF)r!   r"   r#   r$   r`   r'   r   r   r   r   r   r   r   r   r   r   r     s     
%r   c                       s.   e Zd ZdZdededdf fddZ  ZS )r   z
    Raised when the supplied ``RunConfig`` does not satisfy the
    ``runopts``, either due to missing required configs or value
    type mismatch.
    invalid_reason
run_configr   c                    s    t  | d| d|  d S )Nr   z, Expected: r_   r`   )r3   r   r   r   rh   r   r   r`     s    z"InvalidRunConfigException.__init__)r!   r"   r#   r$   r'   r   r`   ro   r   r   rh   r   r     s    "r   c                   @   s,  e Zd ZdZdefddZdededefdd	Ze	j
d
edefddZdededefddZdededefddZdefddZe	j
dedee fddZdefddZe	j
deddfddZdeddfddZ				d(dededed ee d!ee d"ee defd#d$Zded%eddfd&d'ZdS ))	Schedulerz
    An interface abstracting functionalities of a scheduler.
    Implementors need only implement those methods annotated with
    ``@abc.abstractmethod``.
    session_namec                 C   
   || _ d S rP   )r   )r3   r   r   r   r   r`     r   zScheduler.__init__appcfgr1   c                 C   s   |  ||}| |S )z
        Submits the application to be run by the scheduler.

        Returns:
            The application id that uniquely identifies the submitted app.
        )submit_dryrunschedule)r3   r   r   dryrun_infor   r   r   submit  s   
zScheduler.submitr   c                 C      t  )ac  
        Same as ``submit`` except that it takes an ``AppDryrunInfo``.
        Implementors are encouraged to implement this method rather than
        directly implementing ``submit`` since ``submit`` can be trivially
        implemented by:

        ::

         dryrun_info = self.submit_dryrun(app, cfg)
         return schedule(dryrun_info)

        NotImplementedErrorr3   r   r   r   r   r     s   zScheduler.schedulec                 C   s*   |   |}| ||}||_||_|S )a  
        Rather than submitting the request to run the app, returns the
        request object that would have been submitted to the underlying
        service. The type of the request object is scheduler dependent.
        This method can be used to dry-run an application. Please refer
        to the scheduler implementation's documentation regarding
        the actual return type.
        )run_optsr   _submit_dryrunr   r   )r3   r   r   r   r   r   r   r   r   &  s
   	zScheduler.submit_dryrunc                 C   r   rP   r   )r3   r   r   r   r   r   r   5  r}   zScheduler._submit_dryrunc                 C   s   t  S )z
        Returns the run configuration options expected by the scheduler.
        Basically a ``--help`` for the ``run`` API.
        )r   r{   r   r   r   r   8     zScheduler.run_optsr<   c                 C   r   )z
        Describes the specified application.

        Returns:
            Application description or ``None`` if the app does not exist.
        r   r3   r<   r   r   r   describe?     zScheduler.describec                 C   s   |  |}|duS )zf
        Returns:
            ``True`` if the app exists (was submitted), ``False`` otherwise
        N)r   )r3   r<   descr   r   r   existsI  s   
zScheduler.existsNc                 C   r   )zo
        Kills the application. This method will only be called on an
        application that exists.
        r   r   r   r   r   _cancel_existingQ     zScheduler._cancel_existingc                 C   s   |  |r| | dS dS )a;  
        Cancels/kills the application. This method is idempotent within the same
        thread and is safe to call on the same application multiple times.
        However when called from multiple threads/processes on the same app
        the exact semantics of this method depends on the idempotency guarantees
        of the underlying scheduler API.

        .. note:: This method does not block for the application to reach a
                  cancelled state. To ensure that the application reaches a
                  terminal state use the ``wait`` API.
        N)r   r   r   r   r   r   cancelY  s   
zScheduler.cancelr   	role_namekregexsinceuntilc                 C   s   t | jj d)a	  
        Returns an iterator to the log lines of the ``k``th replica of the ``role``.
        The iterator ends end all qualifying log lines have been read.

        If the scheduler supports time-based cursors fetching log lines
        for custom time ranges, then the ``since``, ``until`` fields are
        honored, otherwise they are ignored. Not specifying ``since`` and ``until``
        is equivalent to getting all available log lines. If the ``until`` is
        empty, then the iterator behaves like ``tail -f``, following the log output
        until the job reaches a terminal state.

        The exact definition of what constitutes a log is scheduler specific. Some
        schedulers may consider stderr or stdout as the log, others may read the logs
        from a log file.

        Behaviors and assumptions:

        1. Produces an undefined-behavior if called on an app that does not exist
           The caller should check that the app exists using ``exists(app_id)``
           prior to calling this method.

        2. Is not stateful, calling this method twice with same parameters
           returns a new iterator. Prior iteration
           progress is lost.

        3. Does not always support log-tailing. Not all schedulers support live
           log iteration (e.g. tailing logs while the app is running). Refer to
           the specific scheduler's documentation for the iterator's behavior.

        4. Does not guarantee log retention. It is possible that by the time this
           method is called, the underlying scheduler may have purged the log records
           for this application. If so this method raises an arbitrary exception.

        5. Only raises a ``StopIteration`` exception when the accessible log lines
           have been fully exhausted and the app has reached a final state. For instance,
           if the app gets stuck and does not produce any log lines, then the iterator
           blocks until the app eventually gets killed (either via timeout or manually)
           at which point it raises a ``StopIteration``.

        6. Need not be supported by all schedulers.

        7. Some schedulers may support line cursors by supporting ``__getitem__``
           (e.g. ``iter[50]`` seeks to the 50th log line).

        Returns:
            An ``Iterator`` over log lines of the specified role replica

        Raises:
            NotImplementedError - if the scheduler does not support log iteration
        z+ does not support application log iteration)r   ri   r#   )r3   r<   r   r   r   r   r   r   r   r   log_iterk  s   ;zScheduler.log_iter	schedulerc                 C   s0   |j D ]}|jjtkrtd|jj dqdS )z
        Validates whether application is consistent with the scheduler.

        Raises:
            ValueError: if application is not compatible with scheduler
        zNo resources for container: z5. Did you forget to call container.require(resources)N)rq   rK   r/   r*   r   r.   )r3   r   r   r^   r   r   r   	_validate  s   
zScheduler._validater   NNN)r!   r"   r#   r$   r'   r`   rp   r   r   abcabstractmethodr   r   r   r   r   r   r   r   r   r   r   r   r%   r   r   r   SchedulerBackendr   r   r   r   r   r     sF    
	
?r   c                       s&   e Zd ZdZdef fddZ  ZS )MalformedAppHandleExceptionz6
    Raised when APIs are given a bad app handle.
    
app_handlec                    s   t  | d d S )NzB is not of the form: <scheduler_backend>://<session_name>/<app_id>r   r3   r  rh   r   r   r`     s   z$MalformedAppHandleException.__init__r!   r"   r#   r$   r'   r`   ro   r   r   rh   r   r    s    r  c                       s*   e Zd ZdZdedef fddZ  ZS )SessionMismatchExceptiona  
    Raised on session certain action APIs
    when the session_name on an app handle does not match
    the current session's name. Modify/update APIs raise
    this exception as modifying/updataing an application
    owned by a different session should not be allowed.
    r  r   c                    s   t  d| d| d d S )NzApp handle: z is not owned by this session: zT. Please perform the action on the correct session or re-run the app on this sessionr   )r3   r  r   rh   r   r   r`     s   z!SessionMismatchException.__init__r  r   r   rh   r   r    s    r  c                       s"   e Zd Zdef fddZ  ZS )UnknownSchedulerExceptionscheduler_backendc                       t  d| d d S )NzScheduler backend: zQ does not exist. Use session.scheduler_backends() to see all supported schedulersr   )r3   r
  rh   r   r   r`        
z"UnknownSchedulerException.__init__)r!   r"   r#   r  r`   ro   r   r   rh   r   r	    s    r	  c                       s"   e Zd ZdZd fddZ  ZS )UnknownAppExceptionz
    Raised by ``Session`` APIs when either the application does not
    exist or the application is not owned by the session.
    r  	AppHandlec                    r  )NzUnknown app = zp. Did you forget to call session.run()? Otherwise, the app may have already finished and purged by the schedulerr   r  rh   r   r   r`     r  zUnknownAppException.__init__)r  r  )r!   r"   r#   r$   r`   ro   r   r   rh   r   r    s    r  r
  r   r<   c                 C   s   |  d| d| S )Nz:///r   )r
  r   r<   r   r   r   make_app_handle  s   r  r  c                 C   sB   ddl }d}||| }|st| | }|d |d |d fS )zX
    parses the app handle into ```(scheduler_backend, session_name, and app_id)```
    r   Nz?(?P<scheduler_backend>.+)://(?P<session_name>.+)/(?P<app_id>.+)r
  r   r<   )rematchr  	groupdict)r  r  patternr  gdr   r   r   parse_app_handle  s   r  c                   @   s  e Zd ZdZdefddZdefddZ			d-d
edede	e
 defddZejdedefddZ			d-d
edede	e
 defddZejd
edede
defddZdeeef fddZejdee fddZejdede	e fddZejdede	e fddZejdeeef fdd Zejdedd	fd!d"Zejdede	e fd#d$Zej	%						d.ded&ed'ed(e	e d)e	e d*e	e de fd+d,Z!d	S )/Sessiona  
    Entrypoint and client-facing API for TSM. Has the methods for the user to
    define and act upon ``Applications``. The ``Session`` is stateful and
    represents a logical workspace of the user. It can be backed by a service
    (e.g. TSM server) for persistence or can be standalone with no persistence
    meaning that the ``Session`` lasts only during the duration of the hosting
    process (see the ``attach()`` API for instructions on re-parenting apps
    between sessions).
    rH   c                 C   r   rP   _name)r3   rH   r   r   r   r`     r   zSession.__init__r1   c                 C   rz   )z@
        Returns:
            The name of this session.
        r  r{   r   r   r   rH     r   zSession.namer   Nr   r   r   c                 C   s   |  |||}| |S )a  
        Runs the given application in the specified mode.

        .. note:: sub-classes of ``Session`` should implement ``schedule`` method
                  rather than overriding this method directly.

        Returns:
            An application handle that is used to call other action APIs on the app.

        Raises:
            AppNotReRunnableException: if the session/scheduler does not support re-running attached apps
        )dryrunr   )r3   r   r   r   r   r   r   r   run  s   
zSession.runr   c                 C   r   )a  
        Actually runs the application from the given dryrun info.
        Useful when one needs to overwrite a parameter in the scheduler
        request that is not configurable from one of the object APIs.

        .. warning:: Use sparingly since abusing this method to overwrite
                     many parameters in the raw scheduler request may
                     lead to your usage of TSM going out of compliance
                     in the long term. This method is intended to
                     unblock the user from experimenting with certain
                     scheduler-specific features in the short term without
                     having to wait until TSM exposes scheduler features
                     in its APIs.

        .. note:: It is recommended that sub-classes of ``Session`` implement
                  this method instead of directly implementing the ``run`` method.

        Usage:

        ::

         dryrun_info = session.dryrun(app, scheduler="default", cfg)

         # overwrite parameter "foo" to "bar"
         dryrun_info.request.foo = "bar"

         app_handle = session.submit(dryrun_info)

        r   r   r   r   r   r   1  s   zSession.schedulec                 C   s   |j std|j d|j D ]*}|jstd|j d|jdkr+td|j d|jtkr9td|j d	q| |||pBt }||_	|S )
a  
        Dry runs an app on the given scheduler with the provided run configs.
        Does not actually submit the app but rather returns what would have been
        submitted. The returned ``AppDryRunInfo`` is pretty formatted and can
        be printed or logged directly.

        Usage:

        ::

         dryrun_info = session.dryrun(app, scheduler="local", cfg)
         print(dryrun_info)

        zNo roles for app: z). Did you forget to call app.of(roles..)?zNo entrypoint for role: z:. Did you forget to call role.runs(entrypoint, args, env)?r   z Non-positive replicas for role: z8. Did you forget to call role.replicas(positive_number)?zNo container for role: z+. Did you forget to call role.on(container))
rq   r   rH   rI   rM   rK   r8   _dryrunr   r   )r3   r   r   r   r^   r   r   r   r   r  R  s*   


zSession.dryrunc                 C   r   )z
        The actual dryrun logic.
        Implementors of ``Session`` should implement this method rather than ``dryrun``.
        r   )r3   r   r   r   r   r   r   r    r   zSession._dryrunc                 C   r   )a,  
        Returns the ``runopts`` for the supported scheduler backends.

        Usage:

        ::

         local_runopts = session.run_opts()["local"]
         print("local scheduler run options: {local_runopts}")

        Returns:
            A map of scheduler backend to its ``runopts``
        r   r{   r   r   r   r     s   zSession.run_optsc                 C   r   )z
        Returns a list of all supported scheduler backends.
        All session implementations must support a "default"
        scheduler backend and document what the default
        scheduler is.
        r   r{   r   r   r   scheduler_backends  r   zSession.scheduler_backendsr  c                 C   r   )z
        Returns:
            The status of the application, or ``None`` if the app does not exist anymore
            (e.g. was stopped in the past and removed from the scheduler's backend).
        r   r  r   r   r   status  s   zSession.statusc                 C   r   )a  
        Block waits (indefinitely) for the application to complete.
        Possible implementation:

        ::

         while(True):
             app_status = status(app)
             if app_status.is_terminal():
                 return
             sleep(10)

        Returns:
            The terminal status of the application, or ``None`` if the app does not exist anymore
        r   r  r   r   r   wait  s   zSession.waitc                 C   r   )z
        Returns the applications that were run with this session mapped by the app handle.
        The persistence of the session is implementation dependent.
        r   r{   r   r   r   rW     r   zSession.listc                 C   r   )a  
        Stops the application, effectively directing the scheduler to cancel
        the job. Does nothing if the app does not exist.

        .. note:: This method returns as soon as the cancel request has been
                  submitted to the scheduler. The application will be in a
                  ``RUNNING`` state until the scheduler actually terminates
                  the job. If the scheduler successfully interrupts the job
                  and terminates it the final state will be ``CANCELLED``
                  otherwise it will be ``FAILED``.

        Raises:
            SessionMismatchException: if the app handle does not belong to this session
        r   r  r   r   r   stop  s   zSession.stopc                 C   r   )a  
        Reconstructs the application (to the best extent) given the app handle.
        Note that the reconstructed application may not be the complete app as
        it was submitted via the run API. How much of the app can be reconstructed
        is scheduler dependent.

        Returns:
            Application or None if the app does not exist anymore or if the
            scheduler does not support describing the app handle
        r   r  r   r   r   r     s   zSession.describer   r   r   r   r   r   c                 C   r   )a	  
        Returns an iterator over the log lines of the specified job container.

        .. note:: #. ``k`` is the node (host) id NOT the ``rank``.
                  #. ``since`` and ``until`` need not always be honored (depends on scheduler).

        .. warning:: The semantics and guarantees of the returned iterator is highly
                     scheduler dependent. See ``torchelastic.tsm.driver.api.Scheduler.log_iter``
                     for the high-level semantics of this log iterator. For this reason
                     it is HIGHLY DISCOURAGED to use this method for generating output
                     to pass to downstream functions/dependencies. This method
                     DOES NOT guarantee that 100% of the log lines are returned.
                     It is totally valid for this method to return no or partial log lines
                     if the scheduler has already totally or partially purged log records
                     for the application.

        Usage:

        ::

         app_handle = session.run(app, scheduler="local", cfg=RunConfig())

         print("== trainer node 0 logs ==")
         for line in session.log_lines(app_handle, "trainer", k=0):
            print(line)

        Discouraged anti-pattern:

        ::

         # DO NOT DO THIS!
         # parses accuracy metric from log and reports it for this experiment run
         accuracy = -1
         for line in session.log_lines(app_handle, "trainer", k=0):
            if matches_regex(line, "final model_accuracy:[0-9]*"):
                accuracy = parse_accuracy(line)
                break
         report(experiment_name, accuracy)

        Args:
            app_handle: application handle
            role_name: role within the app (e.g. trainer)
            k: k-th replica of the role to fetch the logs for
            regex: optional regex filter, returns all lines if left empty
            since: datetime based start cursor. If left empty begins from the
                    first log line (start of job).
            until: datetime based end cursor. If left empty, follows the log output
                    until the job completes and all log lines have been consumed.

        Returns:
             An iterator over the role k-th replica of the specified application.

        Raise:
            UnknownAppException: if the app does not exist in the scheduler
            SessionMismatchException: if the app handle does not belong to this session

        r   )r3   r  r   r   r   r   r   r   r   r   	log_lines  s   CzSession.log_lines)r   Nr   )"r!   r"   r#   r$   r'   r`   rH   rp   r  r   r   r  r  r  r  r   r   r  r  r
   r   r   r   r  r   r  r  rW   r   r   r%   r   r   r!  r   r   r   r   r    s    


#
.		r  )Gr  r   rj   dataclassesr   r   r   r   enumr   stringr   typingr   r	   r
   r   r   r   r   r   r   r   r   r   r   r   r'   r  r   r*   r&   r,   r-   r7   r8   r9   rD   rG   rX   rp   r%   rs   r   r   r   r   re   r   r   r   r   r   floatr   r   r   r   r   	Exceptionr   ABCr   r  r  r	  r  r  r  r  r  r   r   r   r   <module>   sx   
4"5+=B#"D7v 9

