o
    bi8                     @   s   d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	m
Z
mZmZmZ d dlmZ d dlmZmZ erNd dlZd dlZd dlmZ d dlmZ edd	G d
d deZedd	G dd de jZdS )    N)Enum)TYPE_CHECKINGAnyDictListOptionalUnion)BatchFormat)DeveloperAPI	PublicAPI)DataBatchType)Datasetbeta)	stabilityc                   @   s   e Zd ZdZdS )PreprocessorNotFittedExceptionz<Error raised when the preprocessor needs to be fitted first.N)__name__
__module____qualname____doc__ r   r   I/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/preprocessor.pyr      s    r   c                   @   s  e Zd ZdZG dd deeZdZdd Zd?d	d
Z	d@ddZ
ddddddddee dee dee dee ddfddZddddddddee dee dee dee ddfddZdAd d!Zed@d"d#Zdefd$d%Z			dBdddee dee dee dee ddfd&d'Zdeeef fd(d)ZdAd*d+Zed,ee d-eee  dee fd.d/ZedCd2d3Zed4ed5eed5f f ded5eed5f f fd6d7Zeedefd8d9Z edefd:d;Z!e"ed<edd fd=d>Z#dS )DPreprocessora  Implements an ML preprocessing operation.

    Preprocessors are stateful objects that can be fitted against a Dataset and used
    to transform both local data batches and distributed data. For example, a
    Normalization preprocessor may calculate the mean and stdev of a field during
    fitting, and uses these attributes to implement its normalization transform.

    Preprocessors can also be stateless and transform data without needed to be fitted.
    For example, a preprocessor may simply remove a column, which does not require
    any state to be fitted.

    If you are implementing your own Preprocessor sub-class, you should override the
    following:

    * ``_fit`` if your preprocessor is stateful. Otherwise, set
      ``_is_fittable=False``.
    * ``_transform_pandas`` and/or ``_transform_numpy`` for best performance,
      implement both. Otherwise, the data will be converted to the match the
      implemented method.
    c                   @   s    e Zd ZdZdZdZdZdZdS )Preprocessor.FitStatuszThe fit status of preprocessor.NOT_FITTABLE
NOT_FITTEDPARTIALLY_FITTEDFITTEDN)r   r   r   r   r   r   r   r   r   r   r   r   	FitStatus2   s    r   Tc                 C   s   dd t | D }t|S )aG  Checks if the Preprocessor has fitted state.

        This is also used as an indiciation if the Preprocessor has been fit, following
        convention from Ray versions prior to 2.6.
        This allows preprocessors that have been fit in older versions of Ray to be
        used to transform data in newer versions.
        c                 S   s   g | ]	}| d r|qS )_)endswith).0vr   r   r   
<listcomp>K   s    z8Preprocessor._check_has_fitted_state.<locals>.<listcomp>)varsbool)selffitted_varsr   r   r   _check_has_fitted_stateB   s   	z$Preprocessor._check_has_fitted_statereturnr   c                 C   s6   | j stjjS t| dr| js|  rtjjS tjjS )N_fitted)	_is_fittabler   r   r   hasattrr)   r'   r   r   r%   r   r   r   
fit_statusN   s   zPreprocessor.fit_statusdsr   c                 C   sJ   |   }|tjjkr| S |tjjtjjfv rtd | |}d| _	|S )a  Fit this Preprocessor to the Dataset.

        Fitted state attributes will be directly set in the Preprocessor.

        Calling it more than once will overwrite all previously fitted state:
        ``preprocessor.fit(A).fit(B)`` is equivalent to ``preprocessor.fit(B)``.

        Args:
            ds: Input dataset.

        Returns:
            Preprocessor: The fitted Preprocessor with state attributes.
        z`fit` has already been called on the preprocessor (or at least one contained preprocessors if this is a chain). All previously fitted state will be overwritten!T)
