o
    ciw                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlm  mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z&m'Z'm(Z(m)Z) d dl*m+Z+ d dl,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z<m=Z= edZ>e ?e@ZAG dd deBZCG dd deBZDeG dd dZEG dd dZFG dd deBZGG dd  d ZHd!eIfd"d#ZJdS )$    N)defaultdict)	dataclass)AnyCallableDictListOptionalTupleTypeTypeVar)HIP_VISIBLE_DEVICES_ENV_VAR)NEURON_RT_VISIBLE_CORES_ENV_VAR)!ASCEND_RT_VISIBLE_DEVICES_ENV_VAR)CUDA_VISIBLE_DEVICES_ENV_VAR)env_integer)Dataset)RayActorError)
Checkpoint
DataConfig)	TrialInfo_TrainingResultget_sessioninit_sessionshutdown_session)StorageContext)check_for_failure)WorkerGroup)BackendConfig)&ENABLE_DETAILED_AUTOFILLED_METRICS_ENV%ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV$ENABLE_SHARE_HIP_VISIBLE_DEVICES_ENV)ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV'ENABLE_SHARE_NPU_RT_VISIBLE_DEVICES_ENVRAY_TRAIN_ENABLE_STATE_TRACKINGTRAIN_ENABLE_WORKER_SPREAD_ENV#TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV)get_current_placement_groupremove_placement_groupTc                   @      e Zd ZdZdS )TrainBackendErrorz?Errors with BackendExecutor that should not be exposed to user.N__name__
__module____qualname____doc__ r0   r0   X/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/_internal/backend_executor.pyr*   .       r*   c                   @   r)   )TrainingWorkerErrorz)Raised if a worker fails during training.Nr+   r0   r0   r0   r1   r3   2   r2   r3   c                   @   s*   e Zd ZU dZeed< eed< eed< dS )ResourceConfiga  
    Resource configuration for resource_ids to share between workers.

    Args:
        resource_name: The name of the resource to configure
         (Example: "neuron_cores" or "gpu").
        resource_enable_sharing_env_var: The environment variable to
         check if the resource should be shared.
        share_resource_ids_env_var: The environment variable to configure for
         sharing the resources with other workers.
    resource_nameresource_enable_sharing_env_varshare_resource_ids_env_varN)r,   r-   r.   r/   str__annotations__r0   r0   r0   r1   r4   6   s
   
 r4   c                   @   s  e Zd ZdZ				dEdedee dedeee	e
f  d	ef
d
dZ				dFdeeg df  dee dee dee fddZdd Zdd Zde	de	fddZde	de	fddZdee fdd Z	dGd!eg ef d"ee	ef d#ee	ef d$ed%ed&ee ddfd'd(Zdeee  fd)d*Zd+d, Zd-d. Z 	/		dHd0e!d1ee d2ee	 fd3d4Z"d5d6 Z#dId8e!fd9d:Z$d;d< Z%d=d> Z&d?d@ Z'dAdB Z(dCdD Z)dS )JBackendExecutora  Main execution class for training backends.

    This class holds a worker group and is responsible for executing the
    training function on the workers, and collecting intermediate results
    from ``session.report()``.

    Args:
        backend_config: The configurations for this
            specific backend.
        num_workers: Number of workers to use for training.
        resources_per_worker (Optional[Dict[str, float]]):
            Dictionary specifying the resources that will be
            requested for each worker. Defaults to {"CPU": 1}.
        max_retries: Number of retries when Ray actors fail.
            Defaults to 3. Set to -1 for unlimited retries.
    N      backend_config
trial_infonum_workersresources_per_workermax_retriesc                 C   s   |d u r
ddi| _ n| | _ || _| | _|| _|| _| jdk r'td| _d| _d | _	d | _
d | _|| _t | _d | _ttjttttjttttjttg| _tt d | _tt d| _!d S )NCPUr;   r   inf  )"_resources_per_workercopy_backend_configbackend_cls_backend_num_workers_max_failuresfloat_num_failures_last_failure_initialization_hook_placement_group_trial_infoInactiveWorkerGroupworker_groupdataset_shardsr4   ray_constantsNEURON_CORESr!   r   NPUr"   r   GPUr    r   _resource_configsinttime_start_time_msr   r#   state_tracking_enabled)selfr=   r>   r?   r@   rA   r0   r0   r1   __init__[   sD   	



zBackendExecutor.__init__initialization_hook	train_clstrain_cls_argstrain_cls_kwargsc              
      s  |    | jpd}t| j| j||||d| _| jr| jjnd}| j| z\|r1|| _	| j
