o
    $iL                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZmZmZ d dlZd dlZd dlmZ d dlmZ d dlmZmZmZ d dlZdZd	Zd
dgZG dd deZG dd deZ G dd deZ!G dd dZ"G dd dZ#G dd dZ$		dHde$deee%  de%de$fddZ&	dIde$de%de$fd d!Z'de$fd"d#Z(		$dJde$d%eeee%e)f   d&e)fd'd(Z*de$d)e"fd*d+Z+dKd-ee% fd.d/Z,	0dLd1e#d)e"d2e%dee% fd3d4Z-de$d1e#d)e"fd5d6Z.de$d)e"fd7d8Z/de$d9ee# d)e"fd:d;Z0de$d9ee# d)e"fd<d=Z1d>e%deee% e%e%ee% ee% f fd?d@Z2					dMdAee% dBee% dCee% dDee% dEee% f
dFdGZ3dS )N    N)ThreadPoolExecutor)contextmanager)ListOptionalSequenceTuple)
cli_logger)_get_node_provider)NODE_KIND_HEADNODE_KIND_WORKERTAG_RAY_NODE_KIND   ubuntu~/ray_bootstrap_key.pemz%~/.ssh/ray-autoscaler_2_us-west-2.pemc                   @      e Zd ZdS )CommandFailedN__name__
__module____qualname__ r   r   a/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/autoscaler/_private/cluster_dump.pyr          r   c                   @   r   )LocalCommandFailedNr   r   r   r   r   r      r   r   c                   @   r   )RemoteCommandFailedNr   r   r   r   r   r   #   r   r   c                   @   sL   e Zd Z						ddedededededeeeeef   fd	d
ZdS )GetParametersTNlogsdebug_statepip	processesprocesses_verboseprocesses_listc                 C   s(   || _ || _|| _|| _|| _|| _d S N)r   r   r   r   r    r!   )selfr   r   r   r   r    r!   r   r   r   __init__(   s   	
zGetParameters.__init__)TTTTTN)	r   r   r   boolr   r   r   strr$   r   r   r   r   r   '   s(    r   c                   @   s<   e Zd ZdZ				ddededed	ee d
ef
ddZdS )NodezNode (as in "machine")r   r   NFhostssh_userssh_keydocker_containeris_headc                 C   s"   || _ || _|| _|| _|| _d S r"   )r(   r)   r*   r+   r,   )r#   r(   r)   r*   r+   r,   r   r   r   r$   <   s
   
zNode.__init__)r   r   NF)r   r   r   __doc__r&   r   r%   r$   r   r   r   r   r'   9   s"    r'   c                   @   sl   e Zd ZdZddee fddZedd Zdd	 Z	d
d Z
dd Zdd Zeddedee fddZdS )Archivea  Archive object to collect and compress files into a single file.

    Objects of this class can be passed around to different data collection
    functions. These functions can use the :meth:`subdir` method to add
    files to a sub directory of the archive.

    Nfilec                 C   s,   |p
t jdddd | _d | _t | _d S )N	ray_logs_.tar.gzprefixsuffix   )tempfilemkstempr/   tar	threadingLock_lock)r#   r/   r   r   r   r$   T   s   zArchive.__init__c                 C   s
   t | jS r"   )r%   r8   r#   r   r   r   is_openY   s   
zArchive.is_openc                 C   s   t | jd| _d S )Nzw:gz)tarfileopenr/   r8   r<   r   r   r   r?   ]   s   zArchive.openc                 C   s   | j   d | _ d S r"   )r8   closer<   r   r   r   r@   `   s   

zArchive.closec                 C   s   |    | S r"   )r?   r<   r   r   r   	__enter__d   s   zArchive.__enter__c                 C   s   |    d S r"   )r@   )r#   exc_typeexc_valexc_tbr   r   r   __exit__h   s   zArchive.__exit__/subdirrootc                 #   s0    t j  G  fddd}| V  dS )a  Open a context to add files to the archive.

        Example:

            .. code-block:: python

                with Archive("file.tar.gz") as archive:
                    with archive.subdir("logfiles", root="/tmp/logs") as sd:
                        # Will be added as `logfiles/nested/file.txt`
                        sd.add("/tmp/logs/nested/file.txt")

        Args:
            subdir: Subdir to which to add files to. Calling the
                ``add(path)`` command will place files into the ``subdir``
                directory of the archive.
            root: Root path. Files without an explicit ``arcname``
                will be named relatively to this path.

        Yields:
            A context object that can be used to add files to the archive.
        c                       s0   e Zd Zeddedee f fddZdS )z Archive.subdir.<locals>._ContextNpatharcnamec                    sP   t j| } |pt jt j|  }j  jj| |d j	  d S )NrJ   )
