o
    ciu                     @   sh  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlZd dlZd dlm  m  mZ d dlm  m  mZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z& d d	l'm(Z( d d
l)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z>m?Z? d dl@mAZA eBeCZDe(ddZEdd ZFG dd dejGZHdd ZIdeJfddZKdejLdeeeJef  fd d!ZMd1d"d#ZNd$d% ZOd2d'd(ZPd1d)d*ZQd+eeJ dee& fd,d-ZRd.d/ ZSeCd0kr2eS  dS dS )3    N)defaultdict)futures)AnyCallableDictListOptionalSetUnion)cloudpickle)ray_constants)disable_client_hook)	GcsClient)env_integer)setup_logger)%canonicalize_bootstrap_address_or_die)add_port_to_grpc_server	JobConfig)CLIENT_SERVER_MAX_THREADSGRPC_OPTIONSOBJECT_TRANSFER_CHUNK_SIZEClientServerHandleResponseCache)DataServicer)LogstreamServicer)serve_proxier)dumps_from_serverloads_from_client)current_serverTIMEOUT_FOR_SPECIFIC_SERVER_S   c                    s   t   fdd}|S )z
    Decorator for gRPC stubs. Before calling the real stubs, checks if there's
    an existing entry in the caches. If there is, then return the cached
    entry. Otherwise, call the real function and use the real cache
    c              
      s  t |  d}t fdd|D r| ||S  d } d }t d }| j| }|||}|d urMt|trK|t	j
j |t| ||S z| ||}	W n" tyw }
 z||||
 |t	j
j |t|
  d }
~
ww ||||	 |	S )N)	client_id	thread_idreq_idc                 3   s    | ]}| vV  qd S N ).0imetadatar&   Q/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/client/server/server.py	<genexpr><   s    z7_use_response_cache.<locals>.wrapper.<locals>.<genexpr>r"   r#   r$   )dictinvocation_metadataanyintresponse_cachescheck_cache
isinstance	Exceptionset_codegrpc
StatusCodeFAILED_PRECONDITIONset_detailsstrupdate_cache)selfrequestcontextexpected_idsr"   r#   r$   response_cachecached_entryrespefuncr)   r+   wrapper8   s4   

z$_use_response_cache.<locals>.wrapper)	functoolswraps)rE   rF   r&   rD   r+   _use_response_cache1   s   $rI   c                   @   s  e Zd ZdefddZ	dLdejdejfddZe	dLdej
fd	d
ZdLdejfddZe	dLdejfddZdLdejfddZdLdejfddZ	dLdejfddZdLdejfddZdLdefddZdededefddZdd Zdd  Zd!d" Zd#d$ Z e	dLd%d&Z!	dLdej"ded'e#d(e$j%de&ej' f
d)d*Z(dej"fd+d,Z)dej"defd-d.Z*	dLdej+dej,fd/d0Z-	dLd1e.ee/f d2eded3efd4d5Z0dLdej1fd6d7Z2	dLd8ej3d9e4e5 d:e6ee5f dej7fd;d<Z8	dLd8ej3d9e4e5 d:e6ee5f dej7fd=d>Z9	dLd8ej3d9e4e5 d:e6ee5f dej7fd?d@Z:	dLd8ej3d9e4e5 d:e6ee5f dej7fdAdBZ;	dLd8ej3dej7fdCdDZ<dededEe&e6 de=j>j?fdFdGZ@dededEe&e6 fdHdIZAdJdK ZBdS )MRayletServicerray_connect_handlerc                 C   sV   t t| _t t| _i | _i | _t t| _i | _t | _	t
 | _|| _t t| _dS )zuConstruct a raylet service

        Args:
           ray_connect_handler: Function to connect to ray cluster
        N)r   r-   object_refsclient_side_ref_mapfunction_refs
actor_refssetactor_ownersregistered_actor_classesnamed_actors	threadingLock
state_lockrK   r   r1   )r<   rK   r&   r&   r+   __init__a   s   



