o
    ci`                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlmZmZ d dlmZmZmZmZmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9 d dl:m;Z;m<Z< erd dlm=Z= d dl>m?Z? e@eAZBe
G dd dZCG dd dZDG dd dZEe4G dd dZFeG ZHeeI eJd< daKeeF eJd< deeLeMf dee< fd d!ZNdad#d$ZOd"eeF fd%d&ZPd'd( ZQd)d* ZRd+ee# d"e#fd,d-ZSd.e#d"dfd/d0ZTdbd1efd2d3ZUe5d4d5eU ddd6d7ed8ee! d9eeL d"dfd:d;ZVe5d4d5eU d"ee! fd<d=ZWe5d>d5eU d"eeLef fd?d@ZXe5d>d5eU d"eLfdAdBZYe5d>d5eU d"eLfdCdDZZe5d>d5eU d"eLfdEdFZ[e5dGd5eU d"eLfdHdIZ\e5d>d5eU dcdKdLZ]e5d>d5eU d"eLfdMdNZ^e5d>d5eUdOdPd"e_fdQdRZ`e5d>d5eUd dPd"e_fdSdTZae5d>d5eUd dPd"e_fdUdVZbe5d>d5eUd dPd"e_fdWdXZce5d>d5eUd dPd"e_fdYdZZde5d4d5eU 	dbd[eeL d"ed\ fd]d^Zee4eU d"e%fd_d`ZfdS )d    N)	dataclass)datetime)TYPE_CHECKINGAnyCallableDictOptionalSetType)RunnerThreadStartTraceback)_ERROR_FETCH_TIMEOUT_RESULT_FETCH_TIMEOUTSESSION_MISUSE_LOG_ONCE_KEYTIME_THIS_ITER_S	TIMESTAMP)Dataset)
Checkpoint)Accelerator)StorageContext)CHECKPOINT_DIR_NAMEDETAILED_AUTOFILLED_KEYSRAY_CHDIR_TO_TRIAL_DIRTIME_TOTAL_SWORKER_HOSTNAMEWORKER_NODE_IP
WORKER_PID_v2_migration_warnings_enabledSessionMisuseError)_log_deprecation_warning)DeveloperAPI	PublicAPI)log_once)_valid_resource_shape) PlacementGroupSchedulingStrategySchedulingStrategyT)DataIterator)PlacementGroupFactoryc                   @   sj   e Zd ZU dZeed< eed< eeef ed< eed< eed< eed< dZe	e ed	< dZ
e	e ed
< dS )	TrialInfoz3The trial information to propagate to TrainSession.nameid	resourceslogdir	driver_ipdriver_node_idNexperiment_namerun_id)__name__
__module____qualname____doc__str__annotations__r   floatr0   r   r1    r9   r9   O/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/_internal/session.pyr)   7   s   
 r)   c                   @   s8   e Zd ZdZdejfddZddeded fd	d
Z	dS )_FutureTrainingResultzA future that will be resolved to a `_TrainingResult`.

    This is needed for specific schedulers such as PBT that schedule saves.

    This wrapper should be removed after refactoring PBT to not schedule saves anymore.
    futurec                 C   s
   || _ d S N)r<   )selfr<   r9   r9   r:   __init__M   s   
z_FutureTrainingResult.__init__Tblockreturn_TrainingResultc              
   C   sj   |rd}nd}z	t j| j|dW S  ty   Y dS  ty4 } ztd|  W Y d}~dS d}~ww )zResolve into ``_TrainingResult``.

        This will return None for function trainables if no checkpoint has been
        saved before.
        Ng&.>timeoutzError resolving result: )raygetr<   TimeoutError	Exceptionloggererror)r>   r@   rD   excr9   r9   r:   resolveP   s   z_FutureTrainingResult.resolveN)T)
r2   r3   r4   r5   rE   	ObjectRefr?   boolr   rL   r9   r9   r9   r:   r;   E   s    r;   c                   @   s<   e Zd ZdZdee deeef fddZ	defddZ
d	S )
rB   z4A (checkpoint, metrics) result reported by the user.
checkpointmetricsc                 C   s   || _ || _d S r=   rO   rP   )r>   rO   rP   r9   r9   r:   r?   f   s   
z_TrainingResult.__init__rA   c                 C   s   d| j  d| j dS )NzTrainingResult(checkpoint=z
, metrics=)rQ   r>   r9   r9   r:   __repr__j   s   z_TrainingResult.__repr__N)r2   r3   r4   r5   r   r   r   r6   r   r?   rT   r9   r9   r9   r:   rB   c   s    rB   c                   @   s  e Zd ZdZ							dEdedee dee dee dee d	ee d
ee deee	e
f  dee	ef dee dedee defddZde	defddZde	defddZdd Zdd Z	dFded
edefddZd d! ZdFd"ee dee fd#d$Zdee fd%d&Zd'edefd(d)Zd'edefd*d+ZdGd,d-Zd.eddfd/d0ZdFd1edee ddfd2d3Ze de	fd4d5Z!e de	fd6d7Z"e de	fd8d9Z#e de	fd:d;Z$e dHd=d>Z%e de	fd?d@Z&	dFdAee	 dedB fdCdDZ'dS )I_TrainSessionz.Holds information for training on each worker.NFtraining_func
world_rank
local_rank	node_ranklocal_world_size
world_size
trial_infodataset_shardmetadatarO   detailed_autofilled_metricsstoragesynchronous_result_reportingc                 C   s   || _ || _|	| _|| _|| _|| _|| _|| _|sJ t	d| d|  | j
||||
d || _t | _d| _d| _|  | _d | _i | _d S )Nz StorageContext on SESSION (rank=z):
)rV   r\   r`   loaded_checkpointr   g        )ra   r]   r^   rW   rX   rY   rZ   r[   rI   debugresetr_   timelast_report_time	iteration
time_totalget_current_iplocal_ipaccelerator_state)r>   rV   rW   rX   rY   rZ   r[   r\   r]   r^   rO   r_   r`   ra   r9   r9   r:   r?   s   s.   


z_TrainSession.__init__keyrA   c                 C   s   | j |S r=   )rl   rF   )r>   rm   r9   r9   r:   	get_state   s   z_TrainSession.get_statevaluec                 C   s   || j |< d S r=   )rl   )r>   rm   ro   r9   r9   r:   	set_state   s   z_TrainSession.set_statec                 C   s   t j | _| jS r=   )rE   utilget_node_ip_addressrj   rS   r9   r9   r:   ri      s   z_TrainSession.get_current_ipc                 C   s   d| _ | j  dS )zStarts the training thread.TN)training_startedtraining_threadstartrS   r9   r9   r:   ru      s   z_TrainSession.startc                 C   s   t d| _t  | _td| _td| _t	|d| jd| _
|| _|| _|| _i | _d| _d| _d| _tj|jdd tttjtdrYtd|j  t|j d S d S )	Nr      T)targetdaemonerror_queueF)exist_ok1z#Changing the working directory to: )	threading	Semaphorecontinue_lockEvent
stop_eventqueueQueueresult_queuery   r   rt   r\   r`   rb   rl   ignore_reportrs   _first_reportosmakedirstrial_working_directoryrN   intenvironrF   r   rI   rc   chdir)r>   rV   r\   r`   rb   r9   r9   r:   rd      s*   

z_TrainSession.resetc                 C   s
   d| _ dS )z-Ignore all future ``session.report()`` calls.TN)r   rS   r9   r9   r:   pause_reporting   s   
z_TrainSession.pause_reportingrD   c                 C   s>   | j   | j  | jjdd d}| jr| jj|d}|S )zSFinishes the training thread.

        Raises any Exception from training.
        TforceNrC   )	r   setr~   releaser`   persist_artifactsrs   rt   join)r>   rD   outputr9   r9   r:   finish   s   