osrI   abspathjoinrelpathr;   acquirer8   addrelease)rI   rJ   rH   r#   rG   r   r   rQ      s
   
z$Archive.subdir.<locals>._Context.addr"   )r   r   r   staticmethodr&   r   rQ   r   rS   r   r   _Context   s    &rU   N)rL   rI   rM   )r#   rG   rH   rU   r   rS   r   rG   k   s   
zArchive.subdirr"   )rF   )r   r   r   r-   r   r&   r$   propertyr=   r?   r@   rA   rE   r   rG   r   r   r   r   r.   K   s    
r.   /tmp/ray/session_latestarchiveexcludesession_log_dirreturnc           	         s   | j s|   |p
g }tjtj|d}| jd|d9}t|D ]*\}}}|D ]"}tj||}tjj||d t	 fdd|D rGq*|
| q*q#W d   | S 1 sYw   Y  | S )a  Copy local log files into an archive.

    Args:
        archive: Archive object to add log files to.
        exclude (Sequence[str]): Sequence of regex patterns. Files that match
            any of these patterns will not be included in the archive.
        session_dir: Path to the Ray session files. Defaults to
            ``/tmp/ray/session_latest``

    Returns:
        Open archive object.

    r   rH   )startc                 3   s    | ]	}t | V  qd S r"   )rematch).0patternrel_pathr   r   	<genexpr>   s    z%get_local_ray_logs.<locals>.<genexpr>N)r=   r?   rL   rI   rN   
expanduserrG   walkrO   anyrQ   )	rX   rY   rZ   sdrH   dirsfilesr/   	file_pathr   rb   r   get_local_ray_logs   s$   


rl   session_dirc                 C   sz   | j s|   tj|}tj|d}tj|std| jd|d}|	| W d   | S 1 s6w   Y  | S )zCopy local log files into an archive.

    Args:
        archive: Archive object to add log files to.
        session_dir: Path to the Ray session files. Defaults to
            ``/tmp/ray/session_latest``

    Returns:
        Open archive object.

    zlogs/debug_state.txtz No `debug_state.txt` file found. r\   N)
r=   r?   rL   rI   re   rN   existsr   rG   rQ   )rX   rm   debug_state_filerh   r   r   r   get_local_debug_state   s   
rq   c              	   C   s   | j s|   zddlm} W n ty   ddlm} Y nw td?}| D ]	}||dg q)|	  | 
d}||jd W d   n1 sNw   Y  W d   | S W d   | S 1 sfw   Y  | S )zGet currently installed pip packages and write into an archive.

    Args:
        archive: Archive object to add meta files to.

    Returns:
        Open archive object.
    r   )freezewt
rn   zpip_packages.txtN)r=   r?   pip._internal.operationsrr   ImportErrorpip.operationsr6   NamedTemporaryFile
writelinesflushrG   rQ   name)rX   rr   fplinerh   r   r   r   get_local_pip_packages   s*   	

r~   Fr   verbosec                 C   s  |s
ddl m} |}g }tg dD ]Q}z>| 0 d| }||r(|n|ddd dd |	 |j
| d	| f W d   n1 sMw   Y  W q tyd } zt||d}~ww i }|D ]'\}	}|D ] \}
}|rz|	d
 }nt|}|
|v r|	d |vr|	||	d < qoqitdB}| D ]}|t|dg q|  | d}||j	d W d   n1 sw   Y  W d   | S W d   | S 1 sw   Y  | S )a7  Get the status of all the relevant ray processes.
    Args:
        archive: Archive object to add process info files to.
        processes: List of processes to get information on. The first
            element of the tuple is a string to filter by, and the second
            element is a boolean indicating if we should filter by command
            name (True) or command line including parameters (False)
        verbose: If True, show entire executable command line.
            If False, show just the first term.
    Returns:
        Open archive object.
    r   )RAY_PROCESSES)pidr{   cmdlinestatus z--r5   N)