zRayletServicer.__init__Nr=   returnc           
      C   s|  |j rt|j }d|_nd }d }t S t r$tjjj	}|j
 }n<t|jp*d}z| j|fi | W n( ty_ } ztd tjdd| dW  Y d }~W  d    S d }~ww W d    n1 sjw   Y  |d u rytjddS | }|rt|jjj}||jjj t|jjj}	|	|jjj ||	krt|dkrtjdd	|jj d
|jj dS tjddS )NTz{}zRunning Ray Init failed:Fz0Call to `ray.init()` on the server failed with: )okmsgrY   r   z.Runtime environment doesn't match request one z current one )
job_configpickleloads_client_jobr   rayis_initialized_privateworkerglobal_workercore_workerget_job_configjsonray_init_kwargsrK   r4   logger	exceptionray_client_pb2InitResponse_get_proto_job_configrP   runtime_env_infourisworking_dir_uriupdatepy_modules_urislen)
r<   r=   r>   r\   current_job_configrc   extra_kwargsrC   job_uriscurrent_job_urisr&   r&   r+   Initt   sZ   

zRayletServicer.Initc              
   C   s   z$t   tjjj|j|j|j|jd}W d    n1 sw   Y  W n t	y= } zt
|| d}W Y d }~nd }~ww tj|dS )N)	overwrite	namespaceF)already_exists)r   r`   experimentalinternal_kv_internal_kv_putkeyvaluery   rz   r4   return_exception_in_contextrk   KVPutResponse)r<   r=   r>   r{   rC   r&   r&   r+   KVPut   s    
zRayletServicer.KVPutc              
   C      z t   tjjj|j|jd}W d    n1 sw   Y  W n ty9 } zt|| d}W Y d }~nd }~ww t	j
|dS )Nrz       )r   )r   r`   r|   r}   _internal_kv_getr   rz   r4   r   rk   KVGetResponse)r<   r=   r>   r   rC   r&   r&   r+   KVGet      
zRayletServicer.KVGetc              
   C   s   z"t   tjjj|j|j|jd}W d    n1 sw   Y  W n ty; } zt	|| d}W Y d }~nd }~ww t
j|dS )N)del_by_prefixrz   r   )deleted_num)r   r`   r|   r}   _internal_kv_delr   r   rz   r4   r   rk   KVDelResponse)r<   r=   r>   r   rC   r&   r&   r+   KVDel   s   
zRayletServicer.KVDelc              
   C   s   z t   tjjj|j|jd}W d    n1 sw   Y  W n ty9 } zt|| g }W Y d }~nd }~ww t	j
|dS )Nr   )keys)r   r`   r|   r}   _internal_kv_listprefixrz   r4   r   rk   KVListResponse)r<   r=   r>   r   rC   r&   r&   r+   KVList   r   zRayletServicer.KVListc              
   C   r   )Nr   F)exists)r   r`   r|   r}   _internal_kv_existsr   rz   r4   r   rk   KVExistsResponse)r<   r=   r>   r   rC   r&   r&   r+   KVExists   r   zRayletServicer.KVExistsc                 C   sH   t   tjj|jd}W d    n1 sw   Y  tjt|dS )N)all_namespaces)actors_json)	r   r`   utillist_named_actorsr   rk   ClientListNamedActorsResponserg   dumps)r<   r=   r>   actorsr&   r&   r+   ListNamedActors   s   zRayletServicer.ListNamedActorsc                 C   s  t  }|j|_|jt jjkr<t  t }W d    n1 s!w   Y  dd | D }|j	
