o
    TÃiW=  ã                   @   sø   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m	Z	 ddl
mZ ddlmZmZmZ ddl
mZmZmZ 	 d dlmZ d	ZG d
d„ dƒZG dd„ dƒZG dd„ dƒZdd„ Zdd„ Zdefdd„ZdZdefdd„ZdS )é    N)Útqdmé   )Úloggeré   )Ú
AUTOTUNINGÚAUTOTUNING_METRIC_PATHÚBUFSIZE)Úget_val_by_keyÚsearch_errorÚwas_interruptted)Úcommé   c                   @   sd   e Zd Zdd„ Zdd„ Zdefdd„Zdd	„ Zd
d„ Zdd„ Z	dd„ Z
ddd„Zdd„ Zdd„ ZdS )ÚResourceManagerc           
      C   s¦   || _ || _g | _|| _|D ]}| j t||ƒ¡ qg | _i | _i | _d| _	t
ƒ | _|| _i | _|d urO| ¡ D ]\}}	| ¡ }|	 ¡ }	|| jvrN|	| j|< q8d S d S )Nr   )Úresults_dirÚexps_dirÚnodesÚnum_gpus_per_nodeÚappendÚNodeÚexperiment_queueÚrunning_experimentsÚfinished_experimentsÚexperiment_countÚsetÚ	exp_pathsÚargsÚarg_mappingsÚitemsÚstrip)
Úselfr   Úhostsr   r   r   r   ÚhostÚkÚv© r$   úR/home/ubuntu/.local/lib/python3.10/site-packages/deepspeed/autotuning/scheduler.pyÚ__init__"   s,   

€ûzResourceManager.__init__c              	   C   sn  |D ]²}|| j v r
q| j  |¡ t|dƒ—}t |¡}| j|d< |  jd7  _tj | j	|d ¡ }|d< t
|d v rJtj |d¡}||d t
 t< tj |d¡}tj |d	¡}tj |d¡}tj |¡rŸtj |¡rŸt|ƒsŸt|ƒ}	|d }
||	f| j|
< |	sŒtj |¡sŒtj |¡rŸt d
|d › d¡ 	 W d   ƒ q| j |¡ W d   ƒ n1 s¯w   Y  qd S )NÚrÚexp_idr   ÚnameÚ
result_dirÚ	ds_configzmetrics.jsonú
stderr.logzmodel_info.jsonzSkipping exp z whose result already exists)r   ÚaddÚopenÚhjsonÚloadr   ÚosÚpathÚjoinr   r   r   Úexistsr   r
   r   r   Úinfor   r   )r   r   Úexp_pathÚfdÚexpr*   Úmetric_fileÚstderr_fileÚmodel_info_fileÚerrr(   r$   r$   r%   Úschedule_experiments:   s8   


ëé€ûz$ResourceManager.schedule_experimentsr8   c                 C   sâ   |d }| j j| |d< tj | j|d ¡|d< | j j}| j j}| j 	¡ D ]/\}}t
||ƒ}|rTt|ƒdkrT||v rH| |¡}	t|ƒ||	d < q%| |¡ | t|ƒ¡ q%tjt||||fd}
|
 ¡  |
