o
    wO iXÁ  ã                   @   sl  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlmZm	Z	 d dl
mZ d dlZd dlmZ d dlmZmZmZmZmZ ddlmZ e d¡Ze ej¡Ze e¡ e e¡Zd	e_ e !ej"¡ e #e¡ G d
d„ de$ƒZ%G dd„ de$ƒZ&dZ'dZ(dZ)dZ*dZ+dd„ Z,G dd„ deƒZ-G dd„ de.ƒZ/G dd„ deƒZ0dedej1fdd„Z2dedefdd„Z3dS )é    N)Ú	b64decodeÚ	b64encode)ÚOptional)ÚStore)ÚRendezvousClosedExceptionÚRendezvousHandlerÚRendezvousNonRetryableErrorÚRendezvousParametersÚRendezvousTimeoutExceptioné   )Ú_parse_hostname_and_portz%%(levelname)s %(asctime)s %(message)sFc                   @   ó   e Zd ZdS )ÚEtcdRendezvousRetryableFailureN©Ú__name__Ú
__module__Ú__qualname__© r   r   ú[/home/ubuntu/.local/lib/python3.10/site-packages/torchelastic/rendezvous/etcd_rendezvous.pyr   /   ó    r   c                   @   r   )ÚEtcdRendezvousRetryImmediatelyNr   r   r   r   r   r   5   r   r   é   é
   i   c                   C   s   t  t dd¡¡ d S )Nr   çš™™™™™¹?)ÚtimeÚsleepÚrandomÚuniformr   r   r   r   Ú	cas_delayK   s   r   c                   @   sj   e Zd ZdZdd„ Zdd„ Zdefdd„Zd	d
„ Zdd„ Z	dd„ Z
dd„ Zdefdd„Zdefdd„ZdS )ÚEtcdRendezvousHandleruå  
    Implements a :py:class:`torchelastic.rendezvous.RendezvousHandler`
    interface backed by
    :py:class:`torchelastic.rendezvous.etcd_rendezvous.EtcdRendezvous`.

    Torchelastic uses a URL to configure the type of rendezvous to use and
    to pass implementation specific configurations to the rendezvous module.
    The basic etcd rendezvous configuration URL looks like the following
    ::

     etcd://<etcd_address>:<port>/<job_id>?min_workers=<min_workers>&max_workers=<max_workers> # noqa W605

     -- example --

     etcd://localhost:2379/1234?min_workers=1&max_workers=3

    The URL above is interpreted as follows:

    1. Use the rendezvous handler that is registered with the ``etcd``
       scheme
    2. The ``etcd`` endpoint to use is ``localhost:2379``
    3. ``job_id == 1234`` is used as the prefix in etcd (this allows one to
       share a common etcd server for multiple jobs so long as the
       ``job_ids`` are guaranteed to be unique). Note that the job id can be
       any string (e.g.Â does not need to be a number) as long as it is
       unique.
    4. ``min_workers=1`` and ``max_workers=3`` specifies a range for
       membership size - torchelastic starts running the job as long as the
       cluster size is greater than or equal to ``min_workers`` and admits
       up to ``max_workers`` into the cluster.

    Below are a full list of the parameters that can be passed to etcd
    rendezvous:

    +--------------------------------------------+--------------------------+
    | Parameter                                  | Description              |
    +============================================+==========================+
    | min_workers                                | minimum number of        |
    |                                            | workers for the          |
    |                                            | rendezvous to be valid   |
    +--------------------------------------------+--------------------------+
    | max_workers                                | maximum number of        |
    |                                            | workers to admit         |
    +--------------------------------------------+--------------------------+
    | timeout                                    | total timeout within     |
    |                                            | which next_rendezvous is |
    |                                            | expected to succeed      |
    |                                            | (default 600s)           |
    +--------------------------------------------+--------------------------+
    | last_call_timeout                          | additional wait amount   |
    |                                            | (â€œlast callâ€) after min  |
    |                                            | number of workers has    |
    |                                            | been reached (defaults   |
    |                                            | to 30s)                  |
    +--------------------------------------------+--------------------------+
    | etcd_prefix                                | path prefix (from etcd   |
    |                                            | root), inside which all  |
    |                                            | etcd nodes will be       |
    |                                            | created (defaults to     |
    |                                            | ``/torchelastic/p2p``)   |
    +--------------------------------------------+--------------------------+
    c                 C   s
   || _ d S ©N©Ú
