o
    ci                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZ d dlZd dl	m
Z
 d dlmZmZmZmZmZ d dlmZ d dlmZ e eZG dd	 d	ZdS )
    N)defaultdict)AnyDictList)Dataset)ActorStatusEnumRunStatusEnumTrainDatasetInfoTrainRunInfoTrainWorkerInfo)check_for_failure)WorkerGroupc                   @   s   e Zd ZdZdddZ	ddeded	ed
ededeeef dede	de
eee	f  deddfddZded
ededefddZdedeeef ddfddZdS )TrainRunStateManagerzA class that aggregates and reports train run info to TrainStateActor.

    This manager class is created on the train controller layer for each run.
    returnNc                 C   s   || _ tt| _d S N)state_actorr   dicttrain_run_info_dict)selfr    r   [/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/_internal/state/state_manager.py__init__   s   zTrainRunStateManager.__init__ run_idjob_idrun_name
run_statuscontroller_actor_iddatasetsworker_groupstart_time_ms	resourcesstatus_detailc                    s   | j s
td dS fdd  fddttD }t|\}}|s0td|  dS t|}t	|dd	 d
}dd |
 D }t|||||||||
d
}i | j|< | || dS )z0Collect Train Run Info and report to StateActor.zDUnable to register train run since `TrainStateActor` is not started.Nc                     sX   t j } t j }t|  |  |  |	 |
 t j t  t  d tjd
S )Nr   )

world_rank
local_rank	node_rankactor_idnode_idnode_ipgpu_idspidr!   status)raytrainget_contextruntime_contextget_runtime_contextr   get_world_rankget_local_rankget_node_rankget_actor_idget_node_idutilget_node_ip_addressget_gpu_idsosgetpidr   ALIVE)train_contextcore_context)r!   r   r   collect_train_worker_info4   s   

zJTrainRunStateManager.register_train_run.<locals>.collect_train_worker_infoc                    s   g | ]} | qS r   )execute_single_async).0index)r>   r   r   r   
<listcomp>D   s    
z;TrainRunStateManager.register_train_run.<locals>.<listcomp>z>Failed to collect run information from the Ray Train workers:
c                 S   s   | j S r   )r#   )infor   r   r   <lambda>R   s    z9TrainRunStateManager.register_train_run.<locals>.<lambda>)keyc                 S   s&   g | ]\}}t ||jj|jjd qS ))namedataset_namedataset_uuid)r	   _plan_dataset_name_dataset_uuid)r@   ds_namedsr   r   r   rB   T   s    )
idr   rF   r   workersr   r    r   r"   r!   )r   loggerwarningrangelenr   errorr,   getsorteditemsr   r   _update_train_run_info)r   r   r   r   r   r   r   r   r    r!   r"   futuressuccess	exceptionworker_info_listdataset_info_listupdatesr   )r>   r!   r   r   register_train_run   sH   

	
z'TrainRunStateManager.register_train_runend_time_msc                 C   s   t |||d}| || dS )z:Update the train run status when the training is finished.)r   r"   r`   N)r   rX   )r   r   r   r"   r`   r^   r   r   r   end_train_runn   s   z"TrainRunStateManager.end_train_runr^   c                 C   sJ   || j v r#| j | | tdi | j | }t| jj| dS dS )z=Update specific fields of a registered TrainRunInfo instance.Nr   )r   updater
   r,   rU   r   r_   remote)r   r   r^   train_run_infor   r   r   rX   }   s
   
z+TrainRunStateManager._update_train_run_info)r   N)r   )__name__
__module____qualname____doc__r   strr   r   r   floatr   r_   r   intra   r   rX   r   r   r   r   r      sJ    

	

O
"r   )loggingr9   collectionsr   typingr   r   r   r,   ray.datar    ray.train._internal.state.schemar   r   r	   r
   r   ray.train._internal.utilsr    ray.train._internal.worker_groupr   	getLoggerre   rP   r   r   r   r   r   <module>   s    
