o
    ci}I                     @   s\  d dl mZ d dlmZ d dlZd dlZd dlZd dlmZ d dl	Z	d dl
mZmZmZmZmZ d dlmZmZmZ d dlZd dlm  m  mZ d dlm  m  mZ d dlmZmZmZ d dlm Z  d d	l!m"Z" ertd d
l#m$Z$ e%e&Z'dZ(dede)fddZ*dej+de)fddZ,deej+ ddddfddZ-G dd dZ.G dd dej/Z0dS )    )defaultdict)loads_from_clientN)Queue)AnyDictIteratorTYPE_CHECKINGUnion)EventLockThread)CLIENT_SERVER_MAX_THREADS_propagate_error_in_contextOrderedResponseCache)log_once)disable_client_hook)RayletServicer
   contextreturnc                 C   sD   t |  }|d}|du s|dvrtd| d dS |dkS )zE
    Get `reconnecting` from gRPC metadata, or False if missing.
    reconnectingN)TrueFalsez9Client connecting with invalid value for "reconnecting": zF, This may be because you have a mismatched client and server version.Fr   )dictinvocation_metadatagetloggererror)r   metadataval r    W/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/client/server/dataservicer.py_get_reconnecting_from_context   s   

r"   reqc                 C   s^   |  d}|dkr| jjrdS |dkr| jj| jjd kS |dkr+| jj| jjd kS |dvS )a  
    Returns True if the response should to the given request should be cached,
    false otherwise. At the moment the only requests we do not cache are:
        - asynchronous gets: These arrive out of order. Skipping caching here
            is fine, since repeating an async get is idempotent
        - acks: Repeating acks is idempotent
        - clean up requests: Also idempotent, and client has likely already
             wrapped up the data connection by this point.
        - puts: We should only cache when we receive the final chunk, since
             any earlier chunks won't generate a response
        - tasks: We should only cache when we receive the final chunk,
             since any earlier chunks won't generate a response
    typer   Fput   task)acknowledgeconnection_cleanup)
WhichOneofr   asynchronousr%   chunk_idtotal_chunksr'   )r#   req_typer    r    r!   _should_cache/   s   
r/   grpc_input_generatoroutput_queuezEQueue[Union[ray_client_pb2.DataRequest, ray_client_pb2.DataResponse]]c              
   C   s~   z8z| D ]}| | qW n tjy( } ztd|  W Y d}~nd}~ww W | d dS W | d dS | d w )z<
    Pushes incoming requests to a shared output_queue.
    zHclosing dataservicer reader thread grpc error reading request_iterator: N)r%   grpcRpcErrorr   debug)r0   r1   r#   er    r    r!   