_rdzv_impl)ÚselfÚ	rdzv_implr   r   r   Ú__init__   s   
zEtcdRendezvousHandler.__init__c                 C   s   | ` d S r    r!   ©r#   r   r   r   Ú__del__’   s   zEtcdRendezvousHandler.__del__Úreturnc                 C   s   dS )NÚetcdr   r&   r   r   r   Úget_backend–   s   z!EtcdRendezvousHandler.get_backendc                 C   s0   | j  ¡ \}}}t d¡ | j  |¡}|||fS )Nz4Creating EtcdStore as the c10d::Store implementation)r"   Úrendezvous_barrierÚlogÚinfoÚsetup_kv_store)r#   Úrdzv_versionÚrankÚ
world_sizeÚstorer   r   r   Únext_rendezvous™   s   

z%EtcdRendezvousHandler.next_rendezvousc                 C   s4   z| j  ¡ \}}|d dkW S  tjy   Y dS w )NÚstatusÚclosedF©r"   Úget_rdzv_stater)   ÚEtcdKeyNotFound©r#   Ú_Ústater   r   r   Ú	is_closed¡   s   þzEtcdRendezvousHandler.is_closedc                 C   s   | j  ¡  d S r    )r"   Ú
set_closedr&   r   r   r   r=   ©   s   z EtcdRendezvousHandler.set_closedc                 C   sB   z| j  ¡ \}}|d dkr|d W S W dS  tjy    Y dS w )Nr4   ÚfinalÚnum_workers_waitingr   r6   r9   r   r   r   Únum_nodes_waiting¬   s   
ÿþþz'EtcdRendezvousHandler.num_nodes_waitingc                 C   s   | j jS r    )r"   Ú_run_idr&   r   r   r   Ú
get_run_idµ   s   z EtcdRendezvousHandler.get_run_idc              
   C   sJ   z|   ¡  W dS  ty$ } zt dt|ƒ› ¡ W Y d }~dS d }~ww )NTz!Shutdown failed. Error occurred: F)r=   ÚBaseExceptionr,   ÚwarningÚstr©r#   Úer   r   r   Úshutdown¸   s   €þzEtcdRendezvousHandler.shutdownN)r   r   r   Ú__doc__r%   r'   rE   r*   r3   r<   r=   r@   rB   ÚboolrH   r   r   r   r   r   O   s    ?	r   c                   @   sÖ   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ Zd
d„ Zdd„ Z	dd„ Z
dd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zdd„ Zd d!„ Zd"d#„ Zd3d%d&„Zd'd(„ Zd3d)d*„Zd+d,„ Zd-d.„ Zd3d/d0„Zd1d2„ Zd$S )4ÚEtcdRendezvousze
    A rendezvous implementation that uses `etcd <https://etcd.io/>`__ as
    the backend store.
    c                 C   sì   || _ t dt| j jƒ ¡ || _|| _|| _|| _|| _	|| _
d | _d | _| j d¡s3|  jd7  _| jdkr>|  | j¡ | j|  d¡td | j|  d¡td| _|  |  d¡¡ z| j j|  d¡ddd	 W d S  tjyu   Y d S w )
NzEtcd machines: ú/Ú ©Úttlú/rdzvú/rdzv/version_counterÚ0F©ÚkeyÚvalueÚ	prevExist)Úclientr,   r-   rE   ÚmachinesÚ_prefixrA   Ú_num_min_workersÚ_num_max_workersÚ_timeoutÚ_last_call_timeoutÚ_lease_run_id_stopÚ_lease_this_rank_stopÚendswithÚcreate_path_if_not_existsÚget_pathÚCONST_RUNID_SUBROOT_TTLÚsetup_lease_renewalÚwriter)   ÚEtcdAlreadyExist)r#   rW   ÚprefixÚrun_idÚnum_min_workersÚnum_max_workersÚtimeoutÚlast_call_timeoutr   r   r   r%   Í   s4   


ÿÿÿzEtcdRendezvous.__init__c                 C   s0   | j d ur
| j  ¡  | jd ur| j ¡  d S d S r    )r^   Úsetr_   r&   r   r   r   r'   ý   s
   


ÿzEtcdRendezvous.__del__c              
   C   s  t   ¡ | j | _	 t   ¡ | jkrtƒ ‚t d¡ z| jdur#| j ¡  |  ¡ W S  t	y0   Y nQ t
