o
    8wiG'                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
mZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ e eZed
ZG dd deZdee fddZde de!ee ef fddZ"de#ej$ ddfddZ%G dd dej&Z'dS )    N)Sequence)AnyCallableOptional)RequirementCache)override)ClusterEnvironment)	_Launcher)_set_num_threads_if_needed)rank_prefixed_messagez
hydra-corec                	       sz   e Zd ZdZdddededdf fdd	Zeedefd
dZ	ede
dededefddZdddZdddZ  ZS )_SubprocessScriptLaunchera  A process launcher that invokes the current script as many times as desired in a single node.

    This launcher needs to be invoked on each node.
    In its default behavior, the main process in each node then spawns N-1 child processes via :func:`subprocess.Popen`,
    where N is the number of devices (e.g. GPU) per node. It is very similar to how :mod:`torch.distributed.run`
    launches processes.

    For example, if the script gets invoked with the command

    .. code-block:: bash

        python train.py --devices 4

    The launcher will create three additional subprocesses that get called like so:

    .. code-block:: bash

        LOCAL_RANK=1 python train.py --devices 4
        LOCAL_RANK=2 python train.py --devices 4
        LOCAL_RANK=3 python train.py --devices 4

    It is implied that the main process which launched the others has ``LOCAL_RANK=0``.
    Beside the local rank, the following other environment variables also get set, but unlike the local rank, these
    get determined by the cluster environment:

    1. `MASTER_ADDR`: The IP address of the main node.
    2. `MASTER_PORT`: The port number of the main node through which all processes communicate.
    3. `NODE_RANK`: The index of the node the current process is running on. Ranges from 0 to ``num_nodes - 1``.
    4. `WORLD_SIZE`: The total number of processes across all nodes, i.e., ``num_processes * num_nodes``.

    Arguments:
        cluster_environment: A cluster environment that provides access to world size, node rank, etc.
        num_processes: The number of processes to launch in the current node.
        num_nodes: The total number of nodes that participate in this process group.

    cluster_environmentr   num_processes	num_nodesreturnNc                    s&   t    || _|| _|| _g | _d S N)super__init__r   r   r   procs)selfr   r   r   	__class__ t/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/lightning_fabric/strategies/launchers/subprocess_script.pyr   J   s
   

z"_SubprocessScriptLauncher.__init__c                 C   s   dS )NFr   r   r   r   r   is_interactive_compatibleV   s   z3_SubprocessScriptLauncher.is_interactive_compatiblefunctionargskwargsc                 O   sH   | j j| j| jd | j js|   t| j t| jd ||i |S )a  Creates new processes, then calls the given function.

        Arguments:
            function: A callback function to execute after all processes have been created.
                It is up to the implementation of this function to synchronize the processes, e.g., with barriers.
            *args: Optional positional arguments to be passed to the given function.
            **kwargs: Optional keyword arguments to be passed to the given function.

        )num_devicesr   )r   )	r   validate_settingsr   r   creates_processes_externally_call_children_scripts_launch_process_observerr   r
   )r   r   r   r   r   r   r   launch[   s   
z _SubprocessScriptLauncher.launchc                 C   s  |    | jjtjd< t| jjtjd< t| j tjd< t| j tjd< | j	| j
  tjd< td| j	D ]F}tj }| |d< tjdd u rSd|v rS|d= d}d }trcd	d
lm} | }|rmt|d\}}nt }tj|||d}| j| q8d S )NMASTER_ADDRMASTER_PORT	NODE_RANK
LOCAL_RANK
WORLD_SIZE   PL_GLOBAL_SEEDFr   HydraConfig)
local_rank)envcwd)_check_can_spawn_childrenr   main_addressosenvironstr	main_port	node_rankr.   r   r   rangecopyget_HYDRA_AVAILABLEhydra.core.hydra_configr-   initialized_hydra_subprocess_cmd_basic_subprocess_cmd
subprocessPopenr   append)r   r.   env_copyhydra_in_user0   r-   commandprocr   r   r   r"   n   s,   