fill_queueG   s    	r6   c                   @   s@   e Zd ZdZdd Zdejdeejej	f fddZ
dd	 Zd
S )ChunkCollectorzR
    Helper class for collecting chunks from PutObject or ClientTask messages
    c                 C      d | _ d| _t | _d S Ncurr_req_idlast_seen_chunk_id	bytearraydataselfr    r    r!   __init__`      zChunkCollector.__init__r#   chunkc                 C   s   | j d ur| j |jkrtd| j  d|j d|j| _ | jd }|j|k r(d S |j|kr:td|j d|j d|j| jd krM| j|j |j| _|jd |jkS )Nz1Expected to receive a chunk from request with id z, but found z	 instead.r&   zA chunk z of request z was received out of order.)r<   req_idRuntimeErrorr=   r,   r?   extendr-   )rA   r#   rD   
next_chunkr    r    r!   	add_chunke   s(   


zChunkCollector.add_chunkc                 C   r8   r9   r;   r@   r    r    r!   reset~   rC   zChunkCollector.resetN)__name__
__module____qualname____doc__rB   ray_client_pb2DataRequestr	   
PutRequest
ClientTaskrI   rJ   r    r    r    r!   r7   [   s    
r7   c                   @   s<   e Zd ZdddZdd Zdeded	efd
dZdd Z	dS )DataServicerbasic_servicer   c                 C   sF   || _ t | _d| _i | _i | _tt| _t	 | _
t | _t | _d S )Nr   )rT   r   clients_locknum_clientsclient_last_seenreconnect_grace_periodsr   r   response_cachesr
   stoppedr7   put_request_chunk_collectorclient_task_chunk_collector)rA   rT   r    r    r!   rB      s   zDataServicer.__init__c           "      c   sv   t   }d}t| }|d}|d u rtd d S td| d | |||}| j| }d}	|s8d S zzt	 }
t
td||
fd}|  	 t|
jd D ]}t|tjr`|V  qSt|tjshJ t|r|	r||j}t|tr{||d ur|V  qSd }|d}|d	kr| j|j}tj|d
}| j |jj| j|< |jjdkrd}	W d    n1 sw   Y  nr|dkr|jjr| j|j||j|
}|d u rqSn| j|j|}tj|d}nI|dkr| j !||j"sqS| j#| j j$|j"j%||j"j&}| j '  tj|d}n|dkr8g }|j(j)D ]}| j(||}|*| qtjtj+|dd}n|dkrFtj| , d}n|dkrm| j | j-|j.}tj|d}W d    n	1 sgw   Y  n|dkrd}t/ }tj|d}n|dkr|0|j1j qS|dkr| j< |j2}| j3!||s	 W d    qSt4| j3j$| j\}}| j3'  | j5|j2|||}tj|d}~~W d    n	1 sw   Y  nW|dkr| j | j6|j7|}tj|d}W d    n	1 sw   Y  n/|dkr)| j | j8|j9}tj|d}W d    n	1 s#w   Y  ntd | d!|j|_t|rD|	rD|:|j| |V  qSW n1 tyz } z$t;d" t<||}|=|}|rg|rp|>t?j@jA d}W Y d }~nd }~ww W td#|  |BtC |D rtd$EtC | j|} |s| d urtd%|  d& | jFjG| d' ntd( | j || jHvrtd) 	 W d    d S | jH| }!|!|krtd* 	 W d    d S | jI| | jH|= || jv r| j|= || jv r| j|= |  jJd+8  _Jtd,| d-| jJ  tK  | jJdkr.td. tLM  W d    n1 s9w   Y  W d    d S W d    d S 1 sRw   Y  d S td#|  |BtC |D rstd$EtC | j|} |s| d urtd%|  d& | jFjG| d' ntd( | j || jHvrtd) 	 W d       Y d S | jH| }!|!|krtd* 	 W d       Y d S | jI| | jH|= || jv r| j|= || jv r| j|= |  jJd+8  _Jtd,| d-| jJ  tK  | jJdkrtd. tLM  W d    n1 sw   Y  W d    w W d    w 1 s5w   Y  w )/NF	client_idz#Client connecting with no client_idz New data connection from client z: T)targetdaemonargsr$   init)ra   r   r   )r   r%   )r%   release)ok)rb   connection_info)rd   prep_runtime_env)re   r)   )r)   r(   r'   )task_ticket	terminate)rg   list_named_actors)rh   zUnreachable code: Request type z not handled in DatapathzError in data channel:zStream is broken with client z5Queue filler thread failed to join before timeout: {}z-Cleanup wasn't requested, delaying cleanup byz	 seconds.)timeoutz/Cleanup was requested, cleaning up immediately.zConnection already cleaned up.z$Client reconnected, skipping cleanupr&   zRemoved client z, remaining=zShutting down ray.)Ntimer   r   r   r   r   r4   _initrY   r   r   r6   startiter
isinstancerO   DataResponserP   r/   check_cacherE   	Exceptionr*   rT   Initra   rU   reconnect_grace_periodrX   r+   _async_get_object_get_objectr[   rI   r%   _put_objectr?   client_ref_idowner_idrJ   rb   idsappendReleaseResponse_build_connection_responsePrepRuntimeEnvre   ConnectionCleanupResponsecleanupr(   r'   r\   r   Schedule	Terminaterg   ListNamedActorsrh   update_cache	exceptionr   
invalidateset_coder2   
StatusCodeFAILED_PRECONDITIONjoinQUEUE_JOIN_SECONDSis_aliveformatrZ   waitrW   release_allrV   r   rayshutdown)"rA   request_iteratorr   
start_timecleanup_requestedr   r]   accepted_connectionresponse_cachereconnect_enabledrequest_queuequeue_filler_threadr#   cached_resprespr.   	resp_initget_respput_respreleasedrel_idrel	resp_prepcleanup_respr'   arglistkwargsresp_ticketresponser5   recoverableinvalid_cachecleanup_delay	last_seenr    r    r!   Datapath   s  



















u


	







 $







  zDataServicer.Datapathr]   r   r   c              
   C   s,  | j  t|}ttd }| j|kr@td| j d| d| d tdr0tdt d |t	j
j 	 W d	   d
S |r\|| jvr\|t	j
j |d 	 W d	   d
S || jv rktd| d n|  jd7  _td| d| j  || j|< 	 W d	   dS 1 sw   Y  d	S )z
        Checks if resources allow for another client.
        Returns a boolean indicating if initialization was successful.
           z[Data Servicer]: Num clients z has reached the threshold z. Rejecting client: z. client_thresholdzyYou can configure the client connection threshold by setting the RAY_CLIENT_SERVER_MAX_THREADS env var (currently set to z).NFzEAttempted to reconnect to a session that has already been cleaned up.zClient z has reconnected.r&   zAccepted data connection from z. Total clients: T)rU   r"   intr   rV   r   warningr   r   r2   r   RESOURCE_EXHAUSTEDrW   	NOT_FOUNDset_detailsr4   )rA   r]   r   r   r   	thresholdr    r    r!   rk   k  sL   



$zDataServicer._initc                 C   s^   | j  | j}W d    n1 sw   Y  tj|dtjd tjd tjd tjtj	dS )Nz{}.{}.{}r   r&   r   )rV   python_versionray_version
ray_commit)
rU   rV   rO   ConnectionInfoResponser   sysversion_infor   __version__
__commit__)rA   cur_num_clientsr    r    r!   r|     s   z'DataServicer._build_connection_responseN)rT   r   )
rK   rL   rM   rB   r   strr   floatrk   r|   r    r    r    r!   rS      s    
 R+rS   )1collectionsr   %ray.util.client.server.server_picklerr   r   loggingr2   queuer   r   typingr   r   r   r   r	   	threadingr
   r   r   rj   !ray.core.generated.ray_client_pb2core	generatedrO   &ray.core.generated.ray_client_pb2_grpcray_client_pb2_grpcray.util.client.commonr   r   r   ray.util.debugr   ray._private.client_mode_hookr   ray.util.client.server.serverr   	getLoggerrK   r   r   boolr"   rP   r/   r6   r7   RayletDataStreamerServicerrS   r    r    r    r!   <module>   s<    

)