o
    wi                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
m
Z
 d dlmZ d dlmZ d dlmZ d dlZd dlZd dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$ e %e&Z'i Z(e	G dd dZ)de)dee&* j+d diZ,de)fddZ-de.dee) fddZ/dd Z0dd Z1edd!e.de2e. fd"d#Z3d$d% Z4dd'e2e5 dB d(e5e.B de5fd)d*Z6d+d, Z7	-dd.e.d/e5d0e5d1e.d2e8d3e5d4e.fd5d6Z9d7d8 Z:	-dd.e.d/e5d0e5d1e.d2e8d3e5d4e.fd9d:Z;d;d< Z<e	d d=G d>d? d?eZ=d@dA Z>ddBdCZ?e			ddDe.dEe.dFe.dGe.dB dHe.dB dIe.dB fdJdKZ@dLdM ZAdNdO ZBG dPdQ dQeZCdRe5dSe5ddfdTdUZD	 ddVe dWe.dXe.dYee. dZeEf
d[d\ZFddVe d]e.dWe.dZeEfd^d_ZGdd`e.eB fdadbZHddceIe. dB fdddeZJdfdg ZKd2e8fdhdiZL	j						k			ddle.dceIe. dB dme8dB fdndoZMedpdq ZNdddddddkdddd dde;dddkfdre.e2e. B dse.e2e. B dte5e2e5 B due5dB dve.ejOB dB dweEdxe2e. dye.e2e. B dB dce2e. dB dme8dB dzeEfd{d|ZPdd}d~ZQdS )    N)contextmanager)	dataclass)datetime)	lru_cache)Path)Optional)	get_token)StreamWatcher)set_nemorun_home)DockerExecutor)SlurmJobDetailsget_packaging_job_key)	SSHTunnel)
DictConfig)AppStatec                   @   s*   e Zd ZU dZeed< eed< dd ZdS )RepoMetadataz3Metadata for a repo that is used in the experiment.namepathc                 C   s8   t | jtrt| j| _| j std| j dd S )NzRepository path `z` does not exist.)
isinstancer   strr   exists
ValueErrorself r   g/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/collections/common/parts/skills_utils.py__post_init__<   s
   
zRepoMetadata.__post_init__N)__name__
__module____qualname____doc__r   __annotations__r   r   r   r   r   r   r   5   s
   
 r   nemo_skills   )r   r   metadatac                 C   s*   | j tv rtd| j  d| t| j < dS )zRegister an external repo to be packaged with the code in the experiment.

    Args:
        metadata (RepoMetadata): Metadata for the external repo.
    zExternal repo z is already registered.N)r   EXTERNAL_REPOSr   )r$   r   r   r   register_external_repoL   s   
r&   r   returnc                 C   s   | t vrdS t |  S )zGet the path to the registered external repo.

    Args:
        name (str): Name of the external repo.

    Returns:
        A path to the external repo if it is registered, otherwise None.
    N)r%   )r   r   r   r   get_registered_external_repoX   s   	r(   c                 C   s>   t | dg D ]}||dd r dS qtd| d)zEWill check that path_to_check is referenced inside one of the mounts.z/nemo_run/code:/nemo_run/code:r#   N
The path ''' is not mounted. Check cluster config.)get_mounts_from_config
startswithsplitr   )cluster_configpath_to_checkmountr   r   r   check_if_mountedg   s
   r2   c                 C   sj   |du rdS t | D ]"}||dd r,|dd |t|dd d    S q
td| d)z;Will return the path on the filesystem before it's mounted.Nr)   r#   r   r*   r+   )r,   r-   r.   lenr   )r/   r   r1   r   r   r   get_unmounted_patho   s   ,r4   Texpnamec                    s    fdd}t | tjr|| S ztj| }||W  d   W S 1 s'w   Y  W dS  tyt   z!tj| }||W  d   W  Y S 1 sNw   Y  W Y dS  tys   |rktd|  g  Y  Y S t	d|  dw w )a  Will return the handles of the tasks in the experiment.

    If ignore_finished=True, will only return handles for the tasks
    that are not yet finished. Useful for filtering handles to set dependencies on.

    If ignore_exp_not_exists=True, will not raise an error if the experiment does not exist.

    TODO: it's still possible that job submission fails if the tasks exist when this function
          is called, but finish before nemo-run submits a new job (which might take minutes)
    c                    sF   g }| j D ]} r|| jtjtjtjtjfv r ||j	 qq|S N)
jobsstatus_runnerr   RUNNINGPENDING	SUBMITTEDUNKNOWNappendhandle)exphandlesjobignore_finishedr   r   _get_handles   s   
 z%get_exp_handles.<locals>._get_handlesNzExperiment %s not found!zExperiment z not found!)
r   run
Experiment
from_titleFileNotFoundErrorfrom_idAssertionErrorLOGwarningr   )r5   rD   ignore_exp_not_existsrE   r@   r   rC   r   get_exp_handles{   s&   (.rO   c                 C   sr   d| vrd}|S | d |p| d  }t |dt dd }d|jd dd	|jd d
 dd	|jd
 d}|S )a2  Get the timeout value for a given partition from the cluster configuration.

    Args:
        cluster_config (dict): The cluster configuration dictionary
        partition (str): The partition name to get timeout for

    Returns:
        str: Timeout value in the format 'days:hours:minutes:seconds'
    timeouts10000:00:00:00	partitionz%H:%M:%Sz00:15:00z00:i  02dr)   <   )r   strptimeseconds)r/   rR   timeout	time_diffr   r   r   get_timeout   s   

0rY     excludestrategyc                 C   s~   | pg } t |tr|}|| v r|d7 }|| v s|S |dkr7ddl}|dd}|| v r5|dd}|| v s+|S td| d)	z$Will return a free port on the host.r#   randomr   N   i  z	Strategy z not supported.)r   intr]   randintr   )r[   r\   portr]   r   r   r   get_free_port   s   
rb   c                 C   s   d|  d|  d| }|S )a2  Construct the command for running generation tasks.

    Args:
        server_address (str): Address of the server to connect to
        generation_commands (str): Commands to run for generation

    Returns:
        str: Complete command string including environment setup and server connection check
    znexport PYTHONPATH=$PYTHONPATH:/nemo_run/code && cd /nemo_run/code && echo 'Waiting for the server to start at z' && while [ $(curl -X PUT z8 >/dev/null 2>&1; echo $?) -ne 0 ]; do sleep 3; done && r   )server_addressgeneration_commandscmdr   r   r   get_generation_command   s   
rf    server_typenum_gpus	num_nodes
model_pathr/   server_portserver_argsc                 C   s   |}| dkrt || n| dkr|drt || | dkrLtd|gd}d| d| d| d	| d
| d| d| d| d| d}	|d dkrKd}n&| dkrj|dkrXtdd| d| d| d| d	}	d}ntd|  dd|	 d}
|
|fS )a  Construct the command for starting a reward model server.

    Args:
        server_type (str): Type of server to start ('nemo', 'vllm', etc.)
        num_gpus (int): Number of GPUs to use
        num_nodes (int): Number of nodes to use
        model_path (str): Path to the model
        cluster_config (dict): Cluster configuration
        server_port (int): Port for the server
        server_args (str, optional): Additional arguments for the server

    Returns:
        tuple[str, int]: Server command string and number of tasks to run

    Raises:
        ValueError: If server type is not supported or configuration is invalid
    vllm/nemor]   )r\   r[   z[python -m nemo_skills.inference.server.serve_nemo_aligner_reward_model     ++rm_model_file=     trainer.devices=     trainer.num_nodes=z'     +model.tensor_model_parallel_size=z)     +model.pipeline_model_parallel_size=z     inference.port=     zU & python -m nemo_skills.inference.server.serve_nemo_reward_model     inference_port=z&      triton_server_address=localhost: executorlocalr#   z1VLLM server does not support multi-node execution?python3 -m nemo_skills.inference.server.serve_vllm     --model      --num_gpus      --port zServer type 'z!' not supported for reward model.Snvidia-smi && cd /nemo_run/code && export PYTHONPATH=$PYTHONPATH:/nemo_run/code && )r2   r-   rb   r   )rh   ri   rj   rk   r/   rl   rm   	num_tasksnemo_aligner_reward_model_portserver_start_cmd
server_cmdr   r   r   get_reward_server_command   sd   
	r   c                 C   s    d}d| d|  d| d}|S )zDGenerate Ray server command with head and worker node configuration.z--node-manager-port=12345 --object-manager-port=12346 --dashboard-port=8265 --dashboard-agent-grpc-port=12347 --runtime-env-agent-port=12349 --metrics-export-port=12350 --min-worker-port=14349 --max-worker-port=18349 zif [ "${SLURM_PROCID:-0}" = 0 ]; then     echo 'Starting head node' &&     export RAY_raylet_start_wait_time_s=120 &&     ray start         --head         --port=6379        z &&    z ;else     echo 'Starting worker node' &&     export RAY_raylet_start_wait_time_s=120 &&     echo "Connecting to head node at $SLURM_MASTER_NODE" &&     ray start         --block         --address=$SLURM_MASTER_NODE:6379        z ;fir   )	start_cmdportsray_start_cmdr   r   r   get_ray_server_cmd=  s   r   c                 C   s,  |}| dkrt || n| dkr|drt || | dkr?d| d| d| d| d| d	| d
| d}|d dkr>d}nM| dkrYd| d| d| d
| d	}	t|	}d}n3| dkr~|dkrdd}
nd}
d| d| d| d| d
|
 d
| d}d}nd| d| d
| d}|}d| d}||fS )z3Generate command to start a model inference server.rn   ro   rp   zEpython -m nemo_skills.inference.server.serve_nemo     gpt_model_file=rq   rr   z      tensor_model_parallel_size=z"     pipeline_model_parallel_size=z     ++port=rs   rt   ru   rv   r#   rw   rx   ry   sglangz? --dist_init_addr $SLURM_MASTER_NODE --node_rank $SLURM_PROCID rg   zApython3 -m nemo_skills.inference.server.serve_sglang     --model z     --num_nodes zcFORCE_NCCL_ALL_REDUCE_STRATEGY=1 python -m nemo_skills.inference.server.serve_trt     --model_path rz   )r2   r-   r   )rh   ri   rj   rk   r/   rl   rm   r{   r}   start_vllm_cmdmultinode_argsr~   r   r   r   get_server_command`  s   

	r   c                   C   s   dS )z}Get the command to start the sandbox environment.

    Returns:
        str: Command to initialize and start the sandbox
    z/entrypoint.sh && /start.shr   r   r   r   r   get_sandox_command  s   r   )kw_onlyc                   @   s   e Zd ZU dZdZeed< dZeed< ede	fddZ
