o
    `۷i                     @   s  d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
mZmZ d dlmZmZmZ d dlmZmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ d d	lm Z  d d
l!m"Z"m#Z#m$Z$ d dl%m&Z& ernd dl'm(Z( edZ)edZ*e +e,Z-e&G dd dZ.dS )    N)Path)TYPE_CHECKINGAnyCallableDictListOptionalTypeVarUnion)StartTracebackStartTracebackWithWorkerRankskip_exceptions)
Checkpoint
DataConfig)BackendExecutorInactiveWorkerGroupErrorTrainBackendErrorTrainingWorkerError)_TrainingResult_TrainSessionget_session)ActorWrapper)BackendConfig)BaseTrainer
GenDatasetTrainingFailedError)DeveloperAPI)DatasetTSc                   @   s   e Zd ZdZdeeef dedeeg e	f ee
eef ge	f f de
edf de
eef ded	eee
eeef  fd
dZdd Z	dd	ee fddZdefddZdd Zdeee
  fddZdd ZdefddZdS )TrainingIteratorzEAn iterator over Train results. Returned by ``trainer.run_iterator``.backend_executorbackend_config
train_funcdatasetsr   metadatadata_config
checkpointc                 C   sL   || _ | | _|| _|| _|| _|| _| j|| j| j| j|d d| _d S )N)r#   r$   r%   r&   r'   F)	_backend_executorbackend_cls_backend_train_func	_datasets	_metadata_data_config_start_training_finished_training)selfr!   r"   r#   r$   r%   r&   r'    r2   G/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/train/trainer.py__init__)   s   


zTrainingIterator.__init__c                 C   s   | S Nr2   r1   r2   r2   r3   __iter__D   s   zTrainingIterator.__iter__Nc              	      s:   t  }|s	J d|j fdd d S )Nz8`_start_training` should only be called from within Tunec                      s   j j dS )N)r#   r$   r%   r&   storager'   )r(   start_trainingr2   r'   r&   r$   r%   r1   r8   r#   r2   r3   <lambda>T   s    z2TrainingIterator._start_training.<locals>.<lambda>)r   r8   _run_with_error_handling)r1   r#   r$   r%   r&   r'   tune_sessionr2   r:   r3   r/   G   s   z TrainingIterator._start_trainingfuncc              	   C   st   z| W S  t y#   td | | j| j| j| j | | Y S  t	y.   t
dd  ty9   t
dd w )NzSWorkers have been successfully restarted. Resuming training from latest checkpoint.zThis Trainer is not active. It is either shutdown already or never started in the first place. Either create a new Trainer or start this one.zTraining failed. You should not be seeing this error and this is a bug. Please create a new issue at https://github.com/ray-project/ray.)r   loggerinfor/   r+   r,   r-   r.   r<   r   RuntimeErrorr   )r1   r>   r2   r2   r3   r<   ^   s4   z)TrainingIterator._run_with_error_handlingc                 C   s   |   r| jjdd tz| | j}|d u r*| jjdd | | j d| _t|W S  tys } z;t	|t
r=|j}nd }t|}dtt|||j}| jjd||d z| jjdd d| _W   tyn   Y  w d }~ww )NF)erroredT )rB   stack_tracefailed_rank)graceful_termination)is_finishedr(   report_final_run_statusStopIterationr<   _fetch_next_result_finish_trainingr0   r   
isinstancer   worker_rankr   join	tracebackformat_exceptiontype__traceback__shutdown	Exception)r1   next_resultserE   rD   r2   r2   r3   __next__~   s@   
zTrainingIterator.__next__returnc                 C   s0   | j  }|du rdS tdd |D sJ |S )a  Fetch next results produced by ``session.report()`` from each worker.

        Assumes ``start_training`` has already been called.

        Returns:
            A list of dictionaries of values passed to ``session.report()`` from
                each worker. Each item corresponds to an intermediate result
                a single worker. If there are no more items to fetch,
                returns None.
        Nc                 s   s    | ]}t |tV  qd S r5   )rL   r   ).0resultr2   r2   r3   	<genexpr>   s    z6TrainingIterator._fetch_next_result.<locals>.<genexpr>)r(   get_next_resultsall)r1   resultsr2   r2   r3   rJ      s
   
z#TrainingIterator._fetch_next_resultc                 C   s
   | j  S )at  Finish training and return final results. Propagate any exceptions.

        Blocks until training is finished on all workers.

        Assumes `start_training` has already been called.

        Returns:
            A list of return values from calling ``train_func`` on each worker.
                Each item corresponds to the return value from a single worker.
        )r(   finish_trainingr6   r2   r2   r3   rK      s   
z!TrainingIterator._finish_trainingc                 C   s   | j S r5   )r0   r6   r2   r2   r3   rG      s   zTrainingIterator.is_finishedr5   )__name__
__module____qualname____doc__r
   r   r   r   r   r   r   strr   r   r   r   r   r4   r7   r/   r<   rW   r   rJ   rK   boolrG   r2   r2   r2   r3   r    %   s6    
$


	
 'r    )/loggingrO   pathlibr   typingr   r   r   r   r   r   r	   r
   ray.air._internal.utilr   r   r   	ray.trainr   r   $ray.train._internal.backend_executorr   r   r   r   ray.train._internal.sessionr   r   r   ray.train._internal.utilsr   ray.train.backendr   ray.train.base_trainerr   r   r   ray.util.annotationsr   ray.datar   r   r   	getLoggerr`   r?   r    r2   r2   r2   r3   <module>   s&    (
