o
    iV                     @   s   d Z ddlZddlZddlmZmZmZmZ ddlm	Z	 ddl
mZmZmZ ddlmZ e	G dd dZG d	d
 d
ZG dd deZdS )zTask observer for managing pipeline frame observers.

This module provides a proxy observer system that manages multiple observers
for pipeline frame events, ensuring that observer processing doesn't block
the main pipeline execution.
    N)AnyDictListOptional)	dataclass)BaseObserverFrameProcessedFramePushed)BaseTaskManagerc                   @   s.   e Zd ZU dZejed< ejed< eed< dS )Proxya{  Proxy data for managing observer tasks and queues.

    This represents is the data received from the main observer that
    is queued for later processing.

    Parameters:
        queue: Queue for frame data awaiting observer processing.
        task: Asyncio task running the observer's frame processing loop.
        observer: The actual observer instance being proxied.
    queuetaskobserverN)	__name__
__module____qualname____doc__asyncioQueue__annotations__Taskr    r   r   R/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/pipeline/task_observer.pyr      s
   
 

r   c                   @   s   e Zd ZdZdS )_PipelineStartedSignalzDInternal sentinel queued to observers when the pipeline has started.N)r   r   r   r   r   r   r   r   r   *   s    r   c                       s   e Zd ZdZdddeee  def fddZdefd	d
Z	defddZ
dd Zdd Z fddZdd ZdefddZdefddZdedefddZdee deeef fddZdefdd Zd!ejdefd"d#Z  ZS )$TaskObservera  Proxy observer that manages multiple observers without blocking the pipeline.

    This is a pipeline frame observer that is meant to be used as a proxy to
    the user provided observers. That is, this is the observer that should be
    passed to the frame processors. Then, every time a frame is pushed this
    observer will call all the observers registered to the pipeline task.

    This observer makes sure that passing frames to observers doesn't block the
    pipeline by creating a queue and a task for each user observer. When a frame
    is received, it will be put in a queue for efficiency and later processed by
    each task.
    N)	observersr   task_managerc                   s,   t  jdi | |pg | _|| _d| _dS )a  Initialize the TaskObserver.

        Args:
            observers: List of observers to manage. Defaults to empty list.
            task_manager: Task manager for creating and managing observer tasks.
            **kwargs: Additional arguments passed to the base observer.
        Nr   )super__init__
_observers_task_manager_proxies)selfr   r   kwargs	__class__r   r   r   >   s
   
zTaskObserver.__init__r   c                 C   s.   | j | | jr| |}|| j|< dS dS )zjAdd a new observer to the managed list.

        Args:
            observer: The observer to add.
        N)r   appendr!   _create_proxyr"   r   proxyr   r   r   add_observerS   s
   
zTaskObserver.add_observerc                    sV   | j r|| j v r| j | }| j |= | j|jI dH  || jv r)| j| dS dS )ztRemove an observer and clean up its resources.

        Args:
            observer: The observer to remove.
        N)r!   r    cancel_taskr   r   remover(   r   r   r   remove_observerb   s   

zTaskObserver.remove_observerc                    s   |  | j| _dS )zStart all proxy observer tasks.N)_create_proxiesr   r!   r"   r   r   r   startt   s   zTaskObserver.startc                    s4   | j sdS | j  D ]}| j|jI dH  qdS )zStop all proxy observer tasks.N)r!   valuesr    r+   r   r"   r)   r   r   r   stopx   s   zTaskObserver.stopc                    s:   t   I dH  | jsdS | jD ]	}| I dH  qdS )zCleanup all proxy observers.N)r   cleanupr!   r2   r$   r   r   r4      s   
zTaskObserver.cleanupc                    s   |  t I dH  dS )z9Forward pipeline started signal to all managed observers.N)_send_to_proxyr   r/   r   r   r   on_pipeline_started   s   z TaskObserver.on_pipeline_starteddatac                       |  |I dH  dS zQueue frame data for all managed observers.

        Args:
            data: The frame push event data to distribute to observers.
        Nr5   r"   r7   r   r   r   on_process_frame      zTaskObserver.on_process_framec                    r8   r9   r:   r;   r   r   r   on_push_frame   r=   zTaskObserver.on_push_framereturnc                 C   s8   t  }| j| ||d| d}t|||d}|S )z%Create a proxy for a single observer.zTaskObserver::z::_proxy_task_handler)r   r   r   )r   r   r    create_task_proxy_task_handlerr   )r"   r   r   r   r)   r   r   r   r'      s   

zTaskObserver._create_proxyc                 C   s$   i }|D ]}|  |}|||< q|S )z!Create proxies for all observers.)r'   )r"   r   proxiesr   r)   r   r   r   r.      s
   

zTaskObserver._create_proxiesc                    s(   | j  D ]}|j|I d H  qd S )N)r!   r1   r   put)r"   r7   r)   r   r   r   r5      s   zTaskObserver._send_to_proxyr   c                    s   d}t |j}t|jdkr5ddl}|  |d |dt	 W d   n1 s.w   Y  d}	 |
 I dH }t|trJ| I dH  n/t|trl|rc||j|j|j|j|jI dH  n||I dH  nt|try||I dH  |  q6)z.Handle frame processing for a single observer.F   r   NalwayszObserver `on_push_frame(source, destination, frame, direction, timestamp)` is deprecated, us `on_push_frame(data: FramePushed)` instead.T)inspect	signaturer>   len
parameterswarningscatch_warningssimplefilterwarnDeprecationWarningget
isinstancer   r6   r	   sourcedestinationframe	direction	timestampr   r<   	task_done)r"   r   r   on_push_frame_deprecatedrG   rJ   r7   r   r   r   rA      s6   




z TaskObserver._proxy_task_handler)r   r   r   r   r   r   r   r
   r   r*   r-   r0   r3   r4   r6   r   r<   r	   r>   r   r'   r   r.   r   r5   r   r   rA   __classcell__r   r   r$   r   r   0   s(    


r   )r   r   rF   typingr   r   r   r   attrr   pipecat.observers.base_observerr   r   r	   "pipecat.utils.asyncio.task_managerr
   r   r   r   r   r   r   r   <module>   s   