executabler{   r   r   r{   r   rs   rt   metazprocess_info.txt)!ray.autoscaler._private.constantsr   psutilprocess_iteroneshotrN   r   appendsplitr{   r   r   	Exceptionr   
subprocesslist2cmdliner6   rx   valuesry   yamldumprz   rG   rQ   )rX   r   r   r   process_infosprocessr   excrelevant_processesprocess_dictkeywordfilter_by_cmdcorpusr|   r}   rh   r   r   r   get_local_ray_processes   sb   





r   
parametersc              
   C   s.  | j s|   |jr)zt| d W n ty( } zt| W Y d}~nd}~ww |jrKzt| d W n tyJ } zt| W Y d}~nd}~ww |j	rmzt
| d W n tyl } zt| W Y d}~nd}~ww |jrzt| |j|jd W | S  ty } zt| W Y d}~| S d}~ww | S )a'  Get all local data.

    Gets:
        - The Ray logs of the latest session
        - The currently installed pip packages

    Args:
        archive: Archive object to add meta files to.
        parameters: Parameters (settings) for getting data.

    Returns:
        Open archive object.
    )rX   N)rX   r   r   )r=   r?   r   rl   r   r   errorr   rq   r   r~   r   r   r!   r    )rX   r   r   r   r   r   get_all_local_data6  sJ   r   'itemsc                 C   s   | d |  | S )Nr   )rN   )r   quotesr   r   r   _wraph  s   r   rayremote_nodescript_pathc           	      C   s  ddddd| j | j d| j g}| jr|dd| jg7 }|d	d
g}||jr(dgndg7 }||jr3dgndg7 }||jr>dgndg7 }||jrIdgndg7 }|jr[||jrWdgndg7 }|ddt	|ddg7 }| j
skdnd}td| j  tjd| d| j dddd  }t|d!-}ztj||tjd" W n tjy } ztd#d$| |d%}~ww W d%   |S 1 sw   Y  |S )&a  Create an archive containing logs on a remote node and transfer.

    This will call ``ray local-dump --stream`` on the remote
    node. The resulting file will be saved locally in a temporary file and
    returned.

    Args:
        remote_node: Remote node to gather archive from.
        script_path: Path to this script on the remote node.
        parameters: Parameters (settings) for getting data.

    Returns:
        Path to a temporary file containing the node's collected data.

    sshz-o StrictHostKeyChecking=noz-o UserKnownHostsFile=/dev/nullz-o LogLevel=ERRORz-i@dockerexecz
local-dumpz--streamz--logsz	--no-logsz--debug-statez--no-debug-statez--pipz--no-pipz--processesz--no-processesz--processes-verbosez--no-proccesses-verbosez	/bin/bashz-c")r   nodeheadz"Collecting data from remote node: ray__r1   r2   r5   wb)stdoutstderrz(Gathering logs from remote node failed: r   N)r*   r)   r(   r+   r   r   r   r   r    r   r,   r   printr6   r7   r?   r   
check_callsysr   CalledProcessErrorr   rN   )	r   r   r   cmdcollect_cmdcattmpr|   r   r   r   r   'create_and_get_archive_from_remote_nodel  sZ   

"
r   c                 C   s   t ||}| js|   |jsdnd}| jdtj|d}|j|d| d|j	 dd W d	   | S 1 s9w   Y  | S )
a%  Create and get data from remote node and add to local archive.

    Args:
        archive: Archive object to add remote data to.
        remote_node: Remote node to gather archive from.
        parameters: Parameters (settings) for getting data.

    Returns:
        Open archive object.
    r   r   rn   r\   r   r   r1   rK   N)
r   r=   r?   r,   rG   rL   rI   dirnamerQ   r(   )rX   r   r   r   r   rh   r   r   r   +create_and_add_remote_data_to_local_archive  s   
 