y=   t  d¡ Y nD tyI   t d¡ ‚  tyZ   t d| j› d¡ ‚  tya   ‚  ty€ } zt dt|ƒ ¡ t  d¡ W Y d}~nd}~ww q	)	aú  
        Main entry point for next rendezvous.
        This method is blocking until rendezvous succeeds or a timeout occurs.

        Returns:
             ``(rdzv_version, rank, world_size)``

        Raises:
            RendezvousTimeoutException - timeout waiting for rendezvous
            RendezvousNonRetryableError - other persistent errors that
             render the rendezvous non-retryable
            RendezvousClosedException - rendezvous is or was closed while
             waiting
        Tz"Attempting to join next rendezvousNr   z3Rendezvous timeout occured in EtcdRendezvousHandlerzRendezvous for run_id=z was observed to be closedz/Rendezvous attempt failed, will retry. Reason: )r   r\   Ú_rendezvous_deadliner
   r,   r-   r_   rm   Ú
init_phaser   r   r   r   rA   r   Ú	ExceptionrE   rF   r   r   r   r+     s:   




ÿ€úÞz!EtcdRendezvous.rendezvous_barrierc                 C   sÂ   z|   ¡ }t |j¡}t dt|ƒ ¡ W n tjy.   |  	¡ \}}t dt|ƒ ¡ Y nw |d dkr8t
ƒ ‚|d dkrE|  |d ¡S |d dkrU|  |d ¡ tƒ ‚| j|jd d	 tƒ ‚)
a˜  
        Initially, the rendezvous state is expected to be one of:

        1. empty (non-existent) - in this case we try to create a new one.
        2. joinable - we try to join it.
        3. final - we announce ourselves as waiting, and go into monitoring mode

        Any other state is considered transitional, and will be retried after
        a short delay.

        Returns:
            ``(rdzv_version, rank, world_size)``

        Raises:
            RendezvousClosedException - current rendezvous was/is closed
            EtcdRendezvousRetryableFailure - observed some intermediate
             state, which is best handled by retrying later
        zNew rendezvous state created: z$Observed existing rendezvous state: r4   r5   ÚjoinableÚversionr>   r   ©Ú
etcd_index)Útry_create_rendezvousÚjsonÚloadsrU   r,   r-   rE   r)   rf   r7   r   Ú
join_phaseÚhandle_existing_rendezvousr   Útry_wait_for_state_changert   r   ©r#   Úactive_versionr;   r   r   r   ro   ?  s"   üzEtcdRendezvous.init_phasec                 C   sÆ   |   |¡\}}t |j¡}t d |d ||¡¡ || jd krC|d dkrCt d |¡¡ t ¡ | j	 }|  
||¡ t d |¡¡ t d¡ |  |¡}t |j¡}|d |ks]J d	ƒ‚|  ||¡S )
z¥
        We observed a rendezvous state in 'joinable' state, and attempt to join this
        particular version, and then wait for all other peers to join.
        z7Joined rendezvous version {} as rank {}. Full state: {}rr   r   r4   rq   z*Rank {} is responsible for join last call.z Rank {} finished join last call.zWaiting for remaining peers.z/Logic error: failed to observe version mismatch)Újoin_rendezvousrv   rw   rU   r,   r-   ÚformatrZ   r   r]   Úhandle_join_last_callÚwait_for_peersÚconfirm_phase)r#   Úexpected_versionr|   Ú	this_rankr;   Úlast_call_deadliner   r   r   rx   i  s&   
ÿÿ

ÿzEtcdRendezvous.join_phasec                 C   sb   t  d¡ |  ||¡ t  d¡ |  |¡}t |j¡}t  d |d |¡¡ |d |t|d ƒfS )a7  
        Once the rendezvous state trainsitions from 'joinable' to 'frozen',
        we have every participant confirm their membership and setup per-member
        keep-alive TTL keys, and then wait for all other participants to confirm,
        which would then successfully conclude this rendezvous.
        z)All peers arrived. Confirming membership.z)Waiting for confirmations from all peers.z2Rendezvous version {} is complete. Final state: {}rr   Úparticipants)	r,   r-   Úconfirm_membershipÚwait_for_finalrv   rw   rU   r~   Úlen)r#   r‚   rƒ   r|   r;   r   r   r   r   ’  s   


ÿÿzEtcdRendezvous.confirm_phasec                 C   s4   |   |¡}t d |j¡¡ |  |¡ t d¡ dS )zÑ
        Handle the case when there's an existing (state 'final) rendezvous already
        in place, and we have to announce ourselves waiting, and wait until
        the next rendezvous opportunity.
        z5Added self to waiting list. Rendezvous full state: {}zBPreviously existing rendezvous state changed. Will re-try joining.N)Úannounce_self_waitingr,   r-   r~   rU   Úwait_for_rendezvous_to_free)r#   r‚   Úactive_stater   r   r   ry   ª  s   
