o
    bi                     @   s   d dl Z d dlZd dlmZmZ d dlmZ erd dlmZ dZ	dZ
dZG dd	 d	Zd
eddfddZd
edee fddZded
efddZded
efddZdS )    N)TYPE_CHECKINGList)DataContext)StreamingExecutorexecution_callbacksRAY_DATA_EXECUTION_CALLBACKS_env_callbacks_initializedc                   @   s@   e Zd ZdZdddZdddZddd	Zddd
efddZdS )ExecutionCallbackz(Callback interface for execution events.executorr   c                 C      dS )z+Called before the Dataset execution starts.N selfr
   r   r   c/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/execution/execution_callback.pybefore_execution_starts      z)ExecutionCallback.before_execution_startsc                 C   r   )z2Called at each step of the Dataset execution loop.Nr   r   r   r   r   on_execution_step   r   z#ExecutionCallback.on_execution_stepc                 C   r   )z,Called after the Dataset execution succeeds.Nr   r   r   r   r   after_execution_succeeds   r   z*ExecutionCallback.after_execution_succeedserrorc                 C   r   )z)Called after the Dataset execution fails.Nr   )r   r
   r   r   r   r   after_execution_fails   r   z'ExecutionCallback.after_execution_failsN)r
   r   )	__name__
__module____qualname____doc__r   r   r   	Exceptionr   r   r   r   r   r	      s    


r	   contextreturnc           	      C   s   t jtd}|sdS |dD ]>}| }|sqz|dd\}}t|}t	||}| }t
||  W q tttfyN } z
td| d| d}~ww dS )zKInitialize callbacks from environment variable and add them to the context. N,.   z Failed to import callback from 'z': )osenvirongetEXECUTION_CALLBACKS_ENV_VARsplitstriprsplit	importlibimport_modulegetattradd_execution_callbackImportErrorAttributeError
ValueError)	r   callbacks_strcallback_pathmodule_path
class_namemodulecallback_clscallbacker   r   r   _initialize_env_callbacks#   s$   

r7   c                 C   s,   |  tdst|  | td |  tg S )z0Get all ExecutionCallbacks from the DataContext.FT)
get_configENV_CALLBACKS_INITIALIZED_KEYr7   
set_configEXECUTION_CALLBACKS_CONFIG_KEY)r   r   r   r   get_execution_callbacks8   s   r<   r5   c                 C   &   | tg }||  |t| dS )z,Add an ExecutionCallback to the DataContext.N)r8   r;   appendr:   r5   r   r   r   r   r   r+   B      
r+   c                 C   r=   )z1Remove an ExecutionCallback from the DataContext.N)r8   r;   remover:   r?   r   r   r   remove_execution_callbackI   r@   rB   )r(   r!   typingr   r   ray.data.contextr   /ray.data._internal.execution.streaming_executorr   r;   r$   r9   r	   r7   r<   r+   rB   r   r   r   r   <module>   s    
