o
    wOiHH                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZmZ d dl m!Z! e"e#Z$d	ee%ef d
e%de&fddZ'dZ(dZ)G dd deZ*dee*ee%e*f f de%dee%e*f fddZ+eG dd dZ,G dd de j-Z.de%dedee%ef dee%ee&e&f f dee%e&f d ee%e&f d!ee%ej/f ddfd"d#Z0G d$d% d%e.Z1G d&d' d'Z2G d(d) d)e.Z3dS )*    N)nullcontext)	dataclassfield)IntFlag)AnyCallableDictOptionalSetTupleUnion)ProcessFailurerecord)redirect_stderrredirect_stdout)TailLogdnprocswhatc                 C   s<   t |  }t t|}||krt| d| d| d S )Nz), local rank mapping mismatch, expected: z
, actual: )setkeysrangeRuntimeError)r   r   r   actual_keysexpected_keys r   T/home/ubuntu/.local/lib/python3.10/site-packages/torchelastic/multiprocessing/api.py_validate_full_rank   s   r   z^(\d:[0123],)*(\d:[0123])$z^[0123]$c                	   @   sF   e Zd ZdZdZdZeeB Zedede	d e
ed f f fddZdS )	Stdr         vmreturnc                 C   sx   dd }t t|r||S t t|r/i }|dD ]}|d\}}|||t|< q|S t| dt dt d)z
        Example:

        ::

         from_str("0") -> Std.NONE
         from_str("1") -> Std.OUT
         from_str("0:3,1:0,2:1,3:2") -> {0: Std.ALL, 1: Std.NONE, 2: Std.OUT, 3: Std.ERR}

        Any other input raises an exception
        c                 S   s&   t | } tD ]
}|| kr|  S qd S N)intr   )vsr   r   r   to_stdA   s   zStd.from_str.<locals>.to_std,:z does not match: <z> or <>)rematch_VALUE_REGEX_MAPPING_REGEXsplitr$   
ValueError)clsr!   r'   r   mir%   r   r   r   from_str3   s   zStd.from_strN)__name__
__module____qualname__NONEOUTERRALLclassmethodstrr   r   r$   r4   r   r   r   r   r   -   s    (r   
val_or_maplocal_world_sizer"   c                    sH   t  tr fddt|D S i }t|D ]} |tj||< q|S )a  
    Certain APIs take redirect settings either as a single value (e.g. apply to all
    local ranks) or as an explicit user-provided mapping. This method is a convenience
    method that converts a value or mapping into a mapping.

    Example:

    ::

     to_map(Std.OUT, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
     to_map({1: Std.OUT}, local_world_size=2) # returns: {0: Std.NONE, 1: Std.OUT}
     to_map({0: Std.OUT, 1: Std.OUT}, local_world_size=2) # returns: {0: Std.OUT, 1: Std.OUT}
    c                    s   i | ]}| qS r   r   ).0r3   r>   r   r   
<dictcomp>g   s    zto_map.<locals>.<dictcomp>)
isinstancer   r   getr8   )r>   r?   mapr3   r   rA   r   to_mapV   s   
rF   c                   @   s   e Zd ZU dZeedZeee	f e
d< eedZeeef e
d< eedZeeef e
d< eedZeeef e
d< defdd	Zd
S )RunProcsResulta  
    Results of a completed run of processes started with ``start_processes()``.
    Returned by ``PContext``.

    Note the following:

    1. All fields are mapped by local rank
    2. ``return_values`` - only populated for functions (not the binaries).
    3. ``stdouts`` - path to stdout.log (empty string if no redirect)
    4. ``stderrs`` - path to stderr.log (empty string if no redirect)

    )default_factoryreturn_valuesfailuresstdoutsstderrsr"   c                 C   s   t | jdkS )Nr   )lenrJ   selfr   r   r   	is_failed   s   zRunProcsResult.is_failedN)r5   r6   r7   __doc__r   dictrI   r   r$   r   __annotations__rJ   r   rK   r=   rL   boolrP   r   r   r   r   rG   o   s   
 rG   c                   @   s
  e Zd ZdZdedeeef deee	f deeeeef f deeef deeef deeef d	eeef d
eeef fddZ
d!ddZejd!ddZejdee fddZd"dededee fddZejdeeef fddZejd!ddZd!dd ZdS )#PContexta  
    The base class that standardizes operations over a set of processes
    that are launched via different mechanisms. The name ``PContext``
    is intentional to disambiguate with ``torch.multiprocessing.ProcessContext``.

    .. warning:: stdouts and stderrs should ALWAYS be a superset of
                 tee_stdouts and tee_stderrs (respectively) this is b/c
                 tee is implemented as a redirect + tail -f <stdout/stderr.log>
    name
entrypointargsenvsrK   rL   tee_stdoutstee_stderrserror_filesc
                 C   st   || _ t|}