| ddlm  d f fdd}| j
|   ttt| jj}| jd	ddkra|ra|   | jD ]}	| |	j|	jrv| |	j|	j qd| j| j| j W n& ty }
 ztt |
 t!d
 | "  | #  W Y d}
~
nd}
~
ww | j$rddl%m&} ddl'm(} || d| _)dS dS )zStarts the worker group.default)r?   r@   	actor_clsactor_cls_argsactor_cls_kwargsplacement_groupNr   DataContextctxc                    s     |  d S N)_set_current)rk   ri   r0   r1   _set_driver_dataset_context      z:BackendExecutor.start.<locals>._set_driver_dataset_contextrX   zXFailure occurred during startup. Restarting all workers and attempting to startup again.)TrainRunStateManager)get_state_actor)state_actor)*_create_placement_grouprP   r   rJ   rE   rS   rQ   driver_node_id"sort_workers_by_node_id_and_gpu_idrO   executeray.datarj   get_currentboolr   r   rI   share_cuda_visible_devicesget_share_cuda_visible_devicesrY   _is_share_resources_enabledr5   r6   _share_resource_idsr7   on_startrG   r   logger	exceptionr8   warning_increment_failures_restartr]   ray.train._internal.staterp   %ray.train._internal.state.state_actorrq   state_manager)r^   r`   ra   rb   rc   rh   trial_driver_node_idrn   "share_cuda_visible_devices_enabledresource_configexcrp   rq   r0   ri   r1   start   st   

	zBackendExecutor.startc                    s   t  }tjjj}|j}|du p| }|rc fddt jD }tt	t
d}|r+dnd}tjj||d}td t	td	}	tj| g|	d
\}
}|
rStd ntdt |j| _dS dS )a  Creates a placement group if it does not exist.

        If a placement group is already detected (Tune) this will be a no-op.

        By default the placement group will be created with PACK strategy.
        This is optimized for colocating GPUs on a minimal number of nodes.
        This behavior can be overridden to use the SPREAD strategy by defining
        ``TRAIN_ENABLE_WORKER_SPREAD_ENV``

        If a placement group is created it will be stored as
        self._placement_group.
        Nc                    s   g | ]} j  qS r0   )rE   rF   ).0_r^   r0   r1   
<listcomp>   s    
z;BackendExecutor._create_placement_group.<locals>.<listcomp>r   SPREADPACK)strategyz%Waiting for placement group to start.d   )timeoutzPlacement group has started.a  Placement group creation timed out. Make sure your cluster either has enough resources or use an autoscaling cluster. If you are running on a cluster, make sure you specify an address in `ray.init()`, for example, `ray.init("auto")`. You can also increase the timeout by setting the TRAIN_PLACEMENT_GROUP_TIMEOUT_S environment variable. Current resources available: {}, resources requested by the placement group: {})r&   ray_privateworkerglobal_worker-should_capture_child_tasks_in_placement_grouprangerJ   ry   r   r$   utilrh   r   debugr%   waitreadyTimeoutErrorformatavailable_resourcesbundle_specsrP   )r^   current_placement_groupr   r   should_create_placement_groupbundles
use_spreadr   rh   r   r   r   r0   r   r1   rs      s6   





z'BackendExecutor._create_placement_groupc                 C   s   |  tjt dS )aF  Sets CUDA_VISIBLE_DEVICES on all workers.

        For each worker, CUDA_VISIBLE_DEVICES will be set to the GPU IDs
        visible to all workers on that worker's node.

        This allows GPU workers on the same node to communicate with one
        another.

        Example:

            Setup:
            - Node1:
                - Worker1: {0, 1}
                - Worker2: {2, 3}
            - Node2:
                - Worker3: {0, 1}

            CUDA_VISIBLE_DEVICES:
            - Worker1: "0,1,2,3"
            - Worker2: "0,1,2,3"
            - Worker3: "0,1"

        N)r~   rU   rX   r   r   r0   r0   r1   r|     s   z+BackendExecutor._share_cuda_visible_devicesresourceenv_varc                    s   fdd| j jD }tt}tt}t|D ]\}\}}|| | || | qg }	| D ]%\}}t|}d	|  fdd}
|| D ]}|	
| j ||
 qJq2t|	 dS )a  Sets the given env_var on all workers.

        For each worker, the cores/devices are visible to all the
        workers on that worker's node.This allows workers on the
        same node to communicate with one another.

        Example:

            Setup:
            - Node1:
                - Worker1: {0, 1}
                - Worker2: {2, 3}
            - Node2:
                - Worker3: {0, 1}

            NEURON_RT_VISIBLE_CORES/TPU_VISIBLE_CHIPS/...:
            - Worker1: "0,1,2,3"
            - Worker2: "0,1,2,3"
            - Worker2: "0,1"

        Args:
            resource: The name of the resource/accelerator.
            env_var: The name of the environment variable to set.
        c                    s    g | ]}|j j|j j  fqS r0   )metadatanode_idresource_ids)r   w)r   r0   r1   r   M  s    
