o
    i2                     @   sX   d Z ddlZddlmZmZmZmZ ddlmZm	Z	 ddl
mZmZ G dd deZdS )z:Idle frame processor for timeout-based callback execution.    N)	AwaitableCallableListOptional)Frame
StartFrame)FrameDirectionFrameProcessorc                	       sx   e Zd ZdZddded ged f dedeee	  f fddZ
d	ed
ef fddZdd Zdd Zdd Z  ZS )IdleFrameProcessora  Monitors frame activity and triggers callbacks on timeout.

    This processor waits to receive any frame or specific frame types within a
    given timeout period. If the timeout is reached before receiving the expected
    frames, the provided callback will be executed.
    N)typescallbacktimeoutr   c                   s2   t  jdi | || _|| _|pg | _d| _dS )a  Initialize the idle frame processor.

        Args:
            callback: Async callback function to execute on timeout. Receives
                this processor instance as an argument.
            timeout: Timeout duration in seconds before triggering the callback.
            types: Optional list of frame types to monitor. If None, monitors
                all frames.
            **kwargs: Additional arguments passed to parent class.
        N )super__init__	_callback_timeout_types
_idle_task)selfr   r   r   kwargs	__class__r   [/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/processors/idle_frame_processor.pyr      s
   

zIdleFrameProcessor.__init__frame	directionc                    sr   t  ||I dH  t|tr|   | ||I dH  | js'| j  dS | jD ]}t||r6| j  q*dS )zProcess incoming frames and manage idle timeout monitoring.

        Args:
            frame: The frame to process.
            direction: The direction of frame flow in the pipeline.
        N)	r   process_frame
isinstancer   _create_idle_task
push_framer   _idle_eventset)r   r   r   tr   r   r   r   1   s   



z IdleFrameProcessor.process_framec                    s"   | j r| | j I dH  dS dS )z,Clean up resources and cancel pending tasks.N)r   cancel_taskr   r   r   r   cleanupH   s   zIdleFrameProcessor.cleanupc                 C   s(   | j st | _| |  | _ dS dS )z*Create and start the idle monitoring task.N)r   asyncioEventr    create_task_idle_task_handlerr$   r   r   r   r   M   s   
z$IdleFrameProcessor._create_idle_taskc                    sh   	 z*zt j| j | jdI dH  W n t jy%   | | I dH  Y nw W | j  n| j  w q)z6Handle idle timeout monitoring and callback execution.T)r   N)r&   wait_forr    waitr   TimeoutErrorr   clearr$   r   r   r   r)   S   s    z%IdleFrameProcessor._idle_task_handler)__name__
__module____qualname____doc__r   r   floatr   r   typer   r   r   r   r%   r   r)   __classcell__r   r   r   r   r
      s    
r
   )r1   r&   typingr   r   r   r   pipecat.frames.framesr   r   "pipecat.processors.frame_processorr   r	   r
   r   r   r   r   <module>   s   