o
    ^۷i0@                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZmZ d dl	m
Z
mZmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZm Z m!Z! d d
l"m#Z#m$Z$ e%e&Z'eG dd dZ(e) Z*de(fddZ+dd Z,dddZ-dddZ.G dd de/Z0G dd dZ1zd dl2m3Z3 e3j4e1_5e1j4e3_4W n e6y   e'7d Y nw ee8e9e:e#e
f Z;ee; Z<ee; Z=ee=e<f Z>G dd deZ?dS )    N)	dataclassfield)datetimetimezone)
AnyAsyncIterable	AwaitableCallable	CoroutineIteratorMappingOptionalSetUnion)BackgroundTask)iterate_in_threadpool)MutableHeaders)Response)ReceiveScopeSendMessage)ServerSentEventensure_bytesc                   @   s6   e Zd ZU dZeedZeej	 e
d< dZee
d< dS )_ShutdownStatezPer-thread state for shutdown coordination.

    Issue #152 fix: Uses threading.local() instead of ContextVar to ensure
    one watcher per thread rather than one per async context.
    )default_factoryeventsFwatcher_startedN)__name__
__module____qualname____doc__r   setr   r   anyioEvent__annotations__r   bool r'   r'   G/home/ubuntu/vllm_env/lib/python3.10/site-packages/sse_starlette/sse.pyr   !   s   
 r   returnc                  C   s$   t tdd} | du rt } | t_| S )z4Get or create shutdown state for the current thread.shutdown_stateN)getattr_thread_stater   r*   )stater'   r'   r(   _get_shutdown_state1   s
   r.   c                  C   sN   zt t j} t| dr| j}t|dr|W S W dS W dS  ty&   Y dS w )am  
    Try to get uvicorn Server instance via signal handler introspection.

    When uvicorn registers signal handlers, they're bound methods on the Server instance.
    We can retrieve the Server from the handler's __self__ attribute.

    Returns None if:
    - Not running under uvicorn
    - Signal handler isn't a bound method
    - Any introspection fails
    __self__should_exitN)signal	getsignalSIGTERMhasattrr/   	Exception)handlerserverr'   r'   r(   _get_uvicorn_server:   s   

r8   c                     st   t  } t }z.	 tjrntjr|dur|jrdt_n	tdI dH  q	t| jD ]}|	  q)W d| _
dS d| _
w )ag  
    Poll for shutdown and broadcast to all events in this context.

    One watcher runs per thread (event loop). Checks two shutdown sources:
    1. AppStatus.should_exit - set when our monkey-patch works
    2. uvicorn Server.should_exit - via signal handler introspection (Issue #132 fix)

    When either becomes True, signals all registered events.
    TNg      ?F)r.   r8   	AppStatusr0   enable_automatic_graceful_drainr#   sleeplistr   r"   r   )r-   uvicorn_servereventr'   r'   r(   _shutdown_watcherQ   s(   

r?   c                  C   sL   t  } | js$d| _zt }|t  W dS  ty#   d| _Y dS w dS )zDEnsure the shutdown watcher is running for this thread (event loop).TFN)r.   r   asyncioget_running_loopcreate_taskr?   RuntimeError)r-   loopr'   r'   r(   $_ensure_watcher_started_on_this_loopu   s   rE   c                   @   s   e Zd ZdS )SendTimeoutErrorN)r   r   r    r'   r'   r'   r(   rF      s    rF   c                   @   sN   e Zd ZU dZdZdZdZee e	d< e
dd Ze
dd	 Ze
d
d ZdS )r9   z\Helper to capture a shutdown signal from Uvicorn so we can gracefully terminate SSE streams.FTNoriginal_handlerc                   C   
   dt _dS )aJ  
        Prevent automatic SSE stream termination on server shutdown.

        WARNING: When disabled, you MUST set AppStatus.should_exit = True
        at some point during shutdown, or streams will never close and the
        server will hang indefinitely (or until uvicorn's graceful shutdown
        timeout expires).
        FNr9   r:   r'   r'   r'   r(    disable_automatic_graceful_drain   s   

z*AppStatus.disable_automatic_graceful_drainc                   C   rH   )a  
        Re-enable automatic SSE stream termination on server shutdown.

        This restores the default behavior where SIGTERM triggers immediate
        stream draining. Call this to undo a previous call to
        disable_automatic_graceful_drain().
        TNrI   r'   r'   r'   r(   $enable_automatic_graceful_drain_mode   s   
	z.AppStatus.enable_automatic_graceful_drain_modec                  O   s.   t jrdt _t jd urt j| i | d S d S )NT)r9   r:   r0   rG   )argskwargsr'   r'   r(   handle_exit   s
   
zAppStatus.handle_exit)r   r   r    r!   r0   r:   rG   r   r	   r%   staticmethodrJ   rK   rN   r'   r'   r'   r(   r9      s   
 