ÿÿ
z)EtcdRendezvous.handle_existing_rendezvousc              	   C   sÆ   | j j|  d¡t ddi¡dtd}z| j  |  d¡¡}tt|j	ƒd ƒ|_	| j  
|¡ W n tjtjfy<   tdƒ‚w | j j|  d	 |j	¡¡d
ddd | j j|  d¡t d|j	g dœ¡|j	dS )zÔ
        Create new rendezvous state or raise an exception that indicates
        an unexpected state (e.g. already exists)

        Raises:
             RendezvousNonRetryableError - on unexpected state
        ú/rdzv/active_versionr4   ÚsetupF)rT   rU   rV   rO   rQ   r   z?Unexpected state of EtcdRendezvousHandler, worker needs to die.ú
/rdzv/v_{}NT)rT   rU   ÚdirrV   rq   )r4   rr   r…   ©rT   rU   Ú
prev_value)rW   re   rb   rv   ÚdumpsÚCONST_ETCD_SETUP_TTLÚgetrE   ÚintrU   Úupdater)   r8   ÚEtcdCompareFailedr   r~   Útest_and_set)r#   r|   Úversion_counterr   r   r   ru   ¿  s>   üÿÿ
ü
ýÿ÷z$EtcdRendezvous.try_create_rendezvousc                 C   s  	 t ƒ  |  ¡ \}}|d dkrtdƒ‚|d |krtdƒ‚t|d ƒ| jk s+J dƒ‚t|d ƒ}|d  |¡ t|d ƒ| jkrLd	|d< g |d
< t}nt|d ƒ| jkrXt	}nd}z| j
j|  d¡t |¡|j|d}||fW S  tjy   t d¡ Y nw q)ú3
        Helper method for the join phase.
        Tr4   rq   zNRendezvous state became non-joinable before we could join. Must join next one.rr   ú6Rendezvous version changed. Must try join the new one.r…   z>Logic error: joinable rendezvous should always have space leftÚfrozenÚkeep_alivesNrŒ   ©rT   rU   r‘   rO   z*Join rendezvous CAS unsuccessful, retrying)r   r7   r   r   rˆ   r[   ÚappendÚCONST_ETCD_FROZEN_TTLrZ   Ú!CONST_ETCD_JOINABLE_EPHEMERAL_TTLrW   r˜   rb   rv   r’   rU   r)   r—   r,   r-   )r#   r‚   r|   r;   rƒ   Úset_ttlr   r   r   r}   ö  sF   ÿÿÿü
ÿÔzEtcdRendezvous.join_rendezvousc                 C   ód   |   ¡ \}}	 |d dkr|d |kr|S |d dkr-|d |kr-| j|jd d\}}ntdƒ‚q)	rš   Tr4   rœ   rr   rq   r   rs   ú>Rendezvous state transition no longer possible. Must re-enter.©r7   rz   rt   r   ©r#   r‚   r|   r;   r   r   r   r€   +  ó   ÿÿózEtcdRendezvous.wait_for_peersc                 C   s  	 t ƒ  |  ¡ \}}|d dkrtdƒ‚|d |krtdƒ‚|  d ||¡¡}| jj|dtd	 |d
  |¡ t	|d
 ƒt	|d ƒkrNd|d< d|d< d}nd}z | jj
|  d¡t |¡|j|rbdntd}| j|td| _|W S  tjy   t d¡ Y nw q)ú5
        Helper method for the confirm phase
        Tr4   rœ   zDRendezvous no longer frozen, before we confirmed. Must join next onerr   r›   z/rdzv/v_{}/rank_{}N)rU   rO   r   r…   r>   r   r?   FrŒ   rž   rN   z-Confirm membership CAS unsuccessful, retrying)r   r7   r   rb   r~   rW   rm   ÚCONST_WORKER_KEEPALIVE_TTLrŸ   rˆ   r˜   rv   r’   rU   r    rd   r_   r)   r—   r,   r-   )r#   r‚   rƒ   r|   r;   Úthis_lease_keyÚfinalizer   r   r   r†   A  sH   ÿÿ
