o
    $i                     @   s  U d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlmZmZ d dlmZmZmZmZmZ d dlmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0 d dl1m2Z2 d dl3mZ4 d dl5m6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z=m>Z> erd dlm?Z? d dl@mAZA eBeCZDe
G dd dZEG dd dZFG dd dZGe6G dd dZHeI ZJeeK eLd< daMeeH eLd< deeNeOf d ee> fd!d"ZPddd$d%ZQd#eeH fd&d'ZRd(d) ZSd*d+ ZTd,ee# d#e#fd-d.ZUd/e#d#dfd0d1ZVded2efd3d4ZWe7d5d6eW ddd7d8ed9ee! d:eeN d#dfd;d<ZXe7d5d6eW d#ee! fd=d>ZYe7d?d6eW d#eeNef fd@dAZZe7d?d6eW d#eNfdBdCZ[e7d?d6eW d#eNfdDdEZ\e7d?d6eW d#eNfdFdGZ]e7dHd6eW d#eNfdIdJZ^e7d?d6eW dfdLdMZ_e7d?d6eW d#eNfdNdOZ`e7d?d6eWdPdQd#eafdRdSZbe7d?d6eWd dQd#eafdTdUZce7d?d6eWd dQd#eafdVdWZde7d?d6eWd dQd#eafdXdYZee7d?d6eWd dQd#eafdZd[Zfe7d5d6eW 	ded\eeN d#ed] fd^d_Zge6eW d#e%fd`daZhd#eifdbdcZjdS )g    N)	dataclass)datetime)TYPE_CHECKINGAnyCallableDictOptionalSetType)RunnerThreadStartTraceback)_ERROR_FETCH_TIMEOUT_RESULT_FETCH_TIMEOUTSESSION_MISUSE_LOG_ONCE_KEYTIME_THIS_ITER_S	TIMESTAMP)Dataset)
Checkpoint)Accelerator)StorageContext)CHECKPOINT_DIR_NAMEDETAILED_AUTOFILLED_KEYSRAY_CHDIR_TO_TRIAL_DIRTIME_TOTAL_SWORKER_HOSTNAMEWORKER_NODE_IP
WORKER_PID_v2_migration_warnings_enabledSessionMisuseError)_log_deprecation_warning)queue)DeveloperAPI	PublicAPI)log_once)_valid_resource_shape) PlacementGroupSchedulingStrategySchedulingStrategyT)DataIterator)PlacementGroupFactoryc                   @   sj   e Zd ZU dZeed< eed< eeef ed< eed< eed< eed< dZe	e ed	< dZ
e	e ed
< dS )	TrialInfoz3The trial information to propagate to TrainSession.nameid	resourceslogdir	driver_ipdriver_node_idNexperiment_namerun_id)__name__
__module____qualname____doc__str__annotations__r   floatr1   r   r2    r:   r:   X/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/train/_internal/session.pyr*   8   s   
 r*   c                   @   s8   e Zd ZdZdejfddZddeded fd	d
Z	dS )_FutureTrainingResultzA future that will be resolved to a `_TrainingResult`.

    This is needed for specific schedulers such as PBT that schedule saves.

    This wrapper should be removed after refactoring PBT to not schedule saves anymore.
    futurec                 C   s
   || _ d S N)r=   )selfr=   r:   r:   r;   __init__N   s   
z_FutureTrainingResult.__init__Tblockreturn_TrainingResultc              
   C   sj   |rd}nd}z	t j| j|dW S  ty   Y dS  ty4 } ztd|  W Y d}~dS d}~ww )zResolve into ``_TrainingResult``.

        This will return None for function trainables if no checkpoint has been
        saved before.
        Ng&.>timeoutzError resolving result: )raygetr=   TimeoutError	Exceptionloggererror)r?   rA   rE   excr:   r:   r;   resolveQ   s   z_FutureTrainingResult.resolveN)T)