t jj|d |S |jt jjkrpt  t }W d    n1 sUw   Y  dd | D }|j	
t jj|d |S |jt jjkrt j }t 1 t }tjj| |_tjj| |_|j|_|j|_|j|_| |_W d    n1 sw   Y  |j
| |S t  | |||_ W d    |S 1 sw   Y  |S )Nc                 S      i | ]	\}}|t |qS r&   floatr'   kvr&   r&   r+   
<dictcomp>       z.RayletServicer.ClusterInfo.<locals>.<dictcomp>)tablec                 S   r   r&   r   r   r&   r&   r+   r     r   )!rk   ClusterInfoResponsetypeClusterInfoTypeCLUSTER_RESOURCESr   r`   cluster_resourcesitemsresource_tableCopyFromResourceTableAVAILABLE_RESOURCESavailable_resourcesRUNTIME_CONTEXTRuntimeContextget_runtime_context_commonutilshex_to_binary
get_job_idjob_idget_node_idnode_idrz   -should_capture_child_tasks_in_placement_groupcapture_client_tasksgcs_addressget_runtime_env_stringruntime_envruntime_context_return_debug_cluster_inforg   )r<   r=   r>   rB   	resourcesfloat_resourcesctxrtcr&   r&   r+   ClusterInfo   sP   




zRayletServicer.ClusterInfoc                 C   s   d}|j tjjkrt }n6|j tjjkrt }n*|j tjjkr&t	 }n|j tjj
kr0i }n|j tjjkr@dtjj i}ntdt|S )z9Handle ClusterInfo requests that only return a json blob.Ndashboard_urlzUnsupported cluster info type)r   rk   r   NODESr`   nodesIS_INITIALIZEDra   TIMELINEtimelinePINGDASHBOARD_URLrb   rc   get_dashboard_url	TypeErrorrg   r   )r<   r=   r>   datar&   r&   r+   r     s   



z)RayletServicer._return_debug_cluster_infor"   idc                 C   s   | j o || jv r,|| j| v r,td|  d|  | j| |= 	 W d    dS || jv ri|| j| v ritd|  d|  | j| | | |r`td|   | j|= 	 W d    dS 	 W d    dS 1 suw   Y  d S )NzReleasing object z for TzReleasing actor Deleting reference to actor F)	rV   rL   ri   debughexrQ   remove_can_remove_actor_refrO   )r<   r"   r   r&   r&   r+   release(  s$   


$zRayletServicer.releasec                 C   sF   | j  | | | | W d    n1 sw   Y  t  d S r%   )rV   _release_objects_release_actorsgccollect)r<   r"   r&   r&   r+   release_all;  s
   
zRayletServicer.release_allc                    s,   t  fdd| j D  }|o | jvS )Nc                 3   s    | ]} |v V  qd S r%   r&   )r'   
actor_listactor_id_bytesr&   r+   r,   F  s    
z7RayletServicer._can_remove_actor_ref.<locals>.<genexpr>)r/   rQ   valuesrS   )r<   r   no_ownerr&   r   r+   r   E  s   
z$RayletServicer._can_remove_actor_refc                 C   sr   || j vrtd|  d S t| j | }| j |= || jv r#| j|= || jv r,| j|= td| d|  d S )Nz%Releasing client with no references: Released all z objects for client )rL   ri   r   rs   rM   r1   )r<   r"   countr&   r&   r+   r   K  s   


zRayletServicer._release_objectsc                 C   s   || j vrtd|  d S d}| j |}|D ]}|d7 }| |r2td|   | j|= qtd| d|  d S )Nz!Releasing client with no actors: r      r   r   z actors for client: )rQ   ri   r   popr   r   rO   )r<   r"   r   actors_to_removeid_bytesr&   r&   r+   r   W  s   

zRayletServicer._release_actorsc              
   C   s0  | ddkrJz+| j|j |jj }t  tj||jj|jj	d W d    n1 s,w   Y  W n_ t
yI } zt|| W Y d }~nMd }~ww | ddkrz%| j|jj }t  tj||jjd W d    n1 spw   Y  W n t
y } zt|| W Y d }~n	d }~ww tdtjddS )	Nterminate_typetask_object)force	recursiveactor)
no_restartzEClient requested termination without providing a valid terminate_typeTr[   )
WhichOneofrL   r"   r   r   r   r`   cancelr   r   r4   r   rO   r   killr   RuntimeErrorrk   TerminateResponse)r<   reqr>   
object_refrC   	actor_refr&   r&   r+   	Terminatef  s<   zRayletServicer.Terminater$   result_queuec           
   
      s  t jdkrtd jd }j  |d}|s0tjdttd| d  dS z4t	
