o
    `۷i=                     @   sv  d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZmZmZmZmZ d dlZd dlZd dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZm Z m!Z! d dl"m#Z# d d	l$m%Z% d d
l&m'Z' d dl(m)Z)m*Z* ervd dl+m,Z, e -e.Z/de	e0ef de0deej1ej1f fddZ2e*dde%dddG dd de!Z3e*dde%dddG dd de!Z4e*dde%dddG dd de!Z5e*dde%dddG d d! d!e!Z6e*dde%dd"dG d#d$ d$e!Z7d%dd&d'd(d)ee0 d*ed+e8d,ee	e0e9f  f
d-d.Z:	d=d0e8d1e'defd2d3Z;d4ej<d)e0ddfd5d6Z=d7ej>d)e0ddfd8d9Z?d:ej@de8fd;d<ZAdS )>    N)Counter)partial)
TYPE_CHECKINGAnyCallableDictHashableListOptionalSetTupleUnionis_null)BlockAccessor)PreprocessorPreprocessorNotFittedExceptionSerializablePreprocessorBase)make_post_processor)SerializablePreprocessor)BatchFormat)DeveloperAPI	PublicAPI)Datasetstats	input_colreturnc                    sh   | d| d  t  tr,t  }t|}tj fdd|D t d}||fS  \}}||fS )a  Get Arrow arrays for keys and values from encoder stats.

    Args:
        stats: The encoder's stats_ dictionary.
        input_col: The name of the column to get arrays for.

    Returns:
        Tuple of (keys_array, values_array) for the column's ordinal mapping.
    unique_values()c                    s   g | ]} | qS  r   ).0k
stat_valuer   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/preprocessors/encoder.py
<listcomp>=   s    z2_get_unique_value_arrow_arrays.<locals>.<listcomp>type)
isinstancedictsortedkeyspaarrayint64)r   r   sorted_keys
keys_arrayvalues_arrayr   r"   r$   _get_unique_value_arrow_arrays,   s   

 r2   alpha)	stability   z$io.ray.preprocessors.ordinal_encoder)version
identifierc                	       s,  e Zd ZdZddddee dedeee  f fdd	Zd
dde	fddZ
dedeeef fddZdedeejejf fddZdedefddZdejfddZdejdejfddZdejdedejfdd Zeedefd!d"Zdeeef fd#d$Z d%eeef d&efd'd(Z!d)d* Z"  Z#S )+OrdinalEncodera  Encode values within columns as ordered integer values.

    :class:`OrdinalEncoder` encodes categorical features as integers that range from
    :math:`0` to :math:`n - 1`, where :math:`n` is the number of categories.

    If you transform a value that isn't in the fitted datset, then the value is encoded
    as ``float("nan")``.

    Columns must contain either hashable values or lists of hashable values. Also, you
    can't have both scalars and lists in the same column.

    Examples:
        Use :class:`OrdinalEncoder` to encode categorical features as integers.

        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import OrdinalEncoder
        >>> df = pd.DataFrame({
        ...     "sex": ["male", "female", "male", "female"],
        ...     "level": ["L4", "L5", "L3", "L4"],
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> encoder = OrdinalEncoder(columns=["sex", "level"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
           sex  level
        0    1      1
        1    0      2
        2    1      0
        3    0      1

        :class:`OrdinalEncoder` can also be used in append mode by providing the
        name of the output_columns that should hold the encoded values.

        >>> encoder = OrdinalEncoder(columns=["sex", "level"], output_columns=["sex_encoded", "level_encoded"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
              sex level  sex_encoded  level_encoded
        0    male    L4            1              1
        1  female    L5            0              2
        2    male    L3            1              0
        3  female    L4            0              1


        If you transform a value not present in the original dataset, then the value
        is encoded as ``float("nan")``.

        >>> df = pd.DataFrame({"sex": ["female"], "level": ["L6"]})
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> encoder.transform(ds).to_pandas()  # doctest: +SKIP
           sex  level
        0    0    NaN

        :class:`OrdinalEncoder` can also encode categories in a list.

        >>> df = pd.DataFrame({
        ...     "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
        ...     "genre": [
        ...         ["comedy", "action", "sports"],
        ...         ["animation", "comedy",  "action"],
        ...         ["documentary"],
        ...     ],
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> encoder = OrdinalEncoder(columns=["genre"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
                                    name      genre
        0                 Shaolin Soccer  [2, 0, 4]
        1                          Moana  [1, 2, 0]
        2  The Smartest Guys in the Room        [3]

    Args:
        columns: The columns to separately encode.
        encode_lists: If ``True``, encode list elements.  If ``False``, encode
            whole lists (i.e., replace each list with an integer). ``True``
            by default.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.

    .. seealso::

        :class:`OneHotEncoder`
            Another preprocessor that encodes categorical data.
    TN)encode_listsoutput_columnscolumnsr9   r:   c                   s(   t    || _|| _t||| _d S N)super__init__r;   r9   r   #_derive_and_validate_output_columnsr:   )selfr;   r9   r:   	__class__r   r$   r>      s   

zOrdinalEncoder.__init__datasetr   r   c                    0   j j fddt dd dd jd S )Nc                    s   t  jj| dS )N)rC   r;   r9   key_gen)compute_unique_value_indicesr;   r9   rE   rC   r@   r   r$   <lambda>   s    z%OrdinalEncoder._fit.<locals>.<lambda>c                 S      d|  dS Nzunique(r   r   colr   r   r$   rI          c                 S   rJ   Nr   r   r   rL   r   r   r$   rI      rN   stat_fnpost_process_fnstat_key_fnpost_key_fnr;   stat_computation_planadd_callable_statunique_post_fnr;   r@   rC   r   rH   r$   _fit   s   zOrdinalEncoder._fitcolumn_namec                 C   s<   | j d| d }t|tr|S |\}}dd t||D S )a  Get the ordinal mapping for a column as a dict.

        Stats can be stored in either:
        - Dict format: {value: index} (from pandas-style processing)
        - Arrow format: (keys_array, values_array) tuple

        This method returns a dict in either case.
        r   r   c                 S   s   i | ]\}}|  |  qS r   )as_pyr    r!   vr   r   r$   
<dictcomp>   s    z3OrdinalEncoder._get_ordinal_map.<locals>.<dictcomp>)stats_r(   r)   zip)r@   r[   r#   r0   r1   r   r   r$   _get_ordinal_map   s
   	
zOrdinalEncoder._get_ordinal_mapr   c                 C      t | j|S z%Get Arrow arrays for keys and values.r2   r`   r@   r   r   r   r$   _get_arrow_arrays      z OrdinalEncoder._get_arrow_arrayselementc                   s0   |  | | jr fdd|D S  t|S )Nc                    s   g | ]}  |qS r   getr    xordinal_mapr   r$   r%      s    z7OrdinalEncoder._encode_list_element.<locals>.<listcomp>)rb   r9   rk   tuple)r@   ri   r[   r   rn   r$   _encode_list_element   s   
z#OrdinalEncoder._encode_list_elementdfc                    s@   t |g jR   dtjf fdd}| j || j< |S )Nsc                    s2   t  r  fddS  j} |S )Nc                    s   j |  jdS )N)r[   )rq   name)elem)rs   r@   r   r$   rI      s    zROrdinalEncoder._transform_pandas.<locals>.column_ordinal_encoder.<locals>.<lambda>)_is_series_composed_of_listsmaprb   rt   rs   s_valuesr@   )rs   r$   column_ordinal_encoder   s   
z@OrdinalEncoder._transform_pandas.<locals>.column_ordinal_encoder)_validate_dfr;   pdSeriesapplyr:   )r@   rr   r{   r   rz   r$   _transform_pandas   s   	z OrdinalEncoder._transform_pandastablec           
      C      t |g| jR   | jD ](}|j|j}tj|s!tj|r4|	 }| 
