o
    $i%                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlZd dlmZmZ d dl	m
Z
 d dlmZmZmZmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZmZ d d	lmZ eeZG d
d deZG dd deZ dS )    N)ABCabstractmethod)wraps)Callable	CoroutineOptionalUnion)TaskCancelledError)ReplicaQueueLengthInfoRequestMetadata)SERVE_LOGGER_NAME)calculate_remaining_timeoutgenerate_request_id)RequestCancelledErrorc                   @   s   e Zd Zedee fddZedee fddZedd Z	ed	d
 Z
edd ZedefddZedd Zedee dejfddZedejfddZedejfddZdS )ReplicaResultreturnc                       t NNotImplementedErrorself r   ^/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/serve/_private/replica_result.pyget_rejection_response      z$ReplicaResult.get_rejection_response	timeout_sc                 C      t r   r   r   r   r   r   r   get      zReplicaResult.getc                    r   r   r   r   r   r   r   	get_async   r   zReplicaResult.get_asyncc                 C   r   r   r   r   r   r   r   __next__"   r    zReplicaResult.__next__c                    r   r   r   r   r   r   r   	__anext__&   r   zReplicaResult.__anext__callbackc                 C   r   r   r   r   r$   r   r   r   add_done_callback*   r    zReplicaResult.add_done_callbackc                 C   r   r   r   r   r   r   r   cancel.   r    zReplicaResult.cancelc                 C   r   r   r   r   r   r   r   to_object_ref2   r    zReplicaResult.to_object_refc                    r   r   r   r   r   r   r   to_object_ref_async6   r   z!ReplicaResult.to_object_ref_asyncc                 C   r   r   r   r   r   r   r   to_object_ref_gen:   s   zReplicaResult.to_object_ref_genN)__name__
__module____qualname__r   r   r
   r   floatr   r!   r"   r#   r   r&   r'   ray	ObjectRefr(   r)   ObjectRefGeneratorr*   r   r   r   r   r      s*    



r   c                   @   s   e Zd Zdddeejejf dedefddZ	dee
ef fd	d
Zedee fddZedee fddZedd Zedd Zedd Zde
fddZdd Zdddee dejfddZdejfd d!Zdejfd"d#ZdS )$ActorReplicaResultF)with_rejectionobj_ref_or_genmetadatar3   c                   s   d _ d _|j_|j_t _|_	d _
t|tjr#|_n|_ jr2jd us2J dtjj   jrVt _tjj jj  fdd d S d S )Nz<An ObjectRefGenerator must be passed for streaming requests.c                    s   t jj jjS r   )r/   servecontext_remove_in_flight_request_internal_request_id_response_id)_request_contextr   r   r   <lambda>c   s    z-ActorReplicaResult.__init__.<locals>.<lambda>)_obj_ref_obj_ref_genis_streaming_is_streaming
request_id_request_id	threadingLock_object_ref_or_gen_sync_lock_with_rejection_rejection_response
isinstancer/   r1   r6   r7   _get_serve_request_contextcancel_on_parent_request_cancelr   r:   _add_in_flight_requestr9   r&   )r   r4   r5   r3   r   r<   r   __init__B   s0   

zActorReplicaResult.__init__fc                    s:   t   fdd}t   fdd}t r|S |S )Nc                    s6   z | g|R i |W S  t jjy   t| jw r   )r/   
exceptionsr	   r   rD   r   argskwargsrO   r   r   wrapperi   s
   
z5ActorReplicaResult._process_response.<locals>.wrapperc                    s<   z | g|R i |I d H W S  t jjy   t w r   )r/   rP   r	   asyncioCancelledErrorrQ   rT   r   r   async_wrapperp   s   z;ActorReplicaResult._process_response.<locals>.async_wrapper)r   inspectiscoroutinefunction)rO   rU   rX   r   rT   r   _process_responseh   s   
z$ActorReplicaResult._process_responser   c              
      s   | j r	| jdusJ dz| jdu r$| j I dH I dH }t|| _| jW S  tjy@ } zt	d | 
  |dd}~w tyJ   t w )z?Get the queue length info from the replica to handle rejection.NzNget_rejection_response() can only be called when request rejection is enabled.z?Cancelling request that has already been assigned to a replica.)rH   r@   rI   r#   pickleloadsrV   rW   loggerinfor'   r	   )r   responseer   r   r   r   |   s*   
z)ActorReplicaResult.get_rejection_responser   c                 C   sB   | j rJ dt }| j|d}t||t d}tj||dS )Nz7get() can only be called on a unary ActorReplicaResult.r   )r   start_time_scurr_time_s)timeout)rB   timer(   r   r/   r   )r   r   rc   
object_refremaining_timeout_sr   r   r   r      s   zActorReplicaResult.getc                    s$   | j rJ d|  I d H I d H S )Nz=get_async() can only be called on a unary ActorReplicaResult.)rB   r)   r   r   r   r   r!      s   zActorReplicaResult.get_asyncc                 C   s"   | j sJ d| j }t|S )Nz<next() can only be called on a streaming ActorReplicaResult.)rB   r@   r"   r/   r   r   next_obj_refr   r   r   r"      s   

zActorReplicaResult.__next__c                    s*   | j sJ d| j I d H }|I d H S )NzA__anext__() can only be called on a streaming ActorReplicaResult.)rB   r@   r#   ri   r   r   r   r#      s   
zActorReplicaResult.__anext__r$   c                 C   s.   | j d ur| j  | d S | j| d S r   )r@   	completed_on_completedr?   r%   r   r   r   r&      s   
z$ActorReplicaResult.add_done_callbackc                 C   s*   | j d urt| j  d S t| j d S r   )r@   r/   r'   r?   r   r   r   r   r'      s   
zActorReplicaResult.cancelNrb   c                C   s~   | j rJ d| j* | jd u r+| jj|d}| rtd|| _W d    | jS W d    | jS 1 s7w   Y  | jS )Nz?to_object_ref can only be called on a unary ReplicaActorResult.rb   z!Timed out resolving to ObjectRef.)rB   rG   r?   r@   
_next_syncis_nilTimeoutError)r   r   obj_refr   r   r   r(      s"   


z ActorReplicaResult.to_object_refc                    s   | j rJ d	 | jd ur| jS | jjdd}|r8z| jd u r)| j I d H | _| jW | j  S | j  w tdI d H  q	)NzEto_object_ref_async can only be called on a unary ReplicaActorResult.TF)blockingr   )	rB   r?   rG   acquirer@   r#   releaserV   sleep)r   acquiredr   r   r   r)      s"   

z&ActorReplicaResult.to_object_ref_asyncc                 C   s   | j sJ d| jS )NzGto_object_ref_gen can only be called on a streaming ReplicaActorResult.)rB   r@   r   r   r   r   r*     s
   z$ActorReplicaResult.to_object_ref_gen)r+   r,   r-   r   r/   r0   r1   r   boolrN   r   r   r[   r   r
   r   r.   r   r!   r"   r#   r&   r'   r(   r)   r*   r   r   r   r   r2   A   s2    
&


'r2   )!rV   rY   loggingr\   rE   rf   abcr   r   	functoolsr   typingr   r   r   r   r/   ray.exceptionsr	   ray.serve._private.commonr
   r   ray.serve._private.constantsr   ray.serve._private.utilsr   r   ray.serve.exceptionsr   	getLoggerr^   r   r2   r   r   r   r   <module>   s$    
,