ÿ
üÿÿÖz!EtcdRendezvous.confirm_membershipc                 C   r£   )	r¨   Tr4   r>   rr   rœ   r   rs   r¤   r¥   r¦   r   r   r   r‡   t  r§   zEtcdRendezvous.wait_for_finalc                 C   sŠ   	 t ƒ  |  ¡ \}}|d dks|d |krtƒ ‚|d  d7  < z| jj|  d¡t |¡|jd}|W S  t	j
yC   t d	¡ Y nw q)
z™
        Announce this worker is waiting (via num_workers_waiting counter) to join next
        rendezvous, but only if state and version match.
        Tr4   r>   rr   r?   r   rŒ   r   z3Announce self as waiting CAS unsuccessful, retrying)r   r7   r   rW   r˜   rb   rv   r’   rU   r)   r—   r,   r-   r¦   r   r   r   r‰   Š  s"   ýÿîz$EtcdRendezvous.announce_self_waitingc              	   C   s<  |   ¡ \}}	 |d dks|d |krdS | j |  dj|d¡¡}dd	„ |jD ƒ}|d
 D ]2}||vr`t d |¡¡ t d |¡¡ t d¡ | jj|  d¡|j	d t d |¡¡  dS q.zt
| jt ¡  dƒd }| jj|  d¡|jd d|d W n tjtjfyŒ   Y nw t ¡ | jkr—tƒ ‚|   ¡ \}}q)a³  
        When there's an existing valid rendezvous in state 'final', we have to
        wait until the next opportunity to join.

        Such opportunity may come from:

        1. rendezvous state changed by someone else, in which case we unblock and retry.
        2. rendezvous becomes invalid because at least one member failed to renew their
           leased keep_alive node. We detect this, and destroy the rendezvous.
        Tr4   r>   rr   Nz/rdzv/v_{version})rr   c                 S   s   g | ]}|j ‘qS r   ©rT   )Ú.0Úchr   r   r   Ú
<listcomp>»  s    z>EtcdRendezvous.wait_for_rendezvous_to_free.<locals>.<listcomp>r   z!Keep-alive key {} is not renewed.z$Rendevous version {} is incomplete. zAttempting to destroy it.rŒ   )rT   Ú	prevValuez-Destroyed rendezvous version {} successfully.ç        ç      ð?rP   r   )rT   ÚindexÚ	recursiverk   )r7   rW   r”   rb   r~   Úchildrenr,   r-   ÚdeleterU   Úmaxrn   r   Úwatchrt   r)   ÚEtcdEventIndexClearedÚEtcdWatchTimedOutr
   )r#   r‚   r|   r;   Úalive_membersÚkeep_alive_keysrT   Úoverall_timeoutr   r   r   rŠ   ¥  sT   ÿÿ
þÿÿèÿ
üÿÈz*EtcdRendezvous.wait_for_rendezvous_to_freec                 C   sH  |   ¡ \}}	 |d dkr|d |krdS |d dks!|d |kr%tdƒ‚t ¡ |kr`d|d< g |d< z| jj|  d	¡t |¡|jt	d
}W dS  t
jy_   t d¡ tƒ  |   ¡ \}}Y qw z*| jj|  d	¡|j|jtd
}ttd |t ¡  d ƒ}| j|jd |d\}}W n t
jy¢   t d¡ tƒ  |   ¡ \}}Y nw q)a  
        After we reach min number of workers, one particular worker takes on the
        responsibility of waiting an additional timeout before closing the join window.
        If the worker responsible for this fails, the rendezvous will be destroyed due
        to expiring TTL, and the other participants will re-rendezvous.

        Here we expect to see state <joinable, expected_version>
        Exit gracefully if either:

        1. state becomes <frozen, expected_version>
        2. timeout happens (reaching deadline), in which case
           we try the tranisiton to <frozen, expected_version>

        Exit with exception otherwise.
        Tr4   rœ   rr   Nrq   r¤   r   rŒ   rž   z6Join last-call transition CAS unsuccessful. Will retryé   r²   r   )rt   rk   z7Join last-call TTL refresh CAS unsuccessful, will retry)r7   r   r   rW   r˜   rb   rv   r’   rU   r    r)   r—   r,   r-   r   r¡   Úminrz   rt   )r#   r‚   Údeadliner|   r;   rk   r   r   r   r   ë  sZ   ÿü