z7BackendExecutor._share_resource_ids.<locals>.<listcomp>,c                      s    t j< d S rl   )osenvironr0   )all_resource_idsr   r0   r1   set_resource_ids`  ro   z=BackendExecutor._share_resource_ids.<locals>.set_resource_idsN)rS   workersr   set	enumerateaddupdateitemssortedjoinappendexecute_single_asyncr   r{   )r^   r   r   node_ids_and_resource_idsnode_id_to_worker_idnode_id_to_resource_ids	worker_idr   r   futuresr   r0   )r   r   r   r1   r~   4  s&   

z#BackendExecutor._share_resource_idsr5   enable_sharing_envc                 C   s"   | j |ddk}|ot|dS )a  Whether to share resource IDs on all workers
        based on enable_sharing_env.

        This will return true if resources are requested and greater than 0.
        Also, user can disable by configuring the `enable_sharing_env` to "0".

        Args:
            resource_name: The name of the resource/accelerator.
            enable_sharing_env: The name of the environment variable
                to check.
        r   T)rE   r{   rU   env_bool)r^   r5   r   has_resource_requestedr0   r0   r1   r}   i  s   z+BackendExecutor._is_share_resources_enabledreturnc           	         s   i  i }i i }d}t t}tt| jD ],}| jj| }|jj}||  |< ||  d7  < ||vr;|||< |d7 }|| |< qtt| jD ]}| jj| }|jj}|| ||< qId fddt	| jjD }t
d|   |fS )ao  Create rank and world size mappings for workers.
        There are three maps returned:
            - local_rank_map, which maps from worker world_rank to local_rank.
            - local_world_size_map, which maps from world_rank to local_world_size
            - node_rank_map, which maps from world rank to node rank

        Example:
            Worker 0: node 0
            Worker 1: node 0
            Worker 2: node 1
            Worker 3: node 0
            Worker 4: node 1

            Workers 0, 1, 3 are on node 0.
            Workers 2, 4 are on node 1.

            Expected local_rank_map:
            {
                0 -> 0,
                1 -> 1,
                2 -> 0,
                3 -> 2,
                4 -> 1
            }

            Expected local_world_size_map:
            {
                0 -> 3,
                1 -> 3,
                2 -> 2,
                3 -> 3,
                4 -> 2
            }

            Expected node_rank_map:
            {
                0 -> 0,
                1 -> 0,
                2 -> 1,
                3 -> 0,
                4 -> 1
            }

        r   r;   
c                    sL   g | ]"\}}d |j j d|j j d|j j d| d |  d|  qS )z- (node_id=z, ip=z, pid=z) world_rank=z, local_rank=z, node_rank=)r   r   node_ippid)r   ir   local_rank_mapnode_rank_mapr0   r1   r     s    zDBackendExecutor._create_rank_world_size_mappings.<locals>.<listcomp>z'Started distributed worker processes: 
)r   rZ   r   lenrS   r   r   r   r   r   r   info)	r^   local_world_size_mapnode_idsnode_cntnode_id_dict
world_rankr   r   workers_infor0   r   r1    _create_rank_world_size_mappingsz  s8   -

z0BackendExecutor._create_rank_world_size_mappings
train_funcdatasetsr   data_configstorage
checkpointc                    sX  t td  fdd}| jdu r/dd | jjD }dd | jjD }	|j|t| j||	d| _|  \}
}}g }tt| jD ]%}|	| jj
||||
| || || t| j| j|| j| |||d	 q?| j| j| j | | | jrdd
lm} tj }| jj| jj| jj| | || j| j|j| jg| j d	 dd }| j | dS )aW  Executes a training function on all workers in a separate thread.

        ``finish_training`` should be called after this.

        Args:
            train_func: The training function to run on each worker.
            datasets: The base datasets.
            data_config: The config object for creating dataset shards for workers.
            checkpoint: The checkpoint data that
                should be loaded onto each worker and accessed by the
                training function via ``session.get_checkpoint()``. If this
                is ``None`` then no checkpoint will be loaded.
        r   c                    s>   zt | ||||||||	| |
d W d S  ty   tdw )N)training_funcr   
local_rank	node_ranklocal_world_size
world_sizer>   dataset_shardr   r   detailed_autofilled_metricsr   zAttempting to start training but a previous training run is still ongoing. You must call `finish_training` before calling `start_training` again.)r   
ValueErrorr*   )r   r   r   r   r   r   r>   r   r   r   r   use_detailed_autofilled_metricsr0   r1   initialize_session  s(   z:BackendExecutor.start_training.<locals>.initialize_sessionNc                 S   s   g | ]}|j qS r0   )actorr   r   r0   r0   r1   r     s    z2BackendExecutor.start_training.<locals>.<listcomp>c                 S   s   g | ]}|j jqS r0   )r   r   r   r0   r0   r1   r     s    )r   worker_handlesworker_node_ids)r   r   r   r   r   r>   r   r   r   r   r   )RunStatusEnum)	run_idrun_namejob_idcontroller_actor_idr   rS   start_time_ms
run_status	resourcesc                  S   s   t  } |   d S rl   )r   r   sessionr0   r0   r1   train_asyncF  s   z3BackendExecutor.start_training.<locals>.train_async)!r   r   rT   rS   r   	configurer   r   r   r   r   rQ   rI   on_training_startrG   get_with_failure_handlingr]    ray.train._internal.state.schemar   r   runtime_contextget_runtime_contextr   register_train_runr   experiment_name
get_job_idget_actor_idr\   RUNNINGrE   rJ   execute_async)r^   r   r   r   r   r   r   r   actorsr   r   r   r   r   indexr   core_contextr   r0   r   r1   start_training  sn   
$

zBackendExecutor.start_trainingc                 C   sR   dd }| j |}| |}tdd |D r'tdd |D s%tddS |S )az  Fetches the next ``_TrainingResult`` from each worker.

        Each ``_TrainingResult`` is expected to correspond to the same step from
        each worker (e.g. the same call to ``train.report()``).

        Returns:
            A list of ``_TrainingResult``s or ``None`` if there are no more results
            since the training function has exited on all workers.
        c                  S   s.   t d} z|  }W |S  ty   tdw )Nget_next_resultszs`get_next_results` has been called before `start_training`. Please call `start_training` before `get_next_results`.)_get_sessionget_nextRuntimeErrorr*   )r   resultr0   r0   r1   r  W  s   

z2BackendExecutor.get_next_results.<locals>.get_nextc                 s       | ]}|d u V  qd S rl   r0   r   rr0   r0   r1   	<genexpr>k      z3BackendExecutor.get_next_results.<locals>.<genexpr>c                 s   r
  rl   r0   r  r0   r0   r1   r  m  r  zSome workers returned results while others didn't. Make sure that `session.report()` are called the same number of times on all workers.N)rS   r   r   anyallr  )r^   r  r   resultsr0   r0   r1   r  L  s   
z BackendExecutor.get_next_resultsc                 C   s"   dd }| j |}| | dS )zDisable workers from enqueuing results from ``session.report()``.

        Note: Already reported results may still be enqueued at this point,
              and should be handled appropriately.
        c                  S   s   t d} |  S )Npause_reporting)r  r  r   r0   r0   r1   pause_session_reporting  s   z@BackendExecutor.pause_reporting.<locals>.pause_session_reportingNrS   r   r   )r^   r  r   r0   r0   r1   r  z  s   zBackendExecutor.pause_reportingc                 C   s"   dd }| j |}| |}|S )at  Finish training and return final results. Propagate any exceptions.

        Blocks until training is finished on all workers.

        Assumes `start_training` has already been called.

        Returns:
            A list of return values from calling ``train_func`` on each worker.
                Each item corresponds to the return value from a single worker.
        c                  S   s&   t d} z
|  }W t  |S t  w )Nfinish_training)r  finishr   )r   outputr0   r0   r1   end_training  s   
z5BackendExecutor.finish_training.<locals>.end_trainingr  )r^   r  r   r  r0   r0   r1   r    s   
zBackendExecutor.finish_trainingFerroredfailed_rankstack_tracec                 C   s   | j rEddlm}m} |r,|j}d}|dur|d| d7 }|dur+||| d 7 }n|j}d}| jj| jj	||t
t d d dS dS )	zJReport the final train run status, error, and end time to TrainStateActor.r   )MAX_ERROR_STACK_TRACE_LENGTHr    NzRank z worker raised an error. 
rD   )r   r   status_detailend_time_ms)r]   r   r  r   ERROREDFINISHEDr   end_train_runrQ   r   rZ   r[   )r^   r  r  r  r  r   r   r  r0   r0   r1   report_final_run_status  s&   
z'BackendExecutor.report_final_run_statusc                 C   s>   t |\}}|rt|S || _|   td |   t)a  Gets the remote values while handling for worker failures.

        This method should be called instead of ``ray.get()`` directly in
        order to handle worker failures.

        If a worker failure is identified, backend specific failure handling
        is executed and a ``TrainingWorkerError`` is raised.

        Args:
            remote_values: List of object refs representing functions
                that may fail in the middle of execution. For example, running
                a Train training loop in multiple parallel actor calls.
        Returns:
            The resolved objects represented by the passed in ObjectRefs.
        zjFailure identified during training. Restarting all workers and continuing training from latest checkpoint.)	r   r   r{   rN   r   r   r   r   r3   )r^   remote_valuessuccessr   r0   r0   r1   r     s   
z)BackendExecutor.get_with_failure_handlingTgraceful_terminationc                 C   s~   |rz| j | j| j W n ty   td Y nw |r$| j  n| jjdd t | _| j	r:t
| j	 d| _	d| _dS )zShuts down the workers in the worker group.

        Args:
            graceful_termination: If set to True, attempt to clean up the backend
                before terminating the Ray actors.

        zXGraceful shutdown of backend failed. This is expected if one of the workers has crashed.r   )
patience_sN)rI   on_shutdownrS   rG   r   r   r   shutdownrR   rP   r'   rT   )r^   r&  r0   r0   r1   r)    s    

zBackendExecutor.shutdownc                 C   s   t | jt S rl   )
isinstancerS   rR   r   r0   r0   r1   
is_started  ro   zBackendExecutor.is_startedc                 C   sF   | j   | jd ur| j}nd }| jrt| j d | _| j|d d S )N)r`   )rS   r)  rO   rP   r'   r   )r^   r`   r0   r0   r1   r     s   


