o
    i{.                     @   s   d Z ddlZddlmZ ddlmZmZmZmZm	Z	 ddl
mZmZ ddlmZmZmZ ddlmZmZmZ ddlmZ dd	lmZ dd
lmZ eefZeG dd dZG dd deZG dd deZG dd deZG dd deZ dS )u  Observer for tracking pipeline startup timing.

This module provides an observer that measures how long each processor's
``start()`` method takes during pipeline startup. It works by tracking
when a ``StartFrame`` arrives at a processor (``on_process_frame``) versus
when it leaves (``on_push_frame``), giving the exact ``start()`` duration
for each processor in the pipeline.

It also measures transport timing — the time from ``StartFrame`` to the
first ``BotConnectedFrame`` (SFU transports only) and ``ClientConnectedFrame``
— via a separate ``on_transport_timing_report`` event.

Example::

    observer = StartupTimingObserver()

    @observer.event_handler("on_startup_timing_report")
    async def on_report(observer, report):
        for t in report.processor_timings:
            print(f"{t.processor_name}: {t.duration_secs:.3f}s")

    @observer.event_handler("on_transport_timing_report")
    async def on_transport(observer, report):
        if report.bot_connected_secs is not None:
            print(f"Bot connected in {report.bot_connected_secs:.3f}s")
        print(f"Client connected in {report.client_connected_secs:.3f}s")

    task = PipelineTask(pipeline, observers=[observer])
    N)	dataclass)DictListOptionalTupleType)	BaseModelField)BotConnectedFrameClientConnectedFrame
StartFrame)BaseObserverFrameProcessedFramePushed)BasePipeline)PipelineSource)FrameProcessorc                   @   s"   e Zd ZU dZeed< eed< dS )_ArrivalInfoz<Internal record of when a StartFrame arrived at a processor.	processorarrival_ts_nsN)__name__
__module____qualname____doc__r   __annotations__int r   r   ]/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/observers/startup_timing_observer.pyr   5   s   
 r   c                   @   s*   e Zd ZU dZeed< eed< eed< dS )ProcessorStartupTiminga.  Startup timing for a single processor.

    Parameters:
        processor_name: The name of the processor.
        start_offset_secs: Offset in seconds from the StartFrame to when this
            processor's start() began.
        duration_secs: How long the processor's start() took, in seconds.
    processor_namestart_offset_secsduration_secsN)r   r   r   r   strr   floatr   r   r   r   r   =   s
   
 	r   c                   @   s8   e Zd ZU dZeed< eed< eedZe	e
 ed< dS )StartupTimingReporta8  Report of startup timings for all measured processors.

    Parameters:
        start_time: Unix timestamp when the first processor began starting.
        total_duration_secs: Total wall-clock time from first to last processor start.
        processor_timings: Per-processor timing data, in pipeline order.
    
start_timetotal_duration_secs)default_factoryprocessor_timingsN)r   r   r   r   r#   r   r	   listr(   r   r   r   r   r   r   r$   L   s
   
 r$   c                   @   s:   e Zd ZU dZeed< dZee ed< dZee ed< dS )TransportTimingReportai  Time from pipeline start to transport connection milestones.

    Parameters:
        start_time: Unix timestamp of the StartFrame (pipeline start).
        bot_connected_secs: Seconds from StartFrame to first BotConnectedFrame
            (only set for SFU transports).
        client_connected_secs: Seconds from StartFrame to first ClientConnectedFrame.
    r%   Nbot_connected_secsclient_connected_secs)	r   r   r   r   r#   r   r+   r   r,   r   r   r   r   r*   Z   s
   
 	r*   c                       s   e Zd ZdZdddeeee df  f fddZded	e	fd
