o
    oi)'                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	m
Z
mZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ e eZed	ZG d
d deZdee fddZ de!deee ef fddZ"de
ej# ddfddZ$G dd dej%Z&dS )    N)AnyCallableListOptionalSequenceTuple)RequirementCache)override)ClusterEnvironment)	_Launcher)_set_num_threads_if_needed)rank_prefixed_messagez
hydra-corec                	       sz   e Zd ZdZdddededdf fdd	Zeedefd
dZ	ede
dededefddZdddZdddZ  ZS )_SubprocessScriptLaunchera  A process launcher that invokes the current script as many times as desired in a single node.

    This launcher needs to be invoked on each node.
    In its default behavior, the main process in each node then spawns N-1 child processes via :func:`subprocess.Popen`,
    where N is the number of devices (e.g. GPU) per node. It is very similar to how :mod:`torch.distributed.run`
    launches processes.

    For example, if the script gets invoked with the command

    .. code-block:: bash

        python train.py --devices 4

    The launcher will create three additional subprocesses that get called like so:

    .. code-block:: bash

        LOCAL_RANK=1 python train.py --devices 4
        LOCAL_RANK=2 python train.py --devices 4
        LOCAL_RANK=3 python train.py --devices 4

    It is implied that the main process which launched the others has ``LOCAL_RANK=0``.
    Beside the local rank, the following other environment variables also get set, but unlike the local rank, these
    get determined by the cluster environment:

    1. `MASTER_ADDR`: The IP address of the main node.
    2. `MASTER_PORT`: The port number of the main node through which all processes communicate.
    3. `NODE_RANK`: The index of the node the current process is running on. Ranges from 0 to ``num_nodes - 1``.
    4. `WORLD_SIZE`: The total number of processes across all nodes, i.e., ``num_processes * num_nodes``.

    Arguments:
        cluster_environment: A cluster environment that provides access to world size, node rank, etc.
        num_processes: The number of processes to launch in the current node.
        num_nodes: The total number of nodes that participate in this process group.

    cluster_environmentr
   num_processes	num_nodesreturnNc                    s&   t    || _|| _|| _g | _d S N)super__init__r   r   r   procs)selfr   r   r   	__class__ k/home/ubuntu/.local/lib/python3.10/site-packages/lightning/fabric/strategies/launchers/subprocess_script.pyr   I   s
   

z"_SubprocessScriptLauncher.__init__c                 C   s   dS )NFr   r   r   r   r   is_interactive_compatibleU   s   z3_SubprocessScriptLauncher.is_interactive_compatiblefunctionargskwargsc                 O   sH   | j j| j| jd | j js|   t| j t| jd ||i |S )a  Creates new processes, then calls the given function.

        Arguments:
            function: A callback function to execute after all processes have been created.
                It is up to the implementation of this function to synchronize the processes, e.g., with barriers.
            *args: Optional positional arguments to be passed to the given function.
            **kwargs: Optional keyword arguments to be passed to the given function.

        )num_devicesr   )r   )	r   validate_settingsr   r   creates_processes_externally_call_children_scripts_launch_process_observerr   r   )r   r   r   r    r   r   r   launchZ   s   
z _SubprocessScriptLauncher.launchc                 C   s  |    | jjtjd< t| jjtjd< t| j tjd< t| j tjd< | j	| j
  tjd< td| j	D ]F}tj }| |d< tjdd u rSd|v rS|d= d}d }trcd	d
lm} | }|rmt|d\}}nt }tj|||d}| j| q8d S )NMASTER_ADDRMASTER_PORT	NODE_RANK
LOCAL_RANK
WORLD_SIZE   PL_GLOBAL_SEEDFr   HydraConfig)
local_rank)envcwd)_check_can_spawn_childrenr   main_addressosenvironstr	main_port	node_rankr0   r   r   rangecopyget_HYDRA_AVAILABLEhydra.core.hydra_configr/   initialized_hydra_subprocess_cmd_basic_subprocess_cmd
subprocessPopenr   append)r   r0   env_copyhydra_in_user2   r/   commandprocr   r   r   r$   m   s,   

