o
    `۷iX                     @   sT  d dl Z d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dl mZ d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZmZ d dlmZmZmZ d dl m!Z! d d	l"m#Z# d d
l$m%Z% d dl&m'Z'm(Z( d dl)m*Z* d dl+m,Z, e-e!Z.de/fddZ0G dd de	Z1G dd de1Z2G dd de1Z3dS )    N)ABCabstractmethod)run_coroutine_threadsafe)wraps)AnyAsyncIteratorCallable	CoroutineIteratorOptionalUnion)ActorUnavailableErrorRayTaskErrorTaskCancelledError)OBJ_REF_NOT_SUPPORTED_ERRORReplicaQueueLengthInfoRequestMetadata)SERVE_LOGGER_NAME)MessageQueue)RPCSerializer)calculate_remaining_timeoutgenerate_request_id)RequestCancelledError)ASGIResponsereturnc                   C   s$   zt   W dS  ty   Y dS w )NTF)asyncioget_running_loopRuntimeError r   r   W/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/serve/_private/replica_result.pyis_running_in_asyncio_loop    s   r    c                   @   s   e Zd Zedee fddZedee fddZedd Z	ed	d
 Z
edd ZedefddZedd Zedee dejfddZedejfddZedejfddZdS )ReplicaResultr   c                       t NNotImplementedErrorselfr   r   r   get_rejection_response)      z$ReplicaResult.get_rejection_response	timeout_sc                 C      t r#   r$   r'   r*   r   r   r   get-      zReplicaResult.getc                    r"   r#   r$   r&   r   r   r   	get_async1   r)   zReplicaResult.get_asyncc                 C   r+   r#   r$   r&   r   r   r   __next__5   r.   zReplicaResult.__next__c                    r"   r#   r$   r&   r   r   r   	__anext__9   r)   zReplicaResult.__anext__callbackc                 C   r+   r#   r$   r'   r2   r   r   r   add_done_callback=   r.   zReplicaResult.add_done_callbackc                 C   r+   r#   r$   r&   r   r   r   cancelA   r.   zReplicaResult.cancelc                 C   r+   r#   r$   r,   r   r   r   to_object_refE   r.   zReplicaResult.to_object_refc                    r"   r#   r$   r&   r   r   r   to_object_ref_asyncI   r)   z!ReplicaResult.to_object_ref_asyncc                 C   r+   r#   r$   r&   r   r   r   to_object_ref_genM   s   zReplicaResult.to_object_ref_genN)__name__
__module____qualname__r   r   r   r(   floatr-   r/   r0   r1   r   r4   r5   ray	ObjectRefr6   r7   ObjectRefGeneratorr8   r   r   r   r   r!   (   s*    



r!   c                   @   s   e Zd Zdddeejejf dedefddZ	dee
ef fd	d
Zedee fddZedee fddZedd Zedd Zedd Zde
fddZdd Zdddee dejfddZdejfd d!Zdejfd"d#ZdS )$ActorReplicaResultFwith_rejectionobj_ref_or_genmetadatarB   c                   s   d _ d _|j_|j_t _|_	d _
t|tjr#|_n|_ jr2jd us2J dtjj   jrVt _tjj jj  fdd d S d S )Nz<An ObjectRefGenerator must be passed for streaming requests.c                       t jj jjS r#   r=   servecontext_remove_in_flight_request_internal_request_id_response_id_request_contextr'   r   r   <lambda>v       z-ActorReplicaResult.__init__.<locals>.<lambda>)_obj_ref_obj_ref_genis_streaming_is_streaming
request_id_request_id	threadingLock_object_ref_or_gen_sync_lock_with_rejection_rejection_response
isinstancer=   r?   rG   rH   _get_serve_request_contextcancel_on_parent_request_cancelr   rK   _add_in_flight_requestrJ   r4   )r'   rC   rD   rB   r   rN   r   __init__U   s0   

zActorReplicaResult.__init__fc                    s:   t   fdd}t   fdd}t r|S |S )Nc                    s6   z | g|R i |W S  t jjy   t| jw r#   )r=   
exceptionsr   r   rW   r'   argskwargsrb   r   r   wrapper|   s
   
z5ActorReplicaResult._process_response.<locals>.wrapperc                    s<   z | g|R i |I d H W S  t jjy   t w r#   )r=   rc   r   r   CancelledErrorrd   rg   r   r   async_wrapper   s   z;ActorReplicaResult._process_response.<locals>.async_wrapper)r   inspectiscoroutinefunctionrb   rh   rj   r   rg   r   _process_response{   s   
z$ActorReplicaResult._process_responser   c              
      s   | j r	| jdusJ dz| jdu r$| j I dH I dH }t|| _| jW S  tjy@ } zt	d | 
  |dd}~w tyJ   t w )?Get the queue length info from the replica to handle rejection.NNget_rejection_response() can only be called when request rejection is enabled.?Cancelling request that has already been assigned to a replica.)r[   rS   r\   r1   pickleloadsr   ri   loggerinfor5   r   )r'   responseer   r   r   r(      s*   
z)ActorReplicaResult.get_rejection_responser*   c                 C   sB   | j rJ dt }| j|d}t||t d}tj||dS )Nz7get() can only be called on a unary ActorReplicaResult.r*   )r*   start_time_scurr_time_stimeout)rU   timer6   r   r=   r-   )r'   r*   ry   
object_refremaining_timeout_sr   r   r   r-      s   zActorReplicaResult.getc                    s$   | j rJ d|  I d H I d H S )Nz=get_async() can only be called on a unary ActorReplicaResult.)rU   r7   r&   r   r   r   r/      s   zActorReplicaResult.get_asyncc                 C   s"   | j sJ d| j }t|S )Nz<next() can only be called on a streaming ActorReplicaResult.)rU   rS   r0   r=   r-   r'   next_obj_refr   r   r   r0      s   

zActorReplicaResult.__next__c                    s*   | j sJ d| j I d H }|I d H S )NzA__anext__() can only be called on a streaming ActorReplicaResult.)rU   rS   r1   r   r   r   r   r1      s   
zActorReplicaResult.__anext__r2   c                 C   s.   | j d ur| j  | d S | j| d S r#   )rS   	completed_on_completedrR   r3   r   r   r   r4      s   
z$ActorReplicaResult.add_done_callbackc                 C   s*   | j d urt| j  d S t| j d S r#   )rS   r=   r5   rR   r&   r   r   r   r5      s   
zActorReplicaResult.cancelNrx   c                C   s~   | j rJ d| j* | jd u r+| jj|d}| rtd|| _W d    | jS W d    | jS 1 s7w   Y  | jS )Nz?to_object_ref can only be called on a unary ReplicaActorResult.rx   z!Timed out resolving to ObjectRef.)rU   rZ   rR   rS   
_next_syncis_nilTimeoutError)r'   r*   obj_refr   r   r   r6      s"   


z ActorReplicaResult.to_object_refc                    s   | j rJ d	 | jd ur| jS | jjdd}|r8z| jd u r)| j I d H | _| jW | j  S | j  w tdI d H  q	)NzEto_object_ref_async can only be called on a unary ReplicaActorResult.TF)blockingr   )	rU   rR   rZ   acquirerS   r1   releaser   sleep)r'   acquiredr   r   r   r7      s"   

z&ActorReplicaResult.to_object_ref_asyncc                 C   s   | j sJ d| jS )NzGto_object_ref_gen can only be called on a streaming ReplicaActorResult.)rU   rS   r&   r   r   r   r8     s
   z$ActorReplicaResult.to_object_ref_gen)r9   r:   r;   r   r=   r>   r?   r   boolra   r   r	   rn   r   r   r(   r<   r-   r/   r0   r1   r4   r5   r6   r7   r8   r   r   r   r   r@   T   s2    
&


'r@   c                   @   s   e Zd Z	d-dddejjdedejde	j
def
d	d
Zdeeef fddZdee fddZdee fddZdd Zdd Zdee fddZedee fddZedd Zedd Zed d! Zd"efd#d$Z d%d& Z!dee dej"fd'd(Z#dej"fd)d*Z$dej%fd+d,Z&dS ).gRPCReplicaResultNFrA   callrD   actor_idlooprB   c                   s   |_ |_|_t _|pt _|j_	|_
d _d _d _|j _tj dr:j  _|j_nd_d _jrLj _t _tjj  tjj jj  fdd d S )N	__aiter__Fc                    rE   r#   rF   rL   rN   r   r   rP   S  rQ   z,gRPCReplicaResult.__init__.<locals>.<lambda>)_call	_actor_id	_metadatar   _result_queuer   _get_running_loop_grpc_call_looprT   rU   r[   r\   _gen_fut_on_separate_loop_calling_from_same_loophasattrr   
_use_queue_consume_taskcreate_taskconsume_messages_from_genr   rK   r=   rG   rH   r^   r`   rJ   r4   )r'   r   rD   r   r   rB   r   rN   r   ra     s8   	