dZ
dd ZdefddZdefddZdefddZdefddZdd Z  ZS )StartupTimingObservera:  Observer that measures processor startup times during pipeline initialization.

    Tracks how long each processor's ``start()`` method takes by measuring the
    time between when a ``StartFrame`` arrives at a processor and when it is
    pushed downstream. This captures WebSocket connections, API authentication,
    model loading, and other initialization work.

    Also measures transport timing, the time from ``StartFrame`` to connection
    milestones:

    - ``bot_connected_secs``: When the bot joins the transport room
      (SFU transports only, triggered by ``BotConnectedFrame``).
    - ``client_connected_secs``: When a remote participant connects
      (triggered by ``ClientConnectedFrame``).

    By default, internal pipeline processors (``PipelineSource``, ``Pipeline``)
    are excluded from the report. Pass ``processor_types`` to measure only
    specific types.

    Event handlers available:

    - on_startup_timing_report: Called once after startup completes with the full
      timing report.
    - on_transport_timing_report: Called once when the first client connects with a
      TransportTimingReport containing client_connected_secs and bot_connected_secs
      (if available).

    Example::

        observer = StartupTimingObserver(
            processor_types=(STTService, TTSService)
        )

        @observer.event_handler("on_startup_timing_report")
        async def on_report(observer, report):
            for t in report.processor_timings:
                logger.info(f"{t.processor_name}: {t.duration_secs:.3f}s")

        @observer.event_handler("on_transport_timing_report")
        async def on_transport(observer, report):
            if report.bot_connected_secs is not None:
                logger.info(f"Bot connected in {report.bot_connected_secs:.3f}s")
            logger.info(f"Client connected in {report.client_connected_secs:.3f}s")

        task = PipelineTask(pipeline, observers=[observer])

    Args:
        processor_types: Optional tuple of processor types to measure. If None,
            all non-internal processors are measured.
    N)processor_typesr.   .c                   s`   t  jdi | || _i | _g | _d| _d| _d| _d| _d| _	d| _
| d | d dS )a  Initialize the startup timing observer.

        Args:
            processor_types: Optional tuple of processor types to measure.
                If None, all non-internal processors are measured.
            **kwargs: Additional arguments passed to parent class.
        NFon_startup_timing_reporton_transport_timing_reportr   )super__init___processor_types	_arrivals_timings_start_frame_id_startup_timing_reported_transport_timing_reported_start_frame_arrival_ns_bot_connected_secs_start_wall_clock_register_event_handler)selfr.   kwargs	__class__r   r   r2      s   
zStartupTimingObserver.__init__r   returnc                 C   s"   | j durt|| j S t|t S )zCheck if a processor should be tracked for timing.

        Args:
            processor: The processor to check.

        Returns:
            True if the processor matches the filter or no filter is set.
        N)r3   
isinstance_INTERNAL_TYPES)r=   r   r   r   r   _should_track   s   
	z#StartupTimingObserver._should_trackc                    s   | j r|  I dH  dS dS )zEmit the startup timing report when the pipeline has fully started.

        Called by the ``PipelineTask`` after the ``StartFrame`` has been
        processed by all processors, including nested ``ParallelPipeline``
        branches.
        N)r5   _emit_report)r=   r   r   r   on_pipeline_started   s   z)StartupTimingObserver.on_pipeline_starteddatac                    s   | j rdS t|jtsdS | jdu r"|jj| _|j| _t | _	n	|jj| jkr+dS | 
|jr@t|j|jd| j|jj< dS dS )z{Record when a StartFrame arrives at a processor.

        Args:
            data: The frame processing event data.
        N)r   r   )r7   rB   framer   r6   id	timestampr9   timer;   rD   r   r   r4   )r=   rG   r   r   r   on_process_frame   s    

z&StartupTimingObserver.on_process_framec                    s   t |jtr| | dS t |jtr| |I dH  dS | jr#dS t |jts+dS | jdur9|jj	| jkr9dS | j
|jj	d}|du rHdS |j|j }|d }|j| j d }| jt|jj||d dS )zRecord when a StartFrame leaves a processor and compute the delta.

        Also handles ``BotConnectedFrame`` and ``ClientConnectedFrame`` to
        measure transport timing.

        Args:
            data: The frame push event data.
        N    eA)r   r    r!   )rB   rH   r
   _handle_bot_connectedr   _handle_client_connectedr7   r   r6   rI   r4   popsourcerJ   r   r9   r5   appendr   r   name)r=   rG   arrivalduration_nsr!   r    r   r   r   on_push_frame   s4   	
z#StartupTimingObserver.on_push_framec                 C   s2   | j dus
| jdu rdS |j| j }|d | _ dS )z7Record bot connected timing on first BotConnectedFrame.NrM   )r:   r9   rJ   )r=   rG   delta_nsr   r   r   rN   #  s   z+StartupTimingObserver._handle_bot_connectedc                    s\   | j s	| jdu rdS d| _ |j| j }|d }t| jpd| j|d}| d|I dH  dS )z;Emit transport timing report on first ClientConnectedFrame.NTrM           )r%   r+   r,   r0   )r8   r9   rJ   r*   r;   r:   _call_event_handler)r=   rG   rW   r,   reportr   r   r   rO   +  s   z.StartupTimingObserver._handle_client_connectedc                    sR   | j rdS d| _ tdd | jD }t| jpd|| jd}| d|I dH  dS )z)Build and emit the startup timing report.NTc                 s   s    | ]}|j V  qd S )N)r!   ).0tr   r   r   	<genexpr>@  s    z5StartupTimingObserver._emit_report.<locals>.<genexpr>rX   )r%   r&   r(   r/   )r7   sumr5   r$   r;   rY   )r=   totalrZ   r   r   r   rE   :  s   z"StartupTimingObserver._emit_report)r   r   r   r   r   r   r   r   r2   boolrD   rF   r   rL   r   rV   rN   rO   rE   __classcell__r   r   r?   r   r-   i   s    6+
*r-   )!r   rK   dataclassesr   typingr   r   r   r   r   pydanticr   r	   pipecat.frames.framesr
   r   r   pipecat.observers.base_observerr   r   r   pipecat.pipeline.base_pipeliner   pipecat.pipeline.pipeliner   "pipecat.processors.frame_processorr   rC   r   r   r$   r*   r-   r   r   r   r   <module>   s"   