|}tjj|dd  S qt| j| jD ]\}}||}| ||}	t|||	}q<|S a  Transform using fast native PyArrow operations for scalar columns.

        List-type columns are preferably handled by _transform_pandas, which is selected
        via _determine_transform_to_use when a PyArrow schema is available. However,
        for pandas-backed datasets (PandasBlockSchema), we can't detect list columns
        until runtime, so we fall back to pandas here if list columns are found.
        F)preserve_index)_validate_arrowr;   schemafieldr'   r,   typesis_listis_large_list	to_pandasr   Tablefrom_pandasra   r:   column_encode_column_vectorizedr   	for_blockupsert_column
r@   r   col_namecol_typerr   	result_dfr   
output_colr   encoded_columnr   r   r$   _transform_arrow      	



zOrdinalEncoder._transform_arrowr   c                 C   s@   |  |\}}|j|jkrt||j}t||}t||S )a  Encode column using PyArrow's vectorized pc.index_in.

        Unseen categories are encoded as null in the output, which becomes NaN
        when converted to pandas. Null values should be validated before calling
        this method via _validate_arrow.
        )rg   r'   pccastindex_intake)r@   r   r   r0   r1   indicesr   r   r$   r     s
   	z(OrdinalEncoder._encode_column_vectorizedc                 C      t jS r<   r   ARROWclsr   r   r$   preferred_batch_format     z%OrdinalEncoder.preferred_batch_formatc                 C      | j | j| jt| dd dS )N_fitted)r;   r:   r9   r   )r;   r:   r9   getattrrz   r   r   r$   _get_serializable_fields  
   
z'OrdinalEncoder._get_serializable_fieldsfieldsr6   c                 C   .   |d | _ |d | _|d | _|d| _d S )Nr;   r:   r9   r   )r;   r:   r9   rk   r   r@   r   r6   r   r   r$   _set_serializable_fields%     


z'OrdinalEncoder._set_serializable_fieldsc                 C   &   | j j d| jd| jd| jdS )N	(columns=z, encode_lists=, output_columns=r   )rB   __name__r;   r9   r:   rz   r   r   r$   __repr__-     zOrdinalEncoder.__repr__)$r   
__module____qualname____doc__r	   strboolr
   r>   r   rZ   r   r   intrb   r   r,   Arrayrg   listrq   r}   	DataFramer   r   r   ChunkedArrayr   classmethodr   r   r   r   r   r   __classcell__r   r   rA   r$   r8   D   s<    Y
	