d	|  t  d
tddf fdd}|| 	 W d   W dS 1 s]w   Y  W dS  ty }	 ztjdt|	dW  Y d}	~	S d}	~	ww )zAttempts to schedule a callback to push the GetResponse to the
        main loop when the desired object is ready. If there is some failure
        in scheduling, a GetResponse will be immediately returned.
        r   z3Async get() must have exactly 1 Object ID. Actual: r   NFzClientObjectRef with id z not found for client validerrorzasync get: %sresultrX   c              
      s   zLt |  }t|}|dksJ dt|t }tj|D ])}|t }t||d t }tj	d||| |||d}tj
|d}| q W d	S  tyu }	 ztj	dt|	d}tj
|d}
|
 W Y d	}	~	d	S d	}	~	ww )
zPushes GetResponses to the main DataPath loop to send
                    to the client. This is called when the object is ready
                    on the server side.r   &Serialized object cannot be zero bytesr   Tr   r   chunk_idtotal_chunks
total_size)getr$   Fr   N)r   rs   mathceilr   rangestart_chunk_idminrk   GetResponseDataResponseputr4   r   r   )r   
serializedr  r  r  startendget_resp
chunk_respexcrB   r"   r$   r=   r   r<   r&   r+   send_get_response  s@   

z;RayletServicer._async_get_object.<locals>.send_get_response)rs   ids
ValueErrorrL   r  rk   r  r   r   ri   r   r   r   _on_completedr4   )
r<   r=   r"   r$   r   r>   ridrefr  rC   r&   r  r+   _async_get_object  s8   
	
"(' z RayletServicer._async_get_objectc                 c   sT    t | }|d}|d u rtjdttddV  d S | ||E d H  d S )Nr"   Fz.client_id is not specified in request metadatar   )	r-   r.   r  rk   r  r   r   r  _get_object)r<   r=   r>   r*   r"   r&   r&   r+   	GetObject  s   
zRayletServicer.GetObjectc              
   c   sd   g }|j D ](}| j| |d }|r|| qtjdttd| d| dV   d S z$t	
d|  t  tj||jd}W d    n1 sMw   Y  W n tyq } ztjdt|dV  W Y d }~d S d }~ww t||| }t|}	|	dksJ dt|	t }
t|j|
D ]}|t }t|	|d	 t }tjd
||| ||
|	dV  qd S )NFzClientObjectRef z is not found for client r   zget: %s)timeoutr   r  r   Tr  )r  rL   r  appendrk   r  r   r   r  ri   r   r   r`   r  r4   r   rs   r  r  r   r	  r
  r  )r<   r=   r"   
objectrefsr  r  r   rC   r  r  r  r  r  r  r&   r&   r+   r    sV   
	

zRayletServicer._get_objectc                 C   s   |  |j|jd|j|S )z#gRPC entrypoint for unary PutObject )_put_objectr   client_ref_idowner_id)r<   r=   r>   r&   r&   r+   	PutObject  s   zRayletServicer.PutObjectr   r$  r%  c           
   
   C   s   z+t || }|r| j| }nd}t  tj||d}W d   n1 s%w   Y  W n" tyM }	 ztd tj	ddt
|	dW  Y d}	~	S d}	~	ww || j| | < t|dkrf| | j| |< td|  tj	| d	d
S )a  Put an object in the cluster with ray.put() via gRPC.

        Args:
            data: Pickled data. Can either be bytearray if this is called
              from the dataservicer, or bytes if called from PutObject.
            client_ref_id: The id associated with this object on the client.
            client_id: The client who owns this data, for tracking when to
              delete this reference.
            owner_id: The owner id of the object.
            context: gRPC context.
        N)_ownerzPut failed:r   F)r   r   r   r   zput: %sT)r   r   )r   rO   r   r`   r  r4   ri   rj   rk   PutResponser   r   rL   binaryrs   rM   r   )
