o
    i/                     @  s   d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
 ddlmZ ddlZddlmZ dd	lmZ dd
lmZ ddlmZmZmZ ddlmZ ddlmZmZmZ ddlm Z  ddl!m"Z"m#Z#m$Z$ e%e&Z'G dd dZ(dS )z/StreamableHTTP Session Manager for MCP servers.    )annotationsN)AsyncIterator)
HTTPStatus)Any)uuid4)
TaskStatus)Request)Response)ReceiveScopeSend)Server)MCP_SESSION_ID_HEADER
EventStoreStreamableHTTPServerTransport)TransportSecuritySettings)INVALID_REQUEST	ErrorDataJSONRPCErrorc                   @  sT   e Zd ZdZ					d"d#ddZejd$ddZd%ddZd%ddZ	d%d d!Z
dS )&StreamableHTTPSessionManagera  
    Manages StreamableHTTP sessions with optional resumability via event store.

    This class abstracts away the complexity of session management, event storage,
    and request handling for StreamableHTTP transports. It handles:

    1. Session tracking for clients
    2. Resumability via an optional event store
    3. Connection management and lifecycle
    4. Request handling and transport setup

    Important: Only one StreamableHTTPSessionManager instance should be created
    per application. The instance cannot be reused after its run() context has
    completed. If you need to restart the manager, create a new instance.

    Args:
        app: The MCP server instance
        event_store: Optional event store for resumability support.
                     If provided, enables resumable connections where clients
                     can reconnect and receive missed events.
                     If None, sessions are still tracked but not resumable.
        json_response: Whether to use JSON responses instead of SSE streams
        stateless: If True, creates a completely fresh transport for each request
                   with no session tracking or state persistence between requests.
        security_settings: Optional transport security settings.
        retry_interval: Retry interval in milliseconds to suggest to clients in SSE
                       retry field. Used for SSE polling behavior.
    NFappMCPServer[Any, Any]event_storeEventStore | Nonejson_responsebool	statelesssecurity_settings TransportSecuritySettings | Noneretry_interval
int | Nonec                 C  sN   || _ || _|| _|| _|| _|| _t | _i | _	d | _
t | _d| _d S )NF)r   r   r   r   r   r   anyioLock_session_creation_lock_server_instances_task_group	_run_lock_has_started)selfr   r   r   r   r   r    r)   V/home/ubuntu/.local/lib/python3.10/site-packages/mcp/server/streamable_http_manager.py__init__<   s   	


z%StreamableHTTPSessionManager.__init__returnAsyncIterator[None]c              
   C s   | j 4 I dH  | jrtdd| _W d  I dH  n1 I dH s#w   Y  t 4 I dH ?}|| _td zdV  W td |j	  d| _| j
  ntd |j	  d| _| j
  w W d  I dH  dS 1 I dH suw   Y  dS )aw  
        Run the session manager with proper lifecycle management.

        This creates and manages the task group for all session operations.

        Important: This method can only be called once per instance. The same
        StreamableHTTPSessionManager instance cannot be reused after this
        context manager exits. Create a new instance if you need to restart.

        Use this in the lifespan context manager of your Starlette app:

        @contextlib.asynccontextmanager
        async def lifespan(app: Starlette) -> AsyncIterator[None]:
            async with session_manager.run():
                yield
        NzyStreamableHTTPSessionManager .run() can only be called once per instance. Create a new instance if you need to run again.Tz&StreamableHTTP session manager startedz,StreamableHTTP session manager shutting down)r&   r'   RuntimeErrorr!   create_task_groupr%   loggerinfocancel_scopecancelr$   clear)r(   tgr)   r)   r*   runV   s,   (




.z StreamableHTTPSessionManager.runscoper   receiver
   sendr   Nonec                   sJ   | j du r
td| jr| |||I dH  dS | |||I dH  dS )a  
        Process ASGI request with proper session handling and transport setup.

        Dispatches to the appropriate handler based on stateless mode.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        Nz6Task group is not initialized. Make sure to use run().)r%   r.   r   _handle_stateless_request_handle_stateful_request)r(   r7   r8   r9   r)   r)   r*   handle_request   s   