ede	fd	d
Zede	fddZede	fddZedefddZdS )CustomJobDetailsz
    Custom job details class for handling SLURM job logs.

    Extends SlurmJobDetails to manage separate log files for sbatch and srun processes,
    with configurable prefixes for each type of log.
    mainsrun_prefixrg   sbatch_prefixr'   c                 C      t | j| j d S )z(Path to sbatch standard output log file.%j_sbatch.logr   folderr   r   r   r   r   stdout     zCustomJobDetails.stdoutc                 C   r   )z&Path to srun standard output log file.%j_srun.logr   r   r   r   r   r   r   srun_stdout  r   zCustomJobDetails.srun_stdoutc                 C   r   )z'Path to sbatch standard error log file.r   r   r   r   r   r   stderr  r   zCustomJobDetails.stderrc                 C   r   )z%Path to srun standard error log file.r   r   r   r   r   r   srun_stderr  r   zCustomJobDetails.srun_stderrc                 C   s   | j sJ tj| j dS )z~This term will be used to fetch the logs.

        The command used to list the files is ls -1 {ls_term} 2> /dev/null
        z	*srun.log)r   osr   joinr   r   r   r   ls_term  s   
zCustomJobDetails.ls_termN)r   r   r   r    r   r   r!   r   propertyr   r   r   r   r   r   r   r   r   r   r     s   
 r   c                 C   s>   t | ddd}t|}W d   |S 1 sw   Y  |S )z)Read and parse a YAML configuration file.rtzutf-8)encodingN)openyaml	safe_load)config_filefinr/   r   r   r   read_config  s   
