o
    $ioJ                     @   s   d dl Z d dlmZmZmZmZmZ d dlmZm	Z	 d dl
Z
d dlm  mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ erkd dlmZ d dl m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' e (e)Z*e*+e j, G dd deZ-dS )    N)TYPE_CHECKINGDictListOptionalTuple)RequestResponse)gcs_service_pb2_grpc)JobInfoStorageClient)find_jobs_by_job_ids)SubprocessModule)SubprocessRouteTable)get_http_session_to_module)DeveloperAPI)
JobDetails)DecoratedTrainRunDecoratedTrainRunAttemptDecoratedTrainWorker	RunStatusTrainRunTrainRunAttemptTrainWorkerc                       s,  e Zd Z fddZede ede	de
fddZded	 ded
 fddZdee deedf fddZded ded fddZded ded fddZdd	dedee f fddZede ede	de
fd d!Zd"ee fd#d$Zd%d& Z fd'd(Zd)d* Zd+d, Z  ZS )-	TrainHeadc                    s4   t  j|i | d | _d | _d | _d | _d | _d S N)super__init___train_stats_actor_train_v2_state_actor_job_info_client_gcs_actor_info_stub_node_head_http_session)selfargskwargs	__class__ c/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/dashboard/modules/train/train_head.pyr   !   s   
zTrainHead.__init__z/api/train/v2/runs/v1reqreturnc              
      s   zddl m} W n ty   td tddd Y S w |  I dH }|du r/tdddS z|j I dH }| 	|
 I dH }||d}W n tjjye } ztd	d
| dW  Y d}~S d}~ww t| ddS )z#Get all TrainRuns for Ray Train V2.r   TrainRunsResponse`Train is not installed. Please run `pip install ray[train]` when setting up Ray on your cluster.  statustextNTrain state data is not available. Please make sure Ray Train is running and that the Train state actor is enabled by setting the RAY_TRAIN_ENABLE_STATE_TRACKING environment variable to "1".
train_runs  ^Failed to get a response from the train stats actor. The GCS may be down, please retry later: application/jsonr0   content_type)#ray.train.v2._internal.state.schemar+   ImportErrorlogger	exceptionr   get_train_v2_state_actorget_train_runsremote_decorate_train_runsvaluesray
exceptionsRayTaskErrorjson)r!   r(   r+   state_actorr3   decorated_train_runsdetailser&   r&   r'   get_train_v2_runs.   sJ   
	
zTrainHead.get_train_v2_runsr3   r   r   c              	      s   ddl m} g }|  I dH }|j I dH }| dd |D I dH }|D ]5}||j  }| |I dH }	||j	 }
| 
|I dH \}}|i | |	|
||d}|| q't|dd d	d
}|S )zDecorate the train runs with run attempts, job details, status, and status details.

        Returns:
            List[DecoratedTrainRun]: The decorated train runs in reverse chronological order.
        r   )r   Nc                 S      g | ]}|j qS r&   job_id).0	train_runr&   r&   r'   
<listcomp>t       z2TrainHead._decorate_train_runs.<locals>.<listcomp>)attemptsjob_detailsr/   status_detailc                 S      | j S r   )start_time_nsrunr&   r&   r'   <lambda>       z0TrainHead._decorate_train_runs.<locals>.<lambda>Tkeyreverse)r9   r   r=   get_train_run_attemptsr?   	_get_jobsidrA   _decorate_train_run_attemptsrM   _get_run_status	parse_objdictappendsorted)r!   r3   r   rG   rF   all_train_run_attemptsjobsrO   train_run_attemptsdecorated_train_run_attemptsrS   r/   status_detailsdecorated_train_runr&   r&   r'   r@   d   s:   	

zTrainHead._decorate_train_runsjob_idsr   c                    s   t | j| j|I d H S r   )r   
gcs_clientr   )r!   rm   r&   r&   r'   r_      s   
zTrainHead._get_jobsri   r   r   c                    sV   ddl m} g }|D ]}| |jI d H }|i | d|i}|| q|S )Nr   )r   workers)r9   r   _decorate_train_workersro   rc   rd   re   )r!   ri   r   rj   train_run_attemptdecorated_train_workersdecorated_train_run_attemptr&   r&   r'   ra      s   z&TrainHead._decorate_train_run_attemptstrain_workersr   r   c           
         s   ddl m} g }dd |D }td| d | |I d H }|D ]? | jd }|rV fdd|d D } fd	d|D }|i   |d
 |d |d}	n|  }	|	|	 q#|S )Nr   )r   c                 S   rK   r&   actor_idrN   workerr&   r&   r'   rP      rQ   z5TrainHead._decorate_train_workers.<locals>.<listcomp>+Getting all actor info from GCS (actor_ids=)c                    (   g | ]} j d d |d D v r|qS )c                 S      g | ]}|d  qS pidr&   rN   processr&   r&   r'   rP          @TrainHead._decorate_train_workers.<locals>.<listcomp>.<listcomp>processesPidsr}   rN   gputrain_workerr&   r'   rP          gpusc                    2   g | ]}i |d  fdd|d D d iqS )processInfoc                       g | ]}|d   j kr|qS r}   r}   r   r   r&   r'   rP      
    r   r   r   r&   r   r   r&   r'   rP          	stateprocessStatsr/   r   r   )
r9   r   r;   info_get_actor_infosgetrv   rc   rd   re   )
r!   rt   r   rr   	actor_idstrain_run_actorsactorr   formatted_gpusdecorated_train_workerr&   r   r'   rp      s:   

		z!TrainHead._decorate_train_workersrO   r   c           	         sv   ddl m}m} | |jgI d H }||j }|r|dnd }||jkr5|j|jkr5|j	}d}||fS |j|j