r3   r4   r5   r6   rF   	ObjectRefr@   boolr   rM   r:   r:   r:   r;   r<   F   s    r<   c                   @   s<   e Zd ZdZdee deeef fddZ	defddZ
d	S )
rC   z4A (checkpoint, metrics) result reported by the user.
checkpointmetricsc                 C   s   || _ || _d S r>   rP   rQ   )r?   rP   rQ   r:   r:   r;   r@   g   s   
z_TrainingResult.__init__rB   c                 C   s   d| j  d| j dS )NzTrainingResult(checkpoint=z
, metrics=)rR   r?   r:   r:   r;   __repr__k   s   z_TrainingResult.__repr__N)r3   r4   r5   r6   r   r   r   r7   r   r@   rU   r:   r:   r:   r;   rC   d   s    rC   c                   @   s  e Zd ZdZ							dJdedee dee dee dee d	ee d
ee deee	e
f  dee	ef dee dedee defddZde	defddZde	defddZdd Zdd Z	dKded
edefddZd d! ZdKd"ee dee fd#d$Zdee fd%d&Zd'd( Zd)edee fd*d+Zd,edefd-d.Zd,edefd/d0ZdLd1d2Zd3eddfd4d5Z dKd6edee ddfd7d8Z!e"de	fd9d:Z#e"de	fd;d<Z$e"de	fd=d>Z%e"de	fd?d@Z&e"dMdBdCZ'e"de	fdDdEZ(	dKdFee	 dedG fdHdIZ)dS )N_TrainSessionz.Holds information for training on each worker.NFtraining_func
world_rank
local_rank	node_ranklocal_world_size
world_size
trial_infodataset_shardmetadatarP   detailed_autofilled_metricsstoragesynchronous_result_reportingc                 C   s   || _ || _|	| _|| _|| _|| _|| _|| _|sJ t	d| d|  | j
||||
d || _t | _d| _d| _|  | _d | _i | _d S )Nz StorageContext on SESSION (rank=z):
)rW   r]   ra   loaded_checkpointr   g        )rb   r^   r_   rX   rY   rZ   r[   r\   rJ   debugresetr`   timelast_report_time	iteration
time_totalget_current_iplocal_ipaccelerator_state)r?   rW   rX   rY   rZ   r[   r\   r]   r^   r_   rP   r`   ra   rb   r:   r:   r;   r@   t   s.   


z_TrainSession.__init__keyrB   c                 C   s   | j |S r>   )rm   rG   )r?   rn   r:   r:   r;   	get_state   s   z_TrainSession.get_statevaluec                 C   s   || j |< d S r>   )rm   )r?   rn   rp   r:   r:   r;   	set_state   s   z_TrainSession.set_statec                 C   s   t j | _| jS r>   )rF   utilget_node_ip_addressrk   rT   r:   r:   r;   rj      s   z_TrainSession.get_current_ipc                 C   s   d| _ | j  dS )zStarts the training thread.TN)training_startedtraining_threadstartrT   r:   r:   r;   rv      s   z_TrainSession.startc                 C   s   t d| _t  | _td| _d | _td| _	t
|d| j	d| _|| _|| _|| _i | _d| _d| _d| _tj|jdd tttjtdr\td|j  t|j d S d S )	Nr      T)targetdaemonerror_queueF)exist_ok1z#Changing the working directory to: )	threading	Semaphorecontinue_lockEvent
stop_eventr!   Queueresult_queue_inter_actor_queuerz   r   ru   r]   ra   rc   rm   ignore_reportrt   _first_reportosmakedirstrial_working_directoryrO   intenvironrG   r   rJ   rd   chdir)r?   rW   r]   ra   rc   r:   r:   r;   re      s,   

z_TrainSession.resetc                 C   s
   d| _ dS )z-Ignore all future ``session.report()`` calls.TN)r   rT   r:   r:   r;   pause_reporting   s   
z_TrainSession.pause_reportingrE   c                 C   s>   | j   | j  | jjdd d}| jr| jj|d}|S )zSFinishes the training thread.

        Raises any Exception from training.
        TforceNrD   )	r   setr   releasera   persist_artifactsrt   ru   join)r?   rE   outputr:   r:   r;   finish   s   