r<   r   r$  r"   r%  r>   objowner	objectrefrC   r&   r&   r+   r#    s*   

zRayletServicer._put_objectc              
   C   s$  g }|j D ]}|| j|j vrtdt| || j|j |  q|j}|j}z%t  t	j
|||dkr8|nd d\}}W d    n1 sHw   Y  W n  tyn }	 ztd|	  tjddW  Y d }	~	S d }	~	ww tdt|t|f  dd	 |D }
d
d	 |D }tjd|
|dS )Nz4Asking for a ref not associated with this client: %s)num_returnsr  z
Exception F)r   zwait: %s %sc                 S      g | ]}|  qS r&   r)  )r'   ready_object_refr&   r&   r+   
<listcomp>E  s    z-RayletServicer.WaitObject.<locals>.<listcomp>c                 S   r/  r&   r0  )r'   remaining_object_refr&   r&   r+   r2  H  s    T)r   ready_object_idsremaining_object_ids)
object_idsrL   r"   r4   r:   r   r.  r  r   r`   waitri   r   rk   WaitResponser   )r<   r=   r>   rL   r  r.  r  ready_object_refsremaining_object_refsrC   r4  r5  r&   r&   r+   
WaitObject-  sJ   

zRayletServicer.WaitObjecttaskarglistkwargsc              
   C   s4  t d|jtjj|jf  zft X |jtjj	kr%| 
||||}n:|jtjjkr5| ||||}n*|jtjjkrE| ||||}n|jtjjkrS| ||}ntdtjj|j d|_|W  d    W S 1 sow   Y  W d S  ty } zt jddd tjdt|dW  Y d }~S d }~ww )Nzschedule: %s %sz$Unimplemented Schedule task type: %sTzCaught schedule exception)exc_infoFr   )ri   r   namerk   
ClientTaskRemoteExecTypeNamer   r   FUNCTION_schedule_functionACTOR_schedule_actorMETHOD_schedule_methodNAMED_ACTOR_schedule_named_actorNotImplementedErrorr   r4   ClientTaskTicketr   r   )r<   r<  r=  r>  r>   r   rC   r&   r&   r+   ScheduleR  s<   (
zRayletServicer.Schedulec           
      C   sv   | j |j}|d u rtdt||j}t|j}|d ur&|jdi |}|j|i |}| 	||j
}	tj|	dS )Nz7Can't run an actor the server doesn't have a handle for
return_idsr&   )rO   r  
payload_idr4   getattrr@  decode_optionsoptionsremoteunify_and_track_outputsr"   rk   rM  )
r<   r<  r=  r>  r>   actor_handlemethodoptsoutputr  r&   r&   r+   rI  t  s   
zRayletServicer._schedule_methodc                 C   s   |  |j|jt|j}t|j}|d ur|jdi |}t|  |j|i |}W d    n1 s4w   Y  || j|j	
 < | j|j |j	
  tj|j	
 gdS NrO  r&   )lookup_or_register_actorrQ  r"   rS  baseline_optionsrT  r   rU  rO   	_actor_idr)  rQ   addrk   rM  )r<   r<  r=  r>  r>   remote_classrY  r   r&   r&   r+   rG    s   

zRayletServicer._schedule_actorc           	      C   s   |  |j|jt|j}t|j}|d ur|jdi |}t|  |j|i |}W d    n1 s4w   Y  | ||j}t	j
|dS r[  )lookup_or_register_funcrQ  r"   rS  r]  rT  r   rU  rV  rk   rM  )	r<   r<  r=  r>  r>   remote_funcrY  rZ  r  r&   r&   r+   rE    s   

