o
    civ)                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZmZmZm	Z	m
Z
 d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZ d dlmZmZ d d	lmZm Z  d d
l!m"Z" d dl#m$Z$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z? d dl@mAZA d dlBmCZCmDZD d dlEmFZF d dlGmHZHmIZI d dlJmKZK eLeMZNeIG dd dZOdS )    N)AnyCallableDictListOptionalUnion)env_bool)	usage_lib)ActorHandle)tag_train_v2_trainer)BackendConfig
Checkpoint
DataConfigResult	RunConfigScalingConfig)+_RESUME_FROM_CHECKPOINT_DEPRECATION_WARNING$_TRAINER_RESTORE_DEPRECATION_WARNING)RAY_CHDIR_TO_TRIAL_DIRRAY_TRAIN_ENABLE_STATE_TRACKING)!_GET_METADATA_DEPRECATION_MESSAGE)AcceleratorSetupCallbackBackendSetupCallbackDatasetsSetupCallbackWorkingDirectorySetupCallback)
GenDataset)_initialize_env_callbacks)ControllerMetricsCallbackWorkerMetricsCallback)StateManagerCallback)UserCallbackHandler)DEFAULT_RUN_CONTROLLER_AS_ACTORMETRICS_ENABLED_ENV_VARRUN_CONTROLLER_AS_ACTOR_ENV_VARget_env_vars_to_propagate)RayTrainCallback)TrainRunContext)TrainController)create_failure_policy)create_scaling_policy)ObjectRefWrapperconstruct_train_func)UserCallback)
DeprecatedDeveloperAPI)NodeAffinitySchedulingStrategyc                   @   s   e Zd ZdZddddddddddeeg df eegdf f dee dee dee	 dee
 d	eeeef  d
ee dee deeeef  fddZdefddZdee fddZdefddZdee fddZeedd Zeedd ZdS )DataParallelTrainera  Base class for distributed data parallel training on Ray.

    This class supports the SPMD parallelization pattern, where a single
    training function is executed in parallel across multiple workers,
    and different shards of data are processed by each worker.
    N)train_loop_configbackend_configscaling_config
run_configdatasetsdataset_configresume_from_checkpointmetadatatrain_loop_per_workerr1   r2   r3   r4   r5   r6   r7   r8   c          
      C   s   |pt  | _|| _|| _|pt | _|pt | _|pi | _|p!t	 | _
t| j| j| j| j| j| j
d| _|d ur<tt|	d urDtttd t|  d S )N)r4   r1   r3   r2   r5   r6   train)r   r4   r9   r1   r   r3   r   r2   r5   r   data_configr&   train_run_contextDeprecationWarningr   r   r	   record_library_usager   )
selfr9   r1   r2   r3   r4   r5   r6   r7   r8    r@   Z/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/v2/api/data_parallel_trainer.py__init__D   s*   
	
zDataParallelTrainer.__init__returnc                 C   sX   t | j| j| jjdd}t|}| j|t| jt	| j
j| j|  d}|jr*|j|S )an  Launches the Ray Train controller to run training on workers.

        Returns:
            A Result object containing the training result.

        Raises:
            ray.train.v2.api.exceptions.TrainingFailedError: If any failures occur
                during training and the number of retries configured in
                `FailureConfig` is exhausted.
        r9   )configtrain_func_contextfn_arg_name)train_fn_refscaling_policyfailure_policyr<   	callbacks)r+   r9   r1   r2   rE   r*   _initialize_and_run_controllerr)   r3   r(   r4   failure_configr<   _create_default_callbackserror)r?   train_fnrG   resultr@   r@   rA   fitl   s"   
zDataParallelTrainer.fitc                 C   s   t  }t| j| j}t| j}t| j| j| jd}||||g t	t
dr.t }|| t	tdrA|t  |t| j t	tdrL|t  | jjd urV| jjng }dd |D }|t|| jd |dd |D  |S )N)r5   r;   r3   TFc                 S   s   g | ]	}t |tr|qS r@   
isinstancer,   .0cbr@   r@   rA   
<listcomp>   s
    
