o
    TiI                     @   s   d dl mZ d dlmZmZmZmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZmZmZ d dlmZ d dlZd dlZd d	lmZ d d
lmZ d dlZd dlZd dlmZ d dlZd dlmZ ee Z!G dd deZ"dS )    )LocalElasticAgent)AnyDictOptionalTuple)datetime)get_free_port)
put_metric)	RunResultWorkerGroup
WorkerSpecWorkerState)StoreN)start_processes)macros)closing)
get_loggerc                       s   e Zd Z			ddedededee f fdd	Ze		dd
e
dee dee dee fddZdedeeef f fddZddedefddZ  ZS )DSElasticAgentspawn,  Nspecenvexit_barrier_timeoutlog_dirc                    s   t  |||| || _d S N)super__init__ds_env)selfr   r   start_methodr   r   	__class__ V/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/elasticity/elastic_agent.pyr   "   s   
zDSElasticAgent.__init__storemaster_addrmaster_port
local_addrc                 C   s   |d u r!t  }t| | d }W d    n1 sw   Y  |d u r<dd l}|d}t|}|d d }| d|j	dd | dt
|j	dd d S )	N   r   zhostname -Izutf-8MASTER_ADDRzUTF-8)encodingMASTER_PORT)r   r   getsocknameshlexsplit
subprocesscheck_outputdecodesetencodestr)r$   r%   r&   r'   sockr-   safe_cmdresultr"   r"   r#   _set_master_addr_port-   s   


z$DSElasticAgent._set_master_addr_portworker_groupreturnc              
      s  |j }|j}|d usJ t |\}}|j| j }|j dk}i }i }	|jD ]}
|
j	}t
| j}i dt|dt|
jdt|jdt|
jd|jdt|jdt|
jd	t|jd
t|
jd|dt|dt|dt|jd|j dt|dtdtd}|| dtjv rtjd |d< ||	|< t|j}t|t|}t|||< q(tj !| j"d| }t#j$|dd t%| |j&d usJ t'|j|j&||	|| j(|j)|j*d| _+| j+, S )Nstatic
LOCAL_RANKRANK
GROUP_RANK	ROLE_RANK	ROLE_NAMELOCAL_WORLD_SIZE
WORLD_SIZEGROUP_WORLD_SIZEROLE_WORLD_SIZEr)   r+   TORCHELASTIC_RESTART_COUNTTORCHELASTIC_MAX_RESTARTSTORCHELASTIC_RUN_IDTORCHELASTIC_USE_AGENT_STORENCCL_ASYNC_ERROR_HANDLINGr(   OMP_NUM_THREADSattempt_T)ignore_errors)name
entrypointargsenvsr   r   	redirectstee)-r   r$   r   _get_master_addr_portmax_restarts_remaining_restartsrdzv_handlerget_backendworkers
local_rankcopydeepcopyr   r4   global_rank
group_rank	role_rankrolelocal_world_size
world_sizegroup_world_sizerole_world_size
get_run_idosgetenvupdateenvironlistrO   r   
substitutetuplepathjoin_log_dirshutilrmtreemakedirsrN   r   _start_methodrQ   rR   	_pcontextpids)r   r9   r   r$   r%   r&   restart_countuse_agent_storerO   rP   workerrY   worker_env_dsworker_env_elasticworker_argsattempt_log_dirr    r"   r#   _start_workersA   s   







	







zDSElasticAgent._start_workersdefaultr_   c                    s.  | j j}|j}td| d|   | | j  |j}|j}|j	j
j}	 | j j
tjks/J t| | | j }|j
}|| j _
t |jj|jj    fdd|j	j
j D }td| d| j td| d|j  d	 |tjkrtd| d
| j d |   |S |tjtj hv st!|t!|j	j
jkr| jdkrtd| d|j d| j d|j" d	 |  jd	8  _| #| j  |j	j
j}nK| $| j  tj | j _
|   |S |tj%kr
|& }	| j j'}
|	dkr	td| d|	 d|
 d | #| j  |j	j
j}nt(d| d|j dq&)N[z#] starting workers for entrypoint: Tc                    s   g | ]
\}}| k r|qS r"   r"   ).0nodelast_heartbeatexpire_timer"   r#   
<listcomp>   s
    z.DSElasticAgent._invoke_run.<locals>.<listcomp>zworkers.z.remaining_restarts.r(   z.] worker group successfully finished. Waiting z$ seconds for other agents to finish.r   z] Worker group z. /z) attempts left; will restart worker groupz] Detected z new nodes from group_rank=z; will restart worker groupz] Worker group in z state))_worker_groupr   r_   loginfoget_entrypoint_name_initialize_workersmonitor_intervalrV   _state_holderstateparticipantsr   INITtimesleep_monitor_workersr   utcnow	_settingskeep_alive_intervalkeep_alive_max_attemptlast_heartbeatsitemsr	   rU   rM   lower	SUCCEEDED_exit_barrier_timeout_exit_barrier	UNHEALTHYFAILEDlenrT   _restart_workers_stop_workersHEALTHYnum_nodes_waitingr]   	Exception)r   r_   r   r   rV   r   
run_resultr   _dead_nodesr   r]   r"   r   r#   _invoke_run   sn   










zDSElasticAgent._invoke_run)r   r   Nr   )r}   )__name__
__module____qualname__r   r   floatr   r4   r   staticmethodr   intr8   r   r   r|   r
   r   __classcell__r"   r"   r    r#   r       s0    >r   )#:torch.distributed.elastic.agent.server.local_elastic_agentr   typingr   r   r   r   r   +torch.distributed.elastic.utils.distributedr   !torch.distributed.elastic.metricsr	   *torch.distributed.elastic.agent.server.apir
   r   r   r   torch.distributedr   r   re   )torch.distributed.elastic.multiprocessingr   torch.distributed.elastic.utilsr   ro   rZ   
contextlibr   r/   'torch.distributed.elastic.utils.loggingr   r   r   r   r"   r"   r"   r#   <module>   s$   