zgRPCReplicaResult.__init__rb   c                    sP   dt dtfdd t fdd}t fdd}tr&|S |S )	Ngrpc_responserD   c                 S   sV   t |j|j}| jr|| j}t|tr| |t	j
j jr%| jS || jS r#   )r   request_serializationresponse_serializationis_errorloads_responseserialized_messager]   r   as_instanceof_causer=   rG   rH   r^   is_http_request)r   rD   
serializererrr   r   r   deserialize_or_raise_errorY  s   
zLgRPCReplicaResult._process_grpc_response.<locals>.deserialize_or_raise_errorc              
      s|   z| g|R i |}W n* t jjy, } z| t jjkr'td| j  d }~w t	j
jy7   td w  || jS NActor is unavailable.)grpcaioAioRpcErrorcode
StatusCodeUNAVAILABLEr   r   binary
concurrentfuturesri   r   r   r'   re   rf   r   rw   r   rb   r   r   rh   r  s   z9gRPCReplicaResult._process_grpc_response.<locals>.wrapperc              
      sn   z| g|R i |I d H }W n t jjy0 } z| t jjkr+td| j  d }~ww  || j	S r   )
r   r   r   r   r   r   r   r   r   r   r   r   r   r   rj     s    z?gRPCReplicaResult._process_grpc_response.<locals>.async_wrapper)r   r   r   rk   rl   rm   r   r   r   _process_grpc_responseX  s   