ü
üþ
ÿ
ýÎz$EtcdRendezvous.handle_join_last_callc                 C   st   	 |   ¡ \}}|d dkrdS d|d< z| jj|  d¡t |¡|jd W dS  tjy8   t	 
d¡ tƒ  Y nw q)zè
        Mark rendezvous 'closed' for current run_id, which is used to signal other
        participants to not attempt to perform (re-)rendezvous. This is useful
        when one of the workers decides the job is complete.
        Tr4   r5   NrŒ   r   z%Set closed CAS unsuccessful, retrying)r7   rW   r˜   rb   rv   r’   rU   r)   r—   r,   r-   r   r{   r   r   r   r=   4  s"   ý

þðzEtcdRendezvous.set_closedc                 C   s$   | j j|  d¡d}|t |j¡fS )NrŒ   r¬   )rW   r”   rb   rv   rw   rU   )r#   r|   r   r   r   r7   N  s   zEtcdRendezvous.get_rdzv_stateNc              	   C   s‚   t | jt ¡  dƒd }|d u r|nt||ƒ}z| jj|  d¡||d W n tjtj	fy2   Y nw t ¡ | jkr=t
ƒ ‚|  ¡ S )Nr±   r²   rŒ   )r³   rk   )r·   rn   r   r¿   rW   r¸   rb   r)   r¹   rº   r
   r7   )r#   rt   rk   r½   r   r   r   rz   R  s   
ÿÿz(EtcdRendezvous.try_wait_for_state_changec                 C   s&   |  d¡s	d| }dj| j| j|dS )NrL   z{prefix}run_{run_id}{path})rg   rh   Úpath)Ú
startswithr~   rY   rA   )r#   rÁ   r   r   r   rb   d  s
   

ÿzEtcdRendezvous.get_pathc                 C   s4   z| j j|d dd|d W d S  tjy   Y d S w )NTF)rT   rU   r   rV   rO   )rW   re   r)   rf   )r#   Ú	full_pathrO   r   r   r   ra   l  s   
ÿÿz(EtcdRendezvous.create_path_if_not_existsc                 C   s:   dd„ }t  ¡ }t j|| j|||fd}d|_| ¡  |S )Nc                 S   sT   	 z	| j ||d W n tjy   Y d S  ty   Y d S w |j|d dr)d S q)NTrN   r¾   ©rk   )Úrefreshr)   r8   ÚConnectionRefusedErrorÚwait)rW   rÁ   rO   Ú
stop_eventr   r   r   Úlease_workerz  s   ýõz8EtcdRendezvous.setup_lease_renewal.<locals>.lease_worker)ÚtargetÚargsT)Ú	threadingÚEventÚThreadrW   ÚdaemonÚstart)r#   rÃ   rO   rÉ   Úlease_stop_eventÚlease_threadr   r   r   rd   t  s   ÿz"EtcdRendezvous.setup_lease_renewalc                 C   s¶   |   d |¡¡}z| jj|t ||i¡dd}W d S  tjy#   Y nw 	 | j |¡}t 	|j
¡}|||< z| jj|t |¡|j
d}W d S  tjyY   t d¡ t d¡ Y nw q%)Nú/rdzv/v_{}/extra_dataFrS   Tr   z+Store extra_data CAS unsuccessful, retryingr   )rb   r~   rW   re   rv   r’   r)   rf   r”   rw   rU   r˜   r—   r,   r-   r   r   )r#   r/   rT   rU   ÚnodeÚ
extra_dataÚnew_extra_data_valuer   r   r   Ústore_extra_data’  s2   ÿÿý
þòzEtcdRendezvous.store_extra_datac              	      s¶   |   d |¡¡‰ |   d |¡¡}	 | j |¡}‡ fdd„|jD ƒ}t|ƒdks)J ‚t|ƒdkr?t |d j¡}||v r?|| S z| jj	ˆ |j
d d W n tjtjfyY   Y nw q)	NrÓ   rŽ   Tc                    s   g | ]	}|j ˆ kr|‘qS r   r¬   )r­   Ún©rÔ   r   r   r¯   ¼  s    z2EtcdRendezvous.load_extra_data.<locals>.<listcomp>r   r   )r³   )rb   r~   rW   r”   rµ   rˆ   rv   rw   rU   r¸   rt   r)   r¹   rº   )r#   r/   rT   rk   Únode_dirÚrootrÕ   Úextra_data_dictr   rÙ   r   Úload_extra_data°  s    ÿîzEtcdRendezvous.load_extra_datac                 C   s*   |   d|› d¡}|  |¡ t| j|dS )Nz/rdzv/v_z/kv)Úetcd_clientÚetcd_store_prefix)rb   ra   Ú	EtcdStorerW   )r#   r/   Ú
store_pathr   r   r   r.   Ì  s   
zEtcdRendezvous.setup_kv_storer    )r   r   r   rI   r%   r'   r+   ro   rx   r   ry   ru   r}   r€   r†   r‡   r‰   rŠ   r   r=   r7   rz   rb   ra   rd   r×   rÝ   r.   r   r   r   r   rK   Ç   s4    0:*)753FI