t||
d t||
d || _|| _|| _|| _|| _|	| _|
| _	t
||tj| _t
||tj| _d S )NrK   rL   )rV   rM   r   rW   rX   rY   rK   rL   r\   r   r   sysstdout_stdout_tailstderr_stderr_tail)rO   rV   rW   rX   rY   rK   rL   rZ   r[   r\   r   r   r   r   __init__   s   zPContext.__init__r"   Nc                 C   s    |    | j  | j  dS )zN
        Start processes using parameters defined in the constructor.
        N)_startr_   startra   rN   r   r   r   rd      s   
zPContext.startc                 C      t  )zQ
        Start processes using strategy defined in a particular context.
        NotImplementedErrorrN   r   r   r   rc         zPContext._startc                 C   re   )aF  
        Polls the run status of the processes running under this context.
        This method follows an "all-or-nothing" policy and returns
        a ``RunProcessResults`` object if either all processes complete
        successfully or any process fails. Returns ``None`` if
        all processes are still running.
        rf   rN   r   r   r   _poll   s   	zPContext._pollr   timeoutperiodc                 C   s`   |dkr|   S |dk rtj}t | }t |k r.|   }|r#|S t| t |k sdS )a  
        Waits for the specified ``timeout`` seconds, polling every ``period`` seconds
        for the processes to be done. Returns ``None`` if the processes are still running
        on timeout expiry. Negative timeout values are interpreted as "wait-forever".
        A timeout value of zero simply queries the status of the processes (e.g. equivalent
        to a poll).
        r   N)ri   r]   maxsizetimesleep)rO   rk   rl   expiryprr   r   r   wait   s   	
zPContext.waitc                 C   re   )zR
        Returns pids of processes mapped by their respective local_ranks
        rf   rN   r   r   r   pids   rh   zPContext.pidsc                 C   re   )z
        Terminates all processes managed by this context and cleans up any
        meta resources (e.g. redirect, error_file files).
        rf   rN   r   r   r   _close   s   zPContext._closec                 C   s0   |    | jr| j  | jr| j  d S d S r#   )rt   r_   stopra   rN   r   r   r   close   s   
zPContext.closer"   N)rj   r   )r5   r6   r7   rQ   r=   r   r   r   r$   r   rb   rd   abcabstractmethodrc   r	   rG   ri   floatrr   rs   rt   rv   r   r   r   r   rU      s@    






	




rU   
local_rankfnrX   rY   stdout_redirectsstderr_redirectsret_valsc              	   C   s   ||  }||  }||  }	||  }
||  }|
rt |
nt }|r#t|nt }| D ]	\}}|tj|< q*|  | t|| }W d    n1 sJw   Y  W d    n1 sYw   Y  |	| d S r#   )r   r   r   itemsosenvironr   put)r{   r|   rX   rY   r}   r~   r   args_env_ret_val_	stdout_rd	stderr_rd	stdout_cm	stderr_cmkr%   retr   r   r   _wrap   s   
 r   c                       s   e Zd ZdZdededeeef deeeeef f deeef deeef deeef d	eeef d
eeef def fddZ	dd Z
dee fddZdeeef fddZdddZ  ZS )MultiprocessContextzF
    ``PContext`` holding worker processes invoked as a function.
    rV   rW   rX   rY   rK   rL   rZ   r[   r\   start_methodc                    sL   t  |||||||||		 |
 _ fddt jD  _i  _d  _d S )Nc                    s   i | ]}|t  j qS r   )mpget_contextr   SimpleQueuer@   r{   rN   r   r   rB   5  s    z0MultiprocessContext.__init__.<locals>.<dictcomp>)superrb   r   r   r   	_ret_vals_return_values_pc)rO   rV   rW   rX   rY   rK   rL   rZ   r[   r\   r   	__class__rN   r   rb     s"   

zMultiprocessContext.__init__c                 C   sF   | j rtdtjt| j| j| j| j| j	| j
f| jdd| jd| _ d S )NzWThe process context already initialized. Most likely the start method got called twice.F)r|   rX   r   joindaemonr   )r   r0   r   start_processesr   rW   rX   rY   rK   rL   r   r   r   rN   r   r   r   rc   ?  s$   zMultiprocessContext._startr"   c           	      C   s*  z;| j d}td| jD ]}| j| }| s| | j|< q|r9t| j| jd | 	  t
| j| j| jdW S W d S  tjtjfy } zI|j}| jj}| j j| }| j| }tjd|j d| d|j d| d	| j d
dd | 	  t
|t||j|j|di| j| jdW  Y d }~S d }~ww )Nrj   r   zreturn_value queue)rI   rK   rL   failed (exitcode: ) local_rank:  (pid: z	) of fn: z (start_method: )T)exc_infor{   pidexitcode
error_filerJ   rK   rL   )r   r   r   r   r   emptyrD   r   r   rv   rG   rK   rL   r   ProcessRaisedExceptionProcessExitedExceptionerror_indexrW   r7   	processesr\   logerrorr   r   r   r   )	rO   doner{   return_queueefailed_local_rankfn_namefailed_procerror_filepathr   r   r   ri   T  sb   



zMultiprocessContext._pollc                 C   s   dd t | j D S )Nc                 S   s   i | ]\}}||qS r   r   )r@   r{   r   r   r   r   rB     s    z,MultiprocessContext.pids.<locals>.<dictcomp>)	enumerater   rs   rN   r   r   r   rs     s   zMultiprocessContext.pidsNc                 C   s,   | j r| j jD ]}|  |  qd S d S r#   )r   r   	terminater   )rO   procr   r   r   rt     s   
zMultiprocessContext._closerw   )r5   r6   r7   rQ   r=   r   r   r$   r   rb   rc   r	   rG   ri   rs   rt   __classcell__r   r   r   r   r     s6    




	

%=r   c                   @   sB   e Zd ZdZdededeeef dededefdd	Zd
d Z	dS )SubprocessHandlerz
    Convenience wrapper around python's ``subprocess.Popen``. Keeps track of
    meta-objects associated to the process (e.g. stdout and stderr redirect fds).
    rW   rX   env
preexec_fnr^   r`   c           	      C   sp   |rt |dnd | _|rt |dnd | _dd |D }tj }|| tj|g|R ||| j| jd| _	d S )Nwc                 S   s   g | ]}t |qS r   )r=   )r@   r   r   r   r   
<listcomp>  s    z.SubprocessHandler.__init__.<locals>.<listcomp>)rX   r   r   r^   r`   )
open_stdout_stderrr   r   copyupdate
subprocessPopenr   )	rO   rW   rX   r   r   r^   r`   args_strenv_varsr   r   r   rb     s   	