r   c                 C   s8  | durdt | ttfr| S |ptjd}|r"tt||  d S t d |  d 	 r<tt d |  d S tt
jd d |  d 	 r\ttt
jd d |  d S td|  dtjd}|sptd	t|	 s~td| d
t|}|d dkrd|vrd|vrtdt|d  |S )aD  Trying to find an appropriate cluster config.

    Will search in the following order:
    1. config_dir parameter
    2. NEMO_SKILLS_CONFIG_DIR environment variable
    3. Current folder / cluster_configs
    4. This file folder / ../../cluster_configs

    If NEMO_SKILLS_CONFIG is provided and cluster is None,
    it will be used as a full path to the config file
    and NEMO_SKILLS_CONFIG_DIR will be ignored.

    If cluster is a python object (dict-like), then we simply
    return the cluster config, under the assumption that the
    config is prepared by the user.
    NNEMO_SKILLS_CONFIG_DIRz.yamlcluster_configs   zCluster config z+ not found in any of the supported folders.NEMO_SKILLS_CONFIGz6Either cluster or NEMO_SKILLS_CONFIG must be provided. not found.ru   slurm
ssh_tunneljob_dirzMjob_dir must be provided in the cluster config if ssh_tunnel is not provided.)r   dictr   r   environgetr   r   cwdr   __file__parentsr   r
   )cluster
