o
    c۷in                     @   sb  d Z ddlZddlZddlmZmZmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZmZ ddlmZ ddlZddlZdd	lmZ dd
lmZmZ ddlmZmZmZ ddlmZ ddlmZm Z  ddl!m"Z"m#Z# ddl$m%Z%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z, e-e.Z/e#e0B Z1ee1 Z2ee# Z3eg e4dB f Z5dZ6dZ7dZ8dZ9dZ:dZ;dZ<dZ=dZ>e? Z@G dd de0ZAG dd deAZBe	G dd dZCG dd  d ZDedd!d"d#e4d$ejEdB d%eFd&eeGee#e0B  ee# e5f df fd'd(ZHeed)dd*d+d!e dfd#e4d,eIe4e4f dB d-eJeB d.eJeB d%eFd/ed0ejKdB d&eeGee#e0B  ee# e5f df fd1d2ZLdS )3z
StreamableHTTP Client Transport Module

This module implements the StreamableHTTP transport for MCP clients,
providing support for HTTP POST requests with optional SSE streaming responses
and session management.
    N)AsyncGenerator	AwaitableCallable)asynccontextmanager)	dataclass)	timedelta)Anyoverload)warn)	TaskGroup)MemoryObjectReceiveStreamMemoryObjectSendStream)EventSourceServerSentEventaconnect_sse)
deprecated)McpHttpClientFactorycreate_mcp_http_client)ClientMessageMetadataSessionMessage)	ErrorDataInitializeResultJSONRPCErrorJSONRPCMessageJSONRPCNotificationJSONRPCRequestJSONRPCResponse	RequestIdzmcp-session-idzmcp-protocol-versionzlast-event-idi     zcontent-typeacceptzapplication/jsonztext/event-streamc                   @      e Zd ZdZdS )StreamableHTTPErrorz3Base exception for StreamableHTTP transport errors.N__name__
__module____qualname____doc__ r'   r'   P/home/ubuntu/vllm_env/lib/python3.10/site-packages/mcp/client/streamable_http.pyr!   C       r!   c                   @   r    )ResumptionErrorz*Raised when resumption request is invalid.Nr"   r'   r'   r'   r(   r*   G   r)   r*   c                   @   sl   e Zd ZU dZejed< edB ed< eed< e	dB ed< e
ed< dZeeef dB ed< dZedB ed	< dS )
RequestContextz Context for a request operation.clientN
session_idsession_messagemetadataread_stream_writerheaderssse_read_timeout)r#   r$   r%   r&   httpxAsyncClient__annotations__strr   r   StreamWriterr1   dictr2   floatr'   r'   r'   r(   r+   K   s   
 
r+   c                   @   sn  e Zd ZdZededdfddZeed					dFded
eeef dB de	e
B de	e
B dejdB ddfddZeeeefded
ededededdfddZdeeef fddZdedefddZdedefddZdejddfddZdeddfddZ			dGdedededB d eeged f dB d!edefd"d#Zd$ejdeddfd%d&Zd'eddfd(d)Z d'eddfd*d+Z!	dHdejded!eddfd,d-Z"	dHdejd'ed!eddfd.d/Z#		0dId'ed1ed2e$dB d3e$ddf
d4d5Z%d6ededdfd7d8Z&ded9eddfd:d;Z'd$ejd<e(ded=e)e* d>eg df d?e+ddfd@dAZ,d$ejddfdBdCZ-dedB fdDdEZ.dS )JStreamableHTTPTransportz/StreamableHTTP client transport implementation.urlreturnNc                 C      d S Nr'   )selfr;   r'   r'   r(   __init__[   s   z StreamableHTTPTransport.__init__zyParameters headers, timeout, sse_read_timeout, and auth are deprecated. Configure these on the httpx.AsyncClient instead.   ,  r1   timeoutr2   authc                 C   r=   r>   r'   )r?   r;   r1   rC   r2   rD   r'   r'   r(   r@   ^   s   c                 C   s   g }|t ur|d |t ur|d |t ur|d |t ur&|d |r6tdd| dtdd	 || _d
| _d
| _d
S )aS  Initialize the StreamableHTTP transport.

        Args:
            url: The endpoint URL.
            headers: Optional headers to include in requests.
            timeout: HTTP timeout for regular operations.
            sse_read_timeout: Timeout for SSE read operations.
            auth: Optional HTTPX authentication handler.
        r1   rC   r2   rD   zParameters , zV are deprecated and will be ignored. Configure these on the httpx.AsyncClient instead.r   )