r8   z$io.ray.preprocessors.one_hot_encoderc                	       s&  e Zd ZdZddddee deeeef  deee  f fddZ	d	d
de
fddZeedefddZdedeeef fddZdejfddZdejdejfddZdedeejejf fddZdejdedejfddZdeeef fd d!Zd"eeef d#efd$d%Zd&d' Z   Z!S )(OneHotEncodera-  `One-hot encode <https://en.wikipedia.org/wiki/One-hot#Machine_learning_and_statistics>`_
    categorical data.

    This preprocessor transforms each specified column into a one-hot encoded vector.
    Each element in the vector corresponds to a unique category in the column, with a
    value of 1 if the category matches and 0 otherwise.

    If a category is infrequent (based on ``max_categories``) or not present in the
    fitted dataset, it is encoded as all 0s.

    Columns must contain hashable objects or lists of hashable objects.

    .. note::
        Lists are treated as categories. If you want to encode individual list
        elements, use :class:`MultiHotEncoder`.

    Example:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import OneHotEncoder
        >>>
        >>> df = pd.DataFrame({"color": ["red", "green", "red", "red", "blue", "green"]})
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> encoder = OneHotEncoder(columns=["color"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
               color
        0  [0, 0, 1]
        1  [0, 1, 0]
        2  [0, 0, 1]
        3  [0, 0, 1]
        4  [1, 0, 0]
        5  [0, 1, 0]

        OneHotEncoder can also be used in append mode by providing the
        name of the output_columns that should hold the encoded values.

        >>> encoder = OneHotEncoder(columns=["color"], output_columns=["color_encoded"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
           color color_encoded
        0    red     [0, 0, 1]
        1  green     [0, 1, 0]
        2    red     [0, 0, 1]
        3    red     [0, 0, 1]
        4   blue     [1, 0, 0]
        5  green     [0, 1, 0]

        If you one-hot encode a value that isn't in the fitted dataset, then the
        value is encoded with zeros.

        >>> df = pd.DataFrame({"color": ["yellow"]})
        >>> batch = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> encoder.transform(batch).to_pandas()  # doctest: +SKIP
            color color_encoded
        0  yellow     [0, 0, 0]

        Likewise, if you one-hot encode an infrequent value, then the value is encoded
        with zeros.

        >>> encoder = OneHotEncoder(columns=["color"], max_categories={"color": 2})
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
            color
        0  [1, 0]
        1  [0, 1]
        2  [1, 0]
        3  [1, 0]
        4  [0, 0]
        5  [0, 1]

    Args:
        columns: The columns to separately encode.
        max_categories: The maximum number of features to create for each column.
            If a value isn't specified for a column, then a feature is created
            for every category in that column.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.

    .. seealso::

        :class:`MultiHotEncoder`
            If you want to encode individual list elements, use
            :class:`MultiHotEncoder`.

        :class:`OrdinalEncoder`
            If your categories are ordered, you may want to use
            :class:`OrdinalEncoder`.
    Nmax_categoriesr:   r;   r   r:   c                   ,   t    || _|pi | _t||| _d S r<   r=   r>   r;   r   r   r?   r:   r@   r;   r   r:   rA   r   r$   r>        


zOneHotEncoder.__init__rC   r   r   c                    rD   )Nc                       t  jd| jdS )NFrC   r;   r9   rE   r   rF   r;   r   rG   rH   r   r$   rI         z$OneHotEncoder._fit.<locals>.<lambda>c                 S   rJ   rK   r   rL   r   r   r$   rI     rN   c                 S   rJ   rO   r   rL   r   r   r$   rI     rN   rP   rU   rY   r   rH   r$   rZ        zOneHotEncoder._fitc                 C   r   r<   r   r   r   r   r$   r     r   z$OneHotEncoder.preferred_batch_formatr^   r   c                 C   s2   t |ttjfrt|}t |tr||dS dS )N)r(   r   npndarrayrp   r   rk   )r@   r^   r   r   r   r$   safe_get  s
   