z_TrainSession.finishc                 C   s   | j std| jr| js| j  d| _d}|du r/| j r/| jdd}|du r/| j s |du r9| jdd}|du rD| j	dd n
| j
 sNtd | jsV| j  |S )zGets the next ``_TrainingResult`` from the result queue.

        If the result queue is empty, then this function returns ``None``.
        z*Please call start before calling get_next.FNTrA   zVRunner error waiting to be raised in main thread. Logging all available results first.)rt   RuntimeErrorrb   r   r   r   ru   is_alive_get_result_from_queues_report_thread_runner_errorrz   emptyrJ   rd   )r?   resultr:   r:   r;   get_next  s*   


z_TrainSession.get_nextc                 C   s$   | j du rtjdddid| _ | j S )z$Get or create the inter-actor queue.Nrw   num_cpusr   )actor_options)r   	ray_queuer   rT   r:   r:   r;    _get_or_create_inter_actor_queueB  s   
z._TrainSession._get_or_create_inter_actor_queuerA   c                 C   s~   d}| j dur(z| j j|td}|r| j  | | W n
 tjy'   Y nw z| jj|td}W |S  t	jy>   Y |S w )zUGet result from result queue. Pass result from training actor result queue if needed.NrA   rE   )
r   rG   r   r   r   reportr   Emptyr   r!   )r?   rA   r   inter_actor_itemr:   r:   r;   r   H  s(   


z%_TrainSession._get_result_from_queuesr   c              
   C   s   t   }t }t|v r|t }n|| j }|  jd7  _|  j|7  _|| _ttt 	|
 t| jtt tt t| ji}| jsLdd | D }| }|| |S )-Add autofilled metrics and update attributes.rw   c                 S   s   i | ]\}}|t vr||qS r:   )r   .0kvr:   r:   r;   
<dictcomp>q  s
    z4_TrainSession._auto_fill_metrics.<locals>.<dictcomp>)rf   r   nowr   rg   rh   ri   r   r   mktime	timetupler   r   r   getpidr   platformnoder   rk   r`   itemscopyupdate)r?   r   current_timecurrent_datetimetime_this_iterauto_filled_metricsr:   r:   r;   _auto_fill_metrics\  s*   


z _TrainSession._auto_fill_metricsc                 C   s4   t  }ttt| i}| }|| |S )r   )	r   r   r   r   rf   r   r   r   r   )r?   r   r   r   r:   r:   r;   _auto_fill_checkpoint_metrics{  s   
z+_TrainSession._auto_fill_checkpoint_metricsc                 C   s.   z| j j|td}t| tjy   Y d S w )Nr   )rz   rG   r   r   r!   r   )r?   rA   er:   r:   r;   r     s   z)_TrainSession._report_thread_runner_errortraining_resultc                 C   sN   |j r|j | _| jj|dd | j  | j r%| j  t	
d dS dS )a  Place a training result on the result queue for the main thread to process,
        then block until the main thread signals that training should continue.

        NOTE: This is used internally to report results from Train to Tune
        without persisting checkpoints to storage 2 times.
        `report` is the public API that directly persists to storage, which
        should only be called by user code.
        Tr   r   N)rP   rc   r   putr   acquirer   is_setclearsysexit)r?   r   r:   r:   r;   _report_training_result  s   	


z%_TrainSession._report_training_resultrQ   c           
      C   s   dt jv rddlm} ||rtd| jrd S | |}d }|r4| j| | j	|}| jj
|t< nd |t< |o>| jjj}| jj|d |rf| jrf| }| j D ]\}}||vr`|||< qT|| t||d}	| |	 d S )Ntorchr   )contains_tensorzPassing objects containg Torch tensors as metrics is not supported as it will throw an exception on deserialization. You can either convert the tensors to Python objects or report a `train.Checkpoint` with `ray.train.report` to store your Torch objects.r   rR   )r   modulesray.air._internal.torch_utilsr   
ValueErrorr   r   ra   _update_checkpoint_indexpersist_current_checkpointcheckpoint_dir_namer   sync_configsync_artifacts_on_checkpointr   r_   get_metadatar   set_metadatarC   r   )
r?   rQ   rP   r   persisted_checkpointforce_artifact_syncuser_metadatar   r   r   r:   r:   r;   r     s8   



