o
    cil                     @   sx  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZ d dlmZmZmZmZmZmZmZmZmZmZmZmZmZmZmZ d dlmZ d dlm Z m!Z!m"Z" d d	l#m$Z$ d d
l%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, e-e&Z.e/e0gZ1eG dd dZ2eG dd dZ3eG dd dZ4deee  deee eeef f fddZ5G dd dZ6G dd dZ7dd Z8dd Z9d e:ddfd!d"Z;ed#d$d%Z<ed&Z=ed'Z>G d(d) d)eee<e=e>f Z?G d*d+ d+eee<e=e>f Z@ed,eee= gee> f dee=ge>f fd-d.ZAed/eee= geeeee> f f dee=geeee>f f fd0d.ZAed1e?e<e=e>f dee<e=ge>f fd2d.ZAed3e@e<e=e>f dee<e=geeee>f f fd4d.ZAe		5	6	7dId8e:d9eBd e:d:ed dd;f
d<d.ZAG d=d; d;eZCe,d>d?		5	6	7dId8e:d9eBd e:d@ee def
dAd.ZAdBe jDdCefdDdEZEdBe jDdFefdGdHZFdS )J    N)deque)	dataclass)wraps)isasyncgenfunctioniscoroutinefunction)AnyAsyncGeneratorCallable	CoroutineDictGenericIterableListLiteralOptionalProtocolSetTupleTypeVaroverload)serve)extract_signatureflatten_argsrecover_args)get_or_create_event_loop)SERVE_LOGGER_NAME)extract_self_if_method_call)RayServeException)	PublicAPIc                   @   s,   e Zd ZU eed< ee ed< ejed< dS )_SingleRequestself_argflattened_argsfutureN)__name__
__module____qualname__r   __annotations__r   asyncioFuture r)   r)   F/home/ubuntu/.local/lib/python3.10/site-packages/ray/serve/batching.pyr   +   s   
 r   c                   @   s    e Zd ZU eed< ejed< dS )_GeneratorResultresultnext_futureN)r#   r$   r%   r   r&   r'   r(   r)   r)   r)   r*   r+   2   s   
 r+   c                   @   sn   e Zd ZU ee ed< edee fddZedee fddZ	edee fddZ
edefd	d
ZdS )_RuntimeSummaryStatisticsstart_timesreturnc                 C      | j rt| j S d S N)r/   minselfr)   r)   r*   min_start_time<      z(_RuntimeSummaryStatistics.min_start_timec                 C   s   | j rt| j t| j  S d S r2   )r/   sumlenr4   r)   r)   r*   mean_start_time@   s   z)_RuntimeSummaryStatistics.mean_start_timec                 C   r1   r2   )r/   maxr4   r)   r)   r*   max_start_timeF   r7   z(_RuntimeSummaryStatistics.max_start_timec                 C   s
   t | jS r2   )r9   r/   r4   r)   r)   r*   num_requestsJ   s   
z&_RuntimeSummaryStatistics.num_requestsN)r#   r$   r%   r   floatr&   propertyr   r6   r:   r<   intr=   r)   r)   r)   r*   r.   8   s   
 r.   list_of_flattened_argsr0   c                    s|   dd | D }t |dksJ d| }g }t|D ]  d dkr-|| d    q| fdd| D  qt|S )	z@Batch a list of flatten args and returns regular args and kwargsc                 S   s   h | ]}t |qS r)   )r9   ).0argsr)   r)   r*   	<setcomp>V       z%_batch_args_kwargs.<locals>.<setcomp>   z=All batch requests should have the same number of parameters.   r   c                    s   g | ]}|  qS r)   r)   rB   itemidxr)   r*   
<listcomp>b   rE   z&_batch_args_kwargs.<locals>.<listcomp>)r9   poprangeappendr   )rA   arg_lengths
arg_lengthbatched_flattened_argsr)   rJ   r*   _batch_args_kwargsO   s   rS   c                   @   s4  e Zd Z	d-dedededee ddf
ddZd	d
 ZdeddfddZ	de