config_dirr   r/   r   r   r   get_cluster_config  s,     r   r   hostuseridentityshellpre_commandc                 C   s   t j|||||| dS )zGCreate and cache an SSH tunnel connection with the given configuration.)r   r   r   r   r   r   )rF   r   r   r   r   r   r   r   r   r   r   _get_tunnel_cached#  s   
r   c                 C   s2   | j  d| j d| j d| j d| j d| j S )z>Generate a unique hash string for an SSH tunnel configuration.r)   r   )tunnelr   r   r   tunnel_hash7  s   2r   c                 C   s0   d| vrt d tjddS tdi | d S )zCreate ssh tunnel.r   zTNo ssh_tunnel configuration found, assuming we are running from the cluster already.rg   )r   Nr   )rL   inforF   LocalTunnelr   )r/   r   r   r   
get_tunnel<  s   
r   c                   @   s   e Zd ZdZdd ZdS )OutputWatcherz3Class for streaming remote tar/compression process.c                 C   s   t |dd tj  g S )z'Process and display a stream of output.)end)printsysr   flush)r   streamr   r   r   submitH  s   
zOutputWatcher.submitN)r   r   r   r    r   r   r   r   r   r   E  s    r   transferredtotalc                 C   sl   | | d }dt |d  d }tjd|dd|dd	| d
 d
 dd|d
 d
 dd	 tj  dS )zDisplay SFTP transfer progress.d   =r   >zFile Transfer Progress: [z<50z] z.1fz% (r^   zMB/zMB)N)r_   r   r   writer   )r   r   percentbarr   r   r   progress_callbackO  s   r   r   
remote_dir	local_dirremote_tar_dirverbosec                 C   s  | d}tj|\}}|r|n|}| d}tj|| }tj||}	| d| d}
t|
j }d}z| jddd}
|
j	d	k}W n t
yR   d}Y nw |rr|rrd
| d| d| d| }| j|t gd| d nd
| d| d| }| j|| d}
| jj }|j||	|rtndd td|	  tj|dd t|	d}|j|d W d   n1 sw   Y  | jd| dd t|	 dS )a  
    Downloads a directory from a remote cluster by creating a tar archive and transferring it.

    Args:
        tunnel: SSHTunnel connection
        remote_dir: Path to the directory on remote server
        local_dir: Local path to save the downloaded directory
        remote_tar_dir: Optional directory for temporary tar file creation
        verbose: Print download progress
    ro   z.tar.gzzdu -sb z
 | cut -f1Fzwhich pvT)warnr   zcd z  && tar --exclude="*.log" -cf - z	 | pv -s zB -p -t -e -b -F "Compressing Remote Directory: %b %t %p" | gzip > )watchersptyhidez && tar -czf rt   )r   Ncallbackz
