o
    wOi                     @   s   d dl Z d dlZd dlZd dlZd dlmZmZmZmZ d dl	m
Z
mZmZmZmZ d dlmZ d dlmZ e eZG dd deZdS )    N)AnyDictOptionalTuple)	RunResultSimpleElasticAgentWorkerGroup
WorkerSpecWorkerState)prof)start_processesc                       s   e Zd ZdZ			ddededee f fdd	Zdee d
efddZ	e
deddfddZe
dedeeef f fddZdddZe
dedefddZ  ZS )LocalElasticAgentav
  
    An implementation of :py:class:`torchelastic.agent.server.ElasticAgent`
    that handles host-local workers.
    This agent is deployed per host and is configured to spawn ``n`` workers.
    When using GPUs, ``n`` maps to the number of GPUs available on the host.

    The local agent does not communicate to other local agents deployed on
    other hosts, even if the workers may communicate inter-host. The worker id
    is interpreted to be a local process. The agent starts and stops all worker
    processes as a single unit.


    The worker function and argument passed to the worker function must be
    python multiprocessing compatible. To pass multiprocessing data structures
    to the workers you may create the data structure in the same multiprocessing
    context as the specified ``start_method`` and pass it as a function argument.

    The ``exit_barrier_timeout`` specifies the amount of time (in seconds) to wait
    for other agents to finish. This acts as a safety net to handle cases where
    workers finish at different times, to prevent agents from viewing workers
    that finished early as a scale-down event. It is strongly advised that the
    user code deal with ensuring that workers are terminated in a synchronous
    manner rather than relying on the exit_barrier_timeout.

    Example launching function

    ::

        def trainer(args) -> str:
            return "do train"

        def main():
            start_method="spawn"
            shared_queue= multiprocessing.get_context(start_method).Queue()
            spec = WorkerSpec(
                        role="trainer",
                        local_world_size=nproc_per_process,
                        entrypoint=trainer,
                        args=("foobar",),
                        ...<OTHER_PARAMS...>)
            agent = LocalElasticAgent(spec, start_method)
            results = agent.run()

            if results.is_failed():
                print("trainer failed")
            else:
                print(f"rank 0 return value: {results.return_values[0]}")
                # prints -> rank 0 return value: do train

    Example launching binary

    ::

        def main():
            spec = WorkerSpec(
                        role="trainer",
                        local_world_size=nproc_per_process,
                        entrypoint="/usr/local/bin/trainer",
                        args=("--trainer_args", "foobar"),
                        ...<OTHER_PARAMS...>)
            agent = LocalElasticAgent(spec)
            results = agent.run()

            if not results.is_failed():
                print("binary launches do not have return values")

    spawn,  Nspecexit_barrier_timeoutlog_dirc                    s6   t  || || _d | _|j }| ||| _d S N)super__init___start_method	_pcontextrdzv_handler
get_run_id_make_log_dir_log_dir)selfr   start_methodr   r   rdzv_run_id	__class__ a/home/ubuntu/.local/lib/python3.10/site-packages/torchelastic/agent/server/local_elastic_agent.pyr   b   s
   
zLocalElasticAgent.__init__r   c                 C   sF   |pt jdd}tj|dd t j| d|d}td|  |S )Ntorchelastic_)prefixT)exist_ok_)r$   dirzlog directory set to: )tempfilemkdtemposmakedirsloginfo)r   r   r   base_log_dirr'   r!   r!   r"   r   o   s
   zLocalElasticAgent._make_log_dirworker_groupreturnc                 C   s   |    d S r   )	_shutdown)r   r/   r!   r!   r"   _stop_workersv   s   zLocalElasticAgent._stop_workersc                    s,  |j }|j}t |\}}|j| j }i }i }|jD ]L}	|	j}
t|
t|	j	t|j
t|	j|jt|jt|	jt|	j|t|t|t|j|j tdd}dtjv r^tjd |d< |||
< |j||
< qtj| jd| }tj|dd t| t|j|j|||| j|j|jd| _ | j ! S )N   )
LOCAL_RANKRANK
GROUP_RANK	ROLE_RANK	ROLE_NAMELOCAL_WORLD_SIZE
WORLD_SIZEROLE_WORLD_SIZEMASTER_ADDRMASTER_PORTTORCHELASTIC_RESTART_COUNTTORCHELASTIC_MAX_RESTARTSTORCHELASTIC_RUN_IDNCCL_ASYNC_ERROR_HANDLINGOMP_NUM_THREADSattempt_T)ignore_errors)name
entrypointargsenvsr   r   	redirectstee)"r   storer   _get_master_addr_portmax_restarts_remaining_restartsworkers
local_rankstrglobal_rank
group_rank	role_rankrolelocal_world_size
world_sizerole_world_sizer   r   r*   environrG   pathjoinr   shutilrmtreer+   r   rF   r   rI   rJ   r   pids)r   r/   r   rK   master_addrmaster_portrestart_countrG   rH   workerrP   
worker_envattempt_log_dirr   r!   r"   _start_workersz   sR   



z LocalElasticAgent._start_workersc                 C   s   | j r
| j   d S d S r   )r   close)r   r!   r!   r"   r1      s   zLocalElasticAgent._shutdownc                 C   s  |j j}dd |jD }t| j  }||kr-td| d| d|  t	t
jdS | jd}|r|| r_td| d i }|j D ]\}}|j| }	|||	j< qIt	t
j|d	S i }
|j D ]\}}|j| }	||
|	j< qft	t
j|
d
S t	t
jdS )Nc                 S   s   h | ]}|j qS r!   )id).0wr!   r!   r"   	<setcomp>   s    z5LocalElasticAgent._monitor_workers.<locals>.<setcomp>[z;] worker pids do not match process_context pids. Expected: z
, actual: )stater   z] Worker group failed)rl   failures)rl   return_values)r   rU   rO   setr   r^   valuesr,   errorr   r
   UNKNOWNwait	is_failedrm   itemsrR   FAILEDrn   	SUCCEEDEDHEALTHY)r   r/   rU   worker_pidspc_pidsresultworker_failuresrP   failurerb   workers_ret_valsret_valr!   r!   r"   _monitor_workers   sB   

z"LocalElasticAgent._monitor_workers)r   r   N)r0   N)__name__
__module____qualname____doc__r	   floatr   rQ   r   r   r   r   r2   r   intr   re   r1   r   r   __classcell__r!   r!   r   r"   r      s(    G 
2r   )loggingr*   r\   r(   typingr   r   r   r   torchelastic.agent.server.apir   r   r   r	   r
   torchelastic.metrics.apir   torchelastic.multiprocessingr   	getLoggerr   r,   r   r!   r!   r!   r"   <module>   s   