zSubprocessHandler.__init__c                 C   s<   | j   | j   | jr| j  | jr| j  d S d S r#   )r   r   rr   r   rv   r   rN   r   r   r   rv     s   


zSubprocessHandler.closeN)
r5   r6   r7   rQ   r=   r   r   r   rb   rv   r   r   r   r   r     s     

r   c                       s   e Zd ZdZdededeeef deeeeef f deeef deeef deeef d	eeef d
eeef f fddZdd Z	de
e fddZdeeef fddZdddZ  ZS )SubprocessContextzD
    ``PContext`` holding worker processes invoked as a binary.
    rV   rW   rX   rY   rK   rL   rZ   r[   r\   c
           
         s<   t  |||||||||		 tt| j| _i | _d | _d S r#   )r   rb   r   r   r   _running_local_ranks	_failuressubprocess_handlers)
rO   rV   rW   rX   rY   rK   rL   rZ   r[   r\   r   r   r   rb     s   
zSubprocessContext.__init__c                    s,    j rtd fddt jD  _ d S )Nz[The subprocess handlers already initialized. Most likely the start method got called twice.c                    sD   i | ]}|t  j j|  j| ttj j|  j	| d qS ))rW   rX   r   r   r^   r`   )
r   rW   rX   rY   r   _prctl_pr_set_pdeathsigsignalSIGTERMrK   rL   r   rN   r   r   rB     s    	
z,SubprocessContext._start.<locals>.<dictcomp>)r   r0   r   r   rN   r   rN   r   rc     s   
	zSubprocessContext._startr"   c              
   C   s   t  }| jD ])}| j| }|j }|d ur/|| |dkr/t||jj|| j| d| j	|< q| j
| | jr<| j	r}|   t| j	| j| jd}| rpt|j dd d}td|j d|j d	|j d
| j  |S dd t| jD |_|S d S )Nr   r   r   c                 S   s   | j S r#   )	timestamp)fr   r   r   <lambda>  s    z)SubprocessContext._poll.<locals>.<lambda>)keyr   r   r   z) of binary: c                 S   s   i | ]}|d qS r#   r   r   r   r   r   rB     s    z+SubprocessContext._poll.<locals>.<dictcomp>)r   r   r   r   polladdr   r   r\   r   difference_updaterv   rG   rK   rL   rP   minrJ   valuesr   r   r   r{   rW   r   r   rI   )rO   done_local_ranksr{   handlerr   resultfirst_failurer   r   r   ri     sN   




zSubprocessContext._pollc                 C   s   dd | j  D S )Nc                 S   s   i | ]	\}}||j jqS r   )r   r   )r@   r{   shr   r   r   rB   #  s    z*SubprocessContext.pids.<locals>.<dictcomp>)r   r   rN   r   r   r   rs   "  s   zSubprocessContext.pidsNc                 C   s&   | j r| j  D ]}|  qd S d S r#   )r   r   rv   )rO   r   r   r   r   rt   (  s
   
zSubprocessContext._closerw   )r5   r6   r7   rQ   r=   r   r$   r   rb   rc   r	   rG   ri   rs   rt   r   r   r   r   r   r     s2    




	

+r   )4rx   loggingr   r+   r   r   r]   rn   
contextlibr   dataclassesr   r   enumr   typingr   r   r   r	   r
   r   r   torch.multiprocessingmultiprocessingr   #torchelastic.multiprocessing.errorsr   r   &torchelastic.multiprocessing.redirectsr   r   %torchelastic.multiprocessing.tail_logr   	getLoggerr5   r   r$   r=   r   r.   r-   r   rF   rG   ABCrU   r   r   r   r   r   r   r   r   r   <module>   sh   $
)

r



 (