o
    ci                     @   s   d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ e eZG dd deZG d	d
 d
ZG dd deZdS )    N)defaultdict)Path)Dict)Callback)Trialc                   @   s*   e Zd ZdZ				dddZdd	 Zd
S )FailureInjectorCallbackz3Adds random failure injection to the TrialExecutor.~/ray_bootstrap_config.yaml皙?r   Fc                 C   s2   || _ t|  | _|| _|| _t | _	d S N)
probabilityr   
expanduseras_posixconfig_pathdisabletime_between_checkstime	monotoniclast_fail_check)selfr   r   r   r    r   G/home/ubuntu/.local/lib/python3.10/site-packages/ray/tune/utils/mock.py__init__   s
   z FailureInjectorCallback.__init__c                 K   s   t j| js	d S t | j| j k rd S t | _dd l}ddl	m
} d}d}t | jk rn| jspt | jk }||k rrz|| jd|d d W d S  |jjyg   |d7 }tdt|t||  Y nw ||k s=d S d S d S d S )Nr   )	kill_node   T)yeshardoverride_cluster_name   z@Killing random node failed in attempt {}. Retrying {} more times)ospathexistsr   r   r   r   r   click ray.autoscaler._private.commandsr   randomr   r   
exceptionsClickExceptionlogger	exceptionformatstr)r   infor!   r   failuresmax_failuresshould_terminater   r   r   on_step_begin!   s@   
z%FailureInjectorCallback.on_step_beginN)r   r	   r   F)__name__
__module____qualname____doc__r   r.   r   r   r   r   r      s    
r   c                   @   sJ   e Zd ZdZdd Zdeeef fddZdefdd	Z	de
fd
dZdS )TrialStatusSnapshotzvA sequence of statuses of trials as they progress.

    If all trials keep previous status, no snapshot is taken.
    c                 C   s
   g | _ d S r
   	_snapshotr   r   r   r   r   K      
zTrialStatusSnapshot.__init__new_snapshotc                 C   s0   |sdS | j r|| j d kr| j | dS dS )z*May append a new snapshot to the sequence.N)r5   append)r   r8   r   r   r   r:   N   s
   zTrialStatusSnapshot.appendreturnc                 C   sB   d}| j D ]}d}|D ]}|| tjkr|d7 }qt||}q|S )zOutputs the max number of running trials at a given time.

        Usually used to assert certain number given resource restrictions.
        r   r   )r5   r   RUNNINGmax)r   resultsnapshotcounttrial_idr   r   r   max_running_trialsV   s   
z&TrialStatusSnapshot.max_running_trialsc                    s*   | j sdS | j d  t fdd D S )z"True if all trials are terminated.Fr9   c                 3   s    | ]
} | t jkV  qd S r
   )r   
TERMINATED).0rA   last_snapshotr   r   	<genexpr>j   s    
z@TrialStatusSnapshot.all_trials_are_terminated.<locals>.<genexpr>)r5   allr6   r   rE   r   all_trials_are_terminatede   s   
z-TrialStatusSnapshot.all_trials_are_terminatedN)r/   r0   r1   r2   r   r   r)   r:   intrB   boolrI   r   r   r   r   r3   E   s    r3   c                   @   s&   e Zd ZdZdefddZdd ZdS )TrialStatusSnapshotTakerzCollects a sequence of statuses of trials as they progress.

    If all trials keep previous status, no snapshot is taken.
    r?   c                 C   s
   || _ d S r
   r4   )r   r?   r   r   r   r   u   r7   z!TrialStatusSnapshotTaker.__init__c                 K   s.   t t}|D ]}|j||j< q| j| d S r
   )r   r)   statusrA   r5   r:   )r   	iterationtrialskwargsr8   trialr   r   r   on_step_endx   s   z$TrialStatusSnapshotTaker.on_step_endN)r/   r0   r1   r2   r3   r   rR   r   r   r   r   rL   o   s    rL   )loggingr   r#   r   collectionsr   pathlibr   typingr   ray.tune.callbackr   ray.tune.experimentr   	getLoggerr/   r&   r   r3   rL   r   r   r   r   <module>   s    
6*