zOneHotEncoder.safe_getrr   c           	         s   t |g jR   t j jD ]D\}} jd| d t}tjt||ftjd}|| 	 fdd
 }|dk}t|d }d|||| f< | ||< q|S )	Nr   r   dtypec                    s     | S r<   )r   )r^   r@   r   r   r$   rI     rN   z1OneHotEncoder._transform_pandas.<locals>.<lambda>r   r   r5   )r|   r;   ra   r:   r`   lenr   zerosuint8r   to_numpynonzerotolist)	r@   rr   r   output_columnnum_categoriesone_hotcodesvalid_category_masknon_zero_indicesr   r   r$   r     s    zOneHotEncoder._transform_pandasr   c           
      C   r   r   )r   r;   r   r   r'   r,   r   r   r   r   r   r   r   ra   r:   r   _encode_column_one_hotr   r   r   r   r   r   r$   r     r   zOneHotEncoder._transform_arrowr   c                 C   rc   rd   re   rf   r   r   r$   rg     rh   zOneHotEncoder._get_arrow_arraysr   c                 C   s   |  |\}}t|}|j|jkrt||j}t||}t|d}t|}| }	tj	||ftj
d}
|	dk}t|d }t|dkrOd|
||	| f< tj|
 |S )zEncode a column to one-hot vectors using Arrow arrays.

        Unseen categories are encoded as all-zeros vectors, matching the pandas
        behavior. Null values should be validated before calling this method
        via _validate_arrow.
        r   r   r   r5   )rg   r   r'   r   r   r   	fill_nullr   r   r   r   r   r,   FixedSizeListArrayfrom_arraysravel)r@   r   r   r0   _r   r   indices_fillednum_rows
indices_npone_hot_matrix
valid_maskvalid_indicesr   r   r$   r     s   	z$OneHotEncoder._encode_column_one_hotc                 C   r   Nr   )r;   r:   r   r   r;   r:   r   r   rz   r   r   r$   r     r   z&OneHotEncoder._get_serializable_fieldsr   r6   c                 C   r   Nr;   r:   r   r   r;   r:   r   rk   r   r   r   r   r$   r   $  r   z&OneHotEncoder._set_serializable_fieldsc                 C   r   Nr   z, max_categories=r   r   rB   r   r;   r   r:   rz   r   r   r$   r   ,  r   zOneHotEncoder.__repr__)"r   r   r   r   r	   r   r
   r   r   r>   r   rZ   r   r   r   r   r   r   r}   r   r   r,   r   r   r   r   rg   r   r   r   r   r   r   r   r   r   rA   r$   r   5  s:    ]

&r   z&io.ray.preprocessors.multi_hot_encoderc                	       s   e Zd ZdZddddee deeeef  deee  f fddZ	d	d
de
fddZdejfddZdeeef fddZdeeef defddZdd Z  ZS )MultiHotEncodera  Multi-hot encode categorical data.

    This preprocessor replaces each list of categories with an :math:`m`-length binary
    list, where :math:`m` is the number of unique categories in the column or the value
    specified in ``max_categories``. The :math:`i\\text{-th}` element of the binary list
    is :math:`1` if category :math:`i` is in the input list and :math:`0` otherwise.

    Columns must contain hashable objects or lists of hashable objects.
    Also, you can't have both types in the same column.

    .. note::
        The logic is similar to scikit-learn's [MultiLabelBinarizer][1]

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import MultiHotEncoder
        >>>
        >>> df = pd.DataFrame({
        ...     "name": ["Shaolin Soccer", "Moana", "The Smartest Guys in the Room"],
        ...     "genre": [
        ...         ["comedy", "action", "sports"],
        ...         ["animation", "comedy",  "action"],
        ...         ["documentary"],
        ...     ],
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>>
        >>> encoder = MultiHotEncoder(columns=["genre"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
                                    name            genre
        0                 Shaolin Soccer  [1, 0, 1, 0, 1]
        1                          Moana  [1, 1, 1, 0, 0]
        2  The Smartest Guys in the Room  [0, 0, 0, 1, 0]

        :class:`MultiHotEncoder` can also be used in append mode by providing the
        name of the output_columns that should hold the encoded values.

        >>> encoder = MultiHotEncoder(columns=["genre"], output_columns=["genre_encoded"])
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
                                    name                        genre    genre_encoded
        0                 Shaolin Soccer     [comedy, action, sports]  [1, 0, 1, 0, 1]
        1                          Moana  [animation, comedy, action]  [1, 1, 1, 0, 0]
        2  The Smartest Guys in the Room                [documentary]  [0, 0, 0, 1, 0]

        If you specify ``max_categories``, then :class:`MultiHotEncoder`
        creates features for only the most frequent categories.

        >>> encoder = MultiHotEncoder(columns=["genre"], max_categories={"genre": 3})
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
                                    name      genre
        0                 Shaolin Soccer  [1, 1, 1]
        1                          Moana  [1, 1, 0]
        2  The Smartest Guys in the Room  [0, 0, 0]
        >>> encoder.stats_  # doctest: +SKIP
        OrderedDict([('unique_values(genre)', {'comedy': 0, 'action': 1, 'sports': 2})])

    Args:
        columns: The columns to separately encode.
        max_categories: The maximum number of features to create for each column.
            If a value isn't specified for a column, then a feature is created
            for every unique category in that column.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.

    .. seealso::

        :class:`OneHotEncoder`
            If you're encoding individual categories instead of lists of
            categories, use :class:`OneHotEncoder`.

        :class:`OrdinalEncoder`
            If your categories are ordered, you may want to use
            :class:`OrdinalEncoder`.

    [1]: https://scikit-learn.org/stable/modules/generated/sklearn.preprocessing.MultiLabelBinarizer.html
    Nr   r;   r   r:   c                   r   r<   r   r   rA   r   r$   r>     r   zMultiHotEncoder.__init__rC   r   r   c                    rD   )Nc                    r   )NTr   r   rG   rH   r   r$   rI     r   z&MultiHotEncoder._fit.<locals>.<lambda>c                 S   rJ   rK   r   rL   r   r   r$   rI     rN   c                 S   rJ   rO   r   rL   r   r   r$   rI     rN   rP   rU   rY   r   rH   r$   rZ     r   zMultiHotEncoder._fitrr   c                    s^   t |g jR   dtdtf fdd}t j jD ]\}}|| t||d||< q|S )Nri   rt   c                   sR   t | tjr|  } nt | ts| g} jd| d }t|   fdd|D S )Nr   r   c                    s   g | ]}  |d qS )r   rj   rl   counterr   r$   r%         zJMultiHotEncoder._transform_pandas.<locals>.encode_list.<locals>.<listcomp>)r(   r   r   r   r   r`   r   )ri   rt   r   rz   r   r$   encode_list  s   