z_TrainSession.finishc                 C   s   | j std| jr| js| j  d| _d}|du r>| j r>z
| jj	dt
d}W n
 tjy4   Y nw |du r>| j s |du rWz
| jj	dt
d}W n
 tjyV   Y nw |du rb| jdd n
| j sltd | jst| j  |S )zGets the next ``_TrainingResult`` from the result queue.

        If the result queue is empty, then this function returns ``None``.
        z*Please call start before calling get_next.FNTr@   rD   r@   zVRunner error waiting to be raised in main thread. Logging all available results first.)rs   RuntimeErrorra   r   r~   r   rt   is_aliver   rF   r   r   Empty_report_thread_runner_errorry   emptyrI   rc   )r>   resultr9   r9   r:   get_next	  sB   

	


z_TrainSession.get_nextr   c              
   C   s   t   }t }t|v r|t }n|| j }|  jd7  _|  j|7  _|| _ttt 	|
 t| jtt tt t| ji}| jsLdd | D }| }|| |S )-Add autofilled metrics and update attributes.rv   c                 S   s   i | ]\}}|t vr||qS r9   )r   .0kvr9   r9   r:   
<dictcomp>]  s
    z4_TrainSession._auto_fill_metrics.<locals>.<dictcomp>)re   r   nowr   rf   rg   rh   r   r   mktime	timetupler   r   r   getpidr   platformnoder   rj   r_   itemscopyupdate)r>   r   current_timecurrent_datetimetime_this_iterauto_filled_metricsr9   r9   r:   _auto_fill_metricsH  s*   