z_TrainSession.reportc                 C      | j jS r>   )r]   r1   rT   r:   r:   r;   r1        z_TrainSession.experiment_namec                 C   r   r>   )r]   r+   rT   r:   r:   r;   
trial_name  r   z_TrainSession.trial_namec                 C   r   r>   )r]   r,   rT   r:   r:   r;   trial_id  r   z_TrainSession.trial_idc                 C   r   r>   )r]   r2   rT   r:   r:   r;   r2     r   z_TrainSession.run_idr)   c                 C   r   r>   )r]   r-   rT   r:   r:   r;   trial_resources  r   z_TrainSession.trial_resourcesc                 C   r   r>   )r]   r.   rT   r:   r:   r;   	trial_dir  r   z_TrainSession.trial_dirdataset_namer(   c                 C   s@   | j }|d u rtd |S t|tr|std||S |S )NziNo dataset passed in. Returning None. Make sure to pass in a Dataset to Trainer.run to use this function.zMultiple datasets were passed into ``Trainer``, but no ``dataset_name`` is passed into ``get_dataset_shard``. Please specify which dataset shard to retrieve.)r^   warningswarn
isinstancedictr   rG   )r?   r   shardr:   r:   r;   get_dataset_shard  s   

z_TrainSession.get_dataset_shard)NNNNFNFr>   )FrB   r)   )*r3   r4   r5   r6   r   r   r   r*   r   r7   r   r   r   rO   r   r@   ro   rq   rj   rv   re   r   r9   r   rC   r   r   r   r   r   r   r   r   r   propertyr1   r   r   r2   r   r   r   r:   r:   r:   r;   rV   p   s    
	


>

15
3rV   _checked_resources_sessionr-   strategyc           
      C   s  t dd | D }|r|tv rdS t|tr|jdu rdS tj }|r-|jj	|j	kr/dS t
| t }|jrB|jdd }n|jdd }t||rPdS | jrcd}| jd | j d | j }n
d}| jd | j }|jd }	d	d
 | D }td| d| d|	 d| d| d)zLaunch hook to catch nested tasks that can't fit in the placement group.

    This gives users a nice warning in case they launch a nested task in a Tune trial
    without reserving resources in the trial placement group to fit it.
    c                 S   s    h | ]\}}|d kr||fqS r   r:   r   r:   r:   r;   	<setcomp>  s     z3_tune_task_and_actor_launch_hook.<locals>.<setcomp>Nr   rw   actor.taskc                 S   s"   i | ]\}}|d kr|t |qS r   )r9   r   r:   r:   r;   r   D  s   " z4_tune_task_and_actor_launch_hook.<locals>.<dictcomp>z3No trial resources are available for launching the z `zg`. To resolve this, specify the Tune option:

>  resources_per_trial=tune.PlacementGroupFactory(
>    [z] + [zB] * N
>  )

Where `N` is the number of slots to reserve for trial zs. If you are using a Ray training library, there might be a utility function to set this automatically for you. For more information, refer to https://docs.ray.io/en/latest/tune/tutorials/tune-resources.html)	frozensetr   r   r   r&   placement_grouprF   rr   get_current_placement_groupr,   addget_trial_resourceshead_bundle_is_emptybundle_specsr%   
class_namemodule_namefunction_namer   )
fnr-   r   rn   cur_pgpgfavailable_bundles	submittedr+   main_resourcesr:   r:   r;    _tune_task_and_actor_launch_hook  sB   





