o
    i                     @   sl   d Z ddlZddlZddlZddlmZ ddlmZ ddlm	Z	 ddl
mZ ddlmZ G dd	 d	eZdS )
zPipeline runner for managing pipeline task execution.

This module provides the PipelineRunner class that handles the execution
of pipeline tasks with signal handling, garbage collection, and lifecycle
management.
    N)Optional)logger)PipelineTaskParams)PipelineTask)
BaseObjectc                       s   e Zd ZdZdddddddee deded	ed
eej f
 fddZ	de
fddZdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Z  ZS ) PipelineRunnera  Manages the execution of pipeline tasks with lifecycle and signal handling.

    Provides a high-level interface for running pipeline tasks with automatic
    signal handling (SIGINT/SIGTERM), optional garbage collection, and proper
    cleanup of resources.
    NTF)namehandle_siginthandle_sigtermforce_gcloopr   r	   r
   r   r   c                   sN   t  j|d i | _d| _|| _|pt | _|r|   |r%| 	  dS dS )a  Initialize the pipeline runner.

        Args:
            name: Optional name for the runner instance.
            handle_sigint: Whether to automatically handle SIGINT signals.
            handle_sigterm: Whether to automatically handle SIGTERM signals.
            force_gc: Whether to force garbage collection after task completion.
            loop: Event loop to use. If None, uses the current running loop.
        )r   N)
super__init___tasks	_sig_task	_force_gcasyncioget_running_loop_loop_setup_sigint_setup_sigterm)selfr   r	   r
   r   r   	__class__ K/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/pipeline/runner.pyr   "   s   zPipelineRunner.__init__taskc                    s   t d|  d|  || j|j< zt| jd}||I dH  W n
 tjy,   Y nw | j|j= | 	 I dH  | j
rB| j
I dH  | jrI|   t d|  d|  dS )zjRun a pipeline task to completion.

        Args:
            task: The pipeline task to execute.
        Runner z started running )r   Nz finished running )r   debugr   r   r   r   runr   CancelledErrorcleanupr   r   _gc_collect)r   r   paramsr   r   r   r   A   s    
zPipelineRunner.runc                    s8   t d|  d tjdd | j D  I dH  dS )zMSchedule all running tasks to stop when their current processing is complete.r   z* scheduled to stop when all tasks are donec                 S      g | ]}|  qS r   )stop_when_done.0tr   r   r   
<listcomp>d       z1PipelineRunner.stop_when_done.<locals>.<listcomp>N)r   r   r   gatherr   valuesr   r   r   r   r%   a   s   $zPipelineRunner.stop_when_donec                    $   t d|   |  I dH  dS )%Cancel all running tasks immediately.zCancelling runner N)r   r   _cancelr-   r   r   r   cancelf      zPipelineRunner.cancelc                    s&   t jdd | j D  I dH  dS )r/   c                 S   r$   r   )r1   r&   r   r   r   r)   m   r*   z*PipelineRunner._cancel.<locals>.<listcomp>N)r   r+   r   r,   r-   r   r   r   r0   k   s   $zPipelineRunner._cancelc              	      P   zt  }|tj fdd W dS  ty'   ttj fdd Y dS w )-Set up signal handlers for graceful shutdown.c                           S N_sig_handlerargsr-   r   r   <lambda>s       z.PipelineRunner._setup_sigint.<locals>.<lambda>c                    r5   r6   r7   sfr-   r   r   r;   v   r<   N)r   r   add_signal_handlersignalSIGINTNotImplementedErrorr   r   r   r-   r   r   o      zPipelineRunner._setup_sigintc              	      r3   )r4   c                     r5   r6   r7   r9   r-   r   r   r;   |   r<   z/PipelineRunner._setup_sigterm.<locals>.<lambda>c                    r5   r6   r7   r=   r-   r   r   r;      r<   N)r   r   r@   rA   SIGTERMrC   rD   r   r-   r   r   x   rE   zPipelineRunner._setup_sigtermc                 C   s   | j st|  | _ dS dS )z1Handle interrupt signals by cancelling all tasks.N)r   r   create_task_sig_cancelr-   r   r   r   r8      s   zPipelineRunner._sig_handlerc                    r.   )z4Cancel all running tasks due to signal interruption.z)Interruption detected. Cancelling runner N)r   warningr1   r-   r   r   r   rH      r2   zPipelineRunner._sig_cancelc                 C   s0   t  }td| d tdt j  dS )z)Force garbage collection and log results.zGarbage collector: collected z	 objects.z)Garbage collector: uncollectable objects N)gccollectr   r   garbage)r   	collectedr   r   r   r"      s   zPipelineRunner._gc_collect)__name__
__module____qualname____doc__r   strboolr   AbstractEventLoopr   r   r   r%   r1   r0   r   r   r8   rH   r"   __classcell__r   r   r   r   r      s6    
 		r   )rQ   r   rJ   rA   typingr   logurur   pipecat.pipeline.base_taskr   pipecat.pipeline.taskr   pipecat.utils.base_objectr   r   r   r   r   r   <module>   s   