o
    `۷iQ                     @   sX  d dl mZmZmZmZmZmZ d dlZd dl	Z
d dlZd dlmZ d dlmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dl m!Z!m"Z" er\d d	l#m$Z$ d
Z%e"ddedddG dd deZ&e"ddedddG dd deZ'e"ddedddG dd deZ(e"ddedddG dd deZ)dS )    )TYPE_CHECKINGAnyDictListOptionalTupleN)AbsMaxApproximateQuantileMaxMeanMinStd)BlockAccessor)PreprocessorSerializablePreprocessorBase)SerializablePreprocessor)BatchFormat)DeveloperAPI	PublicAPI)Datasetg:0yE>alpha)	stability   z$io.ray.preprocessors.standard_scaler)version
identifierc                	       s   e Zd ZdZd!dee deee  f fddZddd	efd
dZ	de
jfddZedejdeded	ejfddZdejd	ejfddZeed	efddZd	eeef fddZdeeef defddZdd  Z  ZS )"StandardScalerav	  Translate and scale each column by its mean and standard deviation,
    respectively.

    The general formula is given by

    .. math::

        x' = \frac{x - \bar{x}}{s}

    where :math:`x` is the column, :math:`x'` is the transformed column,
    :math:`\bar{x}` is the column average, and :math:`s` is the column's sample
    standard deviation. If :math:`s = 0` (i.e., the column is constant-valued),
    then the transformed column will contain zeros.

    .. warning::
        :class:`StandardScaler` works best when your data is normal. If your data isn't
        approximately normal, then the transformed features won't be meaningful.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import StandardScaler
        >>>
        >>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]})
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> ds.to_pandas()  # doctest: +SKIP
           X1  X2  X3
        0  -2  -3   1
        1   0  -3   1
        2   2   3   1

        Columns are scaled separately.

        >>> preprocessor = StandardScaler(columns=["X1", "X2"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
                 X1        X2  X3
        0 -1.224745 -0.707107   1
        1  0.000000 -0.707107   1
        2  1.224745  1.414214   1

        Constant-valued columns get filled with zeros.

        >>> preprocessor = StandardScaler(columns=["X3"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2   X3
        0  -2  -3  0.0
        1   0  -3  0.0
        2   2   3  0.0

        >>> preprocessor = StandardScaler(
        ...     columns=["X1", "X2"],
        ...     output_columns=["X1_scaled", "X2_scaled"]
        ... )
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2  X3  X1_scaled  X2_scaled
        0  -2  -3   1  -1.224745  -0.707107
        1   0  -3   1   0.000000  -0.707107
        2   2   3   1   1.224745   1.414214

    Args:
        columns: The columns to separately scale.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.
    Ncolumnsoutput_columnsc                    "   t    || _t||| _d S Nsuper__init__r   r   #_derive_and_validate_output_columnsr   selfr   r   	__class__ S/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/preprocessors/scaler.pyr"   ^   
   

zStandardScaler.__init__datasetr   returnc                 C   s,   | j jt| jd | j jdd | jd | S )N)aggregator_fnr   c                 S   s   t | ddS )Nr   )ddof)r   )colr(   r(   r)   <lambda>k   s    z%StandardScaler._fit.<locals>.<lambda>)stat_computation_planadd_aggregatorr   r   )r%   r+   r(   r(   r)   _fite   s   zStandardScaler._fitdfc                    .   dt jf fdd}| j || j< |S )Nsc                    sb    j d| j d } j d| j d }|d u s|d u r%tj| d d < | S |tk r+d}| | | S )Nmean()std(r   )stats_namenpnan_EPSILON)r6   s_means_stdr%   r(   r)   column_standard_scalerq   s   z@StandardScaler._transform_pandas.<locals>.column_standard_scalerpdSeriesr   	transformr   )r%   r4   rB   r(   rA   r)   _transform_pandasp   s   z StandardScaler._transform_pandascolumnmeanstdc              	   C   s4   |t k rd}tt| tt|tt|S )Nr   )r>   pcdividesubtractpascalarfloat)rH   rI   rJ   r(   r(   r)   _scale_column   s
    zStandardScaler._scale_columntablec           
         s    fdd| j D }t| j | j|D ]C\}}}| jd| d }| jd| d }|du s1|du rEtjt||jd}t	 