r   rB   c                  O   sD   t rtdddlm}m} dtjvrt|_t|_	t
| i |a d S )NzIA Train session is already in use. Do not call `init_session()` manually.r   )r   remote_functionTUNE_DISABLE_RESOURCE_CHECKS)r   r   rF   r   r   r   r   r   _actor_launch_hook_task_launch_hookrV   )argskwargsr   r   r:   r:   r;   init_sessionS  s   
r  c                   C   s   t S r>   r   r:   r:   r:   r;   get_sessione  s   r  c                   C   s   da dS )z#Shuts down the initialized session.Nr  r:   r:   r:   r;   shutdown_sessioni  s   r  c                   C   s   t d)zKRaises a SessionMisuseError because a utility function was used improperly.zjprepare/accelerate utility functions should be called inside a training function executed by `Trainer.run`r   r:   r:   r:   r;   !_raise_accelerator_session_misuseo  s   r  default_accelerator_clsc                 C   s,   t  }|du r
t  |jdu r|  |_|jS )zThe accelerator for this training session.

    If an accelerator has not been set, then this method will construct an
    accelerator using the provided accelerator class.

    Raises:
        SessionMisuseError: if the session is uninitialized.
    N)r  r  rl   )r  sessionr:   r:   r;   get_acceleratorw  s   	
r  rl   c                 C   s0   t  }|du r
t  |jdurtd| |_dS )a   Sets the accelerator for this training session.

    Args:
        accelerator: The accelerator to use for training.

    Raises:
        SessionMisuseError: if the session is unitialized.
        RuntimeError: if the accelerator has already been set.
    Nz#Cannot change accelerator once set.)r  r  rl   r   )rl   r  r:   r:   r;   set_accelerator  s   


r	  default_valuec                    s   dt f fdd}|S )zKWarns if fn is being used outside of session and returns ``default_value``.r   c                    s$    j t  fdd}|S )Nc                     sF   t  }|stt d rtd d  d  S | i |S )N-`zb` is meant to only be called inside a function that is executed by a Tuner or Trainer. Returning `z`.)r  r$   r   r   r   )r   r   r  )r
  r   fn_namer:   r;   wrapper  s   z4_warn_session_misuse.<locals>.inner.<locals>.wrapper)r3   	functoolswraps)r   r  r
  )r   r  r;   inner  s   z#_warn_session_misuse.<locals>.inner)r   )r
  r  r:   r  r;   _warn_session_misuse  s   r  stable)	stability)rP   r   rQ   rP   r   c                C   s^   |dur	t d ddlm} | r%ddl}t rtd |jj| |dS t	 j| |d dS )a  Report metrics and optionally save a checkpoint.

    If a checkpoint is provided, it will be
    :ref:`persisted to storage <persistent-storage-guide>`.

    If this is called in multiple distributed training workers:

    - Only the metrics reported by the rank 0 worker will be tracked by Ray Train.
      See :ref:`the metrics logging guide <train-monitoring-and-logging>`.
    - A checkpoint will be registered as long as one or more workers reports
      checkpoint that is not None.
      See the :ref:`checkpointing guide <train-dl-saving-checkpoints>`.
    - Checkpoints from multiple workers will be merged into one directory
      in persistent storage.
      See :ref:`the distributed checkpointing guide <train-distributed-checkpointing>`.

    .. note::

        Each invocation of this method will automatically increment the underlying
        ``training_iteration`` number. The physical meaning of this "iteration" is
        defined by user depending on how often they call ``report``.
        It does not necessarily map to one epoch.

    .. warning::

        All workers must call `ray.train.report` the same number of times
        so that Ray Train can properly synchronize the training state across
        workers. Otherwise, your training will hang.

    .. warning::

        This method does NOT act as a barrier for distributed training workers.
        Workers will upload their checkpoint, then continue training immediately.
        If you need to synchronize workers, you can use a framework-native barrier
        such as `torch.distributed.barrier()`.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...
                       # torch.save(...)

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)

                        # Example: Only the rank 0 worker uploads the checkpoint.
                        if ray.train.get_context().get_world_rank() == 0:
                            train.report(metrics, checkpoint=checkpoint)
                        else:
                            train.report(metrics, checkpoint=None)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Args:
        metrics: The metrics you want to report.
        checkpoint: The optional checkpoint you want to report.
    Nz`checkpoint_dir_name` is only supported in the new Ray Train implementation, which can be enabled with `RAY_TRAIN_V2_ENABLED=1`. This argument will be ignored.r   _in_tune_sessionz`ray.train.report` should be switched to `ray.tune.report` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)rP   )