stacklevelN)_UNSETappendr
   joinDeprecationWarningr;   r-   protocol_version)r?   r;   r1   rC   r2   rD   deprecated_paramsr'   r'   r(   r@   l   s$   




c                 C   sB   i }t  dt |t< t |t< | jr| j|t< | jr| j|t< |S )zBuild MCP-specific request headers.

        These headers will be merged with the httpx.AsyncClient's default headers,
        with these MCP-specific headers taking precedence.
        rE   )JSONSSEACCEPTCONTENT_TYPEr-   MCP_SESSION_IDrK   MCP_PROTOCOL_VERSION)r?   r1   r'   r'   r(   _prepare_headers   s   

z(StreamableHTTPTransport._prepare_headersmessagec                 C      t |jto|jjdkS )z2Check if the message is an initialization request.
initialize)
isinstancerootr   methodr?   rT   r'   r'   r(   _is_initialization_request      z2StreamableHTTPTransport._is_initialization_requestc                 C   rU   )z4Check if the message is an initialized notification.znotifications/initialized)rW   rX   r   rY   rZ   r'   r'   r(   _is_initialized_notification   r\   z4StreamableHTTPTransport._is_initialized_notificationresponsec                 C   s0   |j t}|r|| _td| j  dS dS )z3Extract and store session ID from response headers.zReceived session ID: N)r1   getrQ   r-   loggerinfo)r?   r^   new_session_idr'   r'   r(   '_maybe_extract_session_id_from_response   s
   z?StreamableHTTPTransport._maybe_extract_session_id_from_responsec              
   C   s   t |jtrI|jjrKzt|jj}t|j| _t	
d| j  W dS  tyH } zt	d|  t	d|jj  W Y d}~dS d}~ww dS dS )z>Extract protocol version from initialization response message.zNegotiated protocol version: z=Failed to parse initialization response as InitializeResult: zRaw result: N)rW   rX   r   resultr   model_validater6   protocolVersionrK   r`   ra   	Exceptionwarning)r?   rT   init_resultexcr'   r'   r(   ,_maybe_extract_protocol_version_from_message   s   "zDStreamableHTTPTransport._maybe_extract_protocol_version_from_messageFsser0   original_request_idresumption_callbackis_initializationc           	   
      s  |j dkr|js|jr|r||jI dH  dS zGt|j}td|  |r.| | |dur>t|j	t
tB r>||j	_t|}||I dH  |jrW|rW||jI dH  t|j	t
tB W S  ty } ztd ||I dH  W Y d}~dS d}~ww td|j   dS )z@Handle an SSE event, returning True if the response is complete.rT   NFzSSE message: zError parsing SSE messagezUnknown SSE event: )eventdataidr   model_validate_jsonr`   debugrk   rW   rX   r   r   r   sendrg   	exceptionrh   )	r?   rl   r0   rm   rn   ro   rT   r.   rj   r'   r'   r(   _handle_sse_event   s4   
	



z)StreamableHTTPTransport._handle_sse_eventr,   c              
      s  d}d}d}|t k rzd| jsW dS |  }|r||t< t|d| j|d4 I dH :}|j  t	d |
 2 z3 dH W }|jrD|j}|jdurL|j}| ||I dH  q86 d}W d  I dH  n1 I dH siw   Y  W n ty }	 zt	d|	  |d7 }W Y d}	~	nd}	~	ww |t krt	dt  d	 dS |dur|nt}
td
|
 d t|