r-   r   r   r   r   r   warningswarn_fitr)   )r%   r.   r-   	fitted_dsr   r   r   fitX   s   
zPreprocessor.fitN)transform_num_cpustransform_memorytransform_batch_sizetransform_concurrencyr4   r5   r6   r7   c                C   s   |  | | j|||||dS )a$  Fit this Preprocessor to the Dataset and then transform the Dataset.

        Calling it more than once will overwrite all previously fitted state:
        ``preprocessor.fit_transform(A).fit_transform(B)``
        is equivalent to ``preprocessor.fit_transform(B)``.

        Args:
            ds: Input Dataset.
            transform_num_cpus: [experimental] The number of CPUs to reserve for each parallel map worker.
            transform_memory: [experimental] The heap memory in bytes to reserve for each parallel map worker.
            transform_batch_size: [experimental] The maximum number of rows to return.
            transform_concurrency: [experimental] The maximum number of Ray workers to use concurrently.

        Returns:
            ray.data.Dataset: The transformed Dataset.
        )num_cpusmemory
batch_sizeconcurrency)r3   	transform)r%   r.   r4   r5   r6   r7   r   r   r   fit_transformy   s   
zPreprocessor.fit_transformr:   r8   r9   r;   r:   r8   r9   r;   c                C   s<   |   }|tjjtjjfv rtd| j|||||d}|S )a  Transform the given dataset.

        Args:
            ds: Input Dataset.
            batch_size: [experimental] Advanced configuration for adjusting input size for each worker.
            num_cpus: [experimental] The number of CPUs to reserve for each parallel map worker.
            memory: [experimental] The heap memory in bytes to reserve for each parallel map worker.
            concurrency: [experimental] The maximum number of Ray workers to use concurrently.

        Returns:
            ray.data.Dataset: The transformed Dataset.

        Raises:
            PreprocessorNotFittedException: if ``fit`` is not called yet.
        zX`fit` must be called before `transform`, or simply use fit_transform() to run both stepsr>   )r-   r   r   r   r   r   
_transform)r%   r.   r:   r8   r9   r;   r-   transformed_dsr   r   r   r<      s    zPreprocessor.transformdatar   c                 C   s.   |   }|tjjtjjfv rtd| |S )a  Transform a single batch of data.

        The data will be converted to the format supported by the Preprocessor,
        based on which ``_transform_*`` methods are implemented.

        Args:
            data: Input data batch.

        Returns:
            DataBatchType:
                The transformed data batch. This may differ
                from the input type depending on which ``_transform_*`` methods
                are implemented.
        z.`fit` must be called before `transform_batch`.)r-   r   r   r   r   r   _transform_batch)r%   rA   r-   r   r   r   transform_batch   s   
zPreprocessor.transform_batchc                 C      t  )z2Sub-classes should override this instead of fit().NotImplementedError)r%   r.   r   r   r   r1         zPreprocessor._fitc                 C   sH   | j jtjk}| j jtjk}|r|r|  S |rtjS |r tjS td)af  Determine which batch format to use based on Preprocessor implementation.

        * If only `_transform_pandas` is implemented, then use ``pandas`` batch format.
        * If only `_transform_numpy` is implemented, then use ``numpy`` batch format.
        * If both are implemented, then use the Preprocessor defined preferred batch
        format.
        zNone of `_transform_numpy` or `_transform_pandas` are implemented. At least one of these transform functions must be implemented for Preprocessor transforms.)		__class___transform_pandasr   _transform_numpypreferred_batch_formatr	   NUMPYPANDASrF   )r%   has_transform_pandashas_transform_numpyr   r   r   _determine_transform_to_use   s   
z(Preprocessor._determine_transform_to_usec                 C   s   |   }|  }|d ur||d< |d ur||d< |d ur ||d< |d ur(||d< |tjkr:|j| jfdtji|S |tjkrL|j| jfdtji|S td| )Nr8   r9   r:   r;   batch_formatziInvalid transform type returned from _determine_transform_to_use; "pandas" and "numpy" allowed, but got: )	rP   _get_transform_configr	   rM   map_batchesrI   rL   rJ   
ValueError)r%   r.   r:   r8   r9   r;   transform_typekwargsr   r   r   r?      s>   