Transfer complete: )exist_okzr:gzr   zrm )rstripr   r   r.   r   rF   r_   r   stripexited	Exceptionr   sessionclient	open_sftpr   r   r   makedirstarfiler   
extractallremove)r   r   r   r   r   remote_dir_parentremote_dir_nameremote_tar_filename
remote_tar	local_tarresult
total_sizestreaming_possiblecommandsftptarr   r   r   cluster_downloadZ  sH   

r   
local_filec                 C   s8   | j j }|jt|t||rtndd td dS )a  
    Uploads a file to cluster.
    TODO: extend to a folder.

    Args:
        tunnel: SSHTunnel connection
        local_file: Path to the local file to upload
        remote_dir: Cluster path where to save the file
        verbose: Print upload progress
    Nr   z
Transfer complete)r   r   r   putr   r   r   )r   r   r   r   r   r   r   r   cluster_upload  s    r   r   c              	   C   s|   t  }z3z!| rt |  tjg ddddj  }t|W W t | S  tj	y7   Y W t | dS w t | w )zCheck if the path is a git repo.

    Args:
        path: Path to the directory to check. If None, will check the current directory.

    Returns:
        Path to the repo if it is a git repo, otherwise None.
    )gitz	rev-parsez--show-toplevelT)capture_outputcheckN)
r   getcwdchdir
subprocessrF   r   decoder   r   CalledProcessError)r   original_path	repo_pathr   r   r   get_git_repo_path  s&   	

	r  extra_package_dirsc                 C   s  t dj}| rdd | D }dd | D }ng }g }ttdd }tdd}|r`t|d  sFt	d	t
|d
  |t
|d
  n	|t
|d  |t
|j tj|||d}n"t	dt
|d
  |t
|d
  |t
|j tj||d}i }ttdkr||d< t D ]6\}}	|dkrq|	j}t|rtjt
||d||< qt
t|d
 g}
t
t|jg}tj|
|d||< qtj|ddS |S )z`Will check if we are running from a git repo and use git packager or default packager otherwise.r"   c                 S   s   g | ]
}t t|d  qS )*)r   r   .0dr   r   r   
<listcomp>  s    z get_packager.<locals>.<listcomp>c                 S   s   g | ]	}t t|jqS r   )r   r   parentr  r   r   r   r	    s    -NEMO_SKILLS_DISABLE_UNCOMMITTED_CHANGES_CHECKr   Nr   zoNot running from NeMo-Skills repo, trying to upload installed package. Make sure there are no extra files in %sr  zdataset/**/*.jsonl)include_patterninclude_pattern_relative_pathcheck_uncommitted_changeszuNot running from a git repo, trying to upload installed package.             Make sure there are no extra files in %s)r  relative_pathr#   nemo_run)basepathr  T)sub_packagersextract_at_root)r(   r   boolr   getenvr  r   is_dirloggingrM   r   r>   r
  rF   GitArchivePackagerPatternPackagerr3   r%   itemsHybridPackager)r  nemo_skills_dirinclude_patternsinclude_pattern_relative_pathscheck_uncommited_changesr  root_packageextra_repos	repo_name	repo_metarepo_include_pattern"repo_include_pattern_relative_pathr   r   r   get_packager  sf   



