o
    -wi                     @   s  d Z ddlZddlZddlZddlZddlZddlZddlmZm	Z	m
Z
mZmZmZmZ ddlZddlZddlmZ ddlmZ ddlmZ ddlmZ ddlmZ dd	lmZ dd
lmZ ddlm Z m!Z!m"Z"m#Z#m$Z$ ddl%m&Z&m'Z' ddl(m)Z) ddl*m+Z+m,Z, ddl-m.Z. ddl/m0Z0m1Z1m2Z2m3Z3m4Z4m5Z5 ddl6m7Z7m8Z8 e)ddd ddl9Z9ddl9m:Z: ddl;m<Z< ddl=m>Z> ddl?m@Z@ ddlAmBZB ddlCmDZD dZEdZFeGeHZIejJKdZLejJKdZMG d d! d!e7ZNG d"d# d#e7ZOG d$d% d%e8ZPd&eeQ d'ee+ d(eeR d)eSd*df
d+d,ZTd-d.d/eRd0eRd1eRd*d2f
d3d4ZUd-d.d5ed6eRd0eRd*ed2 f
d7d8ZVd9ed*e
eQ fd:d;ZWd9eeQeXf d<e	eReRf d*dfd=d>ZYd?ed*e
eQ fd@dAZZd?eeQeXf dBeRdCeRd*dfdDdEZ[d?eeQeXf dFeQd*dfdGdHZ\d?ee	eXf dIe,d*dfdJdKZ]dS )Lz:Implementation of KubernetesRunner class for wandb launch.    N)AnyDictIteratorListOptionalTupleUnion)Api)LaunchAgent)AbstractEnvironment)AbstractRegistry)AzureContainerRegistry)LocalRegistry)Status)WANDB_K8S_LABEL_AGENTWANDB_K8S_LABEL_MONITORWANDB_K8S_RUN_IDCustomResourceLaunchKubernetesMonitor)ExponentialBackoffretry_async)
get_module   )
EntryPointLaunchProject)LaunchError)CODE_MOUNT_DIR
LOG_PREFIXMAX_ENV_LENGTHSPROJECT_SYNCHRONOUSget_kube_context_and_api_clientmake_name_dns_safe   )AbstractRunAbstractRunnerkubernetes_asynciozfKubernetes runner requires the kubernetes package. Please install it with `pip install wandb[launch]`.)required)client)
BatchV1Api)	CoreV1Api)CustomObjectsApi)V1Secret)ApiException    WANDB_LAUNCH_CODE_PVC_MOUNT_PATHWANDB_LAUNCH_CODE_PVC_NAMEc                   @   s   e Zd ZdZ		dddddded	ee d
ed ddfddZedefddZdee fddZ	de
fddZdefddZdddZdddZdS )KubernetesSubmittedRunz)Wrapper for a launched run on Kubernetes.defaultN	batch_apir(   core_apir)   name	namespacesecretr+   returnc                 C   s(   || _ || _|| _|| _d| _|| _dS )a,  Initialize a KubernetesSubmittedRun.

        Other implementations of the AbstractRun interface poll on the run
        when `get_status` is called, but KubernetesSubmittedRun uses
        Kubernetes watch streams to update the run status. One thread handles
        events from the job object and another thread handles events from the
        rank 0 pod. These threads updated the `_status` attributed of the
        KubernetesSubmittedRun object. When `get_status` is called, the
        `_status` attribute is returned.

        Arguments:
            batch_api: Kubernetes BatchV1Api object.
            core_api: Kubernetes CoreV1Api object.
            name: Name of the job.
            namespace: Kubernetes namespace.
            secret: Kubernetes secret.

        Returns:
            None.
        r   N)r2   r3   r4   r5   _fail_countr6   )selfr2   r3   r4   r5   r6    r:   f/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/wandb/sdk/launch/runner/kubernetes_runner.py__init__M   s   
zKubernetesSubmittedRun.__init__c                 C      | j S )zReturn the run id.r4   r9   r:   r:   r;   idp      zKubernetesSubmittedRun.idc              
      s   zF| j jd| j | jdI d H }dd |jD }|s(td| j  W d S | j j|d | jdI d H }|r=t|W S td|  W d S  t	yd } zt
