o
    ci(                  	   @   sB  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d d
lmZ d dlm Z m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z( e)e*Z+dZ,dZ-e(G dd de$Z.e(	dde
e	ge	f dee/ ded fddZ0dS )    N)partial)Number)AnyCallableDictOptionalType)RunnerThreadStartTraceback)_ERROR_FETCH_TIMEOUT)_TrainingResult)	TrialInfo_TrainSessionget_sessioninit_sessionshutdown_session)RUN_CONTROLLER_AS_ACTOR_ENV_VAR)PlacementGroupFactory)DEFAULT_METRICRESULT_DUPLICATESHOULD_CHECKPOINT)	Trainable)_detect_config_single)DeveloperAPIz.null_markerz.temp_markerc                   @   s   e Zd ZdZdZdd Zdeeef fddZ	dd	 Z
d
d Zdd ZddefddZdefddZdd Zdd ZdddZdS )FunctionTrainablezwTrainable that runs a user function reporting results.

    This mode of execution does not support checkpoint/restore.funcc                    s\   t  fddt j j j jjd d  jjd jdd d d d d d d d d  _dt	j
t< d S )Nc                            jS N_trainable_funcconfig selfr!   Y/home/ubuntu/.local/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py<lambda>-       z)FunctionTrainable.setup.<locals>.<lambda>nameid	resourceslogdir	driver_ipdriver_node_idexperiment_nameT)training_func
trial_infostoragesynchronous_result_reporting
world_rank
local_rank	node_ranklocal_world_size
world_sizedataset_shard
checkpoint0)r   r   
trial_nametrial_idtrial_resources_storagetrial_driver_staging_pathexperiment_dir_name_last_training_resultosenvironr   r#   r    r!   r"   r$   setup+   s.   
	zFunctionTrainable.setupr    c                 C   s   t )z7Subclasses can override this to set the trainable func.)NotImplementedErrorrD   r!   r!   r$   r   O   s   z!FunctionTrainable._trainable_funcc                    sN    fdd}t | jdd _ j  z j  W d S  ty&   Y d S w )Nc               
      s.   z   jW S  ty }  zt| d } ~ ww r   )r   r    	Exceptionr
   )er"   r!   r$   
entrypointU   s   z,FunctionTrainable._start.<locals>.entrypointT)targeterror_queuedaemon)r	   _error_queue_runner_status_reporter_startstartRuntimeError)r#   rI   r!   r"   r$   rP   T   s   
zFunctionTrainable._startc                 C   sZ   t  }|js
|  | }|std|j}t|v rd|t< || _|j	dur+d|t< |S )a=  Implements train() for a Function API.

        If the RunnerThread finishes without reporting "done",
        Tune will automatically provide a magic keyword __duplicate__
        along with a result with "done=True". The TrialRunner will handle the
        result accordingly (see tune/tune_controller.py).
        zShould not have reached here. The TuneController should not have scheduled another `train` remote call.It should have scheduled a `stop` instead after the training function exits.FNT)
r   training_startedrQ   get_nextrR   metricsr   r   rA   r9   )r#   sessiontraining_resultrU   r!   r!   r$   steph   s   
zFunctionTrainable.stepc                 C   s   || S r   r!   )r#   fnr!   r!   r$   execute   s   zFunctionTrainable.execute checkpoint_dirc                 C   s   |rt d| jS )Nz4Checkpoint dir should not be used with function API.)
ValueErrorrA   )r#   r\   r!   r!   r$   save_checkpoint   s   z!FunctionTrainable.save_checkpointcheckpoint_resultc                 C   s   t  }|j|_d S r   )r   r9   loaded_checkpoint)r#   r_   rV   r!   r!   r$   load_checkpoint   s   z!FunctionTrainable.load_checkpointc                 C   s8   t  }z|jdd W |  t  d S |  t  w )Nr   timeout)r   finish_report_thread_runner_errorr   )r#   rV   r!   r!   r$   cleanup   s   
zFunctionTrainable.cleanupc                    sv   t  }ttjdd}|j|d |j rdS |j fddt	 j
 j j jjd d  jjd jd i  _d	S )
NTUNE_FUNCTION_THREAD_TIMEOUT_S   rb   Fc                      r   r   r   r!   r"   r!   r$   r%      r&   z0FunctionTrainable.reset_config.<locals>.<lambda>r'   )r/   r0   r1   T)r   intrB   rC   getrd   training_threadis_aliveresetr   r;   r<   r=   r>   trial_working_directoryr@   _last_result)r#   
new_configrV   thread_timeoutr!   r"   r$   reset_config   s(   

	zFunctionTrainable.reset_configFc                 C   s.   z| j j|td}t| tjy   Y d S w )N)blockrc   )rM   rj   r   r
   queueEmpty)r#   rs   rH   r!   r!   r$   re      s   z-FunctionTrainable._report_thread_runner_errorN)r[   )F)__name__
__module____qualname____doc___namerE   r   strr   r   rP   rX   rZ   r^   r   ra   rf   rr   re   r!   r!   r!   r$   r   #   s    $*	r   
train_funcr(   returnc                    sn   t f}tdrj| }tj}t}|s td|t	dd G  fdddg|R  }|S )N
__mixins__zUnknown argument found in the Trainable function. The function args must include a single 'config' positional parameter.
Found: {}
_resourcesc                       s^   e Zd Z pedrj ndZdd ZfddZedee	e
f dee ffd	d
ZdS )z#wrap_function.<locals>.ImplicitFuncrv   r   c                 S   s   | j S r   )rz   r"   r!   r!   r$   __repr__   s   z,wrap_function.<locals>.ImplicitFunc.__repr__c                    sX   t  |}dd }d }t r| D ]}|| qn| }|| t tdi |S )Nc                 S   sH   | sd S t | trt |  d S t | tr t t| i d S td)NzuInvalid return or yield value. Either return/yield a single number or a dictionary object in your trainable function.)
isinstancedictr   reportr   r   r]   )outputr!   r!   r$   handle_output   s   

zJwrap_function.<locals>.ImplicitFunc._trainable_func.<locals>.handle_outputT)r   inspectisgeneratorfunctionr   r   r   )r#   r    rY   r   r   )r|   r!   r$   r      s   



z3wrap_function.<locals>.ImplicitFunc._trainable_funcr    r}   c                    s   t  tst r |S  S r   )r   r   callable)clsr    )r*   r!   r$   default_resource_request  s   z<wrap_function.<locals>.ImplicitFunc.default_resource_requestN)rv   rw   rx   hasattrrz   r   r   classmethodr   r{   r   r   r   r   r!   r(   r*   r|   r!   r$   ImplicitFunc   s    
r   )
r   r   r~   r   getfullargspecargsr   r]   formatgetattr)r|   r(   inherit_from	func_argsuse_config_singler   r!   r   r$   wrap_function   s   

/r   r   )1r   loggingrB   rt   	functoolsr   numbersr   typingr   r   r   r   r   ray.air._internal.utilr	   r
   ray.air.constantsr   &ray.train._internal.checkpoint_managerr   ray.train._internal.sessionr   r   r   r   r    ray.train.v2._internal.constantsr   #ray.tune.execution.placement_groupsr   ray.tune.resultr   r   r   ray.tune.trainable.trainabler   ray.tune.utilsr   ray.util.annotationsr   	getLoggerrv   loggerNULL_MARKERTEMP_MARKERr   r{   r   r!   r!   r!   r$   <module>   s@    
 /