||t ¡ f| j|< d S )Nr(   Úmaster_portr)   r*   Úautor   )Útargetr   )r   r>   r1   r2   r3   r   Úuser_scriptÚ	user_argsr   r   r	   ÚstrÚindexr   Ú	threadingÚThreadÚrun_experimentÚstartÚtimer   )r   r8   Úreservationsr(   rA   rB   ÚkeyÚvalÚnvalÚidxÚtr$   r$   r%   Úrun_jobY   s"   


€zResourceManager.run_jobc                 C   sú   g }| j  ¡ D ]X\}}|\}}}}t d|› d| ¡ › ¡ |jtd | ¡ s_|d }	tj |	d¡}
t	|
ƒ}| 
||f¡ ||f| j|< t ¡ | }t d|› d|d›d	¡ | t|ƒ¡ q|D ]\}}|D ]}| ¡  qh| j  |¡ qbt t¡ d S )
NzChecking exp_id = z
, alive = )Útimeoutr*   r,   zFinished exp_id = z, duration=z.2fz sec)r   r   r   ÚdebugÚis_aliver3   ÚTIMEOUTr1   r2   r
   r   r   rI   ÚupdateÚlenÚrestore_slotsÚpopÚsleep)r   ÚpbarÚfinished_expsr(   Úexp_dataÚthreadÚexp_jsonrJ   Ú
start_timeÚexp_dirr:   r<   ÚdurationÚreservationr$   r$   r%   Úexperiment_checko   s(   €
z ResourceManager.experiment_checkc           	      C   s~   |d |d }}|}g }| j D ]}|dkr n|j|d}|r-| t||d¡ |d8 }q|dkr4|S |D ]}| ¡  q6d S )NÚnum_gpusÚ	num_nodesr   )Úslot_request©ÚnodeÚslotsr   )r   Úreserve_slotsr   ÚReservationrW   )	r   r8   rd   re   rf   rJ   rh   ri   rb   r$   r$   r%   Úresource_request„   s    
€
ÿz ResourceManager.resource_requestc                 C   s8   d}| j D ]}||j› dt|jƒ› d7 }q|d d… S )NÚ z (z idle gpus), éÿÿÿÿ)r   r!   rV   Ú
idle_slots)r   Ústatusrh   r$   r$   r%   rp   ˜   s   
zResourceManager.statusc                 C   sV  t t| jƒd}t| jƒdkr”| j d¡}t d|d › d¡ t d|  ¡ › ¡ |  |¡}|sSt d|d › ¡ | j d|¡ t d|d › d	¡ |  	|¡ n:d
}|D ]}|j
 ¡  d tt|j
ƒ¡}||jj› d|› d7 }qW|d d… }t d|d › d|› ¡ |  ||¡ t| jƒdkst| jƒdkr©|  	|¡ t| jƒdks›d S d S )N)Útotalr   zPopped exp_id = r(   z from the queuezResource status: zUnable to schedule exp_id = zPut exp_id = z back into the queuerm   ú,ú:ú@rn   zRunning exp_id = z on )r   rV   r   rX   r   rR   rp   rl   Úinsertrc   ri   Úsortr3   ÚmaprC   rh   r!   rP   r   )r   rZ   r8   rJ   Údescrb   ri   r$   r$   r%   Úrunž   s.   

í
ÿzResourceManager.runNc                 C   s°   t  ¡  }|pg }t  ¡ rt  ¡ nd}|r%|s%|d dk}|p$|t|ƒv }t d|› ¡ |rV||d< t|dƒ}t ||¡ | 	d¡ W d  ƒ dS 1 sOw   Y  dS dS )zÿPrint message when one of following condition meets

        + not dist.is_initialized()
        + dist.get_rank() in ranks if ranks is not None or ranks = [-1]

    Args:
            message (str)
            ranks (list)
            path (str)

        rn   r   z*** Should log: ÚrankÚaÚ
N)
ÚdistÚis_initializedÚget_rankr   r   rR   r.   ÚjsonÚdumpÚwrite)r   ÚmessageÚranksr2   Ú
should_logÚmy_rankÚoutfiler$   r$   r%   Úsave_exp_results_to_databaseº   s   
"þþz,ResourceManager.save_exp_results_to_databasec                 C   sê   t jj}d}| j ¡ D ]V\}\}}|r)t d|› d|d › d|› d|d › ¡ q|d t t }t	j
 |¡rat|d	ƒ}t |¡}	|	| }