eejf ddfddZdee fddZdee deddfddZdedeej deddfddZdejdeej defddZdeddfd d!Zded"ee ddfd#d$Zd%ejddfd&d'Zed(ee ddfd)d*Zd+d, ZdS )._BatchQueueNmax_batch_sizebatch_wait_timeout_smax_concurrent_batcheshandle_batch_funcr0   c                 C   sv   t  | _|| _|| _|| _t || _t  | _	t
 | _i | _d| _t | _|dur5| j| || _|   dS )aO  Async queue that accepts individual items and returns batches.

        Respects max_batch_size and batch_wait_timeout_s; a batch will be returned when
        max_batch_size elements are available or the timeout has passed since
        the previous get.

        If handle_batch_func is passed in, a background coroutine will run to
        poll from the queue and call handle_batch_func on the results.

        Cannot be pickled.

        Arguments:
            max_batch_size: max number of elements to return in a batch.
            batch_wait_timeout_s: time to wait before returning an incomplete
                batch.
            max_concurrent_batches: max number of batches to run concurrently.
            handle_batch_func(Optional[Callable]): callback to run in the
                background to handle batches if provided.
        N)r'   QueuequeuerU   rV   rW   	Semaphore	semaphoreEventrequests_available_eventsettaskscurr_iteration_start_times_handle_batch_taskr   _loopcreate_task_process_batches4_warn_if_max_batch_size_exceeds_max_ongoing_requestsr5   rU   rV   rW   rX   r)   r)   r*   __init__i   s   

z_BatchQueue.__init__c              	   C   sF   t  jj}|| j| j k r!td| j d| j d| d dS dS )zHelper to check whether the max_batch_size is bounded.

        Log a warning to configure `max_ongoing_requests` if it's bounded.
        z`max_batch_size` (z) * `max_concurrent_batches` (z)) is larger than `max_ongoing_requests` (z). This means the replica will never achieve the configured `max_batch_size` concurrently. Please update `max_ongoing_requests` to be >= `max_batch_size` * `max_concurrent_batches`.N)r   get_replica_context_deployment_configmax_ongoing_requestsrU   rW   loggerwarning)r5   rk   r)   r)   r*   rf      s   

z@_BatchQueue._warn_if_max_batch_size_exceeds_max_ongoing_requestsnew_max_batch_sizec                 C   s   || _ |   dS zUpdates queue's max_batch_size.N)rU   rf   r5   rn   r)   r)   r*   set_max_batch_size   s   z_BatchQueue.set_max_batch_sizerequestc                 C   s   | j | | j  d S r2   )rZ   
put_nowaitr^   r_   )r5   rr   r)   r)   r*   put   s   z_BatchQueue.putc                    s   g }| | j I dH  | j}| j}t }	 t|t |  d}zt| j	
 |I dH  W n
 tjy<   Y nw t||k r[| j s[| | j  t||k r[| j rH| j re| j	  t | |ksst||krv	 |S q)a  Wait for batch respecting self.max_batch_size and self.timeout_s.

        Returns a batch of up to self.max_batch_size items. Waits for up to
        to self.timeout_s after receiving the first request that will be in
        the next batch. After the timeout, returns as many items as are ready.

        Always returns a batch with at least one item - will block
        indefinitely until an item comes in.
        NTr   )rO   rZ   getrU   rV   timer;   r'   wait_forr^   waitTimeoutErrorr9   empty
get_nowaitclear)r5   batchrU   rV   batch_start_timeremaining_batch_time_sr)   r)   r*   wait_for_batch   s6   

	
z_BatchQueue.wait_for_batchresultsinput_batch_lengthc                 C   s*   t ||krtd| dt | dd S )NzHBatched function doesn't preserve batch size. The input list has length z" but the returned list has length .)r9   r   )r5   r   r   r)   r)   r*   _validate_results   s   z_BatchQueue._validate_resultsfunc_generatorinitial_futuresc              
      s"  d}zlt |}t||ksJ |2 zK3 dH W }| || t|D ]:}|| |d }}	|	|u r7|| n!|tv rFt|	t || nt 	 }
t
|	t||
 ||
 |  q"q6 |D ]}	|	|urlt|	t qaW dS  ty } z|D ]}	|	|urt|	| qyW Y d}~dS d}~ww )zConsumes batch function generator.

        This function only runs if the function decorated with @serve.batch
        is a generator.
        Nr   )r   r9   r   rN   rO   USER_CODE_STREAMING_SENTINELS_set_exception_if_not_doneStopAsyncIterationr   create_future_set_result_if_not_doner+   popleft	Exception)r5   r   r   r   FINISHED_TOKENfuturesr   rK   r,   r"   r-   er)   r)   r*   _consume_func_generator   sF   