rJ   warning%ray.tune.trainable.trainable_fn_utilsr  ray.tuner   r    tuner   r  )rQ   rP   r   r  rF   r:   r:   r;   r     s   Xr   c                  C   s:   ddl m}  |  rddl}t rtd |j S t jS )a  Access the latest reported checkpoint to resume from if one exists.

    Example:

        .. testcode::

            import tempfile

            from ray import train
            from ray.train import Checkpoint
            from ray.train.torch import TorchTrainer


            def train_func(config):
                start_epoch = 0
                checkpoint = train.get_checkpoint()
                if checkpoint:
                    with checkpoint.as_directory() as checkpoint_dir:
                        # Load back training state
                        ...

                for epoch in range(start_epoch, config.get("num_epochs", 10)):
                    # Do training...

                    metrics = {"loss": ...}

                    with tempfile.TemporaryDirectory() as temp_checkpoint_dir:
                       # Save the checkpoint...

                        checkpoint = Checkpoint.from_directory(temp_checkpoint_dir)
                        train.report(metrics, checkpoint=checkpoint)

            trainer = TorchTrainer(
                train_func, scaling_config=train.ScalingConfig(num_workers=2)
            )

    Returns:
        Checkpoint object if the session is currently being resumed.
            Otherwise, return None.
    r   r  Nz`ray.train.get_checkpoint` should be switched to `ray.tune.get_checkpoint` when running in a function passed to Ray Tune. This will be an error in the future. See this issue for more context: https://github.com/ray-project/ray/issues/49454)	r  r  r  r   r    r  get_checkpointr  rc   )r  rF   r:   r:   r;   r  $  s   ,
