o
    i                     @   sl   d Z ddlZddlmZmZmZ ddlmZmZm	Z	m
Z
 ddlmZmZ ddlmZmZ G dd deZdS )	zFConsumer processor for consuming frames from ProducerProcessor queues.    N)	AwaitableCallableOptional)CancelFrameEndFrameFrame
StartFrame)FrameDirectionFrameProcessor)ProducerProcessoridentity_transformerc                       s   e Zd ZdZeejddedee	ge
e	 f def fddZde	def fd	d
ZdefddZdefddZdefddZdd Z  ZS )ConsumerProcessora=  Frame processor that consumes frames from a ProducerProcessor's queue.

    This processor passes through frames normally while also consuming frames
    from a ProducerProcessor's queue. When frames are received from the producer
    queue, they are optionally transformed and pushed in the specified direction.
    )transformer	directionproducerr   r   c                   s.   t  jdi | || _|| _|| _d| _dS )a|  Initialize the consumer processor.

        Args:
            producer: The producer processor to consume frames from.
            transformer: Function to transform frames before pushing. Defaults to identity_transformer.
            direction: Direction to push consumed frames. Defaults to DOWNSTREAM.
            **kwargs: Additional arguments passed to parent class.
        N )super__init___transformer
_direction	_producer_consumer_task)selfr   r   r   kwargs	__class__r   Y/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/processors/consumer_processor.pyr      s
   
zConsumerProcessor.__init__framec                    s~   t  ||I dH  t|tr| |I dH  nt|tr'| |I dH  nt|tr4| |I dH  | 	||I dH  dS )zProcess incoming frames and handle lifecycle events.

        Args:
            frame: The frame to process.
            direction: The direction the frame is traveling.
        N)
r   process_frame
isinstancer   _startr   _stopr   _cancel
push_frame)r   r   r   r   r   r   r   /   s   


zConsumerProcessor.process_frame_c                    s,   | j s| j | _| |  | _ dS dS )z7Start the consumer task and register with the producer.N)r   r   add_consumer_queuecreate_task_consumer_task_handlerr   r$   r   r   r   r    A   s
   zConsumerProcessor._startc                    "   | j r| | j I dH  dS dS )zStop the consumer task.Nr   cancel_taskr)   r   r   r   r!   G      zConsumerProcessor._stopc                    r*   )zCancel the consumer task.Nr+   r)   r   r   r   r"   L   r-   zConsumerProcessor._cancelc                    s:   	 | j  I dH }| |I dH }| || jI dH  q)z0Handle consuming frames from the producer queue.TN)r&   getr   queue_framer   )r   r   	new_framer   r   r   r(   Q   s   z(ConsumerProcessor._consumer_task_handler)__name__
__module____qualname____doc__r   r	   
DOWNSTREAMr   r   r   r   r   r   r   r    r   r!   r   r"   r(   __classcell__r   r   r   r   r      s     r   )r4   asynciotypingr   r   r   pipecat.frames.framesr   r   r   r   "pipecat.processors.frame_processorr	   r
   %pipecat.processors.producer_processorr   r   r   r   r   r   r   <module>   s   