rK   c                       s²   e Zd ZdZ	ddeej f‡ fdd„Zdd„ Zde	fd	d
„Z
dedefdd„Zddeej fdd„Zdefdd„Zdejfdd„Zdefdd„Zde	fdd„Zddd„Z‡  ZS )rà   z—
    Implements a c10 Store interface by piggybacking on the rendezvous etcd
    instance. This is the store object returned by ``EtcdRendezvous``
    Nrk   c                    sR   t ƒ  ¡  || _|| _|d ur|ntjdd| _| j d¡s'|  jd7  _d S d S )Ni,  )ÚsecondsrL   )Úsuperr%   rW   rg   ÚdatetimeÚ	timedeltark   r`   )r#   rÞ   rß   rk   ©Ú	__class__r   r   r%   Ù  s   
ÿÿzEtcdStore.__init__c                 C   s&   | j j| j|  |¡ |  |¡d dS )z‚
        Write a key/value pair into ``EtcdStore``.
        Both key and value may be either Python ``str`` or ``bytes``.
        ©rT   rU   N)rW   rm   rg   Ú_encode)r#   rT   rU   r   r   r   rm   ë  s   &zEtcdStore.setr(   c                 C   sB   | j |  |¡ }|  |g¡}|du rtd|› dƒ‚|  || ¡S )aV  
        Get a value by key, possibly doing a blocking wait.

        If key is not immediately present, will do a blocking wait
        for at most ``timeout`` duration or until the key is published.


        Returns:
            value ``(bytes)``

        Raises:
            LookupError - If key still not published after timeout
        NzKey z not found in EtcdStore)rg   ré   Ú_try_wait_getÚLookupErrorÚ_decode)r#   rT   Úb64_keyÚkvsr   r   r   r”   ò  s
   zEtcdStore.getÚnumc                 C   sÎ   |   |¡}z| jj| j| |   t|ƒ¡dd}t|  |j¡ƒW S  tj	y)   Y nw 	 | jj
| j| d}|   tt|  |j¡ƒ| ƒ¡}z| jj|j||jd}t|  |j¡ƒW S  tjye   tƒ  Y nw q+)a  
        Atomically increment a value by an integer amount. The integer is
        represented as a string using base 10. If key is not present,
        a default value of ``0`` will be assumed.

        Returns:
             the new (incremented) value


        FrS   Tr¬   r   )ré   rW   re   rg   rE   r•   rì   rU   r)   rf   r”   r˜   rT   r—   r   )r#   rT   rï   rí   rÔ   Ú	new_valuer   r   r   Úadd  s.   
ýÿ
ÿ
ÿözEtcdStore.addÚoverride_timeoutc                    s2   ‡ fdd„|D ƒ}ˆ   ||¡}|du rtdƒ‚dS )z‹
        Waits until all of the keys are published, or until timeout.

        Raises:
            LookupError - if timeout occurs
        c                    ó   g | ]
}ˆ j ˆ  |¡ ‘qS r   ©rg   ré   ©r­   rT   r&   r   r   r¯   4  ó    z"EtcdStore.wait.<locals>.<listcomp>Nz+Timeout while waiting for keys in EtcdStore)rê   rë   )r#   Úkeysrò   Úb64_keysrî   r   r&   r   rÇ   -  s
   ÿzEtcdStore.waitc                    s0   ‡ fdd„|D ƒ}ˆ j |tjddd}|duS )zU
        Check if all of the keys are immediately present (without waiting).
        c                    ró   r   rô   rõ   r&   r   r   r¯   >  rö   z#EtcdStore.check.<locals>.<listcomp>r   )Úmicroseconds)rò   N)rê   rä   rå   )r#   r÷   rø   rî   r   r&   r   Úcheck:  s   
þzEtcdStore.checkc                 C   s
   || _ dS )zD
        Change the timeout used for all future operations.
        NrÄ   )r#   rk   r   r   r   Úset_timeoutE  s   
zEtcdStore.set_timeoutc                 C   s<   t |ƒtkrt|ƒ ¡ S t |ƒtkrt| ¡ ƒ ¡ S tdƒ‚©Nz"Value must be of type str or bytes)ÚtypeÚbytesr   ÚdecoderE   ÚencodeÚ
ValueError©r#   rU   r   r   r   ré   P  s
   zEtcdStore._encodec                 C   s4   t |ƒtkr
t|ƒS t |ƒtkrt| ¡ ƒS tdƒ‚rü   )rý   rþ   r   rE   r   r  r  r   r   r   rì   [  s
   zEtcdStore._decodec                    sÐ   |d u r| j n|}t ¡ | ¡  }	 | jj| jd}‡ fdd„|jD ƒ}t|ƒtˆ ƒkr.|S |t ¡  }|dkr:d S z| jj| jd||j	d d W n t
