o
    $i                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZ d dlmZmZmZ d dlmZ d dlmZmZ d dlmZmZmZmZ d dlmZmZmZ d d	lmZ d d
l m!Z! d dl"m#Z#m$Z$m%Z% d dl&m'Z' e
dZ(e
dZ)e *e+Z,e'G dd dZ-dS )    N)Path)AnyCallableDictListOptionalTypeVarUnion)StartTracebackStartTracebackWithWorkerRankskip_exceptions)Dataset)
Checkpoint
DataConfig)BackendExecutorInactiveWorkerGroupErrorTrainBackendErrorTrainingWorkerError)_TrainingResult_TrainSessionget_session)ActorWrapper)BackendConfig)BaseTrainer
GenDatasetTrainingFailedError)DeveloperAPITSc                   @   s   e Zd ZdZdeeef dedeeg e	f ee
eef ge	f f de
eef de
eef dedeee
eeef  fd	d
Zdd Z	ddee fddZdefddZdd Zdeee
  fddZdd ZdefddZdS )TrainingIteratorzEAn iterator over Train results. Returned by ``trainer.run_iterator``.backend_executorbackend_config
train_funcdatasetsmetadatadata_config
checkpointc                 C   sL   || _ | | _|| _|| _|| _|| _| j|| j| j| j|d d| _d S )N)r"   r#   r$   r%   r&   F)	_backend_executorbackend_cls_backend_train_func	_datasets	_metadata_data_config_start_training_finished_training)selfr    r!   r"   r#   r$   r%   r&    r1   N/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/train/trainer.py__init__'   s   


zTrainingIterator.__init__c                 C   s   | S Nr1   r0   r1   r1   r2   __iter__B   s   zTrainingIterator.__iter__Nc              	      s:   t  }|s	J d|j fdd d S )Nz8`_start_training` should only be called from within Tunec                      s   j j dS )N)r"   r#   r$   r%   storager&   )r'   start_trainingr1   r&   r%   r#   r$   r0   r7   r"   r1   r2   <lambda>R   s    z2TrainingIterator._start_training.<locals>.<lambda>)r   r7   _run_with_error_handling)r0   r"   r#   r$   r%   r&   tune_sessionr1   r9   r2   r.   E   s   z TrainingIterator._start_trainingfuncc              	   C   st   z| W S  t y#   td | | j| j| j| j | | Y S  t	y.   t
dd  ty9   t
dd w )NzSWorkers have been successfully restarted. Resuming training from latest checkpoint.zThis Trainer is not active. It is either shutdown already or never started in the first place. Either create a new Trainer or start this one.zTraining failed. You should not be seeing this error and this is a bug. Please create a new issue at https://github.com/ray-project/ray.)r   loggerinfor.   r*   r+   r,   r-   r;   r   RuntimeErrorr   )r0   r=   r1   r1   r2   r;   \   s4   z)TrainingIterator._run_with_error_handlingc                 C   s   |   r| jjdd tz| | j}|d u r*| jjdd | | j d| _t|W S  tys } z;t	|t
r=|j}nd }t|}dtt|||j}| jjd||d z| jjdd d| _W   tyn   Y  w d }~ww )NF)erroredT )rA   stack_tracefailed_rank)graceful_termination)is_finishedr'   report_final_run_statusStopIterationr;   _fetch_next_result_finish_trainingr/   r
   
isinstancer   worker_rankr   join	tracebackformat_exceptiontype__traceback__shutdown	Exception)r0   next_resultserD   rC   r1   r1   r2   __next__|   s@   
zTrainingIterator.__next__returnc                 C   s0   | j  }|du rdS tdd |D sJ |S )a  Fetch next results produced by ``session.report()`` from each worker.

        Assumes ``start_training`` has already been called.

        Returns:
            A list of dictionaries of values passed to ``session.report()`` from
                each worker. Each item corresponds to an intermediate result
                a single worker. If there are no more items to fetch,
                returns None.
        Nc                 s   s    | ]}t |tV  qd S r4   )rK   r   ).0resultr1   r1   r2   	<genexpr>   s    z6TrainingIterator._fetch_next_result.<locals>.<genexpr>)r'   get_next_resultsall)r0   resultsr1   r1   r2   rI      s
   
z#TrainingIterator._fetch_next_resultc                 C   s
   | j  S )at  Finish training and return final results. Propagate any exceptions.

        Blocks until training is finished on all workers.

        Assumes `start_training` has already been called.

        Returns:
            A list of return values from calling ``train_func`` on each worker.
                Each item corresponds to the return value from a single worker.
        )r'   finish_trainingr5   r1   r1   r2   rJ      s   
z!TrainingIterator._finish_trainingc                 C   s   | j S r4   )r/   r5   r1   r1   r2   rF      s   zTrainingIterator.is_finishedr4   )__name__
__module____qualname____doc__r	   r   r   r   r   r   r   strr   r   r   r   r   r   r3   r6   r.   r;   rV   r   rI   rJ   boolrF   r1   r1   r1   r2   r   #   s6    
$


	
 'r   ).loggingrN   pathlibr   typingr   r   r   r   r   r   r	   ray.air._internal.utilr
   r   r   ray.datar   	ray.trainr   r   $ray.train._internal.backend_executorr   r   r   r   ray.train._internal.sessionr   r   r   ray.train._internal.utilsr   ray.train.backendr   ray.train.base_trainerr   r   r   ray.util.annotationsr   r   r   	getLoggerr_   r>   r   r1   r1   r1   r2   <module>   s$    $