zADataParallelTrainer._create_default_callbacks.<locals>.<listcomp>)user_callbacksr<   c                 S   s   g | ]	}t |ts|qS r@   rR   rT   r@   r@   rA   rW      s    )r   r   r2   r3   r   r   r5   r;   extendr   r   r   appendr"   r   r   r<   r   r   r4   rJ   r    )r?   rJ   accelerator_setup_callbackbackend_setup_callbackdatasets_setup_callback working_directory_setup_callbackrun_config_callbacksrX   r@   r@   rA   rM      sL   




z-DataParallelTrainer._create_default_callbacksc                 K   s   t tt}|r9tjdtt  dddt idt	}|jdi |}| 
| t|j  t|j S t	di |}t|  | S )Nr   F)node_idsoftenv_vars)num_cpusscheduling_strategyruntime_envr@   )r   r#   r!   rayremoter/   get_runtime_contextget_node_idr$   r'   _register_sigint_handlergetrun
get_resultasyncio)r?   controller_init_kwargsrun_controller_as_actorcontroller_actor_cls
controllerr@   r@   rA   rK      s(   
z2DataParallelTrainer._initialize_and_run_controllerrr   c                    s$   d fdd}t  t j| dS )z=Register SIGINT handler so user Ctrl C gracefully aborts run.r   c                    st   t d d7 dkrt d td dkr8zt j  W d S  tjj	y7   td Y d S w d S )Nu   Received SIGINT. Gracefully aborting the training run — this may take a few seconds. To forcefully abort immediately, you can send a different signal, such as SIGKILL.      zGReceived SIGINT at least 3 times. Forcefully aborting the training run.r   )
loggerinfosysexitrf   rk   abortrg   
exceptionsActorDiedError)signumframerr   sigint_countr@   rA   sigint_handler   s    
zDDataParallelTrainer._register_sigint_handler.<locals>.sigint_handlerN)signalSIGINT)r?   rr   r   r@   r~   rA   rj      s   z,DataParallelTrainer._register_sigint_handlerc                 O      t t)z[Deprecated] Restores a Train experiment from a previously
        interrupted/failed run.

        This method is deprecated and will be removed in a future release.
        r=   r   clsargskwargsr@   r@   rA   restore      zDataParallelTrainer.restorec                 O   r   )z[Deprecated] Checks if a Train experiment can be restored from
        a previously interrupted/failed run.

        This method is deprecated and will be removed in a future release.
        r   r   r@   r@   rA   can_restore  r   zDataParallelTrainer.can_restore)__name__
__module____qualname____doc__r   r   r   r   r   r   r   strr   r   r   r   rB   r   rQ   r   r%   rM   rK   r
   r'   rj   classmethodr-   r   r   r@   r@   r@   rA   r0   ;   sN    	
(%4r0   )Prn   loggingr   rw   typingr   r   r   r   r   r   rf   ray._private.ray_constantsr   ray._private.usager	   	ray.actorr
   ray.air._internal.usager   	ray.trainr   r   r   r   r   r   ray.train.base_trainerr   r   ray.train.constantsr   r   ray.train.contextr    ray.train.v2._internal.callbacksr   r   r   r   )ray.train.v2._internal.callbacks.datasetsr   -ray.train.v2._internal.callbacks.env_callbackr   (ray.train.v2._internal.callbacks.metricsr   r   .ray.train.v2._internal.callbacks.state_managerr   .ray.train.v2._internal.callbacks.user_callbackr     ray.train.v2._internal.constantsr!   r"   r#   r$   )ray.train.v2._internal.execution.callbackr%   (ray.train.v2._internal.execution.contextr&   +ray.train.v2._internal.execution.controllerr'   1ray.train.v2._internal.execution.failure_handlingr(   /ray.train.v2._internal.execution.scaling_policyr)   ray.train.v2._internal.utilr*   r+   ray.train.v2.api.callbackr,   ray.util.annotationsr-   r.   ray.util.scheduling_strategiesr/   	getLoggerr   ru   r0   r@   r@   r@   rA   <module>   sB      
