o
    bi2'                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZmZmZmZ d dlmZ d dlmZ d dlmZmZmZmZ d dlmZmZmZ d dlmZ zd dlZd d	lm Z m!Z! W n e"yu   dZdZ dZ!Y nw e#e$Z%d
Z&dZ'dedee( fddZ)de
e(ef de
e(ef fddZ*de(de(fddZ+de(de	eee(   fddZ,de dedeee!f fddZ-		d'dedee( dee. dee
e(ef e
e(ef f fdd Z/ded!ede(dee fd"d#Z0ded!ed$ee( de
e(ef fd%d&Z1dS )(    N)	dataclass)AnyAsyncIteratorDictListOptionalTupleUnion)ray_constants)	GcsClient)JOB_ID_METADATA_KEYJobInfoStorageClient	JobStatusvalidate_request_type)
DriverInfo
JobDetailsJobType)
RuntimeEnv)RequestResponse
   i N  
gcs_clientreturnc                    s0   | j tjtjddI dH }|du rdS | S )z%Fetches Head node id persisted in GCS   )	namespacetimeoutN)async_internal_kv_getr
   KV_HEAD_NODE_ID_KEYKV_NAMESPACE_JOBdecode)r   head_node_id_hex_bytes r!   S/home/ubuntu/.local/lib/python3.10/site-packages/ray/dashboard/modules/job/utils.pyget_head_node_id%   s   r#   dc                 C   s   dd |   D S )z-Strip keys with value None from a dictionary.c                 S   s   i | ]\}}|d ur||qS Nr!   ).0kvr!   r!   r"   
<dictcomp>3       z.strip_keys_with_value_none.<locals>.<dictcomp>)items)r$   r!   r!   r"   strip_keys_with_value_none1   s   r,   urlc                 C   s4   t d| }t|dkr| d|d  dd} | S )zRedact any passwords in a URL.zhttps?:\/\/.*:(.*)@.*r   :@z:<redacted>@)refindalllenreplace)r-   secretr!   r!   r"   redact_url_password6   s   r5   pathc                 C  s   t | tstdt|  dtj| s(td|  d dV  tj| rd}t	| d<}g }d}d}	 t
|d
ksE|tksE||krN|pHdV  g }d}| }||krb|| |t
|7 }ntdI dH  q71 snw   Y  dS )aw  Yield lines from a file as it's written.

    Returns lines in batches of up to 10 lines or 20000 characters,
    whichever comes first. If it's a chunk of 20000 characters, then
    the last line that is yielded could be an incomplete line.
    New line characters are kept in the line string.

    Returns None until the file exists or if no new line has been written.
    zpath must be a string, got .zPath z doesn't exist yet.N rr   Tr      )
isinstancestr	TypeErrortypeosr6   existsloggerdebugopenr2   MAX_CHUNK_CHAR_LENGTHreadlineappendasynciosleep)r6   EOFflineschunk_char_count	curr_liner!   r!   r"   file_tail_iterator?   s4   



rN   reqrequest_typec              
      st   ddl }t|  I dH }zt||W S  ty9 } ztd|  tt	 |j
jjdW  Y d}~S d}~ww )a  Parse request and cast to request type.

    Remove keys with value None to allow newer client versions with new optional fields
    to work with older servers.

    If parsing failed, return a Response object with status 400 and stacktrace instead.

    Args:
        req: aiohttp request object.
        request_type: dataclass type to cast request to.

    Returns:
        Parsed request object or Response object with status 400 and stacktrace.
    r   NzGot invalid request type: )textstatus)aiohttpr,   jsonr   	ExceptionrA   infor   	traceback
