o
    xiE                     @  s  U d Z ddlmZ ddlZddlZddlZddlmZ ddlZddl	Z	ddlm
Z
 ddlmZmZmZmZmZmZ ddlZddlmZ ddlmZ dd	lmZmZ dd
lmZ dZe dZe dZe dZe dZ G dd dZ!G dd dZ"ddddddddddddZ#de$d< e%e&Z'dFd$d%Z(dGd(d)Z)dHd-d.Z*dHd/d0Z+dId2d3Z,dJd7d8Z-dKd<d=Z.dLd@dAZ/G dBdC dCZ0G dDdE dEZ1dS )Mz:Monitors kubernetes resources managed by the launch agent.    )annotationsN)Any)watch)ApiException
BatchV1Api	CoreV1ApiCustomObjectsApiV1PodV1PodStatus)LaunchAgent)LaunchError)StateStatus)get_kube_context_and_api_clientzwandb.aiz/run-idz/agentz/monitorz/auxiliary-resourcec                   @  s   e Zd ZdZdZdS )	ResourcesjobspodsN)__name__
__module____qualname__JOBSPODS r   r   ^/home/ubuntu/.local/lib/python3.10/site-packages/wandb/sdk/launch/runner/kubernetes_monitor.pyr   #   s    r   c                   @  s.   e Zd ZdZddd	Zdd
dZdddZdS )CustomResourcezClass for custom resources.groupstrversionpluralreturnNonec                 C  s   || _ || _|| _dS )zInitialize the CustomResource.Nr   r   r   )selfr   r   r   r   r   r   __init__+      
zCustomResource.__init__c                 C  s   | j  d| j d| j S )z5Return a string representation of the CustomResource./r!   r"   r   r   r   __str__1   s   zCustomResource.__str__intc                 C  s   t t| S )z$Return a hash of the CustomResource.)hashr   r&   r   r   r   __hash__5   s   zCustomResource.__hash__N)r   r   r   r   r   r   r   r    )r   r   )r   r(   )r   r   r   __doc__r#   r'   r*   r   r   r   r   r   (   s
    

r   startingrunningfinishedfailedstopping)createdpendingr-   
completing	succeeded	completedr/   abortedtimeout
terminatedterminatingzdict[str, State]CRD_STATE_DICTnamer   coror   argskwargsr   asyncio.Taskc                 O  s,   t ||i |}||  |t |S )zCreate a named task.)asynciocreate_taskset_nameadd_done_callback_log_err_task_callback)r;   r<   r=   r>   taskr   r   r   create_named_taskL   s   

rF   rE   r    c                 C  sv   |   }|dur9t|tjrtd|   d dS |  }td|  |j}d	t
|}t| dS dS )z&Callback to log exceptions from tasks.NzTask z was cancelledzException in task  )	exception
isinstancer@   CancelledErrorwandbtermlogget_name	termerror__traceback__join	traceback	format_tb)rE   execr;   tbtb_strr   r   r   rD   T   s   rD   statusr
   boolc                 C  s>   t | dr| jdur| jD ]}|jdkr|jdv r dS qdS )z%Check if this pod has been preempted.