z6MultiHotEncoder._transform_pandas.<locals>.encode_list)rt   )r|   r;   r   r   ra   r:   rw   r   )r@   rr   r   r   r   r   rz   r$   r     s
   	z!MultiHotEncoder._transform_pandasc                 C   r   r   r   rz   r   r   r$   r     r   z(MultiHotEncoder._get_serializable_fieldsr   r6   c                 C   r   r   r   r   r   r   r$   r     r   z(MultiHotEncoder._set_serializable_fieldsc                 C   s&   | j j d| jd| jd| j dS r   r   rz   r   r   r$   r     r   zMultiHotEncoder.__repr__)r   r   r   r   r	   r   r
   r   r   r>   r   rZ   r}   r   r   r   r   r   r   r   r   r   rA   r$   r   4  s     T
r   z"io.ray.preprocessors.label_encoderc                       s   e Zd ZdZdddedee f fddZdd	d
efddZde	j
fddZd!ddZde	j
fddZd
ee fddZd
ee fddZd
eeef fddZdeeef defddZdd  Z  ZS )"LabelEncodera
  Encode labels as integer targets.

    :class:`LabelEncoder` encodes labels as integer targets that range from
    :math:`0` to :math:`n - 1`, where :math:`n` is the number of unique labels.

    If you transform a label that isn't in the fitted datset, then the label is encoded
    as ``float("nan")``.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> df = pd.DataFrame({
        ...     "sepal_width": [5.1, 7, 4.9, 6.2],
        ...     "sepal_height": [3.5, 3.2, 3, 3.4],
        ...     "species": ["setosa", "versicolor", "setosa", "virginica"]
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>>
        >>> from ray.data.preprocessors import LabelEncoder
        >>> encoder = LabelEncoder(label_column="species")
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
           sepal_width  sepal_height  species
        0          5.1           3.5        0
        1          7.0           3.2        1
        2          4.9           3.0        0
        3          6.2           3.4        2

        You can also provide the name of the output column that should hold the encoded
        labels if you want to use :class:`LabelEncoder` in append mode.

        >>> encoder = LabelEncoder(label_column="species", output_column="species_encoded")
        >>> encoder.fit_transform(ds).to_pandas()  # doctest: +SKIP
           sepal_width  sepal_height     species  species_encoded
        0          5.1           3.5      setosa                0
        1          7.0           3.2  versicolor                1
        2          4.9           3.0      setosa                0
        3          6.2           3.4   virginica                2

        If you transform a label not present in the original dataset, then the new
        label is encoded as ``float("nan")``.

        >>> df = pd.DataFrame({
        ...     "sepal_width": [4.2],
        ...     "sepal_height": [2.7],
        ...     "species": ["bracteata"]
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> encoder.transform(ds).to_pandas()  # doctest: +SKIP
           sepal_width  sepal_height  species
        0          4.2           2.7      NaN

    Args:
        label_column: A column containing labels that you want to encode.
        output_column: The name of the column that will contain the encoded
            labels. If None, the output column will have the same name as the
            input column.

    .. seealso::

        :class:`OrdinalEncoder`
            If you're encoding ordered features, use :class:`OrdinalEncoder` instead of
            :class:`LabelEncoder`.
    Nr   label_columnr   c                   s   t    || _|p|| _d S r<   )r=   r>   r   r   )r@   r   r   rA   r   r$   r>     s   
zLabelEncoder.__init__rC   r   r   c                    s2   j j fddt dd dd jgd S )Nc                    s   t  jg| dS N)rC   r;   rE   )rF   r   rG   rH   r   r$   rI     s
    z#LabelEncoder._fit.<locals>.<lambda>c                 S   rJ   rK   r   rL   r   r   r$   rI   !  rN   c                 S   rJ   rO   r   rL   r   r   r$   rI   "  rN   rP   )rV   rW   rX   r   rY   r   rH   r$   rZ     s   zLabelEncoder._fitrr   c                    s:   t | j dtjf fdd}| j || j< |S )Nrs   c                    s    j d| j d }| |S rO   )r`   rt   rw   rx   rz   r   r$   column_label_encoder*  s   
