o
    #i4                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZ d dlmZmZ d dl	m
Z
mZmZmZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZm Z m!Z! d d
l"m#Z#m$Z$ e%e&Z'eG dd dZ(e) Z*de(fddZ+dd Z,dddZ-dddZ.G dd de/Z0G dd dZ1zd dl2m3Z3 e3j4e1_5e1j4e3_4W n e6y   e'7d Y nw ee8e9e:e#e
f Z;ee; Z<ee; Z=ee=e<f Z>G dd deZ?dS )    N)	dataclassfield)datetimetimezone)
AnyAsyncIterable	AwaitableCallable	CoroutineIteratorMappingOptionalSetUnion)BackgroundTask)iterate_in_threadpool)MutableHeaders)Response)ReceiveScopeSendMessage)ServerSentEventensure_bytesc                   @   s6   e Zd ZU dZeedZeej	 e
d< dZee
d< dS )_ShutdownStatezPer-thread state for shutdown coordination.

    Issue #152 fix: Uses threading.local() instead of ContextVar to ensure
    one watcher per thread rather than one per async context.
    )default_factoryeventsFwatcher_startedN)__name__
__module____qualname____doc__r   setr   r   anyioEvent__annotations__r   bool r'   r'   N/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/sse_starlette/sse.pyr   !   s   
 r   returnc                  C   s$   t tdd} | du rt } | t_| S )z4Get or create shutdown state for the current thread.shutdown_stateN)getattr_thread_stater   r*   )stater'   r'   r(   _get_shutdown_state1   s
   r.   c                  C   sN   zt t j} t| dr| j}t|dr|W S W dS W dS  ty&   Y dS w )am  
    Try to get uvicorn Server instance via signal handler introspection.

    When uvicorn registers signal handlers, they're bound methods on the Server instance.
    We can retrieve the Server from the handler's __self__ attribute.

    Returns None if:
    - Not running under uvicorn
    - Signal handler isn't a bound method
    - Any introspection fails
    __self__should_exitN)signal	getsignalSIGTERMhasattrr/   	Exception)handlerserverr'   r'   r(   _get_uvicorn_server:   s   

r8   c                     st   t  } t }z.	 tjrntjr|dur|jrdt_n	tdI dH  q	t| jD ]}|	  q)W d| _
dS d| _
w )ag  
    Poll for shutdown and broadcast to all events in this context.

    One watcher runs per thread (event loop). Checks two shutdown sources:
    1. AppStatus.should_exit - set when our monkey-patch works
    2. uvicorn Server.should_exit - via signal handler introspection (Issue #132 fix)

    When either becomes True, signals all registered events.
    TNg      ?F)r.   r8   	AppStatusr0   enable_automatic_graceful_drainr#   sleeplistr   r"   r   )r-   uvicorn_servereventr'   r'   r(   _shutdown_watcherQ   s(   

r?   c                  C   sL   t  } | js$d| _zt }|t  W dS  ty#   d| _Y dS w dS )zDEnsure the shutdown watcher is running for this thread (event loop).TFN)r.   r   asyncioget_running_loopcreate_taskr?   RuntimeError)r-   loopr'   r'   r(   $_ensure_watcher_started_on_this_loopu   s   rE   c                   @   s   e Zd ZdS )SendTimeoutErrorN)r   r   r    r'   r'   r'   r(   rF      s    rF   c                   @   sN   e Zd ZU dZdZdZdZee e	d< e
dd Ze
dd	 Ze
d
d ZdS )r9   z\Helper to capture a shutdown signal from Uvicorn so we can gracefully terminate SSE streams.FTNoriginal_handlerc                   C   
   dt _dS )aJ  
        Prevent automatic SSE stream termination on server shutdown.

        WARNING: When disabled, you MUST set AppStatus.should_exit = True
        at some point during shutdown, or streams will never close and the
        server will hang indefinitely (or until uvicorn's graceful shutdown
        timeout expires).
        FNr9   r:   r'   r'   r'   r(    disable_automatic_graceful_drain   s   

z*AppStatus.disable_automatic_graceful_drainc                   C   rH   )a  
        Re-enable automatic SSE stream termination on server shutdown.

        This restores the default behavior where SIGTERM triggers immediate
        stream draining. Call this to undo a previous call to
        disable_automatic_graceful_drain().
        TNrI   r'   r'   r'   r(   $enable_automatic_graceful_drain_mode   s   
	z.AppStatus.enable_automatic_graceful_drain_modec                  O   s.   t jrdt _t jd urt j| i | d S d S )NT)r9   r:   r0   rG   )argskwargsr'   r'   r(   handle_exit   s
   
zAppStatus.handle_exit)r   r   r    r!   r0   r:   rG   r   r	   r%   staticmethodrJ   rK   rN   r'   r'   r'   r(   r9      s   
 


r9   )ServerzHUvicorn not installed. Graceful shutdown on server termination disabled.c                   @   s^  e Zd ZdZdZdZ										d+deded	ee	e
e
f  d
e
dee dee dee
 deeg ef  deeg ed f  dee deeeged f  ddfddZedeeef fddZejdeeef ddfddZd,deddfddZdeddfdd Zd!eddfd"d#Zed-d$d%Zdeddfd&d'Zd(e d!ededdfd)d*Z!dS ).EventSourceResponsezf
    Streaming response that sends data conforming to the SSE (Server-Sent Events) specification.
       
   Ntext/event-streamcontentstatus_codeheaders