z#_BatchQueue._consume_func_generatorfunc_futurer   c              
      sx   z|I dH }|  || t||D ]	\}}t|| qW dS  ty; } z|D ]}t|| q(W Y d}~dS d}~ww )z.Assigns func's results to the list of futures.N)r   zipr   r   r   )r5   r   r   r   r   r,   r"   r   r)   r)   r*   _assign_func_results"  s   
z _BatchQueue._assign_func_resultsfuncc                    sh   | j  s2|  I dH }| ||}t|}| j| t | j	|< |
| j | j  rdS dS )z6Loops infinitely and processes queued request batches.N)rc   	is_closedr   _process_batchr'   rd   r`   addrv   ra   add_done_callback_handle_completed_task)r5   r   r}   promisetaskr)   r)   r*   re   3  s   

z_BatchQueue._process_batchesr}   c                    sp  | j 4 I dH  dd |D }t|dkr"	 W d  I dH  dS dd |D }zJ|d j}tdd |D \}}|durJ||g|R i |}n||i |}t|rd|}| ||t|I dH  n|}	| |	|t|I dH  W n! ty }
 zt	d |D ]}t
||
 qW Y d}
~
nd}
~
ww W d  I dH  dS W d  I dH  dS 1 I dH sw   Y  dS )zProcesses queued request batch.Nc                 S   s   g | ]	}|j  s|qS r)   )r"   	cancelled)rB   reqr)   r)   r*   rL   D  s    z._BatchQueue._process_batch.<locals>.<listcomp>r   c                 S      g | ]}|j qS r)   )r"   rH   r)   r)   r*   rL   H      c                 S   r   r)   )r!   rH   r)   r)   r*   rL   P  r   z0_process_batch ran into an unexpected exception.)r\   r9   r    rS   r   r   r   r   rl   	exceptionr   )r5   r   r}   r   r    rC   kwargsfunc_future_or_generatorr   r   r   r"   r)   r)   r*   r   =  sD   