t d	|  W Y d }~d S d }~ww )
Nz	job-name=label_selectorr5   c                 S      g | ]}|j jqS r:   metadatar4   .0pir:   r:   r;   
<listcomp>z       z3KubernetesSubmittedRun.get_logs.<locals>.<listcomp>z"Found no pods for kubernetes job: r   r4   r5   zNo logs for kubernetes pod(s): zFailed to get pod logs: )r3   list_namespaced_podr4   r5   itemswandbtermwarnread_namespaced_pod_logstr	Exception	termerrorr   )r9   pods	pod_nameslogser:   r:   r;   get_logsu   s*   

zKubernetesSubmittedRun.get_logsc                    sf   	 |   I dH }tt d| j d|j  |jdv rn	tdI dH  q|  I dH  |jdkS )zzWait for the run to finish.

        Returns:
            True if the run finished successfully, False otherwise.
        TNJob 	 status: finishedfailed	preemptedr-   r]   )	
get_statusrO   termlogr   r4   stateasynciosleep_delete_secretr9   statusr:   r:   r;   wait   s   
zKubernetesSubmittedRun.waitc                    s(   t | j}|dv r|  I d H  |S )N)stoppedr^   r]   r_   )r   r`   r4   re   rf   r:   r:   r;   r`      s
   z!KubernetesSubmittedRun.get_statusc                    sn   z| j j| j| jdI dH  |  I dH  W dS  ty6 } ztd| j d| j dt| |d}~ww )zCancel the run.)r5   r4   Nz Failed to delete Kubernetes Job  in namespace : )r2   delete_namespaced_jobr5   r4   re   r,   r   rR   r9   rX   r:   r:   r;   cancel   s   zKubernetesSubmittedRun.cancelc                    sH   t jds | jr"| jj| jjj| jjjdI d H  d | _d S d S d S )NWANDB_RELEASE_NAMErL   )	osenvirongetr6   r3   delete_namespaced_secretrF   r4   r5   r?   r:   r:   r;   re      s   
z%KubernetesSubmittedRun._delete_secret)r1   Nr7   N)__name__
__module____qualname____doc__rR   r   r<   propertyr@   rY   boolrh   r   r`   rn   re   r:   r:   r:   r;   r0   J   s0    
#
r0   c                   @   s   e Zd ZdZdededededededed	d
fddZed	efddZ	d	e
e fddZd	efddZdddZd	efddZd
S )CrdSubmittedRunz-Run submitted to a CRD backend, e.g. Volcano.groupversionpluralr4   r5   r3   
custom_apir7   Nc                 C   s4   || _ || _|| _|| _|| _|| _|| _d| _dS )a  Create a run object for tracking the progress of a CRD.

        Arguments:
            group: The API group of the CRD.
            version: The API version of the CRD.
            plural: The plural name of the CRD.
            name: The name of the CRD instance.
            namespace: The namespace of the CRD instance.
            core_api: The Kubernetes core API client.
            custom_api: The Kubernetes custom object API client.

        Raises:
            LaunchError: If the CRD instance does not exist.
        r   N)r|   r}   r~   r4   r5   r3   r   r8   )r9   r|   r}   r~   r4   r5   r3   r   r:   r:   r;   r<      s   
zCrdSubmittedRun.__init__c                 C   r=   )z"Get the name of the custom object.r>   r?   r:   r:   r;   r@      rA   zCrdSubmittedRun.idc              
      s   i }z-| j jd| j | jdI dH }dd |jD }|D ]}| j j|| jdI dH ||< qW n! tyQ } ztd| j dt	|  W Y d}~dS d}~ww |sVdS d	d | D }d

|S )zGet logs for custom object.zwandb/run-id=rB   Nc                 S   rD   r:   rE   rG   r:   r:   r;   rJ      rK   z,CrdSubmittedRun.get_logs.<locals>.<listcomp>rL   zFailed to get logs for rk   c                 S   s    g | ]\}}d | d| qS )zPod z:
r:   )rH   pod_namelogr:   r:   r;   rJ      s     
)r3   rM   r4   r5   rN   rQ   r,   rO   rP   rR   join)r9   rW   rU   rV   r   rX   logs_as_arrayr:   r:   r;   rY      s*   
zCrdSubmittedRun.get_logsc                    s   t | jS )zGet status of custom object.)r   r`   r4   r?   r:   r:   r;   r`      s   zCrdSubmittedRun.get_statusc                    sl   z| j j| j| j| j| j| jdI dH  W dS  ty5 } ztd| j d| j dt	| |d}~ww )zCancel the custom object.)r|   r}   r5   r~   r4   NzFailed to delete CRD rj   rk   )
r   delete_namespaced_custom_objectr|   r}   r5   r~   r4   r,   r   rR   rm   r:   r:   r;   rn      s"   zCrdSubmittedRun.cancelc                    sT   	 |   I dH }tt d| j d|  |jdv r!|jdkS tdI dH  q)z.Wait for this custom object to finish running.TNrZ   r[   r\   r]   r-   )r`   rO   ra   r   r4   rb   rc   rd   rf   r:   r:   r;   rh   
  s   

