o
    c۷i                     @   s   d Z ddlmZmZ ddlmZmZ ddlmZmZ ddl	m
Z
mZ ddlZddlmZ ddlmZmZmZ eG d	d
 d
ZG dd deZG dd deZdS )a)  
TaskMessageQueue - FIFO queue for task-related messages.

This implements the core message queue pattern from the MCP Tasks spec.
When a handler needs to send a request (like elicitation) during a task-augmented
request, the message is enqueued instead of sent directly. Messages are delivered
to the client only through the `tasks/result` endpoint.

This pattern enables:
1. Decoupling request handling from message delivery
2. Proper bidirectional communication via the tasks/result stream
3. Automatic status management (working <-> input_required)
    )ABCabstractmethod)	dataclassfield)datetimetimezone)AnyLiteralN)Resolver)JSONRPCNotificationJSONRPCRequest	RequestIdc                   @   st   e Zd ZU dZed ed< 	 eeB ed< 	 edd dZ	e
ed< 	 d	Zeeeef  d	B ed
< 	 d	Zed	B ed< d	S )QueuedMessagez
    A message queued for delivery via tasks/result.

    Messages are stored with their type and a resolver for requests
    that expect responses.
    )requestnotificationtypemessagec                   C   s   t tjS N)r   nowr   utc r   r   a/home/ubuntu/vllm_env/lib/python3.10/site-packages/mcp/shared/experimental/tasks/message_queue.py<lambda>)   s    zQueuedMessage.<lambda>)default_factory	timestampNresolveroriginal_request_id)__name__
__module____qualname____doc__r	   __annotations__r   r   r   r   r   r   r
   dictstrr   r   r   r   r   r   r   r      s   
 r   c                   @   s   e Zd ZdZedededdfddZedededB fdd	ZedededB fd
dZ	edede
fddZededee fddZededdfddZededdfddZdS )TaskMessageQueuea"  
    Abstract interface for task message queuing.

    This is a FIFO queue that stores messages to be delivered via `tasks/result`.
    When a task-augmented handler calls elicit() or sends a notification, the
    message is enqueued here instead of being sent directly to the client.

    The `tasks/result` handler then dequeues and sends these messages through
    the transport, with `relatedRequestId` set to the tasks/result request ID
    so responses are routed correctly.

    Implementations can use in-memory storage, Redis, etc.
    task_idr   returnNc                       dS )z
        Add a message to the queue for a task.

        Args:
            task_id: The task identifier
            message: The message to enqueue
        Nr   )selfr%   r   r   r   r   enqueueB       zTaskMessageQueue.enqueuec                    r'   )z
        Remove and return the next message from the queue.

        Args:
            task_id: The task identifier

        Returns:
            The next message, or None if queue is empty
        Nr   r(   r%   r   r   r   dequeueL   r*   zTaskMessageQueue.dequeuec                    r'   )z
        Return the next message without removing it.

        Args:
            task_id: The task identifier

        Returns:
            The next message, or None if queue is empty
        Nr   r+   r   r   r   peekX   r*   zTaskMessageQueue.peekc                    r'   )z
        Check if the queue is empty for a task.

        Args:
            task_id: The task identifier

        Returns:
            True if no messages are queued
        Nr   r+   r   r   r   is_emptyd   r*   zTaskMessageQueue.is_emptyc                    r'   )a  
        Remove and return all messages from the queue.

        This is useful for cleanup when a task is cancelled or completed.

        Args:
            task_id: The task identifier

        Returns:
            All queued messages (may be empty)
        Nr   r+   r   r   r   clearp   r*   zTaskMessageQueue.clearc                    r'   )z
        Wait until a message is available in the queue.

        This blocks until either:
        1. A message is enqueued for this task
        2. The wait is cancelled

        Args:
            task_id: The task identifier
        Nr   r+   r   r   r   wait_for_message~   r*   z!TaskMessageQueue.wait_for_messagec                    r'   )z
        Signal that a message is available for a task.

        This wakes up any coroutines waiting in wait_for_message().

        Args:
            task_id: The task identifier
        Nr   r+   r   r   r   notify_message_available   r*   z)TaskMessageQueue.notify_message_available)r   r   r   r    r   r#   r   r)   r,   r-   boolr.   listr/   r0   r1   r   r   r   r   r$   3   s     	r$   c                   @   s   e Zd ZdZdddZdedee fddZded	eddfd
dZ	dededB fddZ
dededB fddZdedefddZdedee fddZdeddfddZdeddfddZddedB ddfddZdS )InMemoryTaskMessageQueueaA  
    In-memory implementation of TaskMessageQueue.

    This is suitable for single-process servers. For distributed systems,
    implement TaskMessageQueue with Redis, RabbitMQ, etc.

    Features:
    - FIFO ordering per task
    - Async wait for message availability
    - Thread-safe for single-process async use
    r&   Nc                 C   s   i | _ i | _d S r   )_queues_events)r(   r   r   r   __init__   s   
z!InMemoryTaskMessageQueue.__init__r%   c                 C   s   || j vr
g | j |< | j | S )z#Get or create the queue for a task.)r5   r+   r   r   r   
_get_queue   s   


z#InMemoryTaskMessageQueue._get_queuer   c                    s*   |  |}|| | |I dH  dS )zAdd a message to the queue.N)r8   appendr1   )r(   r%   r   queuer   r   r   r)      s   

z InMemoryTaskMessageQueue.enqueuec                    s   |  |}|s
dS |dS )z#Remove and return the next message.Nr   )r8   popr(   r%   r:   r   r   r   r,      s
   

z InMemoryTaskMessageQueue.dequeuec                    s   |  |}|s
dS |d S )z,Return the next message without removing it.Nr   )r8   r<   r   r   r   r-      s
   
zInMemoryTaskMessageQueue.peekc                    s   |  |}t|dkS )zCheck if the queue is empty.r   )r8   lenr<   r   r   r   r.      s   
z!InMemoryTaskMessageQueue.is_emptyc                    s    |  |}t|}|  |S )zRemove and return all messages.)r8   r3   r/   )r(   r%   r:   messagesr   r   r   r/      s
   
zInMemoryTaskMessageQueue.clearc                    sT   |  |I dH sdS t | j|< | j| }|  |I dH s!dS | I dH  dS )z"Wait until a message is available.N)r.   anyioEventr6   wait)r(   r%   eventr   r   r   r0      s   
z)InMemoryTaskMessageQueue.wait_for_messagec                    s"   || j v r| j |   dS dS )z#Signal that a message is available.N)r6   setr+   r   r   r   r1      s   
z1InMemoryTaskMessageQueue.notify_message_availablec                 C   s@   |dur| j |d | j|d dS | j   | j  dS )z
        Clean up queues and events.

        Args:
            task_id: If provided, clean up only this task. Otherwise clean up all.
        N)r5   r;   r6   r/   r+   r   r   r   cleanup   s
   
z InMemoryTaskMessageQueue.cleanup)r&   Nr   )r   r   r   r    r7   r#   r3   r   r8   r)   r,   r-   r2   r.   r/   r0   r1   rD   r   r   r   r   r4      s    
r4   )r    abcr   r   dataclassesr   r   r   r   typingr   r	   r?   &mcp.shared.experimental.tasks.resolverr
   	mcp.typesr   r   r   r   r$   r4   r   r   r   r   <module>   s    d