o
    ci(                     @   s  U d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
mZmZmZmZ d dlZd dlmZmZ d dlmZmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlm Z m!Z! d dl"m#Z#m$Z$ e
ryd dl%m&Z& d dl'm(Z( e )e*Z+eddG dd dZ,eddG dd dZ-eddG dd dZ.eG dd dZ/da0ee/ e1d< e2 Z3de/fddZ4dddZ5dS )     N)	dataclassfield)Queue)TYPE_CHECKINGAnyDictListOptional)DataIteratorDataset)BackendConfig
Checkpoint
DataConfig)session)_TrainingResult)SynchronizationActor)StorageContext)	_copy_docinvoke_context_managers)	RunConfigScalingConfig)TrainContextCallback)ThreadRunnerT)frozenc                   @   s|   e Zd ZU dZeddd dZeed< eed< e	e
eef  ed< eed	< eed
< e
eef ed< eed< defddZdS )TrainRunContextz<Holds the metadata and context for the current training run.Fc                   C   s
   t  jS N)uuiduuid4hex r   r   \/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/v2/_internal/execution/context.py<lambda>    s   
 zTrainRunContext.<lambda>)initdefault_factoryrun_id
run_configtrain_loop_configscaling_configbackend_configdatasetsdataset_configreturnc                 C      | j S )z3Returns the run config of the current training run.)r%   selfr   r   r    get_run_config4      zTrainRunContext.get_run_configN)__name__
__module____qualname____doc__r   r$   str__annotations__r   r	   r   r   r   r   r   r   r/   r   r   r   r    r      s   
 r   c                   @   s6   e Zd ZU eed< eed< eed< eed< eed< dS )DistributedContext
world_rank
world_size
local_ranklocal_world_size	node_rankN)r1   r2   r3   intr6   r   r   r   r    r7   9   s   
 r7   c                   @   s6   e Zd ZU dZeed< eed< ded< ed ed< dS )	ExecutionContextzHolds the execution context for the current worker process.

    Every worker process has a single execution context accessed via the
    `TrainContext`, which includes the training thread that is actually
    running the user code.
    synchronization_actorresult_queuer   training_thread_runnerr   train_context_callbacksN)r1   r2   r3   r4   r   r6   r   r   r   r   r   r    r>   B   s   
 r>   c                	   @   s  e Zd ZU eed< eed< eed< eed< ee	e
f ed< dZee ed< eejde	fd	d
ZeejdefddZeejdefddZeejdefddZeejdefddZeejdefddZeejdd Zdd Zdd Zdd Zde	de
fddZded  fd!d"Z	d+d#ee	 de	fd$d%Z	d+d#e	d&ee	ef dee de fd'd(Z!		d,d&ee	ef dee d#ee	 fd)d*Z"dS )-TrainContexttrain_run_contextdistributed_contextexecution_contextstorage_contextdataset_shardsN
checkpointr+   c                 C   s
   | j jjS r   )rD   r%   namer-   r   r   r    get_experiment_nameb   s   
z TrainContext.get_experiment_namec                 C      | j jS r   )rE   r9   r-   r   r   r    get_world_sizef      zTrainContext.get_world_sizec                 C   rL   r   )rE   r8   r-   r   r   r    get_world_rankj   rN   zTrainContext.get_world_rankc                 C   rL   r   )rE   r:   r-   r   r   r    get_local_rankn   rN   zTrainContext.get_local_rankc                 C   rL   r   )rE   r;   r-   r   r   r    get_local_world_sizer   rN   z!TrainContext.get_local_world_sizec                 C   rL   r   )rE   r<   r-   r   r   r    get_node_rankv   rN   zTrainContext.get_node_rankc                 C   r,   r   )rG   r-   r   r   r    get_storagez   r0   zTrainContext.get_storagec                 C   rL   r   )rF   r@   r-   r   r   r    get_result_queue      zTrainContext.get_result_queuec                 C   rL   r   )rF   r?   r-   r   r   r    get_synchronization_actor   rU   z&TrainContext.get_synchronization_actorc                 C   r,   r   )rI   r-   r   r   r    get_checkpoint   s   zTrainContext.get_checkpointdataset_namec              
   C   s<   z| j | W S  ty   td| dt| j   dw )a  Returns the :class:`ray.data.DataIterator` shard for this worker.

        Call :meth:`~ray.data.DataIterator.iter_torch_batches` or
        :meth:`~ray.data.DataIterator.to_tf` on this shard to convert it to the
        appropriate framework-specific data type.

        Args:
            dataset_name: Name of the dataset shard.
        Returns:
            The ``DataIterator`` shard with the given name for this worker.
        Raises:
            KeyError: If the dataset shard with the given name is not found.
        zDataset z  not found. Available datasets: .)rH   KeyErrorlistkeys)r.   rX   r   r   r    get_dataset_shard   s   zTrainContext.get_dataset_shardr   c                 C   rL   r   )rF   rB   r-   r   r   r    get_context_callbacks   rU   z"TrainContext.get_context_callbackscheckpoint_dir_namec                 C   s8   |p| j  }|  }t|jj| jj| jj	|ddS )zSync the checkpoint dir name across ranks.

        Args:
            checkpoint_dir_name: The checkpoint dir name to sync.

        Returns:
            The synced checkpoint dir name.
        zray.train.report)r8   r9   datacaller_method_name)
