o
    `۷i&                     @   s  U d dl Z d dlZd dlmZmZmZmZmZmZ d dl	Z
d dlZd dlmZ d dlmZmZmZ d dlmZ d dlmZmZmZ z	d dlZejZW n eyU   dZY nw er^d dlmZ e
jej e!ej ej"ej#iZ$eee ef e%d< ed	d
G dd de&Z'eG dd de j(Z)dS )    N)TYPE_CHECKINGCallableDictOptionalTypeUnion)DataBatchType)BatchFormat_convert_batch_type_to_numpy_convert_batch_type_to_pandas)
Checkpoint)
DeprecatedDeveloperAPI	PublicAPI)PreprocessorTYPE_TO_ENUMbeta)	stabilityc                   @   s   e Zd ZdZdS )!PredictorNotSerializableExceptionz;Error raised when trying to serialize a Predictor instance.N)__name__
__module____qualname____doc__ r   r   I/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/train/predictor.pyr   #   s    r   c                	   @   s"  e Zd ZdZd"ded fddZeejde	dd fd	d
Z
edeejgejf dd fddZded fddZded ddfddZeedefddZedefddZdd ZdedefddZed#ddZedeejeeejf f deejeeejf f fddZd d! ZdS )$	PredictoraV  Predictors load models from checkpoints to perform inference.

    .. note::
        The base ``Predictor`` class cannot be instantiated directly. Only one of
        its subclasses can be used.

    **How does a Predictor work?**

    Predictors expose a ``predict`` method that accepts an input batch of type
    ``DataBatchType`` and outputs predictions of the same type as the input batch.

    When the ``predict`` method is called the following occurs:

    - The input batch is converted into a pandas DataFrame. Tensor input (like a
      ``np.ndarray``) will be converted into a single column Pandas Dataframe.
    - If there is a :ref:`Preprocessor <preprocessor-ref>` saved in the provided
      :class:`Checkpoint <ray.train.Checkpoint>`, the preprocessor will be used to
      transform the DataFrame.
    - The transformed DataFrame will be passed to the model for inference (via the
      ``predictor._predict_pandas`` method).
    - The predictions will be outputted by ``predict`` in the same type as the
      original input.

    **How do I create a new Predictor?**

    To implement a new Predictor for your particular framework, you should subclass
    the base ``Predictor`` and implement the following two methods:

    1. ``_predict_pandas``: Given a pandas.DataFrame input, return a
       pandas.DataFrame containing predictions.
    2. ``from_checkpoint``: Logic for creating a Predictor from a
       :class:`Checkpoint <ray.train.Checkpoint>`.
    3. Optionally ``_predict_numpy`` for better performance when working with
       tensor data to avoid extra copies from Pandas conversions.
    Npreprocessorr   c                 C   s*   t j| jj dtdd || _d| _dS )zBSubclasseses must call Predictor.__init__() to set a preprocessor.z4 is deprecated and will be removed after April 2026.   )
stacklevelFN)warningswarn	__class__r   DeprecationWarning_preprocessor_cast_tensor_columnsselfr   r   r   r   __init__P   s   
zPredictor.__init__
checkpointreturnc                 K      t )a  Create a specific predictor from a checkpoint.

        Args:
            checkpoint: Checkpoint to load predictor data from.
            kwargs: Arguments specific to predictor implementations.

        Returns:
            Predictor: Predictor object.
        NotImplementedErrorclsr(   kwargsr   r   r   from_checkpoint\      zPredictor.from_checkpoint
pandas_udfc                    s   G  fdddt    S )zCreate a Predictor from a Pandas UDF.

        Args:
            pandas_udf: A function that takes a pandas.DataFrame and other
                optional kwargs and returns a pandas.DataFrame.
        c                       s4   e Zd Zededdf fddZd
fddZd	S )z5Predictor.from_pandas_udf.<locals>.PandasUDFPredictorr(   r)   r   c                    s     S Nr   r-   )PandasUDFPredictorr   r   r0   v      zEPredictor.from_pandas_udf.<locals>.PandasUDFPredictor.from_checkpointpd.DataFramec                    s    |fi |S r3   r   )r&   dfr/   )r2   r   r   _predict_pandasz   s   zEPredictor.from_pandas_udf.<locals>.PandasUDFPredictor._predict_pandasN)r)   r6   )r   r   r   classmethodr   r0   r8   r   r4   r2   r   r   r4   u   s    r4   )r   )r.   r2   r   r:   r   from_pandas_udfj   s   zPredictor.from_pandas_udfc                 C   s   | j S )z;Get the preprocessor to use prior to executing predictions.r#   r&   r   r   r   get_preprocessor   r5   zPredictor.get_preprocessorc                 C   s
   || _ dS )z;Set the preprocessor to use prior to executing predictions.Nr<   r%   r   r   r   set_preprocessor   s   
zPredictor.set_preprocessorc                 C   s   t jS )a  Batch format hint for upstream producers to try yielding best block format.

        The preferred batch format to use if both `_predict_pandas` and
        `_predict_numpy` are implemented. Defaults to Pandas.

        Can be overridden by predictor classes depending on the framework type,
        e.g. TorchPredictor prefers Numpy and XGBoostPredictor prefers Pandas as
        native batch format.

        )r	   PANDAS)r.   r   r   r   preferred_batch_format   s   z Predictor.preferred_batch_formatc                 C   sN   | j tj k}| jtjk}|r|r|  S |rtjS |rtjS td| j d)z4Determine the batch format to use for the predictor.z
Predictor zG must implement at least one of `_predict_pandas` and `_predict_numpy`.)	r8   r   _predict_numpyrA   r	   r@   NUMPYr,   r   )r.   has_pandas_implementedhas_numpy_implementedr   r   r   _batch_format_to_use   s   zPredictor._batch_format_to_usec                 C   s
   d| _ dS )a  Enable automatic tensor column casting.

        If this is called on a predictor, the predictor will cast tensor columns to
        NumPy ndarrays in the input to the preprocessors and cast tensor columns back to
        the tensor extension type in the prediction outputs.
        TN)r$   r=   r   r   r   _set_cast_tensor_columns   s   
z"Predictor._set_cast_tensor_columnsdatac              
   K   s  t | ds	tdztt| }W n ty(   tdt| dtt  w | jr2| j	|}| 
 }|tjkr_|tjkrK| jt|fi |S |tjkr]t| jt|fi |S dS |tjkr|tjkrvt| jt|fi |S |tjkr| jt|fi |S dS dS )a  Perform inference on a batch of data.

        Args:
            data: A batch of input data of type ``DataBatchType``.
            kwargs: Arguments specific to predictor implementations. These are passed
                directly to ``_predict_numpy`` or ``_predict_pandas``.

        Returns:
            DataBatchType:
                Prediction result. The return type will be the same as the input type.
        r#   zCSubclasses of Predictor must call Predictor.__init__(preprocessor).zInvalid input data type of z, supported types: N)hasattrr,   r   typeKeyErrorRuntimeErrorlistkeysr#   transform_batchrF   r	   r@   r8   r   rC   rB   r
   )r&   rH   r/   batch_formatbatch_format_to_user   r   r   predict   sJ   







zPredictor.predictr6   c                 K   r*   )a  Perform inference on a Pandas DataFrame.

        Args:
            data: A pandas DataFrame to perform predictions on.
            kwargs: Arguments specific to the predictor implementation.

        Returns:
            A pandas DataFrame containing the prediction result.

        r+   r&   rH   r/   r   r   r   r8      r1   zPredictor._predict_pandasc                 K   r*   )a  Perform inference on a Numpy data.

        All Predictors working with tensor data (like deep learning predictors)
        should implement this method.

        Args:
            data: A Numpy ndarray or dictionary of ndarrays to perform predictions on.
            kwargs: Arguments specific to the predictor implementation.

        Returns:
            A Numpy ndarray or dictionary of ndarray containing the prediction result.

        r+   rS   r   r   r   rB      s   zPredictor._predict_numpyc                 C   s   t d)NzPredictor instances are not serializable. Instead, you may want to serialize a checkpoint and initialize the Predictor with Predictor.from_checkpoint.)r   r=   r   r   r   
__reduce__  s   zPredictor.__reduce__r3   )rH   r6   r)   r6   ) r   r   r   r   r   r'   r9   abcabstractmethodr   r0   r   pd	DataFramer;   r>   r?   r   r	   rA   rF   rG   r   rR   r8   r   npndarrayr   strrB   rT   r   r   r   r   r   *   s<    $	0r   )*rU   r   typingr   r   r   r   r   r   numpyrY   pandasrW   ray.air.data_batch_typer   "ray.air.util.data_batch_conversionr	   r
   r   	ray.trainr   ray.util.annotationsr   r   r   pyarrowTablepa_tableImportErrorray.datar   rZ   rC   dictrX   r@   r   __annotations__rL   r   ABCr   r   r   r   r   <module>   s2   
  