r  betac                   C      t  jS )z5User metadata dict passed to the Trainer constructor.)r  r_   r:   r:   r:   r;   r   b     r   c                   C   r  )z,Experiment name for the corresponding trial.)r  r1   r:   r:   r:   r;   get_experiment_namei  r  r   c                   C   r  )z'Trial name for the corresponding trial.)r  r   r:   r:   r:   r;   get_trial_namep  r  r!  c                   C   r  )z%Trial id for the corresponding trial.)r  r   r:   r:   r:   r;   get_trial_idw  r  r"  alphac                   C   r  )z0Unique Train Run id for the corresponding trial.)r  r2   r:   r:   r:   r;   
get_run_id~  r  r$  r)   c                   C   r  )z,Trial resources for the corresponding trial.)r  r   r:   r:   r:   r;   r     r  r   c                   C   r  )a  Log directory corresponding to the trial directory for a Tune session.
    If calling from a Train session, this will give the trial directory of its parent
    Tune session.

    .. testcode::

        import ray.tune

        def train_func(config):
            print(ray.tune.get_context().get_trial_dir())

        tuner = ray.tune.Tuner(train_func)
        tuner.fit()

    .. testoutput::
        :options: +MOCK

        /Users/root/ray_results/train_func_2023-07-19_15-01-37/train_func_d620c_00000_0_2023-07-19_15-01-40
    )r  r   r:   r:   r:   r;   get_trial_dir  s   r%  rw   r  c                  C      t  } t| dstd| jS )a  Get the current world size (i.e. total number of workers) for this run.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        NUM_WORKERS = 2

        def train_loop_per_worker(config):
            assert train.get_context().get_world_size() == NUM_WORKERS

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=NUM_WORKERS),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    r\   z`get_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r  hasattrr   r\   r  r:   r:   r;   get_world_size  s   
r)  c                  C   r&  )a  Get the world rank of this worker.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.tensorflow import TensorflowTrainer

        def train_loop_per_worker(config):
            if train.get_context().get_world_rank() == 0:
                print("Worker 0")

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TensorflowTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rX   z`get_world_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r  r'  r   rX   r(  r:   r:   r;   get_world_rank     
r*  c                  C   r&  )a  Get the local rank of this worker (rank of the worker on its node).

    .. testcode::

        import torch

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            if torch.cuda.is_available():
                torch.cuda.set_device(train.get_context().get_local_rank())
            ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2, use_gpu=True),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...
    rY   z`get_local_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r  r'  r   rY   r(  r:   r:   r;   get_local_rank  s    
r,  c                  C   r&  )a  Get the local world size of this node (i.e. number of workers on this node).

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_local_world_size())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    r[   z`get_local_world_size` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r  r'  r   r[   r(  r:   r:   r;   get_local_world_size  r+  r-  c                  C   r&  )a  Get the rank of this node.

    Example:

        .. testcode::

            import ray
            from ray import train
            from ray.train import ScalingConfig
            from ray.train.torch import TorchTrainer

            def train_loop_per_worker():
                print(train.get_context().get_node_rank())

            train_dataset = ray.data.from_items(
                [{"x": x, "y": x + 1} for x in range(32)])
            trainer = TorchTrainer(train_loop_per_worker,
                scaling_config=ScalingConfig(num_workers=1),
                datasets={"train": train_dataset})
            trainer.fit()

        .. testoutput::
            :hide:

            ...
    rZ   z`get_node_rank` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r  r'  r   rZ   r(  r:   r:   r;   get_node_rankE  r+  r.  r   r(   c                 C   s"   t  }t|dstd|| S )a?  Returns the :class:`ray.data.DataIterator` shard for this worker.

    Call :meth:`~ray.data.DataIterator.iter_torch_batches` or
    :meth:`~ray.data.DataIterator.to_tf` on this shard to convert it to the
    appropriate framework-specific data type.

    .. testcode::

        import ray
        from ray import train
        from ray.train import ScalingConfig
        from ray.train.torch import TorchTrainer

        def train_loop_per_worker(config):
            ...
            for epoch in range(2):
                # Trainer will automatically handle sharding.
                data_shard = train.get_dataset_shard("train")
                for batch in data_shard.iter_torch_batches():
                    ...

        train_dataset = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
        trainer = TorchTrainer(
            train_loop_per_worker,
            scaling_config=ScalingConfig(num_workers=2),
            datasets={"train": train_dataset}
        )
        trainer.fit()

    .. testoutput::
        :hide:

        ...

    Args:
        dataset_name: If a Dictionary of Datasets was passed to ``Trainer``, then
            specifies which dataset shard to return.

    Returns:
        The ``DataIterator`` shard to use for this worker.
        If no dataset is passed into Trainer, then return None.
    r   z`get_dataset_shard` can only be called for TrainSession! Make sure you only use that in `train_loop_per_worker` functionthat is passed into `DataParallelTrainer`.)r  r'  r   r   )r   r  r:   r:   r;   r   l  s   /

r   c                   C   r  )a2  Returns the :class:`~ray.train._internal.storage.StorageContext` storage
    context which gives advanced access to the filesystem and paths
    configured through `RunConfig`.

    NOTE: This is a developer API, and the `StorageContext` interface may change
    without notice between minor versions.
    )r  ra   r:   r:   r:   r;   get_storage  s   
r/  c                   C   s   t t o
t jduS )z6Check if the current process is a Ray Train V1 worker.N)rO   r  rX   r:   r:   r:   r;   _in_ray_train_worker  s   r0  )rB   Nr>   r   )kr  loggingr   r   r!   r   r}   rf   r   dataclassesr   r   typingr   r   r   r   r   r	   r
   rF   ray.air._internal.utilr   r   ray.air.constantsr   r   r   r   r   ray.datar   	ray.trainr   ray.train._internal.acceleratorr   ray.train._internal.storager   ray.train.constantsr   r   r   r   r   r   r   r   ray.train.errorr   ray.train.utilsr    ray.utilr   ray.util.annotationsr"   r#   ray.util.debugr$   ray.util.placement_groupr%   ray.util.scheduling_strategiesr&   r'   r(   #ray.tune.execution.placement_groupsr)   	getLoggerr3   rJ   r*   r<   rC   rV   r   r   r   r8   r   r7   r9   r   r  r  r  r  r  r	  r  r   r  r   r   r!  r"  r$  r   r%  r   r)  r*  r,  r-  r.  r   r/  rO   r0  r:   r:   r:   r;   <module>   s   
 $(

   


@p<&%(%%7