conditionsNDisruptionTarget)EvictionByEvictionAPIPreemptionBySchedulerTerminationByKubeletTF)hasattrrX   typereasonrV   	conditionr   r   r   _is_preemptedb   s   
rb   c                 C  s6   | j pg D ]}|jr|jjr|jjjdkr dS qdS )z2Check if this pod has started creating containers.ContainerCreatingTF)container_statusesstatewaitingr_   )rV   container_statusr   r   r   _is_container_creatingo   s   rh   tuple[bool, str]c                 C  sF   | j sdS | j D ]}|jdkr |jdkr |jdkr d|jf  S qdS )zFReturn whether the pod is unschedulable along with the reason message.)FrG   PodScheduledFalseUnschedulableT)rX   r^   rV   r_   messager`   r   r   r   _is_pod_unschedulable{   s   



rn   objectr	   
str | Nonec                 C  s   | j j}|r|d jS d S )Nr   )metadataowner_referencesr;   )ro   refsr   r   r   _get_crd_job_name   s   
rt   rX   list[dict[str, Any]]State | Nonec                 C  sB   dd | D }dd |D }g d}|D ]
}||v r|  S qdS )z'Get the status from the pod conditions.c                 S  s*   g | ]}| d dkr| dd qS )rV   Truer^   rG   )getlower.0cr   r   r   
<listcomp>   s    $z*_state_from_conditions.<locals>.<listcomp>c                 S  s   h | ]
}|t v rt | qS r   )r:   rz   r   r   r   	<setcomp>   s    z)_state_from_conditions.<locals>.<setcomp>)r.   r/   r0   r-   r,   Nr   )rX   true_conditionsdetected_statesstates_in_orderre   r   r   r   _state_from_conditions   s   r   status_dictdict[str, int]c                 C  s4   |  dd}|  dd}|dkrdS |dkrdS dS )zInfer overall job status from replicated job status for jobsets.

    More info on jobset:
    https://github.com/kubernetes-sigs/jobset/blob/main/docs/concepts/README.md

    This is useful for detecting when jobsets are starting.
    readyr   active   r-   r,   N)rx   )r   
pods_readypods_activer   r   r   _state_from_replicated_status   s   r   c                      s   e Zd ZdZdZd6 fddZd7ddZed8ddZe	d9d:ddZ	ed;ddZ
ed<d d!Z	d9d:d"d#Zd;d$d%Zd<d&d'Zd=d*d+Zd>d-d.Zd?d/d0Zd?d1d2Zd@d4d5Z  ZS )ALaunchKubernetesMonitorzMonitors kubernetes resources managed by the launch agent.

    Note: this class is forced to be a singleton in order to prevent multiple
    threads from being created that monitor the same kubernetes resources.
    Nr=   r   r>   r   c                   s   | j du rt | | _ | j S )a  Create a new instance of the LaunchKubernetesMonitor.

        This method ensures that only one instance of the LaunchKubernetesMonitor
        is created. This is done to prevent multiple threads from being created
        that monitor the same kubernetes resources.
        N)	_instancesuper__new__)clsr=   r>   	__class__r   r   r      s   
zLaunchKubernetesMonitor.__new__core_apir   	batch_apir   
custom_apir   label_selectorr   c                 C  s,   || _ || _|| _|| _t | _t | _dS )'Initialize the LaunchKubernetesMonitor.N)	_core_api
_batch_api_custom_api_label_selectordict_monitor_tasks_job_states)r"   r   r   r   r   r   r   r   r#      s   z LaunchKubernetesMonitor.__init__r    c                   sz   | j du r;tti I dH \}}t|}t|}t|}t d}t r1|dt	 dt
  7 }| ||||d dS dS )r   Nz=true,=)r   r   r   r   )r   r   kubernetes_asyncior   r   r   WANDB_K8S_LABEL_MONITORr   initializedWANDB_K8S_LABEL_AGENTr;   )r   _
api_clientr   r   r   r   r   r   r   ensure_initialized   s$   


z*LaunchKubernetesMonitor.ensure_initialized	namespacecustom_resourceCustomResource | Nonec                 C  s&   | j du r	td| j j||d dS ),Start monitoring a namespaces for resources.NzBLaunchKubernetesMonitor not initialized, cannot monitor namespace.r   )r   r   +_LaunchKubernetesMonitor__monitor_namespace)r   r   r   r   r   r   monitor_namespace   s
   
z)LaunchKubernetesMonitor.monitor_namespacejob_namer   c                 C  s   | j du r	td| j |S )Get the status of a job.Nz;LaunchKubernetesMonitor not initialized, cannot get status.)r   r   $_LaunchKubernetesMonitor__get_status)r   r   r   r   r   
get_status  s
   
z"LaunchKubernetesMonitor.get_statusdict[State, int]c                 C  s   | j du r	td| j  S )KGet a dictionary mapping statuses to the # monitored jobs with each status.NzBLaunchKubernetesMonitor not initialized, cannot get status counts.)r   
ValueError&_LaunchKubernetesMonitor__status_count)r   r   r   r   status_count  s
   

z$LaunchKubernetesMonitor.status_countc                 C  s   |t jf| jvrtd| | j|| j|t jf< |dur;||f| jvr9td| d| | j||d| j||f< dS dS |t jf| jvrUtd| | j|| j|t jf< dS dS )r   monitor_pods_Nmonitor_r   r   monitor_jobs_)r   r   r   rF   _monitor_pods_monitor_crdr   _monitor_jobs)r"   r   r   r   r   r   __monitor_namespace  s,   z+LaunchKubernetesMonitor.__monitor_namespacec                 C  s    || j vr	tdS | j | }|S )r   unknown)r   r   r"   r   re   r   r   r   __get_status2  s   

z$LaunchKubernetesMonitor.__get_statusc                 C  sF   t  }| j D ]\}}|j}||vrd||< q||  d7  < q|S )r   r   )r   r   itemsre   )r"   countsr   rV   re   r   r   r   __status_count9  s   
z&LaunchKubernetesMonitor.__status_countre   r   c                 C  s@   || j vrt|| j |< dS | j | j|kr|| j | _dS dS )zSet the status of the run.N)r   r   re   r   r   r   r   _set_status_stateD  s
   
z)LaunchKubernetesMonitor._set_status_staterm   c                 C  sD   || j vrtd| j |< td| d|  | j | j| d S )Nr   z Warning from Kubernetes for job : )r   r   rK   termwarnmessagesappend)r"   r   rm   r   r   r   _add_status_messageK  s   
z+LaunchKubernetesMonitor._add_status_messagec                   s   t t }|j| jj|| jd2 zT3 dH W }|d}|jj	dp't
|}|du s1t|ds2q| |dv r:qt|j\}}|rI| || |jjdksTt|jr[| |d qt|jrf| |d	 q6 dS )
 Monitor a namespace for changes.r   r   Nro   zjob-namerV   )r.   r/   Runningr-   	preempted)	SafeWatchr   Watchstreamr   list_namespaced_podr   rx   rq   labelsrt   r]   r   rn   rV   r   phaserh   r   rb   )r"   r   watchereventobjr   is_unschedulabler_   r   r   r   r   Q  s,   

z%LaunchKubernetesMonitor._monitor_podsc                   s   t t }|j| jj|| jd2 zE3 dH W }|d}|jj	}|j
jdkr.| |d n|j
jdur@|j
jdkr@| |d |ddkrW| j|tdkrW| |d q6 dS )	r   r   Nro   r   r.   r/   r^   DELETED)r   r   r   r   r   list_namespaced_jobr   rx   rq   r;   rV   r4   r   r/   r   r   )r"   r   r   r   r   r   r   r   r   r   h  s*   
z%LaunchKubernetesMonitor._monitor_jobsr   c                   s  t t }|j| jj||j|j|j| j	d2 zo3 dH W }|
d}|
dt 
d}|
d}d}|du r9q|
d}	t|	trGt|	}|
d}
t|
tr`|