r   c                 C   s   t  }t|| W d   n1 sw   Y  | js|   | jdtj|jd}|j	|jdd W d   n1 s>w   Y  t
|j | S )zCreate and get data from this node and add to archive.

    Args:
        archive: Archive object to add remote data to.
        parameters: Parameters (settings) for getting data.

    Returns:
        Open archive object.
    Nrn   r\   zlocal_node.tar.gzrK   )r.   r   r=   r?   rG   rL   rI   r   r/   rQ   remove)rX   r   local_data_archiverh   r   r   r   *create_and_add_local_data_to_local_archive  s   r   remote_nodesc                 C   sZ   | j s|   ttd}|D ]}|jt| ||d qW d   | S 1 s&w   Y  | S )ag  Create an archive combining data from the remote nodes.

    This will parallelize calls to get data from remote nodes.

    Args:
        archive: Archive object to add remote data to.
        remote_nodes (Sequence[Node]): Sequence of remote nodes.
        parameters: Parameters (settings) for getting data.

    Returns:
        Open archive object.

    )max_workers)rX   r   r   N)r=   r?   r   MAX_PARALLEL_SSH_WORKERSsubmitr   )rX   r   r   executorr   r   r   r   create_archive_for_remote_nodes  s   
		r   c              
   C   sr   | j s|   zt| | W n ty% } zt| W Y d}~nd}~ww t| || tdt| d | S )aj  Create an archive combining data from the local and remote nodes.

    This will parallelize calls to get data from remote nodes.

    Args:
        archive: Archive object to add data to.
        remote_nodes (Sequence[Node]): Sequence of remote nodes.
        parameters: Parameters (settings) for getting data.

    Returns:
        Open archive object.

    Nz#Collected data from local node and z remote nodes.)	r=   r?   r   r   r   r   r   r   len)rX   r   r   r   r   r   r   )create_archive_for_local_and_remote_nodes  s   r   cluster_configc                    s   ddl m} td|   tj| } tt	| 
 }||dd}t|d |d   tti} tti} fdd	|| D }|d
 d }|d
 d }d}|dd}	|	ra|	dd}|dd}
|||||
fS )a[  Get information from Ray cluster config.

    Return list of host IPs, ssh user, ssh key file, and optional docker
    container.

    Args:
        cluster_config: Path to ray cluster config.

    Returns:
        Tuple of list of host IPs, ssh user name, ssh key file path,
            optional docker container name, optional cluster name.
    r   )_bootstrap_configz6Retrieving cluster information from ray cluster file: T)no_config_cacheprovidercluster_namec                    s   g | ]}  |qS r   )external_ip)r`   r   r   r   r   
<listcomp>@  s    z4get_info_from_ray_cluster_config.<locals>.<listcomp>authr)   ssh_private_keyNr   container_name) ray.autoscaler._private.commandsr   r   r   rL   rI   re   r   	safe_loadr?   readr	   non_terminated_nodesr   r
   r   get)r   r   config
head_nodesworker_nodeshostsr)   r*   r   docker_configr   r   r   r    get_info_from_ray_cluster_config"  s&   r   clusterr(   r)   r*   r   c                 C   s"  |s| st jd}t j|r|} td|  d n| r$t j| } d}| rPt| \}}}	}
}|p4|}|p8|	}|p<|
}|rD|dn|}|sOtd|  n|rX|d}ntd|sit	}td| d	 |st
D ]}t j|}t j|r|}td
| d  nqm| |||||fS )ziParse command line arguments.

    Note: This returns a list of hosts, not a comma separated string!
    z~/ray_bootstrap_config.yamlz Detected cluster config file at z@. If this is incorrect, specify with `ray cluster-dump <config>`N,z6Invalid cluster file or cluster has no running nodes: z<You need to either specify a `<cluster_config>` or `--host`.zUsing default SSH user `z9`. If this is incorrect, specify with `--ssh-user <user>`zAuto detected SSH key file: z6. If this is incorrect, specify with `--ssh-key <key>`)rL   rI   re   ro   r   warningr   r   r   DEFAULT_SSH_USERDEFAULT_SSH_KEYS)r   r(   r)   r*   r   bootstrap_configr   hukdr   cand_keycand_key_filer   r   r   _info_from_paramsN  sV   


r   )NrW   )rW   )NF)r   )r   )NNNNN)4rL   r^   r   r   r>   r6   r9   concurrent.futuresr   
contextlibr   typingr   r   r   r   r   r   "ray.autoscaler._private.cli_loggerr   !ray.autoscaler._private.providersr	   ray.autoscaler.tagsr
   r   r   r   r   r   r   RuntimeErrorr   r   r   r   r'   r.   r&   rl   rq   r~   r%   r   r   r   r   r   r   r   r   r   r   r   r   r   r   <module>   s    M

'

A2
@



#
-