zPreprocessor._transformc                 C   s   i S )zReturns kwargs to be passed to :meth:`ray.data.Dataset.map_batches`.

        This can be implemented by subclassing preprocessors.
        r   r,   r   r   r   rR   $  s   z"Preprocessor._get_transform_configc                 C   s   dd l }dd l}ddlm}m} zdd l}W n ty!   d }Y nw t||j|j	t
jj|jfs:tdt| d|  }|tjkrJ| ||S |tjkrV| ||S d S )Nr   )_convert_batch_type_to_numpy_convert_batch_type_to_pandasz`transform_batch` is currently only implemented for Pandas DataFrames, pyarrow Tables, NumPy ndarray and dictionary of ndarray. Got .)numpypandas"ray.air.util.data_batch_conversionrW   rX   pyarrowImportError
isinstance	DataFrameTablecollectionsabcMappingndarrayrT   typerP   r	   rM   rI   rL   rJ   )r%   rA   nppdrW   rX   r]   rU   r   r   r   rB   +  s,   

zPreprocessor._transform_batchcolumnsoutput_columnsc                 C   s$   |rt |t |krtd|p|S )a(  Returns the output columns after validation.

        Checks if the columns are explicitly set, otherwise defaulting to
        the input columns.

        Raises:
            ValueError: If the length of the output columns does not match the
                length of the input columns.
        zuInvalid output_columns: Got len(columns) != len(output_columns). The length of columns and output_columns must match.)lenrT   )clsri   rj   r   r   r   #_derive_and_validate_output_columnsJ  s
   z0Preprocessor._derive_and_validate_output_columnsdfpd.DataFramec                 C   rD   )zDRun the transformation on a data batch in a Pandas DataFrame format.rE   )r%   rn   r   r   r   rI   _  rG   zPreprocessor._transform_pandasnp_dataz
np.ndarrayc                 C   rD   )zARun the transformation on a data batch in a NumPy ndarray format.rE   )r%   rp   r   r   r   rJ   d  s   zPreprocessor._transform_numpyc                 C   s   t jS )aP  Batch format hint for upstream producers to try yielding best block format.

        The preferred batch format to use if both `_transform_pandas` and
        `_transform_numpy` are implemented. Defaults to Pandas.

        Can be overriden by Preprocessor classes depending on which transform
        path is the most optimal.
        )r	   rM   )rl   r   r   r   rK   k  s   z#Preprocessor.preferred_batch_formatc                 C   s   t t| dS )zReturn this preprocessor serialized as a string.

        Note: This is not a stable serialization format as it uses `pickle`.
        ascii)base64	b64encodepickledumpsdecoder,   r   r   r   	serializex  s   zPreprocessor.serialize
serializedc                 C   s   t t| S )zALoad the original preprocessor serialized via `self.serialize()`.)rt   loadsrr   	b64decode)rx   r   r   r   deserialize  s   zPreprocessor.deserialize)r(   r   )r.   r   r(   r   )rA   r   r(   r   )NNN)rn   ro   r(   ro   )$r   r   r   r   strr   r   r*   r'   r-   r3   r   floatintr=   r<   rC   r
   r1   r	   rP   r?   r   r   rR   rB   classmethodr   rm   rI   r   rJ   rK   rw   staticmethodr{   r   r   r   r   r      s    


%
&

*!
$

	r   )rc   rr   rb   rt   r/   enumr   typingr   r   r   r   r   r   r\   r	   ray.util.annotationsr
   r   rZ   rg   r[   rh   ray.air.data_batch_typer   ray.datar   RuntimeErrorr   ABCr   r   r   r   r   <module>   s$     