z0_SubprocessScriptLauncher._call_children_scriptsc                 C   s0   t | jdkrtd| j dkrtdd S )Nr   z/The launcher can only create subprocesses once.a  Lightning attempted to launch new distributed processes with `local_rank > 0`. This should not happen. Possible reasons: 1) LOCAL_RANK environment variable was incorrectly modified by the user, 2) `ClusterEnvironment.creates_processes_externally` incorrectly implemented.)lenr   RuntimeErrorr   r.   r   r   r   r   r1      s   z3_SubprocessScriptLauncher._check_can_spawn_childrenr   N)__name__
__module____qualname____doc__intr   propertyr   boolr   r   r   r$   r"   r1   __classcell__r   r   r   r   r   $   s$    %
%r   r   c                  C   sV   dd l } | jd u rtjtjtjd gtjdd   S tjd| jjgtjdd   S )Nr   r*   -m)	__main____spec__sys
executabler3   pathabspathargvname)rS   r   r   r   r?      s   
&r?   r.   c                 C   s   ddl m} ddlm}m} dd l}|jd u r"tj|tj	d g}ntjd|jj
g}|tj	dd  7 }| }d| jj d}|d| d|  d	g7 }||fS )
Nr   r,   )get_original_cwdto_absolute_pathrR   r*   "zhydra.run.dir=z!hydra.job.name=train_ddp_process_zhydra.output_subdir=null)r<   r-   hydra.utilsr[   r\   rS   rT   rU   rV   rY   rZ   r:   rundir)r.   r-   r[   r\   rS   rE   r0   rundirr   r   r   r>      s   
r>   child_processesc                 C   s   t | t d  dS )z\Launches a thread that runs along the main process and monitors the health of all processes.)rb   main_pidN)_ChildProcessObserverr3   getpidstart)rb   r   r   r   r#      s   r#   c                	       s\   e Zd Zddedeej deddf fddZedd	d
Z	de
fddZdddZ  ZS )rd      rc   rb   sleep_periodr   Nc                    sD   t  jddd || _|| _|| _tjdkrtjntj	| _
d| _d S )NTzchild-process-observer)daemonrZ   win32F)r   r   	_main_pid_child_processes_sleep_periodrU   platformsignalSIGTERMSIGKILL_termination_signal	_finished)r   rc   rb   rh   r   r   r   r      s   
z_ChildProcessObserver.__init__c                 C   s*   | j st| j |  | _ | j rd S d S r   )rs   timesleeprm   _runr   r   r   r   r_      s   
z_ChildProcessObserver.runc                 C   s   | j D ]}|  qdd | j D }tdd |D rdS t| j D ]$\}}|jrFtd|j d|j d|d	 d
}t| | 	   dS q"dS )zKRuns once over all child processes to check whether they are still running.c                 S   s   g | ]}|j qS r   )
returncode).0rF   r   r   r   
<listcomp>   s    z._ChildProcessObserver._run.<locals>.<listcomp>c                 s   s    | ]}|d kV  qdS )r   Nr   )rx   return_coder   r   r   	<genexpr>   s    z-_ChildProcessObserver._run.<locals>.<genexpr>TzChild process with PID z terminated with code uB   . Forcefully terminating all other processes to avoid zombies 🧟r*   )rankF)
rl   pollall	enumeraterw   r   pid_loggerinfo_terminate_all)r   rF   return_codesimessager   r   r   rv      s    



z_ChildProcessObserver._runc                 C   s,   | j D ]}|| j qt| j| j dS )z1Terminates the main process and all its children.N)rl   send_signalrr   r3   killrk   )r   pr   r   r   r      s   
z$_ChildProcessObserver._terminate_all)rg   rI   )rJ   rK   rL   rN   listr@   rA   r   r   r_   rP   rv   r   rQ   r   r   r   r   rd      s    &	rd   )(loggingr3   ro   r@   rU   	threadingrt   collections.abcr   typingr   r   r    lightning_utilities.core.importsr   typing_extensionsr   9lightning_fabric.plugins.environments.cluster_environmentr   .lightning_fabric.strategies.launchers.launcherr	   &lightning_fabric.utilities.distributedr
   $lightning_fabric.utilities.rank_zeror   	getLoggerrJ   r   r;   r   r5   r?   rN   tupler>   r   rA   r#   Threadrd   r   r   r   r   <module>   s,   
z