o
    `۷i@                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZ d dlZd dl	m
Z
mZ er,d dlmZ eddG dd	 d	eZed
dG dd dZe
G dd deZe
G dd deZdS )    N)Enum)TYPE_CHECKINGOptionalTuple)DeveloperAPI	PublicAPI)PathPartitionFilteralpha)	stabilityc                   @   s   e Zd ZdZdZ	 dZdS )CheckpointBackenda(  Supported backends for storing and reading checkpoint files.

    Currently, only one type of backend is supported:

    * Batch-based backends: CLOUD_OBJECT_STORAGE and FILE_STORAGE.

    Their differences are as follows:

    1. Writing checkpoints: Batch-based backends write a checkpoint file
       for each block.
    2. Loading checkpoints and filtering input data: Batch-based backends
       load all checkpoint data into memory prior to dataset execution.
       The checkpoint data is then passed to each read task to perform filtering.
    CLOUD_OBJECT_STORAGEFILE_STORAGEN)__name__
__module____qualname____doc__r   r    r   r   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/checkpoint/interfaces.pyr      s    r   betac                   @   s   e Zd ZdZdZdZ		dddddddddee d	ee d
eded dee	 de
de
ded fddZdefddZ		dd	eded dee	 dee	df fddZdS )CheckpointConfiga  Configuration for checkpointing.

    Args:
        id_column: Name of the ID column in the input dataset.
            ID values must be unique across all rows in the dataset and must persist
            during all operators.
        checkpoint_path: Path to store the checkpoint data. It can be a path to a cloud
            object storage (e.g. `s3://bucket/path`) or a file system path.
            If the latter, the path must be a network-mounted file system (e.g.
            `/mnt/cluster_storage/`) that is accessible to the entire cluster.
            If not set, defaults to `RAY_DATA_CHECKPOINT_PATH_BUCKET/ray_data_checkpoint`.
        delete_checkpoint_on_success: If true, automatically delete checkpoint
            data when the dataset execution succeeds. Only supported for
            batch-based backend currently.
        override_filesystem: Override the :class:`pyarrow.fs.FileSystem` object used to
            read/write checkpoint data. Use this when you want to use custom credentials.
        override_backend: Override the :class:`CheckpointBackend` object used to
            access the checkpoint backend storage.
        filter_num_threads: Number of threads used to filter checkpointed rows.
        write_num_threads: Number of threads used to write checkpoint files for
            completed rows.
        checkpoint_path_partition_filter: Filter for checkpoint files to load during
            restoration when reading from `checkpoint_path`.
    RAY_DATA_CHECKPOINT_PATH_BUCKETray_data_checkpointNT   )delete_checkpoint_on_successoverride_filesystemoverride_backendfilter_num_threadswrite_num_threads checkpoint_path_partition_filter	id_columncheckpoint_pathr   r   zpyarrow.fs.FileSystemr   r   r   r   r   c                C   s   || _ t| j trt| j dkrtd| j  |d ur$tjdtdd |p)|  | _	| 
| j	||\}	}
|
| _|	| _|| _|| _|| _|| _d S )Nr   z9Checkpoint ID column must be a non-empty string, but got zD`override_backend` is deprecated and will be removed in August 2025.   )
stacklevel)r   
isinstancestrlenInvalidCheckpointingConfigwarningswarnFutureWarning_get_default_checkpoint_pathr    _infer_backend_and_fs
filesystembackendr   r   r   r   )selfr   r    r   r   r   r   r   r   inferred_backendinferred_fsr   r   r   __init__K   s4   

zCheckpointConfig.__init__returnc                 C   s8   t j| j}|d u rtd| j d| d| j S )N`zO` env var is not set, please explicitly set `CheckpointConfig.checkpoint_path`./)osenvironget&DEFAULT_CHECKPOINT_PATH_BUCKET_ENV_VARr&   DEFAULT_CHECKPOINT_PATH_DIR)r.   artifact_storager   r   r   r*   u   s   z-CheckpointConfig._get_default_checkpoint_pathc              
   C   s   zH|d urt |tjjsJ dt| |}n	tjj|\}}|d ur6t |ts3J dt| |}nt |tjjrAtj}ntj	}||fW S  t
y] } z	td| d|d }~ww )NzLoverride_filesystem must be an instance of `pyarrow.fs.FileSystem`, but got zEoverride_backend must be an instance of `CheckpointBackend`, but got zInvalid checkpoint path: z. )r#   pyarrowfs
FileSystemtypefrom_urir   LocalFileSystemr   r   	Exceptionr&   )r.   r    r   r   r<   _r-   er   r   r   r+   ~   s8   

z&CheckpointConfig._infer_backend_and_fs)NN)r   r   r   r   r8   r9   r   r$   boolr   intr1   r*   r   r+   r   r   r   r   r   -   sT    	

*
r   c                   @      e Zd ZdZdS )r&   zNException which indicates that the checkpointing
    configuration is invalid.Nr   r   r   r   r   r   r   r   r&          r&   c                   @   rF   )InvalidCheckpointingOperatorszxException which indicates that the DAG is not eligible for checkpointing,
    due to one or more incompatible operators.NrG   r   r   r   r   rI      rH   rI   )r5   r'   enumr   typingr   r   r   r;   ray.util.annotationsr   r   ray.data.datasourcer   r   r   rA   r&   rI   r   r   r   r   <module>   s     s