o
    $i_#                     @   s  d Z ddlZddlZddlZddlZddlmZ ddlZddlZddl	m
Z
 ddlmZ ddlZe
ddZdefd	d
ZefddZefdefddZdee dee fddZejddddddejddeddejdeddejddejd d!d Zed"kre  dS dS )#zSymmetric Run for Ray.    N)List)env_integer)	GcsClient&RAY_SYMMETRIC_RUN_CLUSTER_WAIT_TIMEOUT   returnc                  C   s&   dd l m  m}  |  }t|dkS )Nr   )ray._private.services_privateservicesfind_gcs_addresseslen)r
   running_gcs_addresses r   V/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/scripts/symmetric_run.pycheck_ray_already_started   s   r   c                 C   sv   t   }d}tjdd t   | |k r9t d tt }|| kr%dS td| d|  d t   | |k sdS )	z\Wait for all nodes to start.

    Raises an exception if the nodes don't start in time.
       T)ignore_reinit_error   zWaiting for nodes to start... /z nodes startedF)timerayinitsleepr   nodesclickecho)nnodestimeout
start_timecurrent_nodesr   r   r   check_cluster_ready   s   
	r    addressc                 C   s\   t   }t| d}t   | |k r,|jg ddrtd dS t d t   | |k sdS )N)r!   r   )r   zRay cluster is ready!Tr   F)r   r   check_aliver   r   r   )r!   r   r   
gcs_clientr   r   r   check_head_node_ready1   s   


r$   run_and_start_argsc                 C   st   t jd| dd}t|jd }|D ]&}|dkrtd|dkr%td|d	kr.td
|dkr7tdq|S )N_T)resilient_parsingray_args_and_entrypoint--headz*Cannot use --head option in symmetric_run.z--node-ip-addressz5Cannot use --node-ip-address option in symmetric_run.z--portz*Cannot use --port option in symmetric_run.--blockz+Cannot use --block option in symmetric_run.)symmetric_runmake_contextlistparamsr   ClickException)r%   ctxcleaned_argsargr   r   r   "curate_and_validate_ray_start_args<   s   


r3   r+   T)ignore_unknown_optionsallow_extra_argsaQ  Command to start Ray across all nodes and execute an entrypoint command.

USAGE:

    ray symmetric-run --address ADDRESS
[--min-nodes NUM_NODES] [RAY_START_OPTIONS] -- [ENTRYPOINT_COMMAND]

DESCRIPTION:

    This command (1) starts a Ray cluster across all nodes,
(2) runs a command on the head node, and (3) stops the Ray cluster.

    The '--' separator is required to distinguish between Ray start arguments
and the entrypoint command. The --min-nodes option is optional and
can be used to wait for a specific number of nodes to start.

EXAMPLES:

    # Start Ray with default settings and run a Python script

    ray symmetric-run --address 127.0.0.1:6379 -- python my_script.py

    # Start Ray with specific head node and run a command

    ray symmetric-run --address 127.0.0.1:6379 --min-nodes 4 -- python train_model.py --epochs=100

    # Start Ray and run a multi-word command

    ray symmetric-run --address 127.0.0.1:6379 --min-nodes 4 --num-cpus=4 -- python -m my_module --config=prod

RAY START OPTIONS:

    Most ray start command options are supported. Arguments that are not
supported are: --head, --node-ip-address, --port, --block.

SEPARATOR REQUIREMENT:

    The '--' separator is mandatory and must appear between Ray start
    arguments and the entrypoint command. This ensures clear separation
    between the two sets of arguments.
)namecontext_settingshelp	--addresszThe address of the Ray cluster.)requiredtyper8   z--min-nodesz4If provided, wait for this number of nodes to start.)r;   r8   r(   )nargsr;   c              
   C   s  t jdd  }|r|d dkr|dd  }z|d}W n ty(   tdw |d | ||d d  }}t|}|d u rBdn|}|sKtdt rStdtj	j
| }|d u rgtd|  d	|\}	}
zt|	|
tjtj}|d d
 d }W n tjy   td|	 w g }t  D ]\}}|D ]}|jtjjtjjfv r||j qq|dkrdd |D }||v }d }zzs|rtd dddd| d|
 g|}tj|ddd td td |dkrt|stdtd|  td t|}td n#td|  d t| s'tdddd| dg|}tj|dd W nP tjyx } z2tjd | dd! |j r]tjd"|j !  dd! |j"rntjd#|j"!  dd! W Y d }~nd }~w t#y   tjd$dd! Y nw W tdd%g |d ur|j$dkrtjd&|j$ dd! t %|j$ d S d S d S tdd%g |d ur|j$dkrtjd&|j$ dd! t %|j$ w w w )'Nr   r   zsymmetric-runz--zqNo separator '--' found in arguments. Please use '--' to separate Ray start arguments and the entrypoint command.zNo entrypoint command provided.z$Ray is already started on this node.zInvalid address format: z, should be `host:port`   zCould not resolve hostname: c                 S   s    g | ]}|d kr|dkr|qS )z	127.0.0.1z::1r   ).0ipr   r   r   
<listcomp>   s     z!symmetric_run.<locals>.<listcomp>z*On head node. Starting Ray cluster head...r   startr)   z--node-ip-address=z--port=T)checkcapture_outputzHead node started.z=======================z+Timed out waiting for other nodes to start.zRunning command on head node: z-On worker node. Connecting to Ray cluster at z...z)Timed out waiting for head node to start.r9   r*   )rC   zFailed to start Ray: )errzstdout:
zstderr:
zInterrupted by user.stopz Command failed with return code )&sysargvindex
ValueErrorr   r/   r3   r   r   _commonnetwork_utilsparse_addresssocketgetaddrinfo	AF_UNSPECSOCK_STREAMgaierrorpsutilnet_if_addrsitemsfamilyAddressFamilyAF_INETAF_INET6appendr!   r   
subprocessrunr    r$   CalledProcessErrorstdoutdecodestderrKeyboardInterrupt
returncodeexit)r!   	min_nodesr(   all_args	separatorr%   entrypoint_on_headray_start_argsgcs_host_portgcs_hostgcs_portaddrinforesolved_gcs_hostmy_ipsifaceaddrsaddris_headresultray_start_cmder   r   r   r+   P   s   7












__main__)__doc__rN   r[   rG   r   typingr   r   r   ray._private.ray_constantsr   ray._rayletr   rS   CLUSTER_WAIT_TIMEOUTboolr   r    strr$   r3   commandoptionintargumentUNPROCESSEDr+   __name__r   r   r   r   <module>   sF    
- 