z(gRPCReplicaResult._process_grpc_responser   c                 C      | S r#   r   r&   r   r   r   r        zgRPCReplicaResult.__aiter__c                 C   r   r#   r   r&   r   r   r   __iter__  r   zgRPCReplicaResult.__iter__c              
      s   z<z| j 2 z3 d H W }| j| q6 W n ty- } z| j| W Y d }~nd }~ww W | j  d S W | j  d S | j  w r#   )r   r   
put_nowaitBaseException	set_errorclose)r'   resprw   r   r   r   r     s   z+gRPCReplicaResult.consume_messages_from_genc                    s>   | j du r| jI dH S | jr| j I dH S | j  I dH S )a  Gets the result from the gRPC call object.

        If the call object is a UnaryUnaryCall, we await the call.
        Otherwise the call object is a UnaryStreamCall.
          - If the request was sent on a separate loop, then the
          streamed results are being consumed and put onto the in-memory
          queue, so we read from that queue.
          - Otherwise the request was sent on the current loop, so we
          fetch the next object from the async generator.
        N)r   r   r   r   get_one_messager1   r&   r   r   r   _get_internal  s   
zgRPCReplicaResult._get_internalc              
      s  | j sJ dzU| jdu rZ| j I dH  | j I dH }|dd}|dd}|du s2|du rM| j I dH }| j I dH }td| d| dt	t
t|t|d| _| jW S  tjyv } ztd	 |   |dd}~w tjjy } zQ| js| }|dd}|durt
t|r|dd}|du rtd|  d|  dt	d
t|dW  Y d}~S | tjjkrtd| j |dd}~ww )ro   rp   Nacceptednum_ongoing_requestszUnexpected error (z): .)r   r   rq   Tr   )r[   r\   r   wait_for_connectioninitial_metadatar-   r   detailsr   r   r   intr   ri   rt   ru   r5   r   r   r   rU   r   r   r   r   r   )r'   rD   r   r   r   r   rw   r   r   r   r(     sb   



z(gRPCReplicaResult.get_rejection_responser*   c                 C   sX   t  rtd| jd u rt|  | j| _z| jj|dW S  tjj	y+   t	dd w )NzhSync method `get()` should not be called from within an `asyncio` event loop. Use `get_async()` instead.r{   zTimed out waiting for result.)
r    r   r   r   r   r   resultr   r   r   r,   r   r   r   r-     s   


zgRPCReplicaResult.getc                    sD   | j d u r| jr|  I d H S t|  | j| _ t| j I d H S r#   )r   r   r   r   r   r   wrap_futurer&   r   r   r   r/     s   

zgRPCReplicaResult.get_asyncc                 C   s>   t  rtdt|  | jd}z| W S  ty   tw )NzmSync method `__next__()` should not be called from within an `asyncio` event loop. Use `__anext__()` instead.r   )r    r   r   r   r   r   StopAsyncIterationStopIterationr'   futr   r   r   r0      s   
zgRPCReplicaResult.__next__c                    s8   | j r|  I d H S t|  | jd}t|I d H S )Nr   )r   r   r   r   r   r   r   r   r   r   r1   /  s   
zgRPCReplicaResult.__anext__r2   c                 C   s   | j | d S r#   )r   r4   r3   r   r   r   r4   9  s   z#gRPCReplicaResult.add_done_callbackc                 C   s   | j   d S r#   )r   r5   r&   r   r   r   r5   <  s   zgRPCReplicaResult.cancelc                 C   r+   r#   r   r,   r   r   r   r6   ?  r   zgRPCReplicaResult.to_object_refc                    r"   r#   r   r&   r   r   r   r7   B  s   z%gRPCReplicaResult.to_object_ref_asyncc                 C   r+   r#   r   r&   r   r   r   r8   E  r   z#gRPCReplicaResult.to_object_ref_genr#   )'r9   r:   r;   r   r   Callr   r=   ActorIDr   AbstractEventLoopr   ra   r   r   r	   r   r   r   r   r
   r   r   r   r   r   r(   r<   r-   r/   r0   r1   r4   r5   r>   r6   r7   r?   r8   r   r   r   r   r     sD    
:=	K


	r   )4r   concurrent.futuresr   rk   loggingrr   rX   r}   abcr   r   r   	functoolsr   typingr   r   r   r	   r
   r   r   r   r=   ray.exceptionsr   r   r   ray.serve._private.commonr   r   r   ray.serve._private.constantsr   ray.serve._private.http_utilr    ray.serve._private.serializationr   ray.serve._private.utilsr   r   ray.serve.exceptionsr   ray.serve.generated.serve_pb2r   	getLoggerrt   r   r    r!   r@   r   r   r   r   r   <module>   s6    $
, J