z0_SubprocessScriptLauncher._call_children_scriptsc                 C   s0   t | jdkrtd| j dkrtdd S )Nr   z/The launcher can only create subprocesses once.a  Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen. Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user, 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented.)lenr   RuntimeErrorr   r0   r   r   r   r   r3      s   z3_SubprocessScriptLauncher._check_can_spawn_childrenr   N)__name__
__module____qualname____doc__intr   propertyr	   boolr   r   r   r&   r$   r3   __classcell__r   r   r   r   r   #   s$    %
%r   r   c                  C   sV   dd l } | jd u rtjtjtjd gtjdd   S tjd| jjgtjdd   S )Nr   r,   -m)	__main____spec__sys
executabler5   pathabspathargvname)rU   r   r   r   rA      s   
&rA   r0   c                 C   s   ddl m} ddlm}m} dd l}|jd u r"tj|tj	d g}ntjd|jj
g}|tj	dd  7 }| }d| jj d}|d| d|  d	g7 }||fS )
Nr   r.   )get_original_cwdto_absolute_pathrT   r,   "zhydra.run.dir=z!hydra.job.name=train_ddp_process_zhydra.output_subdir=null)r>   r/   hydra.utilsr]   r^   rU   rV   rW   rX   r[   r\   r<   rundir)r0   r/   r]   r^   rU   rG   r2   rundirr   r   r   r@      s   
r@   child_processesc                 C   s   t | t d  dS )z\Launches a thread that runs along the main process and monitors the health of all processes.)rd   main_pidN)_ChildProcessObserverr5   getpidstart)rd   r   r   r   r%      s   r%   c                	       s\   e Zd Zddedeej deddf fddZedd	d
Z	de
fddZdddZ  ZS )rf      re   rd   sleep_periodr   Nc                    sD   t  jddd || _|| _|| _tjdkrtjntj	| _
d| _d S )NTzchild-process-observer)daemonr\   win32F)r   r   	_main_pid_child_processes_sleep_periodrW   platformsignalSIGTERMSIGKILL_termination_signal	_finished)r   re   rd   rj   r   r   r   r      s   
z_ChildProcessObserver.__init__c                 C   s*   | j st| j |  | _ | j rd S d S r   )ru   timesleepro   _runr   r   r   r   ra      s   
z_ChildProcessObserver.runc                 C   s   | j D ]}|  qdd | j D }tdd |D rdS t| j D ]$\}}|jrFtd|j d|j d|d	 d
}t| | 	   dS q"dS )zKRuns once over all child processes to check whether they are still running.c                 S   s   g | ]}|j qS r   )
returncode).0rH   r   r   r   
<listcomp>   s    z._ChildProcessObserver._run.<locals>.<listcomp>c                 s   s    | ]}|d kV  qdS )r   Nr   )rz   return_coder   r   r   	<genexpr>   s    z-_ChildProcessObserver._run.<locals>.<genexpr>TzChild process with PID z terminated with code uB   . Forcefully terminating all other processes to avoid zombies 🧟r,   )rankF)
rn   pollall	enumeratery   r   pid_loggerinfo_terminate_all)r   rH   return_codesimessager   r   r   rx      s    



z_ChildProcessObserver._runc                 C   s,   | j D ]}|| j qt| j| j dS )z1Terminates the main process and all its children.N)rn   send_signalrt   r5   killrm   )r   pr   r   r   r      s   
z$_ChildProcessObserver._terminate_all)ri   rK   )rL   rM   rN   rP   r   rB   rC   r   r	   ra   rR   rx   r   rS   r   r   r   r   rf      s    &	rf   )'loggingr5   rq   rB   rW   	threadingrv   typingr   r   r   r   r   r    lightning_utilities.core.importsr   typing_extensionsr	   9lightning.fabric.plugins.environments.cluster_environmentr
   .lightning.fabric.strategies.launchers.launcherr   &lightning.fabric.utilities.distributedr   $lightning.fabric.utilities.rank_zeror   	getLoggerrL   r   r=   r   r7   rA   rP   r@   rC   r%   Threadrf   r   r   r   r   <module>   s*    
z