r9   )ServerzHUvicorn not installed. Graceful shutdown on server termination disabled.c                   @   sz  e Zd ZdZdZdZ												d0ded	ed
ee	e
e
f  de
dee dee dee
 deeg ef  deeg ed f  dee deeeged f  deej deddfddZedeeef fddZejdeeef ddfddZd1deddfdd Zd!eddfd"d#Zd$eddfd%d&Zed2d'd(Z d2d)d*Z!d!eddfd+d,Z"d-e#d$ed!eddfd.d/Z$dS )3EventSourceResponseag  Streaming response implementing the SSE (Server-Sent Events) specification.

    Args:
        content: Async iterable or sync iterator yielding SSE event data.
        status_code: HTTP status code. Default: 200.
        headers: Additional HTTP headers.
        media_type: Response media type. Default: "text/event-stream".
        background: Background task to run after response completes.
        ping: Ping interval in seconds (0 to disable). Default: 15.
        sep: Line separator for SSE messages ("\r\n", "\r", or "\n").
        ping_message_factory: Callable returning custom ping ServerSentEvent.
        data_sender_callable: Async callable for push-based data sending.
        send_timeout: Timeout in seconds for individual send operations.
        client_close_handler_callable: Async callback on client disconnect.
        shutdown_event: Optional ``anyio.Event`` set by the library when server
            shutdown is detected. Generators can watch this event to send farewell
            messages and exit cooperatively instead of receiving CancelledError.
        shutdown_grace_period: Seconds to wait after setting ``shutdown_event``
            before force-cancelling the generator. Must be >= 0. Should be less
            than your ASGI server's graceful shutdown timeout. Default: 0
            (immediate cancel, identical to pre-v3.3.0 behavior).
       
   Ntext/event-streamr   contentstatus_codeheaders
media_type
backgroundpingsepping_message_factorydata_sender_callable)NNNsend_timeoutclient_close_handler_callableshutdown_eventshutdown_grace_periodr)   c                 C   s   |dvrt d| |p| j| _t|tr|| _nt|| _|| _|d u r)| jn|| _|| _	|	| _
|
| _t }|d urA|| |dd d|d< d|d< | | |d u r[| jn|| _|| _|| _|d	k rlt d
|| _|| _d| _t | _d S )N)NrS   
z'sep must be one of: \r\n, \r, \n, got: zCache-Controlzno-storez
keep-alive
ConnectionnozX-Accel-Bufferingr   z"shutdown_grace_period must be >= 0T)
ValueErrorDEFAULT_SEPARATORr\   
isinstancer   body_iteratorr   rW   rY   rZ   r^   r_   r   update
setdefaultinit_headersDEFAULT_PING_INTERVALping_intervalr]   r`   _shutdown_event_shutdown_grace_periodactiver#   Lock
_send_lock)selfrV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   ra   rb   _headersr'   r'   r(   __init__   s6   



zEventSourceResponse.__init__c                 C   s   | j S N)_ping_intervalru   r'   r'   r(   ro      s   z!EventSourceResponse.ping_intervalvaluec                 C   s0   t |ttfstd|dk rtd|| _d S )Nzping interval must be intr   z$ping interval must be greater than 0)ri   intfloat	TypeErrorrg   ry   )ru   r{   r'   r'   r(   ro   $  s
   
Fforcec                 C   s   t d)Nz-Compression is not supported for SSE streams.)NotImplementedError)ru   r   r'   r'   r(   enable_compression,  s   z&EventSourceResponse.enable_compressionsendc              	      s  |d| j | jdI dH  | j2 zL3 dH W }t|| j}td| t| j	}|d|ddI dH  W d   n1 s?w   Y  |r]|j
r]t| jdd}|durZ| I dH  t q6 | j4 I dH  d	| _|dd
d	dI dH  W d  I dH  dS 1 I dH sw   Y  dS )zHSend out SSE data to the client as it becomes available in the iterator.zhttp.response.start)typestatusrX   Nz	chunk: %shttp.response.bodyTr   body	more_bodyacloseF    )rW   raw_headersrj   r   r\   loggerdebugr#   move_on_afterr_   cancel_calledr+   rF   rt   rr   )ru   r   datachunkcancel_scoper   r'   r'   r(   _stream_response/  s4   


.z$EventSourceResponse._stream_responsereceivec                    sX   | j r*| I dH }|d dkr%d| _ td | jr#| |I dH  dS | j sdS dS )z/Watch for a disconnect message from the client.Nr   zhttp.disconnectFz+Got event: http.disconnect. Stop streaming.)rr   r   r   r`   )ru   r   messager'   r'   r(   _listen_for_disconnectK  s   
z*EventSourceResponse._listen_for_disconnectc               	      st   t jrdS t  t } t }| j| zt jr#W | j| dS |	 I dH  W | j| dS | j| w )z0Wait for shutdown signal via the shared watcher.N)
r9   r0   rE   r.   r#   r$   r   adddiscardwait)r-   r>   r'   r'   r(   _listen_for_exit_signalV  s   z+EventSourceResponse._listen_for_exit_signalc                    s   |   I dH  | jr| j  | jdkrDt| j | jr2tdI dH  | jsW d   dS W d   dS 1 s=w   Y  dS dS )a~  Wait for shutdown signal, then optionally give generator a grace period.

        Issue #167: When a shutdown_event is provided, the library sets it before
        returning, giving the generator a chance to send farewell events and exit
        cooperatively. The shutdown_grace_period controls how long to wait before
        force-cancelling via task group cancellation.
        Nr   g?)r   rp   r"   rq   r#   r   rr   r;   rz   r'   r'   r(   "_listen_for_exit_signal_with_gracej  s   

