o
    $i%                     @   s  U d dl Z d dlmZmZmZmZmZ d dlZd dl	Z
d dlmZ d dlmZmZmZ d dlmZ d dlmZ d dlmZmZ z	d dlZejZW n eyS   dZY nw ejejeeje
jej iZ!eee ef e"d< ed	d
G dd de#Z$ed	d
G dd de j%Z&dS )    N)CallableDictOptionalTypeUnion)DataBatchType)BatchFormat_convert_batch_type_to_numpy_convert_batch_type_to_pandas)Preprocessor)
Checkpoint)DeveloperAPI	PublicAPITYPE_TO_ENUMbeta)	stabilityc                   @   s   e Zd ZdZdS )!PredictorNotSerializableExceptionz;Error raised when trying to serialize a Predictor instance.N)__name__
__module____qualname____doc__ r   r   P/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/train/predictor.pyr       s    r   c                	   @   s"  e Zd ZdZd!dee fddZeej	de
dd fdd	Zed
eejgejf dd fddZdee fddZdee ddfddZeedefddZedefddZdd ZdedefddZed"ddZedeejeeejf f deejeeejf f fddZdd  Z dS )#	PredictoraV  Predictors load models from checkpoints to perform inference.

    .. note::
        The base ``Predictor`` class cannot be instantiated directly. Only one of
        its subclasses can be used.

    **How does a Predictor work?**

    Predictors expose a ``predict`` method that accepts an input batch of type
    ``DataBatchType`` and outputs predictions of the same type as the input batch.

    When the ``predict`` method is called the following occurs:

    - The input batch is converted into a pandas DataFrame. Tensor input (like a
      ``np.ndarray``) will be converted into a single column Pandas Dataframe.
    - If there is a :ref:`Preprocessor <preprocessor-ref>` saved in the provided
      :class:`Checkpoint <ray.train.Checkpoint>`, the preprocessor will be used to
      transform the DataFrame.
    - The transformed DataFrame will be passed to the model for inference (via the
      ``predictor._predict_pandas`` method).
    - The predictions will be outputted by ``predict`` in the same type as the
      original input.

    **How do I create a new Predictor?**

    To implement a new Predictor for your particular framework, you should subclass
    the base ``Predictor`` and implement the following two methods:

    1. ``_predict_pandas``: Given a pandas.DataFrame input, return a
       pandas.DataFrame containing predictions.
    2. ``from_checkpoint``: Logic for creating a Predictor from a
       :class:`Checkpoint <ray.train.Checkpoint>`.
    3. Optionally ``_predict_numpy`` for better performance when working with
       tensor data to avoid extra copies from Pandas conversions.
    Npreprocessorc                 C   s   || _ d| _dS )zBSubclasseses must call Predictor.__init__() to set a preprocessor.FN)_preprocessor_cast_tensor_columnsselfr   r   r   r   __init__M   s   
zPredictor.__init__
checkpointreturnc                 K      t )a  Create a specific predictor from a checkpoint.

        Args:
            checkpoint: Checkpoint to load predictor data from.
            kwargs: Arguments specific to predictor implementations.

        Returns:
            Predictor: Predictor object.
        NotImplementedErrorclsr    kwargsr   r   r   from_checkpointT      zPredictor.from_checkpoint
pandas_udfc                    s   G  fdddt    S )zCreate a Predictor from a Pandas UDF.

        Args:
            pandas_udf: A function that takes a pandas.DataFrame and other
                optional kwargs and returns a pandas.DataFrame.
        c                       s4   e Zd Zededdf fddZd
