o
    `۷i	                     @   s~   d dl Z d dlmZ d dlmZmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ e eZG d	d
 d
eZdS )    N)Optional)ExecutionCallbackremove_execution_callback)StreamingExecutor)Block)CheckpointConfigBatchBasedCheckpointFilter)	ObjectRefc                   @   sp   e Zd ZdZdefddZdedefddZdefd	d
Z	defddZ
dedefddZdee fddZdS )LoadCheckpointCallbackz+ExecutionCallback that handles checkpoints.configc                 C   s(   |d usJ || _ | || _d | _d S N)_config_create_checkpoint_filter_ckpt_filter_checkpoint_refselfr    r   b/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/checkpoint/load_checkpoint_callback.py__init__   s   
zLoadCheckpointCallback.__init__returnc                 C   s   t |S )zFactory method to create the checkpoint filter.

        Subclasses can override this to use a different filter implementation.
        r   r   r   r   r   r      s   z0LoadCheckpointCallback._create_checkpoint_filterexecutorc                 C   s"   | j |jju s	J | j | _d S r   )r   _data_contextcheckpoint_configr   load_checkpointr   r   r   r   r   r   before_execution_starts$      z.LoadCheckpointCallback.before_execution_startsc                 C   s`   | j |jju s	J t| |j z| j jr| j  W d S W d S  ty/   tj	ddd Y d S w )Nz!Failed to delete checkpoint data.T)exc_info)
r   r   r   r   delete_checkpoint_on_successr   delete_checkpoint	Exceptionloggerwarningr   r   r   r   after_execution_succeeds*   s   z/LoadCheckpointCallback.after_execution_succeedserrorc                 C   s"   | j |jju s	J t| |j d S r   )r   r   r   r   )r   r   r&   r   r   r   after_execution_fails6   r   z,LoadCheckpointCallback.after_execution_failsc                 C   s   | j d usJ | j S r   )r   )r   r   r   r   r   <   s   z&LoadCheckpointCallback.load_checkpointN)__name__
__module____qualname____doc__r   r   r	   r   r   r   r%   r"   r'   r
   r   r   r   r   r   r   r      s    
	r   )loggingtypingr   /ray.data._internal.execution.execution_callbackr   r   /ray.data._internal.execution.streaming_executorr   ray.data.blockr   ray.data.checkpointr   %ray.data.checkpoint.checkpoint_filterr	   	ray.typesr
   	getLoggerr(   r#   r   r   r   r   r   <module>   s    