z+StreamableHTTPSessionManager.handle_requestc                   s~   t d tdjdjd tjdd	 fdd}jdus#J j|I dH   	|||I dH   
 I dH  dS )
z
        Process request in stateless mode - creating a new transport for each request.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        z7Stateless mode: Creating new transport for this requestN)mcp_session_idis_json_response_enabledr   r   task_statusrA   TaskStatus[None]c              	     s      4 I d H @}|\}}|   zjj||j ddI d H  W n ty2   td Y nw W d   I d H  d S W d   I d H  d S 1 I d H sOw   Y  d S )NTr   zStateless session crashed)connectstartedr   r6   create_initialization_options	Exceptionr0   	exception)rA   streamsread_streamwrite_streamhttp_transportr(   r)   r*   run_stateless_server   s$   .zTStreamableHTTPSessionManager._handle_stateless_request.<locals>.run_stateless_server)rA   rB   )r0   debugr   r   r   r!   TASK_STATUS_IGNOREDr%   startr=   	terminate)r(   r7   r8   r9   rN   r)   rL   r*   r;      s   
z6StreamableHTTPSessionManager._handle_stateless_requestc              	     s  t ||}|jt}|dur+|jv r+j| }td ||||I dH  dS |du rtd j4 I dH X t	 j
}t|jjjjd  jdusTJ  j j< td|  tjdd fdd}jdusuJ j|I dH   |||I dH  W d  I dH  dS 1 I dH sw   Y  dS tddttddd}	t|	jdddtjdd}
|
|||I dH  dS )z
        Process request in stateful mode - maintaining session state between requests.

        Args:
            scope: ASGI scope
            receive: ASGI receive function
            send: ASGI send function
        Nz1Session already exists, handling request directlyzCreating new transport)r>   r?   r   r   r   z'Created new transport with session ID: r@   rA   rB   r,   r:   c                   s.     4 I d H }|\}}|   zPzjj||j ddI d H  W n  tyE } ztjd j d| dd W Y d }~nd }~ww W  jrb jj	v rb j
sbtd j d j	 j= n jr~ jj	v r~ j
s~td j d j	 j= w W d   I d H  d S 1 I d H sw   Y  d S )	NFrC   zSession z
 crashed: T)exc_infozCleaning up crashed session z from active instances.)rD   rE   r   r6   rF   rG   r0   errorr>   r$   is_terminatedr1   )rA   rI   rJ   rK   erL   r)   r*   
run_server   sV   
.zIStreamableHTTPSessionManager._handle_stateful_request.<locals>.run_serverz2.0zserver-errorzSession not found)codemessage)jsonrpcidrT   T)by_aliasexclude_nonezapplication/json)contentstatus_code
media_type)rA   rB   r,   r:   )r   headersgetr   r$   r0   rO   r=   r#   r   hexr   r   r   r   r   r>   r1   r!   rP   r%   rQ   r   r   r   r	   model_dump_jsonr   	NOT_FOUND)r(   r7   r8   r9   requestrequest_mcp_session_id	transportnew_session_idrW   error_responseresponser)   rL   r*   r<      sR   



.8z5StreamableHTTPSessionManager._handle_stateful_request)NFFNN)r   r   r   r   r   r   r   r   r   r   r   r    )r,   r-   )r7   r   r8   r
   r9   r   r,   r:   )__name__
__module____qualname____doc__r+   
contextlibasynccontextmanagerr6   r=   r;   r<   r)   r)   r)   r*   r      s     
(
1r   ))ro   
__future__r   rp   loggingcollections.abcr   httpr   typingr   uuidr   r!   	anyio.abcr   starlette.requestsr   starlette.responsesr	   starlette.typesr
   r   r   mcp.server.lowlevel.serverr   	MCPServermcp.server.streamable_httpr   r   r   mcp.server.transport_securityr   	mcp.typesr   r   r   	getLoggerrl   r0   r   r)   r)   r)   r*   <module>   s&    