d	}|r_t
| }n|
d
}t|trot|}ntdt| d|  |du rq| || q6 dS )r   )r   r   r   r   r   Nro   rq   r;   rV   ReplicatedJobsStatusre   r   rX   zUnexpected conditions type z for CRD watcher in )r   r   r   r   r   list_namespaced_custom_objectr   r   r   r   rx   r   rI   r   r:   ry   listr   _loggerwarningr^   r   )r"   r   r   r   r   ro   r;   rV   re   replicated_jobs_status
state_dictr   rX   r   r   r   r     sJ   









z$LaunchKubernetesMonitor._monitor_crd)r=   r   r>   r   r   r   )r   r   r   r   r   r   r   r   )r   r    )N)r   r   r   r   r   r    )r   r   r   r   )r   r   )r   r   re   r   r   r    )r   r   rm   r   r   r    )r   r   r   r    )r   r   r   r   r   r    )r   r   r   r+   r   r   r#   classmethodr   r   r   r   r   r   r   r   r   r   r   r   __classcell__r   r   r   r   r      s.    

	





r   c                   @  s$   e Zd ZdZdddZdddZdS )r   zKWrapper for the kubernetes watch class that can recover in more situations.r   watch.Watchr   r    c                 C  s   || _ d| _d| _dS )zInitialize the SafeWatch.NF)_watcher_last_seen_resource_version_stopped)r"   r   r   r   r   r#     r$   zSafeWatch.__init__funcr   r=   r>   c           
      O s\  	 zI| j j|g|R i |ddi2 z.3 dH W }| jr n%|d}t|tr5|dt d| _n|jj| _| j|d< |V  q6 | jrJW dS W na t	j
jyh } ztd	| d
 W Y d}~nId}~w ty } z|jdkrz|d= d| _W Y d}~n-d}~w ty } zt|j}t }	td| d| d|	  W Y d}~nd}~ww q)zStream the watcher.

        This method will automatically resume the stream if it breaks. It will
        also save the resource version so that the stream can be resumed from
        the last seen resource version.
        Ttimeout_seconds   Nro   rq   resourceVersionresource_versionzBroken event stream: z, attempting to recoveri  z*Unknown exception in event stream of type r   z&, attempting to recover. Stack trace: )r   r   r   rx   rI   r   r   rq   r   urllib3
exceptionsProtocolErrorrK   r   r   rV   	Exceptionr^   r   rQ   
format_excrN   )
r"   r   r=   r>   r   ro   eEexc_typestack_tracer   r   r   r     s\   




zSafeWatch.streamN)r   r   r   r    )r   r   r=   r   r>   r   r   r   )r   r   r   r+   r#   r   r   r   r   r   r     s    
r   )
r;   r   r<   r   r=   r   r>   r   r   r?   )rE   r?   r   r    )rV   r
   r   rW   )rV   r
   r   ri   )ro   r	   r   rp   )rX   ru   r   rv   )r   r   r   rv   )2r+   
__future__r   r@   loggingrQ   typingr   r   r   r   kubernetes_asyncio.clientr   r   r   r   r	   r
   rK   wandb.sdk.launch.agentr   wandb.sdk.launch.errorsr    wandb.sdk.launch.runner.abstractr   r   wandb.sdk.launch.utilsr   WANDB_K8S_LABEL_NAMESPACEWANDB_K8S_RUN_IDr   r   "WANDB_K8S_LABEL_AUXILIARY_RESOURCEr   r   r:   __annotations__	getLoggerr   r   rF   rD   rb   rh   rn   rt   r   r   r   r   r   r   r   r   <module>   s\     	












 r