fddZd	S )z5Predictor.from_pandas_udf.<locals>.PandasUDFPredictorr    r!   r   c                    s     S Nr   r%   )PandasUDFPredictorr   r   r(   n      zEPredictor.from_pandas_udf.<locals>.PandasUDFPredictor.from_checkpointpd.DataFramec                    s    |fi |S r+   r   )r   dfr'   )r*   r   r   _predict_pandasr   s   zEPredictor.from_pandas_udf.<locals>.PandasUDFPredictor._predict_pandasN)r!   r.   )r   r   r   classmethodr   r(   r0   r   r,   r*   r   r   r,   m   s    r,   )r   )r&   r*   r   r2   r   from_pandas_udfb   s   zPredictor.from_pandas_udfc                 C   s   | j S )z;Get the preprocessor to use prior to executing predictions.r   r   r   r   r   get_preprocessorw   r-   zPredictor.get_preprocessorc                 C   s
   || _ dS )z;Set the preprocessor to use prior to executing predictions.Nr4   r   r   r   r   set_preprocessor{   s   
zPredictor.set_preprocessorc                 C   s   t jS )a  Batch format hint for upstream producers to try yielding best block format.

        The preferred batch format to use if both `_predict_pandas` and
        `_predict_numpy` are implemented. Defaults to Pandas.

        Can be overriden by predictor classes depending on the framework type,
        e.g. TorchPredictor prefers Numpy and XGBoostPredictor prefers Pandas as
        native batch format.

        )r   PANDAS)r&   r   r   r   preferred_batch_format   s   z Predictor.preferred_batch_formatc                 C   sN   | j tj k}| jtjk}|r|r|  S |rtjS |rtjS td| j d)z4Determine the batch format to use for the predictor.z
Predictor zG must implement at least one of `_predict_pandas` and `_predict_numpy`.)	r0   r   _predict_numpyr9   r   r8   NUMPYr$   r   )r&   has_pandas_implementedhas_numpy_implementedr   r   r   _batch_format_to_use   s   zPredictor._batch_format_to_usec                 C   s
   d| _ dS )a  Enable automatic tensor column casting.

        If this is called on a predictor, the predictor will cast tensor columns to
        NumPy ndarrays in the input to the preprocessors and cast tensor columns back to
        the tensor extension type in the prediction outputs.
        TN)r   r5   r   r   r   _set_cast_tensor_columns   s   
z"Predictor._set_cast_tensor_columnsdatac              
   K   s  t | ds	tdztt| }W n ty(   tdt| dtt  w | jr2| j	|}| 
 }|tjkr_|tjkrK| jt|fi |S |tjkr]t| jt|fi |S dS |tjkr|tjkrvt| jt|fi |S |tjkr| jt|fi |S dS dS )a  Perform inference on a batch of data.

        Args:
            data: A batch of input data of type ``DataBatchType``.
            kwargs: Arguments specific to predictor implementations. These are passed
                directly to ``_predict_numpy`` or ``_predict_pandas``.

        Returns:
            DataBatchType:
                Prediction result. The return type will be the same as the input type.
        r   zCSubclasses of Predictor must call Predictor.__init__(preprocessor).zInvalid input data type of z, supported types: N)hasattrr$   r   typeKeyErrorRuntimeErrorlistkeysr   transform_batchr>   r   r8   r0   r
   r;   r:   r	   )r   r@   r'   batch_formatbatch_format_to_user   r   r   predict   sJ   







zPredictor.predictr.   c                 K   r"   )a  Perform inference on a Pandas DataFrame.

        Args:
            data: A pandas DataFrame to perform predictions on.
            kwargs: Arguments specific to the predictor implementation.

        Returns:
            A pandas DataFrame containing the prediction result.

        r#   r   r@   r'   r   r   r   r0      r)   zPredictor._predict_pandasc                 K   r"   )a  Perform inference on a Numpy data.

        All Predictors working with tensor data (like deep learning predictors)
        should implement this method.

        Args:
            data: A Numpy ndarray or dictionary of ndarrays to perform predictions on.
            kwargs: Arguments specific to the predictor implementation.

        Returns:
            A Numpy ndarray or dictionary of ndarray containing the prediction result.

        r#   rK   r   r   r   r:      s   zPredictor._predict_numpyc                 C   s   t d)NzPredictor instances are not serializable. Instead, you may want to serialize a checkpoint and initialize the Predictor with Predictor.from_checkpoint.)r   r5   r   r   r   
__reduce__   s   zPredictor.__reduce__r+   )r@   r.   r!   r.   )!r   r   r   r   r   r   r   r1   abcabstractmethodr   r(   r   pd	DataFramer3   r6   r7   r   r   r9   r>   r?   r   rJ   r0   r   npndarrayr   strr:   rL   r   r   r   r   r   '   s<    $	0r   )'rM   typingr   r   r   r   r   numpyrQ   pandasrO   ray.air.data_batch_typer   "ray.air.util.data_batch_conversionr   r	   r
   ray.datar   	ray.trainr   ray.util.annotationsr   r   pyarrowTablepa_tableImportErrorrR   r;   dictrP   r8   r   __annotations__rD   r   ABCr   r   r   r   r   <module>   s.   
 