zBackendExecutor._restartc                 C   sV   |  j d7  _ | j | jkr)| j}d | _| jdkr'td| j  d}|d ||d S )Nr;   r   zTraining has failed after z
 attempts.)rM   rK   rN   r  with_traceback)r^   failurer   r0   r0   r1   r     s   
z#BackendExecutor._increment_failuresc                 C      | j S rl   )rS   r   r0   r0   r1   get_worker_group     z BackendExecutor.get_worker_groupc                 C   r.  rl   )rM   r   r0   r0   r1   _get_num_failures  r0  z!BackendExecutor._get_num_failures)Nr;   Nr<   )NNNNrl   )FNN)T)*r,   r-   r.   r/   r   r   r   rZ   r   r8   rL   r_   r   r
   r	   r   rs   r|   r~   r}   r   r   r(   r   r   r   r   r   r  r   r  r  r  ry   r#  r   r)  r+  r   r   r/  r1  r0   r0   r0   r1   r:   I   s    
9
T45Y



 .
 r:   c                   @   r)   )InactiveWorkerGroupErrorz0Raised when underlying worker group is inactive.Nr+   r0   r0   r0   r1   r2     r2   r2  c                   @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
rR   c                 C   s   t | S rl   )varsr   r0   r0   r1   __getstate__)  s   z InactiveWorkerGroup.__getstate__c                 C   s   t | | d S rl   )r3  r   )r^   stater0   r0   r1   __setstate__,  s   z InactiveWorkerGroup.__setstate__c                 C      t  rl   r2  )r^   namer0   r0   r1   __getattr__/  r0  zInactiveWorkerGroup.__getattr__c                 C   r7  rl   r8  r   r0   r0   r1   __len__2  r0  zInactiveWorkerGroup.__len__N)r,   r-   r.   r4  r6  r:  r;  r0   r0   r0   r1   rR   $  s
    rR   method_namec                 C   s$   t  }|std|  d|  d|S )N`zP` has been called before `start_training`. Please call `start_training` before `z`.)r   r*   )r<  r   r0   r0   r1   r  6  s   r  )Kloggingr   r[   collectionsr   dataclassesr   typingr   r   r   r   r   r	   r
   r   r   ray._private.ray_constantsr   rU   !ray._private.accelerators.amd_gpur    ray._private.accelerators.neuronr   ray._private.accelerators.npur   $ray._private.accelerators.nvidia_gpur   r   rw   r   ray.exceptionsr   	ray.trainr   r   ray.train._internal.sessionr   r   r   r   r   ray.train._internal.storager   ray.train._internal.utilsr    ray.train._internal.worker_groupr   ray.train.backendr   ray.train.constantsr   r   r    r!   r"   r#   r$   r%   ray.util.placement_groupr&   r'   r(   	getLoggerr,   r   	Exceptionr*   r3   r4   r:   r2  rR   r8   r  r0   r0   r0   r1   <module>   sL    ((

     \