r&  c                 C   s  i }|  dg }|D ]J}d|v r7|ddkr|d\}}ntd| | || < td|  q
|tjv rMtd| d tj| ||< q
td| dg d	}d
dd i}|  dg }|| D ]a}d|v r|ddkr~|d\}}ntd| | || < td|  qi|tjv rtd| d tj| ||< qi||v r||  ||< td| d qitd| d qi|S )a  
    Will get the environment variables from the cluster config and the user environment.

    The following items in the cluster config are supported:
    - `required_env_vars` - list of required environment variables
    - `env_vars` - list of optional environment variables

    WANDB_API_KEY, NVIDIA_API_KEY, OPENAI_API_KEY, and HF_TOKEN are always added if they exist.

    Args:
        cluster_config: cluster config dictionary

    Returns:
        dict: dictionary of environment
    required_env_varsr   r#   z.Invalid required environment variable format: z%Adding required environment variable z from environmentRequired environment variable r   )WANDB_API_KEYNVIDIA_API_KEYOPENAI_API_KEYHF_TOKENr,  c                   S   s
   t t S r6   )r   r   r   r   r   r   <lambda>I  s   
 z#get_env_variables.<locals>.<lambda>env_varsz.Invalid optional environment variable format: z%Adding optional environment variable zOptional environment variable z) not found in user environment; skipping.)	r   countr.   r   r   r  r   r   r   )r/   r.  r'  env_varvaluealways_optional_env_varsdefault_factoriesoptional_env_varsr   r   r   get_env_variables#  s@   

r5  c                 C   s   |  dg }tt|D ]l}|| }d|vrtd| d|d\}}|d dkrI|d dkrI|d	d }|tjvrDtd
| dtj| }|d dkrm|d dkrm|d	d }|tjvrhtd
| dtj| }| d| }|||< q|S )a  
    Determines if there are mount paths that are being passed via environment variables.
    Selects the key in the cluster config called `mounts` which is a list of strings.
    Each string is in the format of `<str | {env_var}>:<str | {env_var}>` where `env_var`
    is the name of the environment variable.

    Args:
        cluster_config (dict): cluster config dictionary

    Returns:
        list: updated list of mounts
    mountsr)   zInvalid mount format: zK.                              The mount path must be separated by a colon.r   {}r#   r(  zN not found                         in env variables passed in cluster configs.zP not found                           in env variables passed in cluster configs.)r   ranger3   r   r.   r   r   )r/   r6  mount_idr1   mount_sourcemount_targetresolved_mountr   r   r   r,   a  s2   







r,   r   F
log_prefixslurm_kwargsc                 C   s\  t | }t| }|p|}|durt|}t|d}| d dkr:|dkr'tdd|d< t||d	|d|d	|d
did	S |sAd|d< nd| d|d< t|D ]}d| d|d| < qM|	pa| d}	d| vrid}n| d |	 }|
durwd|
ini }| ddur| d |d< | ddur| d |d< ddddd| d| ddd d! |	 D  g}| d"d#s|dur|
d$|  | d%d&}tjd=i d'| d' d|	d(|d)|d*t| d+|d,|d-|d.|d/|d0| d"d#s|ndd1|d2t| d3d| t| ||d4 | d4 |d4 d5d6d7d8d9d:|d%|d;|d<||p+i S )>a  Create and configure an executor for running tasks locally or on a cluster.

    Args:
        cluster_config (dict): Configuration for the cluster/execution environment
        container (str): Container image to use
        num_nodes (int): Number of nodes to use
        tasks_per_node (int): Number of tasks to run per node
        gpus_per_node (int): Number of GPUs to use per node
        job_name (str): Name of the job
        log_dir (str): Directory for storing logs
        log_prefix (str, optional): Prefix for log files. Defaults to "main"
        mounts (list, optional): List of volume mounts
        partition (str, optional): Slurm partition to use
        time_min (int, optional): Minimum time allocation in minutes
        dependencies (list, optional): List of job dependencies
        extra_package_dirs (tuple[str], optional): Additional directories to package
        heterogeneous (bool, optional): Whether this is part of a heterogeneous job
        het_group (int, optional): Heterogeneous group index
        total_het_groups (int, optional): Total number of heterogeneous groups
        slurm_kwargs (dict, optional): Additional kwargs for Slurm configuration

    Returns:
        DockerExecutor | SlurmExecutor: Configured executor instance
    N)r  ru   rv   r#   z4Local executor does not support multi-node execution1PYTHONUNBUFFEREDr   