|
|krN|
}|}|	|d
< W d  ƒ n1 s\w   Y  q|dkrq| j| \}}||fS |dfS )aK   Parses the metric file of the finished experiments to select the optimal DeepSpeed configuration.

        Args:
            finished_experiments (dcit): a dictionary of experiment id and experiment description.

        Returns:
            The path to the result folder of the experiment with the optimal configuration.
        rn   zThe experiment exp_id = ú, exp_name = r)   z(, did not run successfully with error = zD, thus a metrics.txt does not exist for it. Check the stderr.log in r*   r+   r'   ÚresultsN)ÚsysÚ
float_infoÚminr   r   r   r5   r   r   r1   r2   r4   r.   r/   r0   )r   ÚmetricÚmax_throughputÚbest_exp_idr(   r8   r<   r9   ÚfrŠ   Úcurr_throughputÚbest_expÚ_r$   r$   r%   Úparse_resultsÓ   s.   	"ÿ

ú€zResourceManager.parse_resultsc                 C   sH   g | _ | j ¡ D ]\}}|\}}}}t||ƒ qi | _i | _tƒ | _dS )zFClear experiment queues, does not reset self.experiment_count
        N)r   r   r   Úclean_upr   r   r   )r   r(   r\   r]   r^   rJ   r_   r$   r$   r%   Úclearö   s   zResourceManager.clear)NN)Ú__name__Ú
__module__Ú__qualname__r&   r=   ÚdictrP   rc   rl   rp   ry   rˆ   r•   r—   r$   r$   r$   r%   r       s    
#r   c                   @   s4   e Zd Zdd„ Zdedefdd„Zdefdd	„Zd
S )r   c                 C   s   || _ || _tt|ƒƒ| _d S ©N)r!   Ú	max_slotsÚlistÚrangero   )r   r!   r   r$   r$   r%   r&     s   zNode.__init__rf   Úreturnc                    s(   t ˆ jƒ|kr‡ fdd„t|ƒD ƒS d S )Nc                    s   g | ]}ˆ j  d ¡‘qS )r   )ro   rX   )Ú.0r”   ©r   r$   r%   Ú
<listcomp>  s    z&Node.reserve_slots.<locals>.<listcomp>)rV   ro   rŸ   )r   rf   r$   r¢   r%   rj   
  s   ÿzNode.reserve_slotsri   c                 C   s   |  j |7  _ d S rœ   )ro   ©r   ri   r$   r$   r%   rW     ó   zNode.restore_slotsN)r˜   r™   rš   r&   Úintrž   rj   rW   r$   r$   r$   r%   r     s    r   c                   @   s$   e Zd Zdd„ Zdd„ Zdd„ ZdS )rk   c                 C   s   || _ || _d S rœ   rg   )r   rh   ri   r$   r$   r%   r&     s   
zReservation.__init__c                 C   s   | j  | j¡ d S rœ   )rh   rW   ri   r¢   r$   r$   r%   rW     r¥   zReservation.restore_slotsc                 C   s&   d  tt| jƒ¡}| jj› d|› dS )Nrr   rs   rt   )r3   rw   rC   ri   rh   r!   r¤   r$   r$   r%   rx     s   zReservation.descN)r˜   r™   rš   r&   rW   rx   r$   r$   r$   r%   rk     s    rk   c                  C   s<   d } dt jv rt jd } | S dt jv rt jd } | S d} | S )NÚDLWS_JOB_IDÚDLTS_JOB_IDzunknown-job-id©r1   Úenviron)Úinfra_job_idr$   r$   r%   Ú
get_job_id   s   


û
þr¬   c                  C   s$   d } dt jv rt jd } | S d} | S )NÚUSERzunknown-userr©   )Úuserr$   r$   r%   Úget_user-  s   