rG    make_default_checkpoint_dir_namerV   raygetbroadcast_from_rank_zeroremoterE   r8   r9   )r.   r_   
sync_actorr   r   r    &_sync_checkpoint_dir_name_across_ranks   s   z3TrainContext._sync_checkpoint_dir_name_across_ranksmetricsc                 C   s0   |st d|dS | j||}|| _t ||dS )zSave the checkpoint to remote storage.

        Returns:
            The training result object containing the persisted checkpoint.
        N)rI   ri   )r   rG   persist_current_checkpointrI   )r.   r_   ri   rI   persisted_checkpointr   r   r    _save_checkpoint   s   zTrainContext._save_checkpointc                 C   s   dt jv rddlm} ||rtdtdd | jjD  | |}| 	|||}| 
 | W d   dS 1 s=w   Y  dS )a3  
        Upload checkpoint to remote storage and put a training
        result on the result queue of this worker process.

        Args:
            metrics: The metrics to report.
            checkpoint: The checkpoint to report.
            checkpoint_dir_name: The name of the checkpoint dir
                in this iteration. Note: If not set, the checkpoint will
                be stored in the default storage path. If set, make sure
                this value is unique for each iteration.

        TODO: the report function should be implemented in the worker instead
        of in the train context. The train context should only keep the train
        related information and not the worker related actions. This refactor
        would also require the `TrainContextCallback` to be updated as well.
        torchr   )contains_tensora  Passing objects containg Torch tensors as metrics is not supported as it will throw an exception on deserialization. You can either convert the tensors to Python objects (ex: `.numpy()`, `.item()`, etc.) or save tensors as part of the checkpoint files instead.c                 S   s   g | ]}|j qS r   )	on_report).0callbackr   r   r    
<listcomp>   s    z'TrainContext.report.<locals>.<listcomp>N)sysmodulesray.air._internal.torch_utilsrn   
ValueErrorr   rF   rB   rh   rl   rT   put)r.   ri   rI   r_   rn   training_resultr   r   r    report   s&   
	"zTrainContext.reportr   )NN)#r1   r2   r3   r   r6   r7   r>   r   r   r5   r
   rI   r	   r   r   r   rK   rM   r=   rO   rP   rQ   rR   rS   rT   rV   rW   r]   r   r^   rh   r   r   rl   ry   r   r   r   r    rC   Y   sf   
 

!


rC   _train_contextr+   c                   C   s<   t  td u rtdtW  d    S 1 sw   Y  d S )Nz&TrainContext has not been initialized.)_context_lockrz   RuntimeErrorr   r   r   r    get_train_context  s
   $r}   c                 C   s.   t  | aW d    d S 1 sw   Y  d S r   )r{   rz   )contextr   r   r    set_train_context  s   "r   )r+   N)6loggingrs   	threadingr   dataclassesr   r   queuer   typingr   r   r   r   r	   rc   ray.datar
   r   	ray.trainr   r   r   ray.train._internalr   ray.train._internal.sessionr   6ray.train.v2._internal.execution.checkpoint.sync_actorr   (ray.train.v2._internal.execution.storager   ray.train.v2._internal.utilr   r   ray.train.v2.api.configr   r   )ray.train.v2._internal.execution.callbackr   ;ray.train.v2._internal.execution.worker_group.thread_runnerr   	getLogger__file__loggerr   r7   r>   rC   rz   r6   Lockr{   r}   r   r   r   r   r    <module>   sB   
 
 8