z!RayletServicer._schedule_functionc                 C   sv   t |jdks	J t|j|jpd }|j }|| jvr"|| j|< | j	|j
 | | j| tj|j gdS )Nr   rO  )rs   rQ  r`   	get_actorr@  rz   r^  r)  rO   rQ   r"   r_  rS   rk   rM  )r<   r<  r>   r   bin_actor_idr&   r&   r+   rK    s   


z$RayletServicer._schedule_named_actorrT  c                 C   s   t  B || jvr>| j| | }t|}t|std|d u s(t|dkr1t	|| j|< ntj	di ||| j|< W d    n1 sHw   Y  | j| S )Nz6Attempting to register function that isn't a function.r   r&   )
r   rN   rL   r`   r  inspect
isfunctionr4   rs   rU  )r<   r   r"   rT  funcrefrE   r&   r&   r+   ra    s   



z&RayletServicer.lookup_or_register_funcc                 C   s   t  A || jvr=| j| | }t|}t|std|d u s(t|dkr.t	|}n
tj	di ||}|| j|< W d    n1 sGw   Y  | j| S )Nz0Attempting to schedule actor that isn't a class.r   r&   )
r   rR   rL   r`   r  re  isclassr4   rs   rU  )r<   r   r"   rT  actor_class_refactor_class	reg_classr&   r&   r+   r\    s   




z'RayletServicer.lookup_or_register_actorc                 C   sp   |d u rg }nt |tr|}n|g}|D ]}| | j| v r'td|  || j| | < qdd |D S )NzAlready saw object_ref c                 S   r/  r&   r0  )r'   outr&   r&   r+   r2    s    z:RayletServicer.unify_and_track_outputs.<locals>.<listcomp>)r3   listr)  rL   ri   warning)r<   rZ  r"   outputsrl  r&   r&   r+   rV    s   
z&RayletServicer.unify_and_track_outputsr%   )C__name__
__module____qualname__r   rW   rk   InitRequestrl   rx   rI   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r:   r   bytesboolr   r   r   r   r   r   
GetRequestr0   queueQueuer   r  r  r  r  
PutRequestr(  r&  r
   	bytearrayr#  r8  r;  rA  r   r   r   rM  rN  rI  rG  rE  rK  r`   remote_functionRemoteFunctionra  r\  rV  r&   r&   r&   r+   rJ   `   s    
4

'
 
H(


(*

'








rJ   c                 C   s,   |d ur| t|  |tjj d S d S r%   )r9   encode_exceptionr5   r6   r7   ABORTED)errr>   r&   r&   r+   r     s   r   rX   c                 C   s   t | }t| S r%   )r   r   base64standard_b64encodedecode)rj   r   r&   r&   r+   r}    s   
r}  rT  c                 C   s(   | j sd S t| j }t|tsJ |S r%   )pickled_optionsr]   r^   r3   r-   )rT  rY  r&   r&   r+   rS    s
   rS  c                 C   s   	 d	dt dtttf fdd}|p|}tjtjtddt	d}t
|}t|}t }t|| t|| t|| t||  t||||d}|  |S )
Nr\   rh   c                 [   sV   t   t stjdd| i|W  d    S W d    d S 1 s$w   Y  d S )Nr\   r&   )r   r`   ra   initr\   rh   r&   r&   r+   default_connect_handler   s   "z&serve.<locals>.default_connect_handlerray_client_server)max_workersthread_name_prefix)rT  )task_servicerdata_servicerlogs_servicergrpc_serverr%   )r   r   r:   r   r6   serverr   ThreadPoolExecutorr   r   rJ   r   r   ray_client_pb2_grpc"add_RayletDriverServicer_to_server(add_RayletDataStreamerServicer_to_server'add_RayletLogStreamerServicer_to_serverr   r   r  )connection_strrK   r  r  r  r  r  current_handler&   r&   r+   serve  s:   


r  c                    s\   t   tj i W d    n1 sw   Y  d fdd	}t| |d}|fS )Nc                    s    t  rS t j d| iS )Nr\   )r`   ra   r  r  argsinfor>  r&   r+   rK   %  s   z+init_and_serve.<locals>.ray_connect_handler)rK   r%   )r   r`   r  r  )r  r  r>  rK   server_handler&   r  r+   init_and_serve   s   r  Fc                 C   s@   |  d t  t| W d    d S 1 sw   Y  d S )Nr   )stopr   r`   shutdown)r  _exiting_interpreterr&   r&   r+   shutdown_with_server1  s   
"r  c                    s   ddt f fdd}|S )Nr\   c                    sX    r rt jd | d| d S t jd | d| d S t jdd| i| d S )N)address_redis_username_redis_passwordr\   )r  r\   r\   r&   )r`   r  r  r  redis_passwordredis_usernamer&   r+   rK   8  s   
z/create_ray_handler.<locals>.ray_connect_handlerr%   r   )r  r  r  rK   r&   r  r+   create_ray_handler7  s   r  r  c                 C   s   t | } t| dS )zr
    Try to create a gcs client based on the command line args or by
    autodetecting a running Ray cluster.
    r  )r   r   r  r&   r&   r+   try_create_gcs_clientJ  s   