media_type
backgroundpingsepping_message_factorydata_sender_callable)NNNsend_timeoutclient_close_handler_callabler)   c                 C   s   |dvrt d| |p| j| _t|tr|| _nt|| _|| _|d u r)| jn|| _|| _	|	| _
|
| _t }|d urA|| |dd d|d< d|d< | | |d u r[| jn|| _|| _|| _d	| _t | _d S )
N)NrS   
z'sep must be one of: \r\n, \r, \n, got: zCache-Controlzno-storez
keep-alive
ConnectionnozX-Accel-BufferingT)
ValueErrorDEFAULT_SEPARATORr\   
isinstancer   body_iteratorr   rW   rY   rZ   r^   r_   r   update
setdefaultinit_headersDEFAULT_PING_INTERVALping_intervalr]   r`   activer#   Lock
_send_lock)selfrV   rW   rX   rY   rZ   r[   r\   r]   r^   r_   r`   _headersr'   r'   r(   __init__   s.   



zEventSourceResponse.__init__c                 C   s   | j S N)_ping_interval)rq   r'   r'   r(   rm     s   z!EventSourceResponse.ping_intervalvaluec                 C   s0   t |ttfstd|dk rtd|| _d S )Nzping interval must be intr   z$ping interval must be greater than 0)rg   intfloat	TypeErrorre   ru   )rq   rv   r'   r'   r(   rm     s
   
Fforcec                 C   s   t d)Nz-Compression is not supported for SSE streams.)NotImplementedError)rq   rz   r'   r'   r(   enable_compression  s   z&EventSourceResponse.enable_compressionsendc              	      s  |d| j | jdI dH  | j2 zI3 dH W }t|| j}td| t| j	}|d|ddI dH  W d   n1 s?w   Y  |rZ|j
rZt| jdrW| j I dH  t q6 | j4 I dH  d	| _|dd
d	dI dH  W d  I dH  dS 1 I dH sw   Y  dS )zHSend out SSE data to the client as it becomes available in the iterator.zhttp.response.start)typestatusrX   Nz	chunk: %shttp.response.bodyTr~   body	more_bodyacloseF    )rW   raw_headersrh   r   r\   loggerdebugr#   move_on_afterr_   cancel_calledr4   r   rF   rp   rn   )rq   r}   datachunkcancel_scoper'   r'   r(   _stream_response  s2   


.z$EventSourceResponse._stream_responsereceivec                    sX   | j r*| I dH }|d dkr%d| _ td | jr#| |I dH  dS | j sdS dS )z/Watch for a disconnect message from the client.Nr~   zhttp.disconnectFz+Got event: http.disconnect. Stop streaming.)rn   r   r   r`   )rq   r   messager'   r'   r(   _listen_for_disconnect+  s   
z*EventSourceResponse._listen_for_disconnectc               	      st   t jrdS t  t } t }| j| zt jr#W | j| dS |	 I dH  W | j| dS | j| w )z0Wait for shutdown signal via the shared watcher.N)
r9   r0   rE   r.   r#   r$   r   adddiscardwait)r-   r>   r'   r'   r(   _listen_for_exit_signal6  s   z+EventSourceResponse._listen_for_exit_signalc              	      s   | j r^t| jI dH  | jr|  ntdttj	 | j
d}t|| j
}td| | j4 I dH  | j rD|d|ddI dH  W d  I dH  n1 I dH sTw   Y  | j sdS dS )zPeriodically send ping messages to keep the connection alive on proxies.
        - frequenccy ca every 15 seconds.
        - Alternatively one can send periodically a comment line (one starting with a ':' character)
        Nzping - )commentr\   zping: %sr   Tr   )rn   r#   r;   ru   r]   r   r   nowr   utcr\   r   r   r   rp   )rq   r}   sse_ping
ping_bytesr'   r'   r(   _pingJ  s,   

(zEventSourceResponse._pingscopec              	      s   t  4 I dH Kdtg td f ffdd}|fdd |fdd |j jr?j | fdd W d  I dH  n1 I dH sZw   Y  jdurm I dH  dS dS )	a  Entrypoint for Starlette's ASGI contract. We spin up tasks:
        - _stream_response to push events
        - _ping to keep the connection alive
        - _listen_for_exit_signal to respond to server shutdown
        - _listen_for_disconnect to respond to client disconnect
        Ncoroc                    s   |  I d H   j   d S rt   )r   cancel)r   )
task_groupr'   r(   cancel_on_finishn  s   z6EventSourceResponse.__call__.<locals>.cancel_on_finishc                      
     S rt   )r   r'   rq   r}   r'   r(   <lambda>r     
 z.EventSourceResponse.__call__.<locals>.<lambda>c                      r   rt   )r   r'   r   r'   r(   r   s  r   c                      s
     S rt   )r   r'   )r   rq   r'   r(   r   {  r   )r#   create_task_groupr	   r   
start_soonr   r^   rZ   )rq   r   r   r}   r   r'   )r   rq   r}   r   r(   __call__e  s   (
zEventSourceResponse.__call__)
rT   NrU   NNNNNNN)Fr)   N)"r   r   r    r!   rl   rf   ContentStreamrw   r   r   strr   r	   r   r
   rx   r   r   rs   propertyr   rm   setterr&   r|   r   r   r   r   rO   r   r   r   r   r'   r'   r'   r(   rQ      sl    	

=rQ   r   )@r@   loggingr1   	threadingdataclassesr   r   r   r   typingr   r   r   r	   r
   r   r   r   r   r   r#   starlette.backgroundr   starlette.concurrencyr   starlette.datastructuresr   starlette.responsesr   starlette.typesr   r   r   r   sse_starlette.eventr   r   	getLoggerr   r   r   localr,   r.   r8   r?   rE   TimeoutErrorrF   r9   uvicorn.mainrP   rN   rG   ImportErrorr   r   bytesdictContentSyncContentStreamAsyncContentStreamr   rQ   r'   r'   r'   r(   <module>   sL    0
	

$&