zCrdSubmittedRun.waitrt   )ru   rv   rw   rx   rR   r)   r*   r<   ry   r@   r   rY   r   r`   rn   rz   rh   r:   r:   r:   r;   r{      s2    	
!
r{   c                       s   e Zd ZdZdedeeef dede	ddf
 fdd	Z
d
eeef deeef defddZd
eeef dedededddeeeef ed f fddZdededee fddZ  ZS )KubernetesRunnerzLaunches runs onto kubernetes.apibackend_configenvironmentregistryr7   Nc                    s   t  || || _|| _dS )a+  Create a Kubernetes runner.

        Arguments:
            api: The API client object.
            backend_config: The backend configuration.
            environment: The environment to launch runs into.

        Raises:
            LaunchError: If the Kubernetes configuration is invalid.
        N)superr<   r   r   )r9   r   r   r   r   	__class__r:   r;   r<     s   
zKubernetesRunner.__init__resource_argscontextc                 C   sL   |r
|d  ddnd}| di  dp%| dp%| j di  dp%|S )zGet the namespace to launch into.

        Arguments:
            resource_args: The resource args to launch.
            context: The k8s config context.

        Returns:
            The namespace to launch into.
        r   r5   r1   rF   runner)rr   r   )r9   r   r   default_namespacer:   r:   r;   get_namespace,  s   zKubernetesRunner.get_namespacelaunch_project	image_urir5   r3   r)   r+   c                    s>  ddd}| | |di }ddd}| |di  |d	i }	d
di}
|
 |	di  |
di g}|di  |j|d t< d|d t< t rXt |d t	< |dslt
d|j d|j d|d< t|D ]%\}}d|vr|ddt| |d< d|vrdddgiddid|d< qp|jp| }|jr||d d< ntdd |D s|dusJ ||d d< t|| j|j|I dH }|durdd|j ig|
d < t|||j|jdu || jt| jj }d}|D ]y}|d!pg }| D ]g\}}|d"kr]|r]t s| jt r]tj d#}d$}|r(|d| 7 }n|d|j 7 }d%d& }t!t"t#j$d'd(t#j$d'd)t%d*t&|||||d+I dH }|'|d,|d-d.id/ q|'||d0 q||d!< q||
d< |
|	d< |	|d	< ||d< ||d< t(|td |j)rt*|| t rt(|t	t  ||fS )1a  Apply our default values, return job dict and api key secret.

        Arguments:
            resource_args (Dict[str, Any]): The resource args to launch.
            launch_project (LaunchProject): The launch project.
            builder (Optional[AbstractBuilder]): The builder.
            namespace (str): The namespace.
            core_api (CoreV1Api): The core api.

        Returns:
            Tuple[Dict[str, Any], Optional["V1Secret"]]: The resource args and api key secret.
        batch/v1Job)
apiVersionkindrF   r   <   )backoffLimitttlSecondsAfterFinishedspectemplaterestartPolicyNever
containerslabelstruer4   zlaunch--generateNamelaunchsecurityContextFdropALLtypeRuntimeDefault)allowPrivilegeEscalationcapabilitiesseccompProfileimagec                 S   s   g | ]}d |v qS )r   r:   )rH   contr:   r:   r;   rJ     rK   z5KubernetesRunner._inject_defaults.<locals>.<listcomp>Nregcred-imagePullSecretsenvWANDB_API_KEYro   zwandb-api-keyc                 S   s   t d|  d d S )N3Exception when ensuring Kubernetes API key secret: z. Retrying...)rO   rP   )rX   r:   r:   r;   handle_exception  s   
z;KubernetesRunner._inject_defaults.<locals>.handle_exceptionr"   )seconds)minutes)initial_sleep	max_sleepmax_retries)backofffnon_excr3   secret_namer5   api_keysecretKeyRefpassword)r4   key)r4   	valueFromr4   value)+updaterr   
setdefaultrun_idr   r   r
   initializedr4   r   r!   target_entitytarget_project	enumeraterR   override_entrypointget_job_entry_pointdocker_imageanymaybe_create_imagepull_secretr   inject_entrypoint_and_argsoverride_argsget_env_vars_dict_apir   r   ru   rN   r   r   rp   rq   r   r   datetime	timedeltaAPI_KEY_SECRET_MAX_RETRIESensure_api_key_secretappendadd_label_to_podsjob_base_imageapply_code_mount_configuration)r9   r   r   r   r5   r3   jobjob_metadatajob_specpod_templatepod_specr   ir   entry_pointr6   env_varsapi_key_secretr   r   r   release_namer   r   r:   r:   r;   _inject_defaultsD  s   









z!KubernetesRunner._inject_defaultsc           #         sF  t  I dH  ||di }|stt d td|  t	t
|I dH \}}|jdurLtdu s:tdu r>td| }|tjt| |dd}|dvri|| jt| jj }t|| |d	i |d	< |d	 d
i |d	 d
< d|d	 d
 t< t|td t rt|tt  t |d	 d
 t< |jrt || i }	|j!r|j!|	d< |j"r|j"j#|	d< t$||	 t%&|}
| '||}|(d^}}}|d|}|d|}|d|}|)  d}t*|||d}t j+||d z|
j,|||||dI dH }W n+ t-y3 } zt./|j0}t12|}td| d|j3 d|j4 d| |d}~ww |d	i d}td| d|d	 d   t5|||||t%6||
d}| j7t8 rg|9 I dH  |S t
j%:|}t
j%6|}| '||}| ;|||||I dH \}}d}d|v r|d|d  7 }t| zt
j<j=|||dI dH }W nR t
j<j>y } z-|j?D ]#}t./|j0}|d}|d }td!|j@ d"| d|j4 d#| W Y d}~nd}~w tAy } z
td$tB| dd}~ww |d% } | jCj}!t +| tD|||!||}"| j7t8 r!|"9 I dH  |"S )&a  Execute a launch project on Kubernetes.

        Arguments:
            launch_project: The launch project to execute.
            builder: The builder to use to build the image.

        Returns:
            The run object if the run was successful, otherwise None.
        N
kuberneteszyNote: no resource args specified. Add a Kubernetes yaml spec or other options in a json file with --resource-args <json>.z+Running Kubernetes job with resource args: a0  WANDB_LAUNCH_SOURCE_CODE_PVC_ environment variables not set. Unable to mount source code PVC into base image. Use the `codeMountPvcName` variable in the agent helm chart to enable base image jobs for this agent. See https://github.com/wandb/helm-charts/tree/main/charts/launch-agent for more information.r   r   )r   zbatch/v1beta1rF   r   r   argscommand/r|   r}   r   s)r|   r}   r~   )custom_resource)r|   r}   r5   r~   bodyzError creating CRD of kind rk    r   r4   zCreated )r4   r|   r}   r5   r~   r3   r   zCreating Kubernetes job)r5   messagecodez(Failed to create Kubernetes job for run z (z): z3Unexpected exception when creating Kubernetes job: r   )Er   ensure_initializedfill_macrosrr   rO   ra   r   _loggerinfor    r%   r   SOURCE_CODE_PVC_NAMESOURCE_CODE_PVC_MOUNT_PATHr   get_image_source_stringchange_project_dirrp   pathr   r   r   r   r   ru   add_wandb_envr   r   r
   r   r   r4   r   r   r   r   add_entrypoint_args_overridesr'   r*   r   splitlowerr   monitor_namespacecreate_namespaced_custom_objectr,   jsonloadsr   yamldumprg   reasonr{   r)   r   r   rh   r(   r   utilscreate_from_dictFailToCreateErrorapi_exceptionsr   rS   rR   rF   r0   )#r9   r   r   r   r   
api_clientcode_subdirapi_versionr   	overridesr   r5   r|   r}   _r   r~   r   responserX   r   	body_yamlr4   submitted_runr2   r3   r   r6   msgexcrespr   job_responsejob_namesubmitted_jobr:   r:   r;   run  s  






	







zKubernetesRunner.run)ru   rv   rw   rx   r	   r   rR   r   r   r   r<   r   r   r   r   r   r#   r  __classcell__r:   r:   r   r;   r     sR    





 )r   r   r   r   should_override_entrypointr7   c                 C   sJ   t t| D ]}|r|| | d< |r"| | dr|r"|j| | d< qdS )aP  Inject the entrypoint and args into the containers.

    Arguments:
        containers: The containers to inject the entrypoint and args into.
        entry_point: The entrypoint to inject.
        override_args: The args to inject.
        should_override_entrypoint: Whether to override the entrypoint.

    Returns:
        None
    r   r   N)rangelenrr   r   )r   r   r   r  r   r:   r:   r;   r     s   r   r3   r)   r   r5   r   r+   c           	   
      s2  dt |  i}ddi}tj|tj|||dddd}zcz| ||I dH W W S  ty } zK|j	d	kr}| j
||d
I dH }|j|krt|jjddkrj| j||d
I dH  | ||I dH W  Y d}~W S td| d| |W  Y d}~W S  d}~ww  ty } z
tdt| dd}~ww )a<  Create a secret containing a user's wandb API key.

    Arguments:
        core_api: The Kubernetes CoreV1Api object.
        secret_name: The name to use for the secret.
        namespace: The namespace to create the secret in.
        api_key: The user's wandb API key

    Returns:
        The created secret
    r   zwandb.ai/created-byzlaunch-agent)r4   r5   r   Secretzkubernetes.io/basic-authdatarF   r   r   N  rL   z.Kubernetes secret already exists in namespace z with incorrect data: r   r   )base64	b64encodeencodedecoder'   r+   V1ObjectMetacreate_namespaced_secretr,   rg   read_namespaced_secretr   rF   r   rr   rs   r   rS   rR   )	r3   r   r5   r   secret_datar   r6   rX   existing_secretr:   r:   r;   r     sT   	

r   r   r   c           
   
      s,  d}t |tst |trdS | I dH \}}d|jt| d|   ddii}dtt	
|  i}tj|tjd| |dd	d
d}z2z| ||I dH W W S  ty }	 z|	jdkrz| jd| |dI dH W  Y d}	~	W S  d}	~	ww  ty }	 z
tdt|	 dd}	~	ww )aI  Create a secret for pulling images from a private registry.

    Arguments:
        core_api: The Kubernetes CoreV1Api object.
        registry: The registry to pull from.
        run_id: The run id.
        namespace: The namespace to create the secret in.

    Returns:
        A secret if one was created, otherwise None.
    Nauths:zdeprecated@wandblaunch.com)authemailz.dockerconfigjsonr   rL   r  zkubernetes.io/dockerconfigjsonr  r!  z+Exception when creating Kubernetes secret: r   )
isinstancer   r   get_username_passwordurir"  r#  r$  r%  r  dumpsr'   r+   r&  r'  r,   rg   r(  rS   r   rR   )
r3   r   r   r5   r6   unametoken
creds_infor)  rX   r:   r:   r;   r     sJ   


r   rootc                 c   s    t | tr.|  D ]!\}}|dkrt |tr|E dH  q
t |ttfr+t|E dH  q
dS t | tr?| D ]}t|E dH  q5dS dS )zYield all container specs in a manifest.

    Recursively traverses the manifest and yields all container specs. Container
    specs are identified by the presence of a "containers" key in the value.
    r   N)r/  dictrN   listyield_containers)r6  kvitemr:   r:   r;   r9  ,  s    



r9  r   c                 C   sP   t | D ]!}|dg }|dd | D  ||d< d|v r%|d qdS )a  Injects wandb environment variables into specs.

    Recursively walks the spec and injects the environment variables into
    every container spec. Containers are identified by the "containers" key.

    This function treats the WANDB_RUN_ID and WANDB_GROUP_ID environment variables
    specially. If they are present in the spec, they will be overwritten. If a setting
    for WANDB_RUN_ID is provided in env_vars, then that environment variable will only be
    set in the first container modified by this function.

    Arguments:
        root: The spec to modify.
        env_vars: The environment variables to inject.

    Returns: None.
    r   c                 S   s   g | ]	\}}||d qS )r   r:   )rH   r   r   r:   r:   r;   rJ   Q  s    z!add_wandb_env.<locals>.<listcomp>WANDB_RUN_IDN)r9  r   extendrN   pop)r6  r   r   r   r:   r:   r;   r   >  s   
r   manifestc                 c   s~    t | tr| D ]	}t|E dH  qdS t | tr;d| v r&d| d v r&| V  |  D ]}t |ttfr:t|E dH  q*dS dS )zYield all pod specs in a manifest.

    Recursively traverses the manifest and yields all pod specs. Pod specs are
    identified by the presence of a "spec" key with a "containers" key in the
    value.
    Nr   r   )r/  r8  
yield_podsr7  values)r@  r<  r   r:   r:   r;   rA  X  s   

rA  	label_keylabel_valuec                 C   s2   t | D ]}|di }|di }|||< qdS )a  Add a label to all pod specs in a manifest.

    Recursively traverses the manifest and adds the label to all pod specs.
    Pod specs are identified by the presence of a "spec" key with a "containers"
    key in the value.

    Arguments:
        manifest: The manifest to modify.
        label_key: The label key to add.
        label_value: The label value to add.

    Returns: None.
    rF   r   N)rA  r   )r@  rC  rD  podrF   r   r:   r:   r;   r   j  s
   
r   r  c                 C   s   t | tr| D ]}t|| qdS t | trKd| v r?d| d v r?| d d }|D ]}d|v r4|d |d< d|v r>|d |d< q(|  D ]	}t|| qCdS dS )a  Add entrypoint and args overrides to all containers in a manifest.

    Recursively traverses the manifest and adds the entrypoint and args overrides
    to all containers. Containers are identified by the presence of a "spec" key
    with a "containers" key in the value.

    Arguments:
        manifest: The manifest to modify.
        overrides: Dictionary with args and entrypoint keys.

    Returns: None.
    r   r   r   r   N)r/  r8  r   r7  rB  )r@  r  r<  r   	containerr   r:   r:   r;   r     s"   

r   projectc                 C   s   t dusJ | }t| D ]8}t|D ]}d|vrg |d< |d dt|d t|d< q|d }d|vr:g |d< |d ddt id	 qdS )
a  Apply code mount configuration to all containers in a manifest.

    Recursively traverses the manifest and adds the code mount configuration to
    all containers. Containers are identified by the presence of a "spec" key
    with a "containers" key in the value.

    Arguments:
        manifest: The manifest to modify.
        project: The launch project.

    Returns: None.
    NvolumeMountszwandb-source-code-volume)r4   	mountPathsubPath
workingDirr   volumes	claimName)r4   persistentVolumeClaim)r   r   rA  r9  r   r   )r@  rG  
source_dirrE  rF  r   r:   r:   r;   r     s.   
r   )^rx   rc   r"  r   r  loggingrp   typingr   r   r   r   r   r   r   r  rO   wandb.apis.internalr	   wandb.sdk.launch.agent.agentr
   %wandb.sdk.launch.environment.abstractr   "wandb.sdk.launch.registry.abstractr   2wandb.sdk.launch.registry.azure_container_registryr   (wandb.sdk.launch.registry.local_registryr    wandb.sdk.launch.runner.abstractr   *wandb.sdk.launch.runner.kubernetes_monitorr   r   r   r   r   wandb.sdk.lib.retryr   r   
wandb.utilr   _project_specr   r   errorsr   r  r   r   r   r   r    r!   abstractr#   r$   r%   r'   *kubernetes_asyncio.client.api.batch_v1_apir(   )kubernetes_asyncio.client.api.core_v1_apir)   0kubernetes_asyncio.client.api.custom_objects_apir*   *kubernetes_asyncio.client.models.v1_secretr+   kubernetes_asyncio.client.restr,   TIMEOUTr   	getLoggerru   r   rq   rr   r   r   r0   r{   r   r7  rR   rz   r   r   r   r9  r8  r   rA  r   r   r   r:   r:   r:   r;   <module>   s    $ 
o[   

=
8&