entrypointrg   )	container_imagepackageripc_modevolumesntasks_per_noderi   networkr.  additional_kwargsz9$(scontrol show hostnames $SLURM_JOB_NODELIST | head -n1)SLURM_MASTER_NODEz8$(scontrol show hostnames $SLURM_JOB_NODELIST_HET_GROUP_z | head -n1)SLURM_MASTER_NODE_HET_GROUP_rR   rP   rQ   time_min	mail_type	mail_userz--no-container-mount-homez	--overlapz
--mpi=pmixz	--wait=10z--ntasks-per-node=z--nodes=z--container-env=,c                 S   s   g | ]}|  qS r   )r   )r  kr   r   r   r	    s    z get_executor.<locals>.<listcomp>disable_gpus_per_nodeFz--gpus-per-node=dependency_typeafteranyaccountnodesrH  r   rD  container_mountstimeadditional_parametersrE  gpus_per_node	srun_argsjob_detailsjob_name_prefix_)job_namer   r   r   wait_time_for_group_jobg{Gz?monitor_group_job_wait_time   dependenciesheterogeneousr.  r   )r5  r,   tupler&  r   r   r:  r   r   keysr>   rF   SlurmExecutorr   r   r4   )r/   	containerrj   tasks_per_noderZ  r_  log_dirr?  r6  rR   rM  rc  r  rd  	het_grouptotal_het_groupsr@  r.  config_mountsrE  grouprW   rY  r[  rS  r   r   r   get_executor  s   +




	
ro  c              	   c   sh    |  dg  }| }| D ]\}}|| d|  || d< qz
dV  W || d< dS || d< w )a  Temporarily update environment variables in cluster configuration.

    A context manager that temporarily adds or updates environment variables in the cluster
    configuration and restores the original configuration when exiting the context.

    Args:
        cluster_config (dict): The cluster configuration to modify
        updates (dict): Dictionary of environment variable updates where
                       keys are environment variable names and values are their values
    r.  r   N)r   copyr  r>   )r/   updatesoriginal_env_varsupdated_env_varskeyr1  r   r   r   temporary_env_update#  s   
ru  re   rh  r{   sandbox_portreuse_code_exp
reuse_codetask_dependencies	run_afterrd  c           .   
   C   s&  |dur:|d dkr:t |ttjfr|g}g }|D ]}t|}t|dkr+td| || qt|dkr9d}nd}|du rR|d dkrRd|	pN|	ddvrRd	}|du r[t
d
d}d}g }|dut| | }g }g }|dur|d8i |d|i\}}d|vr|d |d  } td8i d|d| d|d d|d|d d|	d|
d|d|d|ddd|d|d|d|d|}!|d dkr|d	krd | d!t| }|| ||! || |d	7 }|rt |tr|g}t |tr|g}t |tr|g}t|t|kst|t|krtd"tt|||D ]\}"\}#}$}%|d dkrC|%d	krCd |% d!t|# }#t|d#|iV ||# |td8i d|d|$d|d|%d|d|	d|
d|d|d|dt|d	kr}d$nd%|" d|d|d|d|d| || W d   n	1 sw   Y  q%|d	7 }|rNd&|i}&|	d'g  }'|'D ]}(d(|(v r|(d)r|(d*d }(|(d+ |&d(< qt||&a |t  td8i d|d|d d, d|d dkr|d jnd	dd	d|d|	d|
d-t d|d|d|dd,d|d|d|d|d|})||) || W d   n	1 sEw   Y  |d	7 }|d dkrt|}*t |*tjr|r|plt	t|*}|durt |trztj|}W n ty   t d.| d/ tj!|}Y nw t"d0|j# t$|j%d1}+|+|j&|*j' j(v r|j&|*j' j(|+ j)},|D ]}-|,|-j*_+qt"d2|+  ntd3|j# nt |*tjrt,t|*d t|d	kr| j-tj.|d d4|d d1|d5S |r||d _/| j-d6d7 |D |d1|d5S )9a  Wrapper for nemo-run exp.add to help setting up executors and dependencies.

    Note that there are two parameters that control dependencies.
        - task_dependencies: list of tasks that this task depends on **within the same experiment**
        - run_after: a string with experiment name or a list of experiment names that this task
          should run after. Will schedule dependencies on all tasks inside `run_after` experiments.
          It needs to already be launched and running.

    Example of how to set task_dependencies:

    with run.Experiment(expname) as exp:
        task1 = add_task(exp, ...)
        task2 = add_task(exp, ..., task_dependencies=[task1])

    You can use `reuse_code_exp` to reuse the code from another experiment
    (and thus avoid costly packaging/ssh uploading). You can provide either experiment
    name or the experiment object itself.

    By default we will reuse the code of the first submitted experiment.
    If you want to avoid this, set `reuse_code=False`.
    Nru   r   r   zMNo pending or running tasks found for experiment %s, cannot set dependencies.cpurR   rg   r#   r]   )r\   r/   rh  
