o
    Ã¿i#&  ã                
   @   sî   d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	m
Z
mZmZmZmZmZ ddlmZmZ zddlZe dd	¡ e d
d	¡ ddlmZmZ W n  eyl Z ze de› ¡ e d¡ ede› ƒ‚dZ[ww G dd„ deƒZdS )z2GStreamer pipeline source integration for Pipecat.é    N)ÚOptional)Úlogger)Ú	BaseModel)ÚCancelFrameÚEndFrameÚFrameÚOutputAudioRawFrameÚOutputImageRawFrameÚ
StartFrameÚSystemFrame)ÚFrameDirectionÚFrameProcessorÚGstz1.0ÚGstApp)r   r   zException: zIn order to use GStreamer, you need to `pip install pipecat-ai[gstreamer]`. Also, you need to install GStreamer in your system.zMissing module: c                       sð   e Zd ZdZG dd„ deƒZddœdedee f‡ fdd	„Zd
e	de
f‡ fdd„Zd
efdd„Zd
efdd„Zd
efdd„Zdejdejfdd„Zdejdejfdd„Zdejfdd„Zdejfdd„Zd ejfd!d"„Zd ejfd#d$„Z‡  ZS )%ÚGStreamerPipelineSourcea+  A frame processor that uses GStreamer pipelines as media sources.

    This processor creates and manages GStreamer pipelines to generate audio and video
    output frames. It handles pipeline lifecycle, decoding, format conversion, and
    frame generation with configurable output parameters.
    c                   @   sR   e Zd ZU dZdZeed< dZeed< dZe	e ed< dZ
eed	< d
Zeed< dS )z$GStreamerPipelineSource.OutputParamsaÃ  Output configuration parameters for GStreamer pipeline.

        Parameters:
            video_width: Width of output video frames in pixels.
            video_height: Height of output video frames in pixels.
            audio_sample_rate: Sample rate for audio output. If None, uses frame sample rate.
            audio_channels: Number of audio channels for output.
            clock_sync: Whether to synchronize output with pipeline clock.
        i   Úvideo_widthiÐ  Úvideo_heightNÚaudio_sample_rateé   Úaudio_channelsTÚ