z _TrainSession._auto_fill_metricsc                 C   s4   t  }ttt| i}| }|| |S )r   )	r   r   r   r   re   r   r   r   r   )r>   r   r   r   r9   r9   r:   _auto_fill_checkpoint_metricsg  s   
z+_TrainSession._auto_fill_checkpoint_metricsc                 C   s.   z| j j|td}t| tjy   Y d S w )Nr   )ry   rF   r   r   r   r   )r>   r@   er9   r9   r:   r   r  s   z)_TrainSession._report_thread_runner_errortraining_resultc                 C   sN   |j r|j | _| jj|dd | j  | j r%| j  t	
d dS dS )a  Place a training result on the result queue for the main thread to process,
        then block until the main thread signals that training should continue.

        NOTE: This is used internally to report results from Train to Tune
        without persisting checkpoints to storage 2 times.
        `report` is the public API that directly persists to storage, which
        should only be called by user code.
        Tr   r   N)rO   rb   r   putr~   acquirer   is_setclearsysexit)r>   r   r9   r9   r:   _report_training_resulty  s   	


z%_TrainSession._report_training_resultrP   c           
      C   s   dt jv rddlm} ||rtd| jrd S | |}d }|r4| j| | j	|}| jj
|t< nd |t< |o>| jjj}| jj|d |rf| jrf| }| j D ]\}}||vr`|||< qT|| t||d}	| |	 d S )Ntorchr   )contains_tensorzPassing objects containg Torch tensors as metrics is not supported as it will throw an exception on deserialization. You can either convert the tensors to Python objects or report a `train.Checkpoint` with `ray.train.report` to store your Torch objects.r   rQ   )r   modulesray.air._internal.torch_utilsr   
ValueErrorr   r   r`   _update_checkpoint_indexpersist_current_checkpointcheckpoint_dir_namer   sync_configsync_artifacts_on_checkpointr   r^   get_metadatar   set_metadatarB   r   )
r>   rP   rO   r   persisted_checkpointforce_artifact_syncuser_metadatar   r   r   r9   r9   r:   report  s8   



z_TrainSession.reportc                 C      | j jS r=   )r\   r0   rS   r9   r9   r:   r0        z_TrainSession.experiment_namec                 C   r   r=   )r\   r*   rS   r9   r9   r:   
trial_name  r   z_TrainSession.trial_namec                 C   r   r=   )r\   r+   rS   r9   r9   r:   trial_id  r   z_TrainSession.trial_idc                 C   r   r=   )r\   r1   rS   r9   r9   r:   r1     r   z_TrainSession.run_idr(   c                 C   r   r=   )r\   r,   rS   r9   r9   r:   trial_resources  r   z_TrainSession.trial_resourcesc                 C   r   r=   )r\   r-   rS   r9   r9   r:   	trial_dir  r   z_TrainSession.trial_dirdataset_namer'   c                 C   s@   | j }|d u rtd |S t|tr|std||S |S )NziNo dataset passed in. Returning None. Make sure to pass in a Dataset to Trainer.run to use this function.zMultiple datasets were passed into ``Trainer``, but no ``dataset_name`` is passed into ``get_dataset_shard``. Please specify which dataset shard to retrieve.)r]   warningswarn
isinstancedictr   rF   )r>   r   shardr9   r9   r:   get_dataset_shard  s   

