o
    wOi                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
Z
d dlmZ e Zdd Zdde	e fddZG d	d
 d
ZdS )    N)Optional)
get_loggerc                  C   s   t jddt jt jd} | D ];}|\}}}}}zt  |||}|d |d |W   S  tyH } z|  td|  W Y d}~qd}~ww t	d)a  
    Finds a free port and binds a temporary socket to it so that
    the port can be "reserved" until used.

    .. note:: the returned socket must be closed before using the port,
              otherwise a ``address already in use`` error will happen.
              The socket should be held and closed as close to the
              consumer of the port as possible since otherwise, there
              is a greater chance of race-condition where a different
              process may see the port as being free and take it.

    Returns: a socket binded to the reserved free port

    Usage::

    sock = find_free_port()
    port = sock.getsockname()[1]
    sock.close()
    use_port(port)
    	localhostN)hostportfamilytype)r   r   r   z Socket creation attempt failed: zFailed to create a socket)
socketgetaddrinfo	AF_UNSPECSOCK_STREAMbindlistenOSErrorcloseprintRuntimeError)addrsaddrr   r   proto_se r   W/home/ubuntu/.local/lib/python3.10/site-packages/torchelastic/rendezvous/etcd_server.pyfind_free_port   s    


r   data_dirc                 C   sT   | r|   d u rtd |   |   |r(td|  tj|dd d S d S )Nzstopping etcd serverzdeleting etcd data dir: Tignore_errors)pollloginfo	terminatewaitshutilrmtree)
subprocessr   r   r   r   	stop_etcd?   s   
r'   c                   @   s   e Zd ZdZddee fddZdefddZdefd	d
Z	defddZ
ddededdfddZddededdfddZdejfddZddeddfddZdddZdS )
EtcdServera  
    .. note:: tested on etcd server v3.4.3

    Starts and stops a local standalone etcd server on a random free
    port. Useful for single node, multi-worker launches or testing,
    where a sidecar etcd server is more convenient than having to
    separately setup an etcd server.

    This class registers a termination handler to shutdown the etcd
    subprocess on exit. This termination handler is NOT a substitute for
    calling the ``stop()`` method.

    The following fallback mechanism is used to find the etcd binary:

    1. Uses env var TORCHELASTIC_ETCD_BINARY_PATH
    2. Uses ``<this file root>/bin/etcd`` if one exists
    3. Uses ``etcd`` from ``PATH``

    Usage
    ::

     server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd")
     server.start()
     client = server.get_client()
     # use client
     server.stop()

    Args:
        etcd_binary_path: path of etcd server binary (see above for fallback path)
    Nr   c                 C   sp   d| _ d| _tjt}tj|d}tjd|| _	tj
| j	s%d| _	|r)|ntjdd| _d | _d | _d S )Nr   zbin/etcdTORCHELASTIC_ETCD_BINARY_PATHetcdtorchelastic_etcd_data)prefix)_port_hostospathdirname__file__joinenvironget_etcd_binary_pathisfiletempfilemkdtemp_base_data_dir	_etcd_cmd
_etcd_proc)selfr   rootdefault_etcd_binr   r   r   __init__j   s   
zEtcdServer.__init__returnc                 C      | j S )zI
        Returns:
            the port the server is running on.
        )r.   r>   r   r   r   get_port|      zEtcdServer.get_portc                 C   rC   )zI
        Returns:
            the host the server is running on.
        )r/   rD   r   r   r   get_host   rF   zEtcdServer.get_hostc                 C   s   | j  d| j S )zK
        Returns:
            the etcd server endpoint (host:port)
        :)r/   r.   rD   r   r   r   get_endpoint   s   zEtcdServer.get_endpoint<      timeoutnum_retriesc              
   C   s   d}	 zt j| jt|}t j|dd | ||W S  tyN } z'|d7 }t| j	 t
dt| d ||krDtj| jdd  W Y d}~nd}~ww q)	a  
        Starts the server, and waits for it to be ready. When this function
        returns the sever is ready to take requests.

        Args:
            timeout: time (in seconds) to wait for the server to be ready
                before giving up.
            num_retries: number of retries to start the server. Each retry
                will wait for max ``timeout`` before considering it as failed.

        Raises:
            TimeoutError: if the server is not ready within the specified timeout
        r   T)exist_ok   z(Failed to start etcd server, got error: z
, retryingr   N)r0   r1   r4   r;   strmakedirs_start	Exceptionr'   r=   r    warningr$   r%   atexitregister)r>   rL   rM   curr_retriesr   r   r   r   r   start   s&   
zEtcdServer.startc                 C   s   t  }t  }| d | _| d }td| jdd|dd| j d| j dd| j d| j d	d| j d| g
}t	d
| d |
  |
  tj|dd| _| | d S )NrO    z--enable-v2z
--data-dirz--listen-client-urlszhttp://rH   z--advertise-client-urlsz--listen-peer-urlszStarting etcd server: []T)	close_fds)r   getsocknamer.   shlexsplitr4   r7   r/   r    r!   r   r&   Popenr=   _wait_for_ready)r>   r   rL   sock	sock_peer	peer_portetcd_cmdr   r   r   rR      s0   zEtcdServer._startc                 C   s   t j| j| jdddS )zx
        Returns:
           An etcd client object that can be used to make requests to
           this server.
        /v2
   r   r   version_prefixread_timeout)r+   Clientr/   r.   rD   r   r   r   
get_client   s   zEtcdServer.get_clientc                 C   s   t j| j | jddd}t | }t |k rI| j d ur(td| jj zt	
d|j  W d S  tyB   td Y nw t |k std)Nre      rg   z*Etcd server process exited with the code: zetcd server ready. version: rO   z.Timed out waiting for etcd server to be ready!)r+   rj   r/   r.   timer=   r   r   
returncoder    r!   versionrS   sleepTimeoutError)r>   rL   clientmax_timer   r   r   r`      s"   zEtcdServer._wait_for_readyc                 C   s   t d t| j| j dS )zY
        Stops the server and cleans up auto generated resources (e.g. data dir)
        zEtcdServer stop method calledN)r    r!   r'   r=   r;   rD   r   r   r   stop   s   
zEtcdServer.stopN)rJ   rK   )rJ   )rB   N)__name__
__module____qualname____doc__r   rP   rA   intrE   rG   rI   rX   rR   r+   rj   rk   r`   rt   r   r   r   r   r(   J   s    
r(   ru   )rU   r0   r]   r$   r	   r&   r9   rm   typingr   r+   torchelastic.utils.loggingr   r    r   rP   r'   r(   r   r   r   r   <module>   s   &