o
    ci                     @   s   d dl Z d dlmZmZmZmZmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZmZ ee
eg e
f f ZG dd deZdS )	    N)AnyCallableDictListUnion)Dataset)DataContext)WorkerGroupCallback)WorkerWorkerGroupc                   @   s   e Zd ZdZdeeef dejj	dejj
fddZdejj
deeef fdd	Zd
ee deeee f fddZdefddZdS )DatasetsSetupCallbackz8The callback to setup Ray Datasets for the worker group.datasetsdata_configscaling_configc                 C   s&   || _ || _|| _tt | _d S N)	_datasets_data_config_scaling_configcopydeepcopyr   get_current_data_context)selfr   r   r    r   ]/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/v2/_internal/callbacks/datasets.py__init__   s   
zDatasetsSetupCallback.__init__returnc                 C   s   |j S )zReturn the resources reserved for training, so that Data can exclude
        these resources logically from its available pool.)total_resources)r   r   r   r   r   get_train_total_resources)   s   z/DatasetsSetupCallback.get_train_total_resourcesworkersc                 C   s   dd | j  D }dd |D }| | j}| j|dd|dd | jj|t|d |d}t|t|ks<J d	|iS )
Nc                 S   s$   i | ]\}}|t |r| n|qS r   )callable).0kvr   r   r   
<dictcomp>2   s   $ zCDatasetsSetupCallback.before_init_train_context.<locals>.<dictcomp>c                 S   s   g | ]}|j jqS r   )metadatanode_id)r!   workerr   r   r   
<listcomp>3   s    zCDatasetsSetupCallback.before_init_train_context.<locals>.<listcomp>CPUr   GPU)
world_sizeworker_handlesworker_node_idsdataset_shards)	r   itemsr   r   r   set_train_total_resourcesget	configurelen)r   r   r   node_idstotal_train_resourcesr.   r   r   r   before_init_train_context0   s   z/DatasetsSetupCallback.before_init_train_contextworker_groupc                 C   s    dt fdd}||| j d S )Nctxc                 S   s   t |  d S r   )r   _set_current)r8   r   r   r   _propagate_data_contextG   s   zODatasetsSetupCallback.after_worker_group_start.<locals>._propagate_data_context)r   executer   )r   r7   r:   r   r   r   after_worker_group_startE   s
   z.DatasetsSetupCallback.after_worker_group_startN)__name__
__module____qualname____doc__r   str
GenDatasetraytrain
DataConfigScalingConfigr   floatr   r   r
   r   r6   r   r<   r   r   r   r   r      s     



"r   )r   typingr   r   r   r   r   	ray.trainrC   ray.datar   ray.data.contextr   )ray.train.v2._internal.execution.callbackr	   :ray.train.v2._internal.execution.worker_group.worker_groupr
   r   rB   r   r   r   r   r   <module>   s    