"z6EventSourceResponse._listen_for_exit_signal_with_gracec              	      s   | j r^t| jI dH  | jr|  ntdttj	 | j
d}t|| j
}td| | j4 I dH  | j rD|d|ddI dH  W d  I dH  n1 I dH sTw   Y  | j sdS dS )zPeriodically send ping messages to keep the connection alive on proxies.
        - frequenccy ca every 15 seconds.
        - Alternatively one can send periodically a comment line (one starting with a ':' character)
        Nzping - )commentr\   zping: %sr   Tr   )rr   r#   r;   ry   r]   r   r   nowr   utcr\   r   r   r   rt   )ru   r   sse_ping
ping_bytesr'   r'   r(   _ping~  s,   

(zEventSourceResponse._pingscopec              	      s   t  4 I dH Kdtg td f ffdd}|fdd |fdd |j jr?j | fdd W d  I dH  n1 I dH sZw   Y  jdurm I dH  dS dS )	a  Entrypoint for Starlette's ASGI contract. We spin up tasks:
        - _stream_response to push events
        - _ping to keep the connection alive
        - _listen_for_exit_signal to respond to server shutdown
        - _listen_for_disconnect to respond to client disconnect
        Ncoroc                    s   |  I d H   j   d S rx   )r   cancel)r   )
task_groupr'   r(   cancel_on_finish  s   z6EventSourceResponse.__call__.<locals>.cancel_on_finishc                      
     S rx   )r   r'   ru   r   r'   r(   <lambda>     
 z.EventSourceResponse.__call__.<locals>.<lambda>c                      r   rx   )r   r'   r   r'   r(   r     r   c                      s
     S rx   )r   r'   )r   ru   r'   r(   r     r   )r#   create_task_groupr	   r   
start_soonr   r^   rZ   )ru   r   r   r   r   r'   )r   ru   r   r   r(   __call__  s"   (
zEventSourceResponse.__call__)rT   NrU   NNNNNNNNr   )Fr)   N)%r   r   r    r!   rn   rh   ContentStreamr|   r   r   strr   r	   r   r
   r}   r   r   r#   r$   rw   propertyr   ro   setterr&   r   r   r   r   r   rO   r   r   r   r   r   r'   r'   r'   r(   rQ      sz    	

I
rQ   r   )@r@   loggingr1   	threadingdataclassesr   r   r   r   typingr   r   r   r	   r
   r   r   r   r   r   r#   starlette.backgroundr   starlette.concurrencyr   starlette.datastructuresr   starlette.responsesr   starlette.typesr   r   r   r   sse_starlette.eventr   r   	getLoggerr   r   r   localr,   r.   r8   r?   rE   TimeoutErrorrF   r9   uvicorn.mainrP   rN   rG   ImportErrorr   r   bytesdictContentSyncContentStreamAsyncContentStreamr   rQ   r'   r'   r'   r(   <module>   sL    0
	

$&