|| q| |||}	t	 
||	 q S )z/Transform using fast native PyArrow operations.c                    s   g | ]}  |qS r(   )rH   ).0	input_colrR   r(   r)   
<listcomp>   s    z3StandardScaler._transform_arrow.<locals>.<listcomp>r7   r8   r9   N)type)r   zipr   r:   rN   nullslenrW   r   	for_blockupsert_columnrQ   )
r%   rR   input_columnsrT   
output_colrH   r?   r@   
null_arrayscaled_columnr(   rU   r)   _transform_arrow   s"   


zStandardScaler._transform_arrowc                 C   s   t jS r   )r   ARROW)clsr(   r(   r)   preferred_batch_format   s   z%StandardScaler.preferred_batch_formatc                 C      | j | jt| dd dS N_fitted)r   r   rg   r   r   getattrrA   r(   r(   r)   _get_serializable_fields      
z'StandardScaler._get_serializable_fieldsfieldsr   c                 C   $   |d | _ |d | _|d| _d S Nr   r   rg   r   r   getrg   r%   rl   r   r(   r(   r)   _set_serializable_fields      

z'StandardScaler._set_serializable_fieldsc                 C      | j j d| jd| jdS N	(columns=z, output_columns=r8   r'   __name__r   r   rA   r(   r(   r)   __repr__      zStandardScaler.__repr__r   )rx   
__module____qualname____doc__r   strr   r"   r   r3   rD   	DataFramerG   staticmethodrN   ArrayrP   rQ   Tablera   classmethodr   r   rd   r   r   rj   intrr   ry   __classcell__r(   r(   r&   r)   r      s    $C 	r   z#io.ray.preprocessors.min_max_scalerc                          e Zd ZdZddee deee  f fddZddd	efd
dZ	de
jfddZd	eeef fddZdeeef defddZdd Z  ZS )MinMaxScalera  Scale each column by its range.

    The general formula is given by

    .. math::

        x' = \frac{x - \min(x)}{\max{x} - \min{x}}

    where :math:`x` is the column and :math:`x'` is the transformed column. If
    :math:`\max{x} - \min{x} = 0` (i.e., the column is constant-valued), then the
    transformed column will get filled with zeros.

    Transformed values are always in the range :math:`[0, 1]`.

    .. tip::
        This can be used as an alternative to :py:class:`StandardScaler`.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import MinMaxScaler
        >>>
        >>> df = pd.DataFrame({"X1": [-2, 0, 2], "X2": [-3, -3, 3], "X3": [1, 1, 1]})   # noqa: E501
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> ds.to_pandas()  # doctest: +SKIP
           X1  X2  X3
        0  -2  -3   1
        1   0  -3   1
        2   2   3   1

        Columns are scaled separately.

        >>> preprocessor = MinMaxScaler(columns=["X1", "X2"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
            X1   X2  X3
        0  0.0  0.0   1
        1  0.5  0.0   1
        2  1.0  1.0   1

        Constant-valued columns get filled with zeros.

        >>> preprocessor = MinMaxScaler(columns=["X3"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2   X3
        0  -2  -3  0.0
        1   0  -3  0.0
        2   2   3  0.0

        >>> preprocessor = MinMaxScaler(columns=["X1", "X2"], output_columns=["X1_scaled", "X2_scaled"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2  X3  X1_scaled  X2_scaled
        0  -2  -3   1        0.0        0.0
        1   0  -3   1        0.5        0.0
        2   2   3   1        1.0        1.0

    Args:
        columns: The columns to separately scale.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.
    Nr   r   c                    r   r   r    r$   r&   r(   r)   r"     r*   zMinMaxScaler.__init__r+   r   r,   c                    s&    fddt tfD }|j|  _ S )Nc                    s    g | ]} j D ]}||qqS r(   )r   )rS   Aggr/   rA   r(   r)   rV     s     z%MinMaxScaler._fit.<locals>.<listcomp>)r   r
   	aggregater:   r%   r+   
aggregatesr(   rA   r)   r3   
  s   zMinMaxScaler._fitr4   c                    r5   )Nr6   c                    sH    j d| j d } j d| j d }|| }|tk rd}| | | S )Nzmin(r8   zmax(r   )r:   r;   r>   )r6   s_mins_maxdiffrA   r(   r)   column_min_max_scaler  s   z=MinMaxScaler._transform_pandas.<locals>.column_min_max_scalerrC   )r%   r4   r   r(   rA   r)   rG        zMinMaxScaler._transform_pandasc                 C   re   rf   rh   rA   r(   r(   r)   rj      rk   z%MinMaxScaler._get_serializable_fieldsrl   r   c                 C   rm   rn   ro   rq   r(   r(   r)   rr   '  rs   z%MinMaxScaler._set_serializable_fieldsc                 C   rt   ru   rw   rA   r(   r(   r)   ry   .  rz   zMinMaxScaler.__repr__r   rx   r{   r|   r}   r   r~   r   r"   r   r3   rD   r   rG   r   r   rj   r   rr   ry   r   r(   r(   r&   r)   r      s    $?r   z#io.ray.preprocessors.max_abs_scalerc                       r   )MaxAbsScalera@  Scale each column by its absolute max value.

    The general formula is given by

    .. math::

        x' = \frac{x}{\max{\vert x \vert}}

    where :math:`x` is the column and :math:`x'` is the transformed column. If
    :math:`\max{\vert x \vert} = 0` (i.e., the column contains all zeros), then the
    column is unmodified.

    .. tip::
        This is the recommended way to scale sparse data. If you data isn't sparse,
        you can use :class:`MinMaxScaler` or :class:`StandardScaler` instead.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import MaxAbsScaler
        >>>
        >>> df = pd.DataFrame({"X1": [-6, 3], "X2": [2, -4], "X3": [0, 0]})   # noqa: E501
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> ds.to_pandas()  # doctest: +SKIP
           X1  X2  X3
        0  -6   2   0
        1   3  -4   0

        Columns are scaled separately.

        >>> preprocessor = MaxAbsScaler(columns=["X1", "X2"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
            X1   X2  X3
        0 -1.0  0.5   0
        1  0.5 -1.0   0

        Zero-valued columns aren't scaled.

        >>> preprocessor = MaxAbsScaler(columns=["X3"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2   X3
        0  -6   2  0.0
        1   3  -4  0.0

        >>> preprocessor = MaxAbsScaler(columns=["X1", "X2"], output_columns=["X1_scaled", "X2_scaled"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2  X3  X1_scaled  X2_scaled
        0  -2  -3   1       -1.0       -1.0
        1   0  -3   1        0.0       -1.0
        2   2   3   1        1.0        1.0

    Args:
        columns: The columns to separately scale.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.
    Nr   r   c                    r   r   r    r$   r&   r(   r)   r"   p  r*   zMaxAbsScaler.__init__r+   r   r,   c                 C   s    dd | j D }|j| | _| S )Nc                 S   s   g | ]}t |qS r(   )r   rS   r/   r(   r(   r)   rV   x  s    z%MaxAbsScaler._fit.<locals>.<listcomp>)r   r   r:   r   r(   r(   r)   r3   w  s   zMaxAbsScaler._fitr4   c                    r5   )Nr6   c                    s(    j d| j d }|dkrd}| | S )Nzabs_max(r8   r   r   )r:   r;   )r6   	s_abs_maxrA   r(   r)   column_abs_max_scaler}  s   z=MaxAbsScaler._transform_pandas.<locals>.column_abs_max_scalerrC   )r%   r4   r   r(   rA   r)   rG   |  s   
zMaxAbsScaler._transform_pandasc                 C   re   rf   rh   rA   r(   r(   r)   rj     rk   z%MaxAbsScaler._get_serializable_fieldsrl   r   c                 C   rm   rn   ro   rq   r(   r(   r)   rr     rs   z%MaxAbsScaler._set_serializable_fieldsc                 C   rt   ru   rw   rA   r(   r(   r)   ry     rz   zMaxAbsScaler.__repr__r   r   r(   r(   r&   r)   r   2  s    $;r   z"io.ray.preprocessors.robust_scalerc                	       s   e Zd ZdZdZddefdee deeef de	ee  de
f fd	d
ZdddefddZdejfddZdeeef fddZdeeef de
fddZdd Z  ZS )RobustScaleraI  Scale and translate each column using approximate quantiles.

    The general formula is given by

    .. math::
        x' = \frac{x - \mu_{1/2}}{\mu_h - \mu_l}

    where :math:`x` is the column, :math:`x'` is the transformed column,
    :math:`\mu_{1/2}` is the column median. :math:`\mu_{h}` and :math:`\mu_{l}` are the
    high and low quantiles, respectively. By default, :math:`\mu_{h}` is the third
    quartile and :math:`\mu_{l}` is the first quartile.

    Internally, the `ApproximateQuantile` aggregator is used to calculate the
    approximate quantiles.

    .. tip::
        This scaler works well when your data contains many outliers.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import RobustScaler
        >>>
        >>> df = pd.DataFrame({
        ...     "X1": [1, 2, 3, 4, 5],
        ...     "X2": [13, 5, 14, 2, 8],
        ...     "X3": [1, 2, 2, 2, 3],
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> ds.to_pandas()  # doctest: +SKIP
           X1  X2  X3
        0   1  13   1
        1   2   5   2
        2   3  14   2
        3   4   2   2
        4   5   8   3

        :class:`RobustScaler` separately scales each column.

        >>> preprocessor = RobustScaler(columns=["X1", "X2"])
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
            X1     X2  X3
        0 -1.0  0.625   1
        1 -0.5 -0.375   2
        2  0.0  0.750   2
        3  0.5 -0.750   2
        4  1.0  0.000   3

        >>> preprocessor = RobustScaler(
        ...    columns=["X1", "X2"],
        ...    output_columns=["X1_scaled", "X2_scaled"]
        ... )
        >>> preprocessor.fit_transform(ds).to_pandas()  # doctest: +SKIP
           X1  X2  X3  X1_scaled  X2_scaled
        0   1  13   1       -1.0      0.625
        1   2   5   2       -0.5     -0.375
        2   3  14   2        0.0      0.750
        3   4   2   2        0.5     -0.750
        4   5   8   3        1.0      0.000

    Args:
        columns: The columns to separately scale.
        quantile_range: A tuple that defines the lower and upper quantiles. Values
            must be between 0 and 1. Defaults to the 1st and 3rd quartiles:
            ``(0.25, 0.75)``.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.
        quantile_precision: Controls the accuracy and memory footprint of the sketch (K in KLL);
            higher values yield lower error but use more memory. Defaults to 800. See
            https://datasketches.apache.org/docs/KLL/KLLAccuracyAndSize.html
            for details on accuracy and size.
    i   )g      ?g      ?Nr   quantile_ranger   quantile_precisionc                    s.   t    || _|| _|| _t||| _d S r   )r!   r"   r   r   r   r   r#   r   )r%   r   r   r   r   r&   r(   r)   r"     s   

zRobustScaler.__init__r+   r   r,   c                    s   j d dj d g  fddjD }|j| }i _jD ](}|d| d \}}}|jd| d< |jd	| d< |jd
| d< q!S )Nr   g      ?r   c                    s   g | ]
}t | jd qS ))on	quantilesr   )r	   r   r   r   r%   r(   r)   rV     s    z%RobustScaler._fit.<locals>.<listcomp>zapprox_quantile(r8   low_quantile(median(high_quantile()r   r   r   r:   )r%   r+   r   
aggregatedr/   low_qmed_qhigh_qr(   r   r)   r3     s   

zRobustScaler._fitr4   c                    r5   )Nr6   c                    sb    j d| j d } j d| j d } j d| j d }|| }|dkr+t| S | | | S )Nr   r8   r   r   r   )r:   r;   r<   
zeros_like)r6   s_low_qs_medians_high_qr   rA   r(   r)   column_robust_scaler  s   
z<RobustScaler._transform_pandas.<locals>.column_robust_scalerrC   )r%   r4   r   r(   rA   r)   rG     r   zRobustScaler._transform_pandasc                 C   s    | j | j| j| jt| dd dS )Nrg   )r   r   r   r   rg   )r   r   r   r   ri   rA   r(   r(   r)   rj   &  s   
z%RobustScaler._get_serializable_fieldsrl   r   c                 C   s8   |d | _ |d | _|d | _|d | _|d| _d S )Nr   r   r   r   rg   )r   r   r   r   rp   rg   rq   r(   r(   r)   rr   /  s
   



z%RobustScaler._set_serializable_fieldsc                 C   s&   | j j d| jd| jd| jdS )Nrv   z, quantile_range=z), output_columns=r8   )r'   rx   r   r   r   rA   r(   r(   r)   ry   8  s   zRobustScaler.__repr__)rx   r{   r|   r}   DEFAULT_QUANTILE_PRECISIONr   r~   r   rP   r   r   r"   r   r3   rD   r   rG   r   r   rj   rr   ry   r   r(   r(   r&   r)   r     s(    K

		r   )*typingr   r   r   r   r   r   numpyr<   pandasrD   pyarrowrN   pyarrow.computecomputerK   ray.data.aggregater   r	   r
   r   r   r   ray.data.blockr   ray.data.preprocessorr   r   &ray.data.preprocessors.version_supportr   #ray.data.util.data_batch_conversionr   ray.util.annotationsr   r   ray.data.datasetr   r>   r   r   r   r   r(   r(   r(   r)   <module>   s6      
 (
o
h