containersrh   rj   ri  rZ  ri   rM  rc  r_  rj  r?  serverr  r@  rd  rk  rl  rv   zmpirun --allow-run-as-root -np z	 bash -c z8Number of commands, containers and num_tasks must match.NEMO_SKILLS_SANDBOX_PORTr   main_LISTEN_PORTr.  
PYTHONPATHzPYTHONPATH=   z:/appsandboxr6  z$Failed to create experiment from id z, trying to find it by titlez'Trying to reuse code from experiment %sznemo-runzSuccessfully reused code from z2Relevant packaging job not found for experiment %sinline)ru   r   rc  c                 S   s   g | ]}t j|d qS )r  )rF   Script)r  r   r   r   r   r	    s    zadd_task.<locals>.<listcomp>r   )0r   r   rF   rG   rO   r3   rL   rM   extendr   rb   r  ro  shlexquoter>   r_   r   	enumeratezipru  rp  r-   r   rV  re  r   r   REUSE_CODE_EXPr   rJ   r   debugrH   r   _titler   _idtunnelsrt  packaging_jobsdst_pathrE  symlink_from_remote_dirpopaddr  het_group_indices).r@   re   	task_namer/   rh  r{   ri   rj   rj  rR   rM  with_sandboxrv  server_configrw  rx  ry  rz  r   r  r@  rd  rc  dep_expnameexp_handlesrk  r  rl  commands	executorsr~   num_server_tasksserver_containerserver_executorcur_idxcur_cmdcur_container	cur_taskssandbox_env_updatescurrent_env_varsoverridesandbox_executorr   	reuse_key	reuse_dirru   r   r   r   add_task?  s  -
	





$
	

	



r  c                 C   s~   |d dkr| j dd|du rdn|d dS | j d|du rdn|d t|}t|t jr;t|}|tvr=| t|< dS dS dS )z|If sequential is not specified, using True locally and False otherwise.

    If it is specified, it will be used as is.
    ru   rv   FTN)detach	tail_logs
sequential)r  r  )rF   r   r   r   r   r  )r@   r/   r  r   ssh_hashr   r   r   run_exp  s    r  )TT)NrZ   )rg   )NN)NNN)NT)Tr6   )
r   NNNNNFNNN)Rr  r   r  r   r   r   
contextlibr   dataclassesr   r   	functoolsr   pathlibr   typingr   r  rF   r   huggingface_hubr   invoker	   nemo_run.configr
   nemo_run.core.execution.dockerr   nemo_run.core.execution.slurmr   r   nemo_run.core.tunnelr   	omegaconfr   torchx.specs.apir   	getLoggerr   rL   r  r   absoluter   r%   r&   r   r(   r2   r4   listrO   rY   r_   rb   rf   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r   r  re  r&  r5  r,   ro  ru  rG   r  r  r   r   r   r   <module>   sb  
($
W*
P	*
	6	

J O>B

 	
!




 _