z<LabelEncoder._transform_pandas.<locals>.column_label_encoder)r|   r   r}   r~   	transformr   )r@   rr   r   r   rz   r$   r   '  s   zLabelEncoder._transform_pandasdsc                 C   sF   |   }|tjjtjjfv rtd|  }|j| jfdt	j
i|S )a/  Inverse transform the given dataset.

        Args:
            ds: Input Dataset that has been fitted and/or transformed.

        Returns:
            ray.data.Dataset: The inverse transformed Dataset.

        Raises:
            PreprocessorNotFittedException: if ``fit`` is not called yet.
        z1`fit` must be called before `inverse_transform`, batch_format)
fit_statusr   	FitStatusPARTIALLY_FITTED
NOT_FITTEDr   _get_transform_configmap_batches_inverse_transform_pandasr   PANDAS)r@   r   r   kwargsr   r   r$   inverse_transform1  s    zLabelEncoder.inverse_transformc                    s.   dt jf fdd}| j || j< |S )Nrs   c                    s,   dd  j d j d  D }| |S )Nc                 S   s   i | ]\}}||qS r   r   )r    keyvaluer   r   r$   r_   P  s    zXLabelEncoder._inverse_transform_pandas.<locals>.column_label_decoder.<locals>.<dictcomp>r   r   )r`   r   itemsrw   )rs   inverse_valuesrz   r   r$   column_label_decoderO  s   
zDLabelEncoder._inverse_transform_pandas.<locals>.column_label_decoder)r}   r~   r   r   r   )r@   rr   r  r   rz   r$   r  N  s   	z&LabelEncoder._inverse_transform_pandasc                 C      | j gS r<   )r   rz   r   r   r$   get_input_columns[     zLabelEncoder.get_input_columnsc                 C   r  r<   r   rz   r   r   r$   get_output_columns^  r  zLabelEncoder.get_output_columnsc                 C   s   | j | jt| dd dS )Nr   )r   r   r   )r   r   r   rz   r   r   r$   r   a  s   
z%LabelEncoder._get_serializable_fieldsr   r6   c                 C   s$   |d | _ |d | _|d| _d S )Nr   r   r   )r   r   rk   r   r   r   r   r$   r   h  s   

z%LabelEncoder._set_serializable_fieldsc                 C   s   | j j d| jd| jdS )Nz(label_column=z, output_column=r   )rB   r   r   r   rz   r   r   r$   r   o  s   zLabelEncoder.__repr__)r   r   r   r   )r   r   r   r   r   r
   r>   r   rZ   r}   r   r   r	  r  r	   r  r  r   r   r   r   r   r   r   r   r   rA   r$   r     s     @

r   z io.ray.preprocessors.categorizerc                	       s   e Zd ZdZ		ddee deeeej	f  deee  f fddZ
dd	d
efddZdejfddZd
eeef fddZdeeef defddZdd Z  ZS )Categorizera^
  Convert columns to ``pd.CategoricalDtype``.

    Use this preprocessor with frameworks that have built-in support for
    ``pd.CategoricalDtype`` like LightGBM.

    .. warning::

        If you don't specify ``dtypes``, fit this preprocessor before splitting
        your dataset into train and test splits. This ensures categories are
        consistent across splits.

    Examples:
        >>> import pandas as pd
        >>> import ray
        >>> from ray.data.preprocessors import Categorizer
        >>>
        >>> df = pd.DataFrame(
        ... {
        ...     "sex": ["male", "female", "male", "female"],
        ...     "level": ["L4", "L5", "L3", "L4"],
        ... })
        >>> ds = ray.data.from_pandas(df)  # doctest: +SKIP
        >>> categorizer = Categorizer(columns=["sex", "level"])
        >>> categorizer.fit_transform(ds).schema().types  # doctest: +SKIP
        [CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5'], ordered=False)]

        :class:`Categorizer` can also be used in append mode by providing the
        name of the output_columns that should hold the categorized values.

        >>> categorizer = Categorizer(columns=["sex", "level"], output_columns=["sex_cat", "level_cat"])
        >>> categorizer.fit_transform(ds).to_pandas()  # doctest: +SKIP
              sex level sex_cat level_cat
        0    male    L4    male        L4
        1  female    L5  female        L5
        2    male    L3    male        L3
        3  female    L4  female        L4

        If you know the categories in advance, you can specify the categories with the
        ``dtypes`` parameter.

        >>> categorizer = Categorizer(
        ...     columns=["sex", "level"],
        ...     dtypes={"level": pd.CategoricalDtype(["L3", "L4", "L5", "L6"], ordered=True)},
        ... )
        >>> categorizer.fit_transform(ds).schema().types  # doctest: +SKIP
        [CategoricalDtype(categories=['female', 'male'], ordered=False), CategoricalDtype(categories=['L3', 'L4', 'L5', 'L6'], ordered=True)]

    Args:
        columns: The columns to convert to ``pd.CategoricalDtype``.
        dtypes: An optional dictionary that maps columns to ``pd.CategoricalDtype``
            objects. If you don't include a column in ``dtypes``, the categories
            are inferred.
        output_columns: The names of the transformed columns. If None, the transformed
            columns will be the same as the input columns. If not None, the length of
            ``output_columns`` must match the length of ``columns``, othwerwise an error
            will be raised.

    Nr;   dtypesr:   c                    s0   t    |s	i }|| _|| _t||| _d S r<   )r=   r>   r;   r  r   r?   r:   )r@   r;   r  r:   rA   r   r$   r>     s   

zCategorizer.__init__rC   r   r   c                    s   fddj D   jjO  _ sS dtttf dtjfdd}jj fddt	t
d	d
|gddd dd  d S )Nc                    s   g | ]	}| j vr|qS r   )r  r    r   rz   r   r$   r%     s    z$Categorizer._fit.<locals>.<listcomp>unique_indicesr   c                 S   s   t |  S r<   )r}   CategoricalDtyper+   )r  r   r   r$   callback  s   z"Categorizer._fit.<locals>.callbackc                    s   t  | dS r   )rF   rG   )columns_to_getrC   r   r$   rI     s
    z"Categorizer._fit.<locals>.<lambda>Tdrop_na_values)base_fn	callbacksc                 S   rJ   rK   r   rL   r   r   r$   rI     rN   c                 S   s   | S r<   r   rL   r   r   r$   rI     s    rP   )r;   r`   r  r   r   r}   r  rV   rW   r   rX   )r@   rC   r  r   )r  rC   r@   r$   rZ     s$   