fS )Nr   )ActorStatusr   r   6Terminated due to system errors or killed by the user.)r9   r   r   r   controller_actor_idr   DEADr/   RUNNINGABORTEDrT   )	r!   rO   r   r   actor_infoscontroller_actor_infocontroller_actor_status
run_statusrT   r&   r&   r'   rb      s   


zTrainHead._get_run_statusz/api/train/v2/runsc           
   
      s"  zddl m} W n ty   td tddd Y S w |  I dH }|du r/tdddS z<|j I dH }| 	|I dH }t
|dd	 d
d}t| j| jdd |D I dH }|D ]	}||j|_q[||d}W n tjjy }	 ztdd|	 dW  Y d}	~	S d}	~	ww t| ddS )z'Get all TrainRunInfos for Ray Train V1.r   r*   r,   r-   r.   Nr1   c                 S   rU   r   )start_time_msrW   r&   r&   r'   rY   1  rZ   z*TrainHead.get_train_runs.<locals>.<lambda>Tr[   c                 S   rK   r&   rL   )rN   rX   r&   r&   r'   rP   7  rQ   z,TrainHead.get_train_runs.<locals>.<listcomp>r2   r4   r5   r6   r7   ) ray.train._internal.state.schemar+   r:   r;   r<   r   get_train_stats_actorget_all_train_runsr?   '_add_actor_status_and_update_run_statusrf   r   rn   r   r   rM   rS   rB   rC   rD   rE   )
r!   r(   r+   stats_actorr3   train_runs_with_detailsrS   rX   rH   rI   r&   r&   r'   r>     s`   
	
zTrainHead.get_train_runsr   c              	      s   | j d u rtd| jj| jj| _ d|}d| d}| j |4 I d H }|  | I d H }W d   I d H  n1 I d H sBw   Y  |d d S )NNodeHead,z$http://localhost/logical/actors?ids=z
&nocache=1dataactors)	r    r   _config
socket_dirsession_namejoinr   raise_for_statusrE   )r!   r   actor_ids_qs_strurlresp	resp_jsonr&   r&   r'   r   M  s   

(zTrainHead._get_actor_infosc              	      sR  ddl m}m}m}m} g }| D ]}g }dd |jD }	td|	 d | 	|	I d H }
|jD ]? |

 jd }|rf fdd|d D } fd	d|D }|i   |d
 |d |d}n|  }|| q3|i | d|i}|

|j}|r|
d
nd }||jkr|j|jkr|j|_d|_|| q|S )Nr   )ActorStatusEnumRunStatusEnumTrainRunInfoWithDetailsTrainWorkerInfoWithDetailsc                 S   rK   r&   ru   rw   r&   r&   r'   rP   f  rQ   zETrainHead._add_actor_status_and_update_run_status.<locals>.<listcomp>ry   rz   c                    r{   )c                 S   r|   r}   r&   r   r&   r&   r'   rP   t  r   PTrainHead._add_actor_status_and_update_run_status.<locals>.<listcomp>.<listcomp>r   r}   r   worker_infor&   r'   rP   p  r   r   c                    r   )r   c                    r   r}   r}   r   r   r&   r'   rP   ~  r   r   r   r   r&   r   r   r&   r'   rP   {  r   r   r   r   ro   r   )r   r   r   r   r   rA   ro   r;   r   r   r   rv   rc   rd   re   r   r   r   r   r   rT   )r!   r3   r   r   r   r   r   rO   worker_infos_with_detailsr   r   r   r   r   worker_info_with_detailstrain_run_with_detailsr   r&   r   r'   r   Y  sT   


			
z1TrainHead._add_actor_status_and_update_run_statusc                    s:   t   I d H  | jst| j| _| j}t|| _d S r   )	r   rX   r   r
   rn   aiogrpc_gcs_channelr	   ActorInfoGcsServiceStubr   )r!   gcs_channelr$   r&   r'   rX     s   
zTrainHead.runc                    H   zddl m} | jdu r| | _| jW S  ty#   td Y dS w )zS
        Gets the train stats actor and caches it as an instance variable.
        r   get_state_actorNr,   )%ray.train._internal.state.state_actorr   r   r:   r;   r<   r!   r   r&   r&   r'   r        
zTrainHead.get_train_stats_actorc                    r   )zS
        Gets the Train state actor and caches it as an instance variable.
        r   r   Nr,   )(ray.train.v2._internal.state.state_actorr   r   r:   r;   r<   r   r&   r&   r'   r=     r   z"TrainHead.get_train_v2_state_actor)__name__
__module____qualname__r   routesr   dashboard_optional_utilsinit_ray_and_catch_exceptionsr   r   r   rJ   r   r@   strr   r_   ra   rp   r   r   rb   r>   r   r   rX   r   r=   __classcell__r&   r&   r$   r'   r       sH    3
2

8
!?U
r   ).loggingtypingr   r   r   r   r   aiohttp.webr   r   rB   ray.dashboard.optional_utils	dashboardoptional_utilsr   ray.core.generatedr	    ray.dashboard.modules.job.commonr
   ray.dashboard.modules.job.utilsr   !ray.dashboard.subprocesses.moduler   !ray.dashboard.subprocesses.routesr   r    ray.dashboard.subprocesses.utilsr   ray.util.annotationsr   )ray.dashboard.modules.job.pydantic_modelsr   r9   r   r   r   r   r   r   r   	getLoggerr   r;   setLevelINFOr   r&   r&   r&   r'   <module>   s$    $

