o
    $iK&                  	   @   s6  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
mZmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d d
lmZmZm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& e'e(Z)dZ*dZ+e&G dd de"Z,e&	dde
e	ge	f dee- ded fddZ.dS )    N)partial)Number)AnyCallableDictOptionalType)RunnerThreadStartTraceback)_ERROR_FETCH_TIMEOUT)_TrainingResult)	TrialInfo_TrainSessionget_sessioninit_sessionshutdown_session)PlacementGroupFactory)DEFAULT_METRICRESULT_DUPLICATESHOULD_CHECKPOINT)	Trainable)_detect_config_single)DeveloperAPIz.null_markerz.temp_markerc                   @   s   e Zd ZdZdZdd Zdeeef fddZ	dd	 Z
d
d Zdd ZddefddZdefddZdd Zdd ZdddZdS )FunctionTrainablezwTrainable that runs a user function reporting results.

    This mode of execution does not support checkpoint/restore.funcc                    sR   t  fddt j j j jjd d  jjd jdd d d d d d d d d  _d S )Nc                            jS N_trainable_funcconfig selfr    b/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/tune/trainable/function_trainable.py<lambda>,       z)FunctionTrainable.setup.<locals>.<lambda>nameid	resourceslogdir	driver_ipdriver_node_idexperiment_nameT)training_func
trial_infostoragesynchronous_result_reporting
world_rank
local_rank	node_ranklocal_world_size
world_sizedataset_shard
checkpoint)	r   r   
trial_nametrial_idtrial_resources_storagetrial_driver_staging_pathexperiment_dir_name_last_training_resultr"   r   r    r!   r#   setup*   s,   
	
zFunctionTrainable.setupr   c                 C   s   t )z7Subclasses can override this to set the trainable func.)NotImplementedErrorr@   r    r    r#   r   C   s   z!FunctionTrainable._trainable_funcc                    sN    fdd}t | jdd _ j  z j  W d S  ty&   Y d S w )Nc               
      s.   z   jW S  ty }  zt| d } ~ ww r   )r   r   	Exceptionr
   )er!   r    r#   
entrypointI   s   z,FunctionTrainable._start.<locals>.entrypointT)targeterror_queuedaemon)r	   _error_queue_runner_status_reporter_startstartRuntimeError)r"   rE   r    r!   r#   rL   H   s   
zFunctionTrainable._startc                 C   sZ   t  }|js
|  | }|std|j}t|v rd|t< || _|j	dur+d|t< |S )a=  Implements train() for a Function API.

        If the RunnerThread finishes without reporting "done",
        Tune will automatically provide a magic keyword __duplicate__
        along with a result with "done=True". The TrialRunner will handle the
        result accordingly (see tune/tune_controller.py).
        zShould not have reached here. The TuneController should not have scheduled another `train` remote call.It should have scheduled a `stop` instead after the training function exits.FNT)
r   training_startedrM   get_nextrN   metricsr   r   r?   r8   )r"   sessiontraining_resultrQ   r    r    r#   step\   s   
zFunctionTrainable.stepc                 C   s   || S r   r    )r"   fnr    r    r#   execute   s   zFunctionTrainable.execute checkpoint_dirc                 C   s   |rt d| jS )Nz4Checkpoint dir should not be used with function API.)
ValueErrorr?   )r"   rX   r    r    r#   save_checkpoint   s   z!FunctionTrainable.save_checkpointcheckpoint_resultc                 C   s   t  }|j|_d S r   )r   r8   loaded_checkpoint)r"   r[   rR   r    r    r#   load_checkpoint   s   z!FunctionTrainable.load_checkpointc                 C   s8   t  }z|jdd W |  t  d S |  t  w )Nr   timeout)r   finish_report_thread_runner_errorr   )r"   rR   r    r    r#   cleanup   s   
zFunctionTrainable.cleanupc                    sv   t  }ttjdd}|j|d |j rdS |j fddt	 j
 j j jjd d  jjd jd i  _d	S )
NTUNE_FUNCTION_THREAD_TIMEOUT_S   r^   Fc                      r   r   r   r    r!   r    r#   r$      r%   z0FunctionTrainable.reset_config.<locals>.<lambda>r&   )r.   r/   r0   T)r   intosenvirongetr`   training_threadis_aliveresetr   r9   r:   r;   r<   trial_working_directoryr>   _last_result)r"   
new_configrR   thread_timeoutr    r!   r#   reset_config   s(   

	zFunctionTrainable.reset_configFc                 C   s.   z| j j|td}t| tjy   Y d S w )N)blockr_   )rI   rh   r   r
   queueEmpty)r"   rq   rD   r    r    r#   ra      s   z-FunctionTrainable._report_thread_runner_errorN)rW   )F)__name__
__module____qualname____doc___namerA   r   strr   r   rL   rT   rV   rZ   r   r]   rb   rp   ra   r    r    r    r#   r   "   s    *	r   
train_funcr'   returnc                    sn   t f}tdrj| }tj}t}|s td|t	dd G  fdddg|R  }|S )N
__mixins__zUnknown argument found in the Trainable function. The function args must include a single 'config' positional parameter.
Found: {}
_resourcesc                       s^   e Zd Z pedrj ndZdd ZfddZedee	e
f dee ffd	d
ZdS )z#wrap_function.<locals>.ImplicitFuncrt   r   c                 S   s   | j S r   )rx   r!   r    r    r#   __repr__   s   z,wrap_function.<locals>.ImplicitFunc.__repr__c                    sX   t  |}dd }d }t r| D ]}|| qn| }|| t tdi |S )Nc                 S   sH   | sd S t | trt |  d S t | tr t t| i d S td)NzuInvalid return or yield value. Either return/yield a single number or a dictionary object in your trainable function.)
isinstancedictr   reportr   r   rY   )outputr    r    r#   handle_output   s   

zJwrap_function.<locals>.ImplicitFunc._trainable_func.<locals>.handle_outputT)r   inspectisgeneratorfunctionr   r   r   )r"   r   rU   r   r   )rz   r    r#   r      s   



z3wrap_function.<locals>.ImplicitFunc._trainable_funcr   r{   c                    s   t  tst r |S  S r   )r   r   callable)clsr   )r)   r    r#   default_resource_request  s   z<wrap_function.<locals>.ImplicitFunc.default_resource_requestN)rt   ru   rv   hasattrrx   r~   r   classmethodr   ry   r   r   r   r   r    r'   r)   rz   r    r#   ImplicitFunc   s    
r   )
r   r   r|   r   getfullargspecargsr   rY   formatgetattr)rz   r'   inherit_from	func_argsuse_config_singler   r    r   r#   wrap_function   s   

/r   r   )/r   loggingrf   rr   	functoolsr   numbersr   typingr   r   r   r   r   ray.air._internal.utilr	   r
   ray.air.constantsr   &ray.train._internal.checkpoint_managerr   ray.train._internal.sessionr   r   r   r   r   #ray.tune.execution.placement_groupsr   ray.tune.resultr   r   r   ray.tune.trainable.trainabler   ray.tune.utilsr   ray.util.annotationsr   	getLoggerrt   loggerNULL_MARKERTEMP_MARKERr   ry   r   r    r    r    r#   <module>   s>    
 $