zCategorizer._fitrr   c                 C   s   || j  | j|| j< |S r<   )r;   astyper`   r:   )r@   rr   r   r   r$   r     s   zCategorizer._transform_pandasc                 C   sB   | j | jt| dd t| dr| jrdd | j D dS d dS )Nr   r  c                 S   s$   i | ]\}}|t |j|jd qS )
categoriesordered)r   r   r!  )r    rM   r   r   r   r$   r_     s    z8Categorizer._get_serializable_fields.<locals>.<dictcomp>)r;   r:   r   r  )r;   r:   r   hasattrr  r  rz   r   r   r$   r     s   
	z$Categorizer._get_serializable_fieldsr   r6   c                 C   sJ   | drdd |d  D ni | _|d | _|d | _| d| _d S )Nr  c                 S   s(   i | ]\}}|t j|d  |d dqS )r   r!  r  )r}   r  )r    rM   
dtype_datar   r   r$   r_     s    z8Categorizer._set_serializable_fields.<locals>.<dictcomp>r;   r:   r   )rk   r  r  r;   r:   r   r   r   r   r$   r     s   



z$Categorizer._set_serializable_fieldsc                 C   r   )Nr   z	, dtypes=r   r   )rB   r   r;   r  r:   rz   r   r   r$   r     s   zCategorizer.__repr__)NN)r   r   r   r   r	   r   r
   r   r}   r  r>   r   rZ   r   r   r   r   r   r   r   r   r   r   rA   r$   r  s  s     >
r  T)r9   r   rC   r   r;   rE   r9   r   c                    s  |d u ri }t  }|D ]}||vrtd| d  dqdtjdtffdddtjdtttt f f fd	d
}| j|dd}fdd D }	|j	d dD ]3}
|

 D ],\}}|D ]%}dd |
 D }||v rytt||| }|	| |  q_qYqS|	S )NzYou set `max_categories` for z, which is not present in .rM   r   c                    sN   t | rrt   fdd}| |  S | dd } t| jdd S )Nc                    s     |  | S r<   )update)ri   r   r   r$   update_counter!  s   
z\compute_unique_value_indices.<locals>.get_pd_value_counts_per_column.<locals>.update_counterc                 S   s   t | S r<   )rp   )rm   r   r   r$   rI   )  s    zVcompute_unique_value_indices.<locals>.get_pd_value_counts_per_column.<locals>.<lambda>F)dropna)rv   r   rw   value_countsto_dict)rM   r&  )r9   r   r$   get_pd_value_counts_per_column  s   
zDcompute_unique_value_indices.<locals>.get_pd_value_counts_per_columnrr   c                    sJ   | j  }i } D ]}||v r| | g||< q	td| d| |S )NzColumn 'z2' does not exist in DataFrame, which has columns: )r;   r   
ValueError)rr   
df_columnsresultrM   )r;   r*  r   r$   get_pd_value_counts,  s   
z9compute_unique_value_indices.<locals>.get_pd_value_countspandas)r   c                    s   i | ]} |t  qS r   )set)r    rM   rG   r   r$   r_   :  r   z0compute_unique_value_indices.<locals>.<dictcomp>)
batch_sizec                 S   s   i | ]\}}|d ur||qS r<   r   r]   r   r   r$   r_   >  s    )r0  r+  r}   r~   r   r   r   r	   r  iter_batchesr  r)   r   most_commonr%  r+   )rC   r;   rE   r9   r   columns_setr   r.  value_counts_dsunique_values_by_colbatchrM   countersr   r   )r;   r9   r*  rE   r$   rF     s:   &rF   Fr  r   c                    s`   dt dtttf f fdddtd dttd tttf f f fdd}|tjkr.|S S )	a  
    Returns a post-processing function that generates an encoding map by
    sorting the unique values produced during aggregation or stats computation.

    Args:
        drop_na_values: If True, NA/null values will be silently dropped from the
            encoding map. If False, raises an error if any NA/null values are present.
        batch_format: Determines the output format of the encoding map.
            - If BatchFormat.ARROW: Returns Arrow format (tuple of arrays) for scalar
              types, or dict format for list types that PyArrow can't sort.
            - Otherwise: Returns pandas dict format {value: index}.

    Returns:
        A callable that takes unique values and returns an encoding map.
        The map format depends on batch_format and input types:
        - Dict format: {value: int} - used for pandas path or list-type data
        - Arrow format: (keys_array, values_array) - used for Arrow path with scalar data
    valuesr   c                    sB   t dd | D r stddd | D }dd tt|D S )a  
        Generate an encoding map from a list of unique values using Python sorting.

        Args:
            values: List of unique values to encode (can include lists/tuples).

        Returns:
            Dict mapping each value to a unique integer index.
            List values are converted to tuples for hashability.

        Raises:
            ValueError: If null values are present and drop_na_values is False.
        c                 s   s    | ]}t |V  qd S r<   r   r    r^   r   r   r$   	<genexpr>r  s    z:unique_post_fn.<locals>.gen_value_index.<locals>.<genexpr>]Unable to fit column because it contains null values. Consider imputing missing values first.c                 S   s   g | ]}t |s|qS r   r   r:  r   r   r$   r%   x  r   z;unique_post_fn.<locals>.gen_value_index.<locals>.<listcomp>c                 S   s(   i | ]\}}t |ts|nt||qS r   )r(   r   rp   )r    ir^   r   r   r$   r_   z  s    z;unique_post_fn.<locals>.gen_value_index.<locals>.<dictcomp>)anyr+  	enumerater*   )r9  non_null_valuesr  r   r$   gen_value_indexb  s   
