o
    i                     @   s   d Z ddlZddlZddlmZmZ ddlmZ ddlm	Z	m
Z
mZmZ ddlmZ eG dd dZG d	d
 d
eZeG dd dZG dd deZdS )zAsyncio task management.

This module provides task management functionality. Includes both abstract base
classes and concrete implementations for managing asyncio tasks with
comprehensive monitoring and cleanup capabilities.
    N)ABCabstractmethod)	dataclass)	CoroutineDictOptionalSequence)loggerc                   @      e Zd ZU dZejed< dS )TaskManagerParamszConfiguration parameters for task manager initialization.

    Parameters:
        loop: The asyncio event loop to use for task management.
    loopN)__name__
__module____qualname____doc__asyncioAbstractEventLoop__annotations__ r   r   V/home/ubuntu/.local/lib/python3.10/site-packages/pipecat/utils/asyncio/task_manager.pyr         
 r   c                   @   s   e Zd ZdZedefddZedejfddZ	ede
d	edejfd
dZeddejdee fddZedeej fddZdS )BaseTaskManagerzAbstract base class for asyncio task management.

    Provides the interface for creating, monitoring, and managing asyncio tasks.
    paramsc                 C      dS zInitialize the task manager with configuration parameters.

        Args:
            params: Configuration parameters for task management.
        Nr   selfr   r   r   r   setup(      zBaseTaskManager.setupreturnc                 C   r   )zuGet the event loop used by this task manager.

        Returns:
            The asyncio event loop instance.
        Nr   r   r   r   r   get_event_loop1   r   zBaseTaskManager.get_event_loop	coroutinenamec                 C   r   )a]  Creates and schedules a new asyncio Task that runs the given coroutine.

        The task is added to a global set of created tasks.

        Args:
            coroutine: The coroutine to be executed within the task.
            name: The name to assign to the task for identification.

        Returns:
            The created task object.
        Nr   )r   r"   r#   r   r   r   create_task:   s   zBaseTaskManager.create_taskNtasktimeoutc                    s   dS )X  Cancels the given asyncio Task and awaits its completion with an optional timeout.

        This function removes the task from the set of registered tasks upon
        completion or failure.

        Args:
            task: The task to be cancelled.
            timeout: The optional timeout in seconds to wait for the task to cancel.
        Nr   )r   r%   r&   r   r   r   cancel_taskI   s   zBaseTaskManager.cancel_taskc                 C   r   )Returns the list of currently created/registered tasks.

        Returns:
            Sequence of currently managed asyncio tasks.
        Nr   r    r   r   r   current_tasksV   r   zBaseTaskManager.current_tasksN)r   r   r   r   r   r   r   r   r   r!   r   strTaskr$   r   floatr(   r   r*   r   r   r   r   r   "   s    r   c                   @   r
   )TaskDatazwInternal data structure for tracking task metadata.

    Parameters:
        task: The asyncio Task being managed.
    r%   N)r   r   r   r   r   r-   r   r   r   r   r   r/   `   r   r/   c                   @   s   e Zd ZdZdddZdefddZdejfd	d
Z	de
dedejfddZddejdee fddZdeej fddZdefddZdejfddZdS )TaskManagerzConcrete implementation of BaseTaskManager.

    Manages asyncio tasks. Provides comprehensive task lifecycle management
    including creation, monitoring, cancellation, and cleanup.

    r   Nc                 C   s   i | _ d| _dS )z5Initialize the task manager with empty task registry.N)_tasks_paramsr    r   r   r   __init__s   s   
zTaskManager.__init__r   c                 C   s   | j s|| _ dS dS r   )r2   r   r   r   r   r   x   s   
zTaskManager.setupc                 C   s   | j std| j jS )zGet the event loop used by this task manager.

        Returns:
            The asyncio event loop instance.

        Raises:
            Exception: If the task manager is not properly set up.
        2TaskManager is not setup: unable to get event loop)r2   	Exceptionr   r    r   r   r   r!      s   	zTaskManager.get_event_loopr"   r#   c                    sf    fdd}| j std| j j| }| || j | t|d t	
 d |S )a  Creates and schedules a new asyncio Task that runs the given coroutine.

        The task is added to a global set of created tasks.

        Args:
            coroutine: The coroutine to be executed within the task.
            name: The name to assign to the task for identification.

        Returns:
            The created task object.

        Raises:
            Exception: If the task manager is not properly set up.
        c                     s   z I d H W S  t jy   t d   tyF }  z#t| j}|d }t d|j	 d|j
 d|   W Y d } ~ d S d } ~ ww )Nz: task cancelledz unexpected exception (:): )r   CancelledErrorr	   tracer5   	traceback
extract_tb__traceback__errorfilenamelineno)etblastr"   r#   r   r   run_coroutine   s   2z.TaskManager.create_task.<locals>.run_coroutiner4   r%   z: task created)r2   r5   r   r$   set_nameadd_done_callback_task_done_handler	_add_taskr/   r	   r:   )r   r"   r#   rE   r%   r   rD   r   r$      s   
zTaskManager.create_taskr%   r&   c                    s(  |  }|  z|rtj||dI dH  W dS |I dH  W dS  tjy3   t| d Y dS  tjy=   Y dS  tyk } z#t	
|j}|d }t| d|j d|j d|  W Y d}~dS d}~w ty } zt	
|j}|d }t| d|j d|j d|   d}~ww )	r'   )r&   Nz&: timed out waiting for task to cancelr6   z- unexpected exception while cancelling task (r7   r8   z- fatal base exception while cancelling task ()get_namecancelr   wait_forTimeoutErrorr	   warningr9   r5   r;   r<   r=   r>   r?   r@   BaseExceptioncritical)r   r%   r&   r#   rA   rB   rC   r   r   r   r(      s6   
zTaskManager.cancel_taskc                 C   s   dd | j  D S )r)   c                 S   s   g | ]}|j qS r   rF   ).0datar   r   r   
<listcomp>   s    z-TaskManager.current_tasks.<locals>.<listcomp>)r1   valuesr    r   r   r   r*      s   zTaskManager.current_tasks	task_datac                 C   s   |j  }|| j|< dS )zfAdd a task to the internal registry.

        Args:
            task_data: The task metadata.
        N)r%   rK   r1   )r   rV   r#   r   r   r   rJ      s   
zTaskManager._add_taskc              
   C   sR   |  }z| j|= W dS  ty( } zt| d|  W Y d}~dS d}~ww )zHandle task completion by removing the task from the registry.

        Args:
            task: The completed asyncio task.
        z1: unable to remove task data (already removed?): N)rK   r1   KeyErrorr	   r:   )r   r%   r#   rA   r   r   r   rI      s   "zTaskManager._task_done_handler)r   Nr+   )r   r   r   r   r3   r   r   r   r   r!   r   r,   r-   r$   r   r.   r(   r   r*   r/   rJ   rI   r   r   r   r   r0   k   s    
	&$	r0   )r   r   r;   abcr   r   dataclassesr   typingr   r   r   r   logurur	   r   r   r/   r0   r   r   r   r   <module>   s   
>