d I dH  |t k sdS dS )zDHandle GET stream for server-initiated messages with auto-reconnect.Nr   GETr1   zGET SSE connection establishedzGET stream error:    z&GET stream max reconnection attempts (
) exceededz)GET stream disconnected, reconnecting in zms...     @@)MAX_RECONNECTION_ATTEMPTSr-   rS   LAST_EVENT_IDr   r;   r^   raise_for_statusr`   rt   	aiter_sserr   retryrw   rg   DEFAULT_RECONNECTION_DELAY_MSra   anyiosleep)r?   r,   r0   last_event_idretry_interval_msattemptr1   event_sourcerl   rj   delay_msr'   r'   r(   handle_get_stream   sT   


(z)StreamableHTTPTransport.handle_get_streamctxc              	      s  |   }|jr|jjr|jj|t< ntdd}t|jjjt	r'|jjjj
}t|jd| j|d4 I dH A}|j  td | 2 z%3 dH W }| ||j||jrW|jjndI dH }|ri|j I dH   nqD6 W d  I dH  dS 1 I dH s|w   Y  dS )z/Handle a resumption request using GET with SSE.z.Resumption request requires a resumption tokenNrx   ry   z)Resumption GET SSE connection established)rS   r/   resumption_tokenr~   r*   rW   r.   rT   rX   r   rr   r   r,   r;   r^   r   r`   rt   r   rw   r0   on_resumption_token_updateaclose)r?   r   r1   rm   r   rl   is_completer'   r'   r(   _handle_resumption_request-  s<   


.z2StreamableHTTPTransport._handle_resumption_requestc              	      s  |   }|jj}| |}|jjd| j|jdddd|d4 I dH }|jdkr:t	
d 	 W d  I dH  dS |jd	kr]t|jtrQ| |j|jjI dH  	 W d  I dH  dS |  |rh| | t|jtr|jtd
 }|tr| ||j|I dH  n0|tr| |||I dH  n+| ||jI dH  W d  I dH  dS W d  I dH  dS W d  I dH  dS W d  I dH  dS 1 I dH sw   Y  dS )z/Handle a POST request with response processing.POSTTjson)by_aliasmodeexclude_none)r   r1   N   zReceived 202 Acceptedi   )rS   r.   rT   r[   r,   streamr;   
model_dumpstatus_coder`   rt   rW   rX   r   _send_session_terminated_errorr0   rr   r   rc   r1   r_   rP   lower
startswithrM   _handle_json_responserN   _handle_sse_response_handle_unexpected_content_type)r?   r   r1   rT   ro   r^   content_typer'   r'   r(   _handle_post_requestN  sV   








.z,StreamableHTTPTransport._handle_post_requestc              
      s   z"|  I dH }t|}|r| | t|}||I dH  W dS  tyC } ztd ||I dH  W Y d}~dS d}~ww )z%Handle JSON response from the server.NzError parsing JSON response)	areadr   rs   rk   r   ru   rg   r`   rv   )r?   r^   r0   ro   contentrT   r.   rj   r'   r'   r(   r   x  s   


z-StreamableHTTPTransport._handle_json_responsec           
   
      s   d}d}zAt |}| 2 z53 dH W }|jr|j}|jdur"|j}| j||j|jr.|jjnd|dI dH }|rC| I dH   W dS q6 W n t	y` }	 zt