z_TrainSession.get_dataset_shard)NNNNFNFr=   )FrA   r(   )(r2   r3   r4   r5   r   r   r   r)   r   r6   r   r   r   rN   r   r?   rn   rp   ri   ru   rd   r   r8   r   rB   r   r   r   r   r   r   r   propertyr0   r   r   r1   r   r   r   r9   r9   r9   r:   rU   o   s    
	


>

.?
3rU   _checked_resources_sessionr,   strategyc           
      C   s  t dd | D }|r|tv rdS t|tr|jdu rdS tj }|r-|jj	|j	kr/dS t
| t }|jrB|jdd }n|jdd }t||rPdS | jrcd}| jd | j d | j }n
d}| jd | j }|jd }	d	d
 | D }td| d| d|	 d| d| d)zLaunch hook to catch nested tasks that can't fit in the placement group.

    This gives users a nice warning in case they launch a nested task in a Tune trial
    without reserving resources in the trial placement group to fit it.
    c                 S   s    h | ]\}}|d kr||fqS r   r9   r   r9   r9   r:   	<setcomp>	  s     z3_tune_task_and_actor_launch_hook.<locals>.<setcomp>Nr   rv   actor.taskc                 S   s"   i | ]\}}|d kr|t |qS r   )r8   r   r9   r9   r:   r   0  s   " z4_tune_task_and_actor_launch_hook.<locals>.<dictcomp>z3No trial resources are available for launching the z `zg`. To resolve this, specify the Tune option:

>  resources_per_trial=tune.PlacementGroupFactory(
>    [z] + [zB] * N
>  )

Where `N` is the number of slots to reserve for trial zs. If you are using a Ray training library, there might be a utility function to set this automatically for you. For more information, refer to https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html)	frozensetr   r   r   r%   placement_grouprE   rq   get_current_placement_groupr+   addget_trial_resourceshead_bundle_is_emptybundle_specsr$   
class_namemodule_namefunction_namer   )
fnr,   r   rm   cur_pgpgfavailable_bundles	submittedr*   main_resourcesr9   r9   r:    _tune_task_and_actor_launch_hook  sB   





r   rA   c                  O   sD   t rtdddlm}m} dtjvrt|_t|_	t
| i |a d S )NzIA Train session is already in use. Do not call `init_session()` manually.r   )r   remote_functionTUNE_DISABLE_RESOURCE_CHECKS)r   r   rE   r   r   r   r   r   _actor_launch_hook_task_launch_hookrU   )argskwargsr   r   r9   r9   r:   init_session?  s   
r   c                   C   s   t S r=   r   r9   r9   r9   r:   get_sessionQ  s   r   c                   C   s   da dS )z#Shuts down the initialized session.Nr   r9   r9   r9   r:   shutdown_sessionU  s   r   c                   C   s   t d)zKRaises a SessionMisuseError because a utility function was used improperly.zjprepare/accelerate utility functions should be called inside a training function executed by `Trainer.run`r   r9   r9   r9   r:   !_raise_accelerator_session_misuse[  s   r   default_accelerator_clsc                 C   s,   t  }|du r
t  |jdu r|  |_|jS )zThe accelerator for this training session.

    If an accelerator has not been set, then this method will construct an
    accelerator using the provided accelerator class.

    Raises:
        SessionMisuseError: if the session is uninitialized.
    N)r   r   rk   )r   sessionr9   r9   r:   get_acceleratorc  s   	
r  rk   c                 C   s0   t  }|du r
t  |jdurtd| |_dS )a   Sets the accelerator for this training session.

    Args:
        accelerator: The accelerator to use for training.

    Raises:
        SessionMisuseError: if the session is unitialized.
        RuntimeError: if the accelerator has already been set.
    Nz#Cannot change accelerator once set.)r   r   rk   r   )rk   r   r9   r9   r:   set_acceleratort  s   


