o
    `۷i                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZ d dlmZ d dlmZ e eZG d	d
 d
ZG dd deZdS )    N)abstractmethod)parquet)call_with_retry)BlockAccessor)CheckpointBackendCheckpointConfig)DataContext)_unwrap_protocolc                   @   sF   e Zd ZdZdefddZedefddZe	dedd fd	d
Z
dS )CheckpointWriterzAbstract class which defines the interface for writing row-level
    checkpoints based on varying backends.

    Subclasses must implement `.write_block_checkpoint()`.configc                 C   s6   || _ t| j j| _| j j| _| j j| _| j j| _d S )N)ckpt_configr	   checkpoint_pathcheckpoint_path_unwrapped	id_columnid_col
filesystemwrite_num_threadsselfr    r   [/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/checkpoint/checkpoint_writer.py__init__   s   

zCheckpointWriter.__init__blockc                 C   s   dS )Write a checkpoint for all rows in a single block to the checkpoint
        output directory given by `self.checkpoint_path`.

        Subclasses of `CheckpointWriter` must implement this method.Nr   )r   r   r   r   r   write_block_checkpoint    s   z'CheckpointWriter.write_block_checkpointreturnc                 C   s.   | j }|tjtjfv rt| S td| d)z_Factory method to create a `CheckpointWriter` based on the
        provided `CheckpointConfig`.zBackend z not implemented)backendr   CLOUD_OBJECT_STORAGEFILE_STORAGEBatchBasedCheckpointWriterNotImplementedError)r   r   r   r   r   create(   s   zCheckpointWriter.createN)__name__
__module____qualname____doc__r   r   r   r   r   staticmethodr!   r   r   r   r   r
      s    	r
   c                       s4   e Zd ZdZdef fddZdefddZ  ZS )r   z*CheckpointWriter for batch-based backends.r   c                    s"   t  | | jj| jdd d S )NT)	recursive)superr   r   
create_dirr   r   	__class__r   r   r   9   s   z#BatchBasedCheckpointWriter.__init__r   c                    s   |  dkrdS t  d}tjj||jjgd}t	
|   fdd}zt|d| t jdW S  tyL   td	|   w )
r   r   Nz.parquet)columnsc                      s   t j jd d S )N)r   )pqwrite_tabler   r   checkpoint_ids_tableckpt_file_pathr   r   r   _writeN   s
   
zABatchBasedCheckpointWriter.write_block_checkpoint.<locals>._writezWrite checkpoint file: )descriptionmatchzCheckpoint write failed: )num_rowsuuiduuid4ospathjoinr   selectr   r   	for_blockto_arrowr   r   get_currentretried_io_errors	Exceptionlogger	exception)r   r   	file_namecheckpoint_ids_blockr2   r   r/   r   r   >   s"   z1BatchBasedCheckpointWriter.write_block_checkpoint)	r"   r#   r$   r%   r   r   r   r   __classcell__r   r   r*   r   r   6   s    r   )loggingr8   r6   abcr   pyarrowr   r-   ray.data._internal.utilr   ray.data.blockr   ray.data.checkpointr   r   ray.data.contextr   ray.data.datasource.path_utilr	   	getLoggerr"   rA   r
   r   r   r   r   r   <module>   s    
%