format_excwebHTTPBadRequeststatus_code)rO   rP   rS   	json_dataer!   r!   r"   parse_and_validate_requestx   s   r^   job_or_submission_idr   c                    s   | j |dd|dI dH }t| dd d}i }i }|D ]^}|jjtjr(q|j	 }t
|jj}	|	t}
|
skt||jjt|jd}t|tj|jrPtjntj|j|j|j|	t|jjj ! |d	}|||< qt||jjt|jd}|||
< q||fS )	a  Returns a tuple of dictionaries related to drivers.

    The first dictionary contains all driver jobs and is keyed by the job's id.
    The second dictionary contains drivers that belong to submission jobs.
    It's keyed by the submission job's submission id.
    Only the last driver of a submission job is returned.

    An optional job_or_submission_id filter can be provided to only return
    jobs with the job id or submission id.
    T)r_   skip_submission_job_info_fieldskip_is_running_tasks_fieldr   Nc                 S   s
   | j  S r%   )job_idhex)job_table_entryr!   r!   r"   <lambda>   s   
 z!get_driver_jobs.<locals>.<lambda>)key)idnode_ip_addresspid)	rb   r>   rR   
entrypoint
start_timeend_timemetadataruntime_envdriver_info)"async_get_all_job_infosortedvaluesconfigray_namespace
startswithr
   RAY_INTERNAL_NAMESPACE_PREFIXrb   rc   dictrm   getr   r   driver_address
ip_addressr<   
driver_pidr   r   DRIVERis_deadr   	SUCCEEDEDRUNNINGrj   rk   rl   r   deserializeruntime_env_infoserialized_runtime_envto_dict)r   r_   r   	job_infossorted_job_infosjobssubmission_job_driversrd   rb   rm   job_submission_iddriverjobr!   r!   r"   get_driver_jobs   sd   



r   job_info_clientc           	         s   t |  dI dH \}}| }|r|S t fdd| D d}|s' }||I dH }|rO||}tdi t|||rD|jnd|t	j
d}|S dS )zH
    Attempts to find the job with a given submission_id or job id.
    r_   Nc                 3   s"    | ]\}}|j  kr|V  qd S r%   rg   )r&   rg   r   r   r!   r"   	<genexpr>   s    
z"find_job_by_ids.<locals>.<genexpr>submission_idrb   ro   r>   r!   )r   rx   nextr+   get_infor   dataclassesasdictrg   r   
SUBMISSION)	r   r   r_   driver_jobsr   r   r   job_infor   r!   r   r"   find_job_by_ids   s8   	

	

r   job_idsc                    s   t | I dH \} fdd| D } fdd D  }tjfdd|D  I dH }i |fddt||D S )z
    Returns a dictionary of submission jobs with the given job ids, keyed by the job id.

    This only accepts job ids and not submission ids.
    Nc                    s   i | ]\}}| v r||qS r!   r!   r&   rf   r   r   r!   r"   r)     r*   z(find_jobs_by_job_ids.<locals>.<dictcomp>c                    s    i | ]\}}|j  v r||qS r!   r   r   r   r!   r"   r)     s    c                    s   g | ]}  |qS r!   )r   )r&   r   )r   r!   r"   
<listcomp>  s    z(find_jobs_by_job_ids.<locals>.<listcomp>c                    sL   i | ]"\}}  |jtdi t||  |j  |tjd qS )r   r!   )rx   rg   r   r   r   r   r   )r&   r   r   )r   r!   r"   r)   '  s    
)r   r+   keysrG   gatherzip)r   r   r   r   job_submission_idsr   r!   )r   r   r   r"   find_jobs_by_job_ids
  s&   




r   )NN)2rG   r   loggingr?   r0   rW   r   typingr   r   r   r   r   r   r	   ray._privater
   ray._rayletr    ray.dashboard.modules.job.commonr   r   r   r   )ray.dashboard.modules.job.pydantic_modelsr   r   r   ray.runtime_envr   rS   aiohttp.webr   r   rU   	getLogger__name__rA   MAX_CHUNK_LINE_LENGTHrD   r<   r#   r,   r5   rN   r^   intr   r   r   r!   r!   r!   r"   <module>   s    $
"	9

 
G
-