r  default_valuec                    s   dt f fdd}|S )zKWarns if fn is being used outside of session and returns ``default_value``.r   c                    s$    j t  fdd}|S )Nc                     sF   t  }|stt d rtd d  d  S | i |S )N-`zb` is meant to only be called inside a function that is executed by a Tuner or Trainer. Returning `z`.)r   r#   r   r   r   )r   r   r   )r  r   fn_namer9   r:   wrapper  s   z4_warn_session_misuse.<locals>.inner.<locals>.wrapper)r2   	functoolswraps)r   r  r  )r   r  r:   inner  s   z#_warn_session_misuse.<locals>.inner)r   )r  r  r9   r
  r:   _warn_session_misuse  s   r  stable)	stability)rO   r   rP   rO   r   c                C   s^   |dur	t d ddlm} | r%ddl}t rtd |jj| |dS t	 j| |d dS )a  Report metrics and optionally save a checkpoint.

    If a checkpoint is provided, it will be
    :ref:`persisted to storage <persistent-storage-guide>`.

    If this is called in multiple distributed training workers:

    - Only the metrics reported by the rank 0 worker will be tracked by Ray Train.
      See :ref:`the metrics logging guide <train-monitoring-and-logging>`.
    - A checkpoint will be registered as long as one or more workers reports
      checkpoint that is not None.
      See the :ref:`checkpointing guide <train-dl-saving-checkpoints>`.
    - Checkpoints from multiple workers will be merged into one directory
      in persistent storage.
      See :ref:`the distributed checkpointing guide <train-distributed-checkpointing>`.

    .. note::

        Each invocation of this method will automatically increment the underlying
        ``training_iteration`` number. The physical meaning of this "iteration" is
        defined by user depending on how often they call ``report``.
        It does not necessarily map to one epoch.

    .. warning::

        All workers must call `ray.train.report` the same number of times
        so that Ray Train can properly synchronize the training state across
        workers. Otherwise, your training will hang.

    .. warning::

        This method does NOT act as a barrier for distributed training workers.
        Workers will upload their checkpoint, then continue training immediately.
        If you need to synchronize workers, you can use a framework-native barrier
        such as `torch.distributed.barrier()`.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...
                       # torch.save(...)

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

                        # Example: Only the rank 0 worker uploads the checkpoint.
                        if ray.train.get_context().get_world_rank() == 0:
                            train.report(metrics, checkpoint=checkpoint)
                        else:
                            train.report(metrics, checkpoint=None)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Args:
        metrics: The metrics you want to report.
        checkpoint: The optional checkpoint you want to report.
    Nz`checkpoint_dir_name` is only supported in the new Ray Train implementation, which can be enabled with `RAY_TRAIN_V2_ENABLED=1`. This argument will be ignored.r   _in_tune_sessionz`ray.train.report` should be switched to `ray.tune.report` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)rO   )
rI   warning%ray.tune.trainable.trainable_fn_utilsr  ray.tuner   r    tuner   r   )rP   rO   r   r  rE   r9   r9   r:   r     s   Xr   c                  C   s:   ddl m}  |  rddl}t rtd |j S t jS )a  Access the latest reported checkpoint to resume from if one exists.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
                        train.report(metrics, checkpoint=checkpoint)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Returns:
        Checkpoint object if the session is currently being resumed.
            Otherwise, return None.
    r   r  Nz`ray.train.get_checkpoint` should be switched to `ray.tune.get_checkpoint` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)	r  r  r  r   r    r  get_checkpointr   rb   )r  rE   r9   r9   r:   r    s   ,