z'unique_post_fn.<locals>.gen_value_index)zpa.ListScalarpa.Array)rB  rB  c                    s   t | tjr	| j} tj| jstj| jr|  S  r%t	
| } nt	t	|  r3tdt	| }t	| |}tjtt|t d}||fS )a  Generate an encoding map from unique values using Arrow-native operations.

        Args:
            values: The aggregation result as a pa.ListScalar (list of unique values)
                or a pa.Array of values directly.

        Returns:
            For scalar types that PyArrow can sort natively, returns a tuple of
            (sorted_keys, indices) as pa.Array. For list types that require fallback,
            returns a dict mapping {value: index}.

        Note:
            PyArrow's sort_indices doesn't support list types, so we fall back to
            dict format for columns containing lists. The _transform_arrow method
            handles this by detecting dict-format stats and converting as needed.
        r<  r&   )r(   r,   
ListScalarr9  r   r   r'   r   	to_pylistr   	drop_nullr>  r   r\   r+  sort_indicesr   r-   ranger   r.   )r9  sorted_indicessorted_valuesr1   r  rA  r   r$    gen_value_index_arrow_from_arrow  s   
z8unique_post_fn.<locals>.gen_value_index_arrow_from_arrow)r	   r   r   r   r   r   r   r   )r  r   rK  r   rJ  r$   rX   L  s   
1rX   rr   c                    s*    fdd|D }|rt d| dd S )Nc                    s"   g | ]} |   j r|qS r   )isnullr9  r>  r  rr   r   r$   r%     s   " z _validate_df.<locals>.<listcomp>Unable to transform columns J because they contain null values. Consider imputing missing values first.r+  )rr   r;   null_columnsr   rM  r$   r|     s   
r|   r   c                    s*    fdd|D }|rt d| ddS )a  Validate that specified columns in an Arrow table do not contain null values.

    Args:
        table: The Arrow table to validate.
        *columns: Column names to check for null values.

    Raises:
        ValueError: If any of the specified columns contain null values.
    c              	      s*   g | ]}t t  | r|qS r   )r   r>  r   r   r\   r  r   r   r$   r%     s
    z#_validate_arrow.<locals>.<listcomp>rN  rO  NrP  )r   r;   rQ  r   rR  r$   r     s   


r   seriesc                 C   s4   t dd | D d }tjj| jot|ttj	fS )Nc                 s   s    | ]	}|d ur|V  qd S r<   r   )r    ri   r   r   r$   r;    s    z/_is_series_composed_of_lists.<locals>.<genexpr>)
nextr/  apir   is_object_dtyper   r(   r   r   r   )rS  first_not_none_elementr   r   r$   rv     s   
rv   )FN)Bloggingcollectionsr   	functoolsr   typingr   r   r   r   r   r	   r
   r   r   r   numpyr   r/  r}   pandas.api.typespyarrowr,   pyarrow.computecomputer   ray.data._internal.utilr   ray.data.blockr   ray.data.preprocessorr   r   r   ray.data.preprocessors.utilsr   &ray.data.preprocessors.version_supportr   #ray.data.util.data_batch_conversionr   ray.util.annotationsr   r   ray.data.datasetr   	getLoggerr   loggerr   r   r2   r8   r   r   r   r  r   r   rF   rX   r   r|   r   r   r~   rv   r   r   r   r$   <module>   s    0



 p
 ~ 
 !
 
E
j	