clock_sync)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   ÚintÚ__annotations__r   r   r   r   r   Úbool© r   r   ú`/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/processors/gstreamer/pipeline_source.pyÚOutputParams0   s   
 
r    N)Ú
out_paramsÚpipeliner!   c                   sª   t ƒ jdi |¤Ž |pt ¡ | _d| _t ¡  tj 	d¡| _
t |d¡}tj dd¡}| d| j¡ | j
 |¡ | j
 |¡ | |¡ | j
 ¡ }| ¡  | d| j¡ dS )	a*  Initialize the GStreamer pipeline source.

        Args:
            pipeline: GStreamer pipeline description string for the source.
            out_params: Output configuration parameters. If None, uses defaults.
            **kwargs: Additional arguments passed to parent FrameProcessor.
        r   ÚplayerTÚ	decodebinNz	pad-addedÚmessager   )ÚsuperÚ__init__r   r    Ú_out_paramsÚ_sample_rater   ÚinitÚPipelineÚnewÚ_playerÚparse_bin_from_descriptionÚElementFactoryÚmakeÚconnectÚ_decodebin_callbackÚaddÚlinkÚget_busÚadd_signal_watchÚ_on_gstreamer_message)Úselfr"   r!   ÚkwargsÚsourcer$   Úbus©Ú	__class__r   r   r'   A   s   

z GStreamerPipelineSource.__init__ÚframeÚ	directionc                 ƒ   sÜ   t ƒ  ||¡I dH  t|tƒr#|  ||¡I dH  |  |¡I dH  dS t|tƒr;|  |¡I dH  |  ||¡I dH  dS t|tƒrK|  ||¡I dH  dS t|t	ƒrc|  ||¡I dH  |  
|¡I dH  dS |  ||¡I dH  dS )z»Process incoming frames and manage GStreamer pipeline lifecycle.

        Args:
            frame: The frame to process.
            direction: The direction of frame processing.
        N)r&   Úprocess_frameÚ
isinstancer
   Ú
push_frameÚ_startr   Ú_cancelr   r   Ú_stop)r8   r>   r?   r<   r   r   r@   _   s   €



z%GStreamerPipelineSource.process_framec                 Ã   s&   | j jp|j| _| j tjj¡ dS )zStart the GStreamer pipeline.N)	r(   r   Úaudio_out_sample_rater)   r-   Ú	set_stater   ÚStateÚPLAYING©r8   r>   r   r   r   rC   ~   s   €zGStreamerPipelineSource._startc                 Ã   ó   | j  tjj¡ dS )zStop the GStreamer pipeline.N©r-   rG   r   rH   ÚNULLrJ   r   r   r   rE   ƒ   ó   €zGStreamerPipelineSource._stopc                 Ã   rK   )zCancel the GStreamer pipeline.NrL   rJ   r   r   r   rD   ‡   rN   zGStreamerPipelineSource._cancelr;   r%   c                 C   s<   |j }|tjjkr| ¡ \}}t | › d|› d|› ¡ dS )zHandle GStreamer bus messages.z error: z : T)Útyper   ÚMessageTypeÚERRORÚparse_errorr   Úerror)r8   r;   r%   ÚtÚerrÚdebugr   r   r   r7      s
   z-GStreamerPipelineSource._on_gstreamer_messager$   Úpadc                 C   s@   |  ¡  ¡ }| d¡r|  |¡ dS | d¡r|  |¡ dS dS )z'Handle new pads from decodebin element.ÚaudioÚvideoN)Úget_current_capsÚ	to_stringÚ
startswithÚ_decodebin_audioÚ_decodebin_video)r8   r$   rW   Úcaps_stringr   r   r   r2   —   s   

ÿz+GStreamerPipelineSource._decodebin_callbackc           	      C   s@  t j dd¡}t j dd¡}t j dd¡}t j dd¡}t j d| j› d| jj› d¡}| d	|¡ t j d
d¡}| dd¡ | d| jj	¡ | 
d| j¡ | j |¡ | j |¡ | j |¡ | j |¡ | j |¡ | ¡  | ¡  | ¡  | ¡  | ¡  | |¡ | |¡ | |¡ | |¡ | d¡}| |¡ dS )z8Set up audio processing pipeline from decoded audio pad.ÚqueueNÚaudioconvertÚaudioresampleÚ
capsfilterzaudio/x-raw,format=S16LE,rate=z
,channels=z,layout=interleavedÚcapsÚappsinkúemit-signalsTÚsyncú
new-sampleÚsink)r   r/   r0   ÚCapsÚfrom_stringr)   r(   r   Úset_propertyr   r1   Ú_appsink_audio_new_sampler-   r3   Úsync_state_with_parentr4   Úget_static_pad)	r8   rW   Úqueue_audiora   rb   ÚaudiocapsfilterÚ	audiocapsÚappsink_audioÚ	queue_padr   r   r   r]   Ÿ   s8   ÿ




z(GStreamerPipelineSource._decodebin_audioc           	      C   s@  t j dd¡}t j dd¡}t j dd¡}t j dd¡}t j d| jj› d| jj› ¡}| d|¡ t j d	d¡}| d
d¡ | d| jj	¡ | 
d| j¡ | j |¡ | j |¡ | j |¡ | j |¡ | j |¡ | ¡  | ¡  | ¡  | ¡  | ¡  | |¡ | |¡ | |¡ | |¡ | d¡}| |¡ dS )z8Set up video processing pipeline from decoded video pad.r`   NÚvideoconvertÚ
videoscalerc   zvideo/x-raw,format=RGB,width=z,height=rd   re   rf   Trg   rh   ri   )r   r/   r0   rj   rk   r(   r   r   rl   r   r1   Ú_appsink_video_new_sampler-   r3   rn   r4   ro   )	r8   rW   Úqueue_videoru   rv   ÚvideocapsfilterÚ	videocapsÚappsink_videort   r   r   r   r^   Á   s8   ÿ




z(GStreamerPipelineSource._decodebin_videore   c                 C   s\   |  ¡  ¡ }| tjj¡\}}t|j| j| j	j
d}t |  |¡|  ¡ ¡ | |¡ tjjS )z0Handle new audio samples from GStreamer appsink.)rX   Úsample_rateÚnum_channels)Úpull_sampleÚ
get_bufferÚmapr   ÚMapFlagsÚREADr   Údatar)   r(   r   ÚasyncioÚrun_coroutine_threadsaferB   Úget_event_loopÚunmapÚ
FlowReturnÚOK©r8   re   ÚbufferÚ_Úinfor>   r   r   r   rm   ä   s   ý
z1GStreamerPipelineSource._appsink_audio_new_samplec                 C   sb   |  ¡  ¡ }| tjj¡\}}t|j| jj	| jj
fdd}t |  |¡|  ¡ ¡ | |¡ tjjS )z0Handle new video samples from GStreamer appsink.ÚRGB)ÚimageÚsizeÚformat)r~   r   r€   r   r   r‚   r	   rƒ   r(   r   r   r„   r…   rB   r†   r‡   rˆ   r‰   rŠ   r   r   r   rw   ñ   s   ý
z1GStreamerPipelineSource._appsink_video_new_sample) r   r   r   r   r   r    Ústrr   r'   r   r   r@   r
   rC   r   rE   r   rD   r   ÚBusÚMessager7   ÚElementÚPadr2   r]   r^   r   ÚAppSinkrm   rw   Ú__classcell__r   r   r<   r   r   (   s     "#r   )r   r„   Útypingr   Úlogurur   Úpydanticr   Úpipecat.frames.framesr   r   r   r   r	   r
   r   Ú"pipecat.processors.frame_processorr   r   ÚgiÚrequire_versionÚgi.repositoryr   r   ÚModuleNotFoundErrorÚerS   Ú	Exceptionr   r   r   r   r   Ú<module>   s*   $	ÿ€û