&.z_BatchQueue._process_batchr   c                 C   s&   | j | | j|= | |  d S r2   )r`   removera   _log_if_exceptionr   )r5   r   r)   r)   r*   r   i  s   z"_BatchQueue._handle_completed_taskexception_maybec                 C   s4   | d urt | tjrtd d S td d S d S )NzTask was cancelledzTask failed unexpectedly)
isinstancer'   CancelledErrorrl   debugr   )r   r)   r)   r*   r   n  s
   z_BatchQueue._log_if_exceptionc                 C   s&   | j d u s
t  sd S | j   d S r2   )rb   r   
is_runningcancelr4   r)   r)   r*   __del__v  s
   
z_BatchQueue.__del__r2   )r#   r$   r%   r@   r>   r   r	   rh   rf   rq   r   r   r'   r(   rt   r   r   r   r   r   r   r   r   re   r   Taskr   staticmethodBaseExceptionr   r   r)   r)   r)   r*   rT   h   s\    
-5


3

,rT   c                
   @   s   e Zd ZdZ				ddededed	ee fd
dZe	de
fddZdeddfddZdeddfddZdefddZdefddZdefddZdefddZdee fddZdS ) _LazyBatchQueueWrapperzStores a _BatchQueue and updates its settings.

    _BatchQueue cannot be pickled, you must construct it lazily
    at runtime inside a replica. This class initializes a queue only upon
    first access.
    
           rF   NrU   rV   rW   rX   c                 C   s"   d | _ || _|| _|| _|| _d S r2   )_queuerU   rV   rW   rX   rg   r)   r)   r*   rh     s
   
z_LazyBatchQueueWrapper.__init__r0   c                 C   s(   | j du rt| j| j| j| j| _ | j S )zXReturns _BatchQueue.

        Initializes queue when called for the first time.
        N)r   rT   rU   rV   rW   rX   r4   r)   r)   r*   rZ     s   
z_LazyBatchQueueWrapper.queuern   c                 C   s$   || _ | jdur| j| dS dS ro   )rU   r   rq   rp   r)   r)   r*   rq     s   
z)_LazyBatchQueueWrapper.set_max_batch_sizenew_batch_wait_timeout_sc                 C   s    || _ | jd ur|| j_ d S d S r2   )rV   r   )r5   r   r)   r)   r*   set_batch_wait_timeout_s  s   
z/_LazyBatchQueueWrapper.set_batch_wait_timeout_sc                 C      | j S r2   rU   r4   r)   r)   r*   get_max_batch_size     z)_LazyBatchQueueWrapper.get_max_batch_sizec                 C   r   r2   rV   r4   r)   r)   r*   get_batch_wait_timeout_s  r   z/_LazyBatchQueueWrapper.get_batch_wait_timeout_sc                 C   s   t t| jj S )z;Gets summary statistics of current iteration's start times.)r.   listrZ   ra   valuesr4   r)   r)   r*   _get_curr_iteration_start_times  s   z6_LazyBatchQueueWrapper._get_curr_iteration_start_timesc                    s    t | jdr| jj  S dS )zGets whether default _BatchQueue's background task is alive.

        Returns False if the batch handler doesn't use a default _BatchQueue.
        rb   F)hasattrrZ   rb   doner4   r)   r)   r*   _is_batching_task_alive  s   z._LazyBatchQueueWrapper._is_batching_task_alivec                    s2   t | jdrt }| jjj|d | S dS )zGets the stack for the default _BatchQueue's background task.

        Returns empty string if the batch handler doesn't use a default _BatchQueue.
        rb   )fileN)r   rZ   ioStringIOrb   print_stackgetvalue)r5   
str_bufferr)   r)   r*   _get_handling_task_stack  s   z/_LazyBatchQueueWrapper._get_handling_task_stack)r   r   rF   N)r#   r$   r%   __doc__r@   r>   r   r	   rh   r?   rT   rZ   rq   r   r   r   r.   r   boolr   strr   r)   r)   r)   r*   r     s0    	
r   c                 C   sN   t | tst | tr|  rt| } ntd|  | dk r%td|  d S )Nz)max_batch_size must be integer >= 1, got rF   z,max_batch_size must be an integer >= 1, got )r   r@   r>   
is_integer	TypeError
ValueErrorr   r)   r)   r*   _validate_max_batch_size  s   

r   c                 C   s6   t | ttfstd|  | dk rtd|  d S )Nz/batch_wait_timeout_s must be a float >= 0, got r   )r   r>   r@   r   r   r   r)   r)   r*   _validate_batch_wait_timeout_s  s   r   rW   c                 C   s$   t | tr	| dk rtd|  d S )NrF   z4max_concurrent_batches must be an integer >= 1, got )r   r@   r   )rW   r)   r)   r*    _validate_max_concurrent_batches  s
   r   SelfTypeT)contravariantTRc                   @   *   e Zd Zdedee dee fddZdS )_SyncBatchingMethodself__SyncBatchingMethod__batchr0   c                C      d S r2   r)   )r5   r   r   r)   r)   r*   __call__  s   z_SyncBatchingMethod.__call__Nr#   r$   r%   r   r   r   r   r   r)   r)   r)   r*   r         "r   c                   @   r   )_AsyncBatchingMethodr   _AsyncBatchingMethod__batchr0   c                   s   d S r2   r)   )r5   r   r   r)   r)   r*   r     s   z_AsyncBatchingMethod.__call__Nr   r)   r)   r)   r*   r     r   r   
_sync_funcc                C   r   r2   r)   )r   r)   r)   r*   r}        r}   _async_funcc                C   r   r2   r)   )r   r)   r)   r*   r}        
_sync_methc                C   r   r2   r)   )r   r)   r)   r*   r}     r   _async_methc                C   r   r2   r)   )r   r)   r)   r*   r}     r   r   r   rF   rU   rV   __BatchDecoratorc                C   r   r2   r)   )r   rU   rV   rW   r)   r)   r*   r}   %  s   c                
   @   s   e Zd ZdZedeee gee f deegef fddZ	edeee ge
eeee f f deege
eeef f fddZ	edeeeef deeegef fd	dZ	ed
eeeef deeege
eeef f fddZ	dS )r   zJDescibes behaviour of decorator produced by calling `batch` with argumentsr   r0   c                C   r   r2   r)   )r5   r   r)   r)   r*   r   3  r   z_BatchDecorator.__call__r   c                C   r   r2   r)   )r5   r   r)   r)   r*   r   7  r   r   c                C   r   r2   r)   )r5   r   r)   r)   r*   r   =  r   r   c                C   r   r2   r)   )r5   r   r)   r)   r*   r   C  r   N)r#   r$   r%   r   r   r	   r   r   r   r   r
   r   r   r   r   r)   r)   r)   r*   r   0  s,    0stable)	stability_funcc                   sd   | durt | stdt| stdt t  t  fdd}t | r0|| S |S )a  Converts a function to asynchronously handle batches.

    The function can be a standalone function or a class method. In both
    cases, the function must be `async def` and take a list of objects as
    its sole argument and return a list of the same length as a result.

    When invoked, the caller passes a single object. These will be batched
    and executed asynchronously once there is a batch of `max_batch_size`
    or `batch_wait_timeout_s` has elapsed, whichever occurs first.

    `max_batch_size` and `batch_wait_timeout_s` can be updated using setter
    methods from the batch_handler (`set_max_batch_size` and
    `set_batch_wait_timeout_s`).

    Example:

    .. code-block:: python

            from ray import serve
            from starlette.requests import Request

            @serve.deployment
            class BatchedDeployment:
                @serve.batch(max_batch_size=10, batch_wait_timeout_s=0.1)
                async def batch_handler(self, requests: List[Request]) -> List[str]:
                    response_batch = []
                    for r in requests:
                        name = (await requests.json())["name"]
                        response_batch.append(f"Hello {name}!")

                    return response_batch

                def update_batch_params(self, max_batch_size, batch_wait_timeout_s):
                    self.batch_handler.set_max_batch_size(max_batch_size)
                    self.batch_handler.set_batch_wait_timeout_s(batch_wait_timeout_s)

                async def __call__(self, request: Request):
                    return await self.batch_handler(request)

            app = BatchedDeployment.bind()

    Arguments:
        max_batch_size: the maximum batch size that will be executed in
            one call to the underlying function.
        batch_wait_timeout_s: the maximum duration to wait for
            `max_batch_size` elements before running the current batch.
        max_concurrent_batches: the maximum number of batches that can be
            executed concurrently. If the number of concurrent batches exceeds
            this limit, the batch handler will wait for a batch to complete
            before sending the next batch to the underlying function.
    Nz?@serve.batch can only be used to decorate functions or methods.z9Functions decorated with @serve.batch must be 'async def'c                    s   t  dtjdtfdddtjf fddt fdd}t fd	d
}t r8|}n|}j|_j|_	j
|_
j|_j|_j|_j|_|S )Nfirst_futurer0   c                 S  s<   | }	 z|I dH }|j }|jV  W n
 ty   Y dS w q)z1Generator that handles generator batch functions.TN)r-   r,   r   )r   r"   async_responser)   r)   r*   batch_handler_generator  s   
z@batch.<locals>._batch_decorator.<locals>.batch_handler_generatorc                    sT   t t | |}t|  }|d ur|dd  }j}t  }|t||| |S )NrG   )r   r   r   rZ   r   r   rt   r   )rC   r   r!   r5   batch_queuer"   )r   lazy_batch_queue_wrapperr)   r*   enqueue_request  s   

z8batch.<locals>._batch_decorator.<locals>.enqueue_requestc                     s   | |} |S r2   r)   )rC   r   r   )r   r   r)   r*   generator_batch_wrapper  s   
z@batch.<locals>._batch_decorator.<locals>.generator_batch_wrapperc                     s    | |I d H S r2   r)   )rC   r   )r   r)   r*   batch_wrapper  s   z6batch.<locals>._batch_decorator.<locals>.batch_wrapper)r   r'   r(   r   r   r   r   _get_max_batch_sizer   _get_batch_wait_timeout_srq   r   r   r   r   )r   r   r   wrapperrV   rU   rW   )r   r   r   r   r*   _batch_decorator  s@   
zbatch.<locals>._batch_decorator)callabler   r   r   r   r   )r   rU   rV   rW   r   r)   r   r*   r}   J  s   =Rr"   r,   c                 C      |   s| | dS dS )z3Sets the future's result if the future is not done.N)r   
set_result)r"   r,   r)   r)   r*   r        r   r   c                 C   r   )z6Sets the future's exception if the future is not done.N)r   set_exception)r"   r   r)   r)   r*   r     r   r   )Nr   r   rF   )Gr'   r   loggingrv   collectionsr   dataclassesr   	functoolsr   inspectr   r   typingr   r   r	   r
   r   r   r   r   r   r   r   r   r   r   r   rayr   ray._common.signaturer   r   r   ray._common.utilsr   ray.serve._private.constantsr   ray.serve._private.utilsr   ray.serve.exceptionsr   ray.util.annotationsr   	getLoggerrl   StopIterationr   r   r   r+   r.   rS   rT   r   r   r   r@   r   r   r   r   r   r   r}   r>   r   r(   r   r   r)   r)   r)   r*   <module>   s    D


  W0
 