d|	  W Y d}	~	nd}	~	ww |durvt
d | |||I dH  dS dS )z$Handle SSE response from the server.N)rn   ro   zSSE stream ended: (SSE stream disconnected, reconnecting...)r   r   rr   r   rw   r0   r/   r   r   rg   r`   rt   ra   _handle_reconnection)
r?   r^   r   ro   r   r   r   rl   r   er'   r'   r(   r     s:   

z,StreamableHTTPTransport._handle_sse_responser   r   r   r   c              
      s  |t krtdt  d dS |dur|nt}t|d I dH  |  }||t< d}t|j	j
jtr:|j	j
jj}zt|jd| j|d4 I dH o}|j  td |}	|}
| 2 z>3 dH W }|jrh|j}	|jdurp|j}
| ||j||jr}|jjndI dH }|r|j I dH   W d  I dH  W dS q\6 td | ||	|
d	I dH  W d  I dH  W dS 1 I dH sw   Y  W dS  ty } ztd
|  | ||||d I dH  W Y d}~dS d}~ww )zFReconnect with Last-Event-ID to resume stream after server disconnect.zMax reconnection attempts (r{   Nr|   rx   ry   zReconnected to SSE streamr   r   zReconnection failed: rz   )r}   r`   rt   r   r   r   rS   r~   rW   r.   rT   rX   r   rr   r   r,   r;   r^   r   ra   r   r   rw   r0   r/   r   r   r   rg   )r?   r   r   r   r   r   r1   rm   r   reconnect_last_event_idreconnect_retry_msrl   r   r   r'   r'   r(   r     s`   	




2 (z,StreamableHTTPTransport._handle_reconnectionr   c                    s.   d| }t | |t|I dH  dS )z+Handle unexpected content type in response.zUnexpected content type: N)r`   errorru   
ValueError)r?   r   r0   	error_msgr'   r'   r(   r     s   

z7StreamableHTTPTransport._handle_unexpected_content_type
request_idc                    s8   t d|tdddd}tt|}||I dH  dS )z)Send a session terminated error response.z2.0iX  zSession terminated)coderT   )jsonrpcrr   r   N)r   r   r   r   ru   )r?   r0   r   jsonrpc_errorr.   r'   r'   r(   r     s   
z6StreamableHTTPTransport._send_session_terminated_errorwrite_stream_readerwrite_streamstart_get_streamtgc              	      sf  zzq|4 I dH ^ |2 zO3 dH W }|j }t|jtr|jnd}	t|	o%|	jtd|  |r7|  t	|j
||	|d  fdd}
t|jtrU||
 q|
 I dH  q6 W d  I dH  n1 I dH smw   Y  W n ty   td Y nw W | I dH  | I dH  dS W | I dH  | I dH  dS | I dH  | I dH  w )z&Handle writing requests to the server.NzSending client message: )r,   r-   r.   r/   r0   c                      s.   r  I d H  d S  I d H  d S r>   )r   r   r'   r   is_resumptionr?   r'   r(   handle_request_async1  s   zAStreamableHTTPTransport.post_writer.<locals>.handle_request_asynczError in post_writer)rT   rW   r/   r   boolr   r`   rt   r]   r+   r-   rX   r   
start_soonrg   rv   r   )r?   r,   r   r0   r   r   r   r.   rT   r/   r   r'   r   r(   post_writer  sL   


(&z#StreamableHTTPTransport.post_writerc              
      s   | j sdS z0|  }|j| j|dI dH }|jdkr#td W dS |jdvr4td|j  W dS W dS  tyQ } ztd|  W Y d}~dS d}~ww )z2Terminate the session by sending a DELETE request.Nry   i  z)Server does not allow session termination)      zSession termination failed: )	r-   rS   deleter;   r   r`   rt   rh   rg   )r?   r,   r1   r^   rj   r'   r'   r(   terminate_sessionC  s   

z)StreamableHTTPTransport.terminate_sessionc                 C   s   | j S )zGet the current session ID.)r-   )r?   r'   r'   r(   get_session_idS  s   z&StreamableHTTPTransport.get_session_id)NrA   rB   N)NNF)F)Nr   )/r#   r$   r%   r&   r	   r6   r@   r   r8   r9   r   r3   AuthrG   r   rS   r   r   r[   r]   Responserc   rk   r   r7   r   r   r   rw   r4   r   r+   r   r   r   r   intr   r   r   StreamReaderr   r   r   r   r   r   r'   r'   r'   r(   r:   X   s    
(



/
6!.

,
?




7r:   Thttp_clientterminate_on_closer;   r   r   r<   c          	        s  t jttB  d\}t jt d\}}|du}|  du r"t  t| t  4 I dH ztd|   t	
 4 I dH Z}|sK| I dH  d fdd}j ||| z||jfV  W jrw|rw I dH  j  njr|r I dH  j  w W d  I dH  n1 I dH sw   Y  W  I dH  | I dH  n I dH  | I dH  w W d  I dH  dS 1 I dH sw   Y  dS )a8  
    Client transport for StreamableHTTP.

    Args:
        url: The MCP server endpoint URL.
        http_client: Optional pre-configured httpx.AsyncClient. If None, a default
            client with recommended MCP timeouts will be created. To configure headers,
            authentication, or other HTTP settings, create an httpx.AsyncClient and pass it here.
        terminate_on_close: If True, send a DELETE request to terminate the session
            when the context exits.

    Yields:
        Tuple containing:
            - read_stream: Stream for reading messages from the server
            - write_stream: Stream for sending messages to the server
            - get_session_id_callback: Function to retrieve the current session ID

    Example:
        See examples/snippets/clients/ for usage patterns.
    r   Nz'Connecting to StreamableHTTP endpoint: r<   c                      s    j  d S r>   )r   r   r'   r,   r0   r   	transportr'   r(   r     s   z0streamable_http_client.<locals>.start_get_stream)r<   N)r   create_memory_object_streamr   rg   r   r:   create_task_groupr`   rt   
contextlibAsyncExitStackenter_async_contextr   r   r   r-   r   cancel_scopecancelr   )	r;   r   r   read_streamr   r   client_providedstackr   r'   r   r(   streamable_http_clientX  sT   "


(.r   z%Use `streamable_http_client` instead.rA   rB   r1   rC   r2   httpx_client_factoryrD   c              
   C  s   t |tr
| n|}t |tr| n|}||tj||d|d}	|	4 I d H 0 t| |	|d4 I d H }
|
V  W d   I d H  n1 I d H sIw   Y  W d   I d H  d S 1 I d H s_w   Y  d S )N)read)r1   rC   rD   r   )rW   r   total_secondsr3   Timeoutr   )r;   r1   rC   r2   r   r   rD   timeout_secondssse_read_timeout_secondsr,   streamsr'   r'   r(   streamablehttp_client  s&   *.r   )Mr&   r   loggingcollections.abcr   r   r   r   dataclassesr   datetimer   typingr   r	   warningsr
   r   r3   	anyio.abcr   anyio.streams.memoryr   r   	httpx_sser   r   r   typing_extensionsr   mcp.shared._httpx_utilsr   r   mcp.shared.messager   r   	mcp.typesr   r   r   r   r   r   r   r   	getLoggerr#   r`   rg   SessionMessageOrErrorr7   r   r6   GetSessionIdCallbackrQ   rR   r~   r   r}   rP   rO   rM   rN   objectrG   r!   r*   r+   r:   r4   r   tupler   r8   r9   r   r   r'   r'   r'   r(   <module>   s    (
    
S