r  betac                   C      t  jS )z5User metadata dict passed to the Trainer constructor.)r   r^   r9   r9   r9   r:   r   N     r   c                   C   r  )z,Experiment name for the corresponding trial.)r   r0   r9   r9   r9   r:   get_experiment_nameU  r  r  c                   C   r  )z'Trial name for the corresponding trial.)r   r   r9   r9   r9   r:   get_trial_name\  r  r  c                   C   r  )z%Trial id for the corresponding trial.)r   r   r9   r9   r9   r:   get_trial_idc  r  r  alphac                   C   r  )z0Unique Train Run id for the corresponding trial.)r   r1   r9   r9   r9   r:   
get_run_idj  r  r  r(   c                   C   r  )z,Trial resources for the corresponding trial.)r   r   r9   r9   r9   r:   r   q  r  r   c                   C   r  )a  Log directory corresponding to the trial directory for a Tune session.
    If calling from a Train session, this will give the trial directory of its parent
    Tune session.

    .. testcode::

        import ray.tune

        def train_func(config):
            print(ray.tune.get_context().get_trial_dir())

        tuner = ray.tune.Tuner(train_func)
        tuner.fit()

    .. testoutput::
        :options: +MOCK

        /Users/root/ray_results/train_func_2023-07-19_15-01-37/train_func_d620c_00000_0_2023-07-19_15-01-40
    )r   r   r9   r9   r9   r:   get_trial_dirx  s   r  rv   r
  c                  C      t  } t| dstd| jS )a  Get the current world size (i.e. total number of workers) for this run.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        NUM_WORKERS = 2

        def train_loop_per_worker(config):
            assert train.get_context().get_world_size() == NUM_WORKERS

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=NUM_WORKERS),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    r[   z`get_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   hasattrr   r[   r   r9   r9   r:   get_world_size  s   
r"  c                  C   r  )a  Get the world rank of this worker.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        def train_loop_per_worker(config):
            if train.get_context().get_world_rank() == 0:
                print("Worker 0")

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rW   z`get_world_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r   r   rW   r!  r9   r9   r:   get_world_rank     
r#  c                  C   r  )a  Get the local rank of this worker (rank of the worker on its node).

    .. testcode::

        import torch

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            if torch.cuda.is_available():
                torch.cuda.set_device(train.get_context().get_local_rank())
            ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rX   z`get_local_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r   r   rX   r!  r9   r9   r:   get_local_rank  s    
r%  c                  C   r  )a  Get the local world size of this node (i.e. number of workers on this node).

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_local_world_size())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    rZ   z`get_local_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r   r   rZ   r!  r9   r9   r:   get_local_world_size
  r$  r&  c                  C   r  )a  Get the rank of this node.

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_node_rank())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    rY   z`get_node_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r   r   rY   r!  r9   r9   r:   get_node_rank1  r$  r'  r   r'   c                 C   s"   t  }t|dstd|| S )a?  Returns the :class:`ray.data.DataIterator` shard for this worker.

    Call :meth:`~ray.data.DataIterator.iter_torch_batches` or
    :meth:`~ray.data.DataIterator.to_tf` on this shard to convert it to the
    appropriate framework-specific data type.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            ...
            for epoch in range(2):
                # Trainer will automatically handle sharding.
                data_shard = train.get_dataset_shard("train")
                for batch in data_shard.iter_torch_batches():
                    ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...

    Args:
        dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then
            specifies which dataset shard to return.

    Returns:
        The ``DataIterator`` shard to use for this worker.
        If no dataset is passed into Trainer, then return None.
    r   z`get_dataset_shard` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r   r   r   r   )r   r   r9   r9   r:   r   X  s   /

r   c                   C   r  )a2  Returns the :class:`~ray.train._internal.storage.StorageContext` storage
    context which gives advanced access to the filesystem and paths
    configured through `RunConfig`.

    NOTE: This is a developer API, and the `StorageContext` interface may change
    without notice between minor versions.
    )r   r`   r9   r9   r9   r:   get_storage  s   
r(  )rA   Nr=   r   )gr  loggingr   r   r   r   r|   re   r   dataclassesr   r   typingr   r   r   r   r   r	   r
   rE   ray.air._internal.utilr   r   ray.air.constantsr   r   r   r   r   ray.datar   	ray.trainr   ray.train._internal.acceleratorr   ray.train._internal.storager   ray.train.constantsr   r   r   r   r   r   r   r   ray.train.errorr   ray.train.utilsr    ray.util.annotationsr!   r"   ray.util.debugr#   ray.util.placement_groupr$   ray.util.scheduling_strategiesr%   r&   r'   #ray.tune.execution.placement_groupsr(   	getLoggerr2   rI   r)   r;   rB   rU   r   r   r   r7   r   r6   r8   r   r   r   r   r   r  r  r  r   r  r   r  r  r  r  r   r  r   r"  r#  r%  r&  r'  r   r(  r9   r9   r9   r:   <module>   s   
 $(

   


@p<&%(%%7