ÿr¯   r8   c                 C   s  d}|D ]}|j  ¡  d tt|j ƒ¡}||jj› d|› d7 }q|d d… }| d }| d }d|› d	|› d
t|ƒg| d< t d| d › ¡ t	ƒ | d< t
ƒ | d< | d }	tj|	dd tj |	d¡}
|
| d< t | d ¡}t |¡ d¡}t |¡ d¡| d< t| d dtd*}t ||¡ | ¡  t |¡ | d }t d|› dtj |¡› ¡ W d   ƒ n1 s¶w   Y  ttj |	d¡dtd-}t | |¡ | ¡  t |¡ tj |	d¡}t d|› dtj |¡› ¡ W d   ƒ n1 s÷w   Y  |r'd|v r
| d¡}n
d|v r| d¡}|t|ƒk sJ dƒ‚| d ||d  < || d!< || d"< d#g| d  |g | }t| d ƒd$ksHJ d%ƒ‚ttj |	d&¡dtd}| d' |¡¡ | d(¡ | ¡  t |¡ W d   ƒ n	1 svw   Y  t d)| d* › d+| d, › d-|› d.tj |
¡› ¡ ttj |	d/¡d0ƒA}ttj |	d1¡d0ƒ&}t j!|||d2}| "¡  | ¡  | ¡  t |¡ t |¡ W d   ƒ n	1 sÓw   Y  W d   ƒ n	1 sãw   Y  t#| |ƒ t d3| d* › d+| d, › d-|› ¡ d S )4Nrm   rr   rs   rt   rn   r>   Úhostfilez
--hostfilez	--includez--master_portÚlauncher_argszlauncher args=r®   Újob_idr*   T)Úexist_okzds_config.jsonÚds_config_pathr+   zutf-8Úds_config_base64Úw)Ú	bufferingzScheduler wrote ds_config to z, zexp.jsonzScheduler wrote exp to z--deepspeed_configz--deepspeedzLthere is no ds_config file specified after --deepspeed_config or --deepspeedr   rA   rB   Ú	deepspeedr   zmust provide launcher argszcmd.txtú r|   zLaunching exp_id = r(   r‰   r)   z, with resource = z, and ds_config = z
stdout.logÚwbr,   )ÚstdoutÚstderrzDone running exp_id = )$ri   rv   r3   rw   rC   rh   r!   r   rR   r¯   r¬   r1   Úmakedirsr2   ÚcopyÚdeepcopyr€   ÚdumpsÚencodeÚbase64Úurlsafe_b64encodeÚdecoder.   r   r   ÚflushÚfsyncr5   ÚabspathrD   rV   r‚   Ú
subprocessÚPopenÚwaitr–   )r8   rJ   rA   rB   Úinclude_strrb   ri   r>   r°   r`   r´   r+   Úds_config_jsonr7   r2   rN   ÚcmdÚoutr<   Úresultr$   r$   r%   rG   6  s”   
ú


 û
 û



ü*ÿ$ÿ
ù€ 
	(rG   i   c           	      C   sà   t j ¡ }d|d< d}|D ]}||jj› d7 }q|d d… }t d| d › d|› ¡ d	d
ttƒd|g}dd
| d g}|| }t d 	d 
|¡¡¡ tj||d}| ¡  |jdkrat |j¡ t d| d › d|› ¡ d S )NÚsshÚPDSH_RCMD_TYPErm   rr   rn   zCleaning up exp_id = r(   z on the following workers: Úpdshz-fz-wÚpkillr)   zcmd = {}r¹   )Úenvr   zDone cleaning up exp_id = )r1   rª   r¾   rh   r!   r   rR   rC   ÚPDSH_MAX_FAN_OUTÚformatr3   rÈ   rÉ   rÊ   Ú
returncoder‹   Úexitr5   )	r8   rJ   rÔ   Ú	nodes_strrb   Úpdsh_cmdÚkill_cmdrÍ   rÏ   r$   r$   r%   r–   ‘  s&   
ý
r–   ) r¾   r€   rÈ   r‹   rE   rI   rÂ   r1   r/   r   Úutilsr   Ú	constantsr   r   r   r	   r
   r   r¸   r   r}   rT   r   r   rk   r¬   r¯   r›   rG   rÕ   r–   r$   r$   r$   r%   Ú<module>   s2    d	X