r  c               
   C   s`  dd l } |  }|jdtddd |jddtdd	d |jd
tg ddd |jddtdd |jddtdd |jddtdd |jddtd dd | \}}ttjtj	 t
|j|j|j}d|j|jf }t|}|jrw||jd}td| d|  |jdkrt||j|j|j|jd}nt||}zt}	 dt i}	z"tjj st|j}
tjj|
 tjjjd t !|	tj"d! W n$ t#y } zt$d"|j d#|j  t%| W Y d }~nd }~ww t&d$ |jd%kr|j'j(dkrt}n|d$8 }|dkrt) |d& dkr|tkrt| d' q t)y/   |*d Y d S w )(Nr   z--hostz0.0.0.0zHost IP to bind to)r   defaulthelpz-pz--porti'  zPort to bind toz--mode)proxylegacyspecific-serverr  )r   choicesr  z	--addressFz Address to use to connect to Ray)requiredr   r  z--redis-usernamez username for connecting to Redisz--redis-passwordz Password for connecting to Redisz--runtime-env-agent-addressz8The port to use for connecting to the runtime_env_agent.)r  r   r  r  z%s:%dz****zStarting Ray Client server on z, args )r  r  runtime_env_agent_addressTtimer  r   [z ] Failed to put health check on r   r     z idle checks before shutdown.)+argparseArgumentParseradd_argumentr:   r0   parse_known_argsr   r   LOGGER_LEVELLOGGER_FORMATr  r  r  r  hostportreplaceri   r  moder   r  r  r    r  r`   r|   r}   _internal_kv_initializedr  _initialize_internal_kvr~   rg   r   KV_NAMESPACE_HEALTHCHECKr4   r   rj   sleepr  num_clientsKeyboardInterruptr  )r  parserr  _rK   hostportargs_strr  idle_checks_remaininghealth_report
gcs_clientrC   r&   r&   r+   mainS  s   






"r  __main__r%   )F)Tr  rG   r   re  rg   loggingr  r]   rw  rT   r  collectionsr   
concurrentr   typingr   r   r   r   r   r	   r
   r6   r`   ray._private.state!ray.core.generated.ray_client_pb2core	generatedrk   &ray.core.generated.ray_client_pb2_grpcr  r   ray._privater   ray._private.client_mode_hookr   ray._rayletr   ray._private.ray_constantsr   ray._private.ray_loggingr   ray._private.servicesr   ray._private.tls_utilsr   ray.job_configr   ray.util.client.commonr   r   r   r   r   #ray.util.client.server.dataservicerr   "ray.util.client.server.logservicerr   ray.util.client.server.proxierr   %ray.util.client.server.server_picklerr   r   #ray.util.client.server.server_stubsr   	getLoggerrp  ri   r    rI   RayletDriverServicerrJ   r   r:   r}  TaskOptionsrS  r  r  r  r  r  r  r&   r&   r&   r+   <module>   sn    $

/     	 
	!

	
c