jy]   t ¡ |kr[Y d S Y q t
jyf   Y qw q)NTr¬   c                    s    i | ]}|j ˆ v r|j |j“qS r   rè   )r­   rÔ   ©rø   r   r   Ú
<dictcomp>q  s
    
þz+EtcdStore._try_wait_get.<locals>.<dictcomp>r   r   )rT   r´   rk   r³   )rk   r   Útotal_secondsrW   r”   rg   rµ   rˆ   r¸   rt   r)   rº   r¹   )r#   rø   rò   rk   rÀ   Ú	all_nodesÚ	req_nodesÚwatch_timeoutr   r  r   rê   j  s6   
þ
üÿãzEtcdStore._try_wait_getr    )r   r   r   rI   r   rä   rå   r%   rm   rþ   r”   r•   rñ   rÇ   rJ   rú   rû   rE   ré   rì   rê   Ú__classcell__r   r   ræ   r   rà   Ó  s    	üü%rà   Úparamsr(   c                 C   s”   t | jdƒ\}}| j d¡}|du rd}n|dkr!|dkr!tdƒ‚| j d¡}|dur9| j d¡}|dur9||f}| j d	¡}tj|||||d
dS )zT
    Creates a new ``etcd.Client`` from the specified ``RendezvousParameters``.
    iK	  ÚprotocolNÚhttpÚhttpsz(The etcd protocol must be HTTP or HTTPS.ÚcertrT   ÚcacertT)r  r  Úca_certÚallow_reconnect)r   ÚendpointÚconfigr”   r  r)   ÚClient)r
  ÚhostnameÚportr  Ússl_certÚcert_keyr  r   r   r   Ú_create_etcd_client  s(   úr  c              	   C   s>   t | ƒ}|  dd¡}t||| j| j| j| j| jd}t|dS )a6  
    Usage:

    ::

    rdzv_params = RendezvousParameters(
                        backend="etcd",
                        endpoint="192.168.0.42:2379",
                        run_id="123",
                        min_nodes=4,
                        max_nodes=8,
                        timeout=300,
                        last_call_timeout=30,
                        etcd_prefix="custom_prefix",
                        protocol="https",
                        cacert="/etc/kubernetes/certs/ca.crt",
                        cert="/etc/kubernetes/certs/client.crt",
                        key="/etc/kubernetes/certs/client.key")
    # -- or --
    rdzv_params = RendezvousParameters(
                        backend="etcd",
                        endpoint="192.168.0.42:2379",
                        run_id="123",
                        min_nodes=4,
                        max_nodes=8)

    etcd_rdzv_handler = create_etcd_rendezvous_handler(rdzv_params)


    Where:
        run_id - unique id for this training job instance,
        min_nodes - min number of workers expected to join the rendezvous,
        max_nodes - max number of workers allowed to join the rendezvous,
                        defaults to min_workers is not specified.
        timeout - total timeout within which next_rendezvous is expected to
                      succeed; a RendezvousTimeoutException is raised otherwise;
                      Defaults is 600 (10 minutes).
        last_call_timeout - additional wait amount ("last call") after
                            min number of workers has been reached.
                            Defaults to 30 seconds.
        etcd_prefix - path prefix (from etcd root), inside which all
                      etcd nodes will be created.
                      Default is "/torchelastic/p2p".
        protocol - http (default) or https to access etcd.
        cacert - CA cert to access etcd, only makes sense with https.
        cert - client cert to access etcd, only makes sense with https.
        key - client key to access etcd, only makes sense with https.
    Úetcd_prefixz/torchelastic/p2p)rW   rg   rh   ri   rj   rk   rl   )r$   )	r  r”   rK   rh   Ú	min_nodesÚ	max_nodesrk   rl   r   )r
  rW   r  Úrdzvr   r   r   Úcreate_rdzv_handler´  s   1ù
	r  )4rä   rv   Úloggingr   ÚsysrÌ   r   Úbase64r   r   Útypingr   r)   Útorch.distributedr   Útorchelastic.rendezvousr   r   r   r	   r
   Úutilsr   Ú	FormatterÚ_log_fmtÚStreamHandlerÚstderrÚ_log_handlerÚsetFormatterÚ	getLoggerr   r,   Ú	propagateÚsetLevelÚINFOÚ
addHandlerrp   r   r   r“   r    r¡   r©   rc   r   r   ÚobjectrK   rà   r  r  r  r   r   r   r   Ú<module>   sP   



x       =%