o
    `۷i3                  
   @   s   d dl Z d dlmZ d dlmZmZmZmZmZ d dl	Z	d dl
mZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ d dlmZ e eZdZ de!de"de#de$de!f
ddZ%eddG dd deZdS )    N)partial)AnyCallableDictOptionalUnion)Version)
Checkpoint)TRAIN_DATASET_KEY)
GenDataset)_log_deprecation_warning)RayTrainReportCallbackXGBoostConfig)XGBoostTrainer)	PublicAPIaK  Passing in `xgboost.train` kwargs such as `params`, `num_boost_round`, `label_column`, etc. to `XGBoostTrainer` is deprecated in favor of the new API which accepts a training function, similar to the other DataParallelTrainer APIs (ex: TorchTrainer). See this issue for more context: https://github.com/ray-project/ray/issues/50042configlabel_columnnum_boost_rounddataset_keysxgboost_train_kwargsc                 C   s  t j }d }|}|r$t|}| }|| }td| d|d t jt	}	|	
  }
dd |D }dd | D }|
j|dd|
| }}tj||d	}|t	fg}| D ]\}}|j|dd|| }}|tj||d	|f q\i }tj| f|||||d
| d S )Nz7Model loaded from checkpoint will train for additional zY iterations (trees) in order to achieve the target number of iterations (num_boost_round=z).c                 S   s"   i | ]}|t kr|tj|qS  )r
   raytrainget_dataset_shard).0kr   r   W/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/train/xgboost/xgboost_trainer.py
<dictcomp>6   s
    z0_xgboost_train_fn_per_worker.<locals>.<dictcomp>c                 S   s   i | ]\}}||   qS r   )materialize	to_pandas)r   r   dr   r   r   r   ;   s       )axis)label)dtrainevalsevals_resultr   	xgb_model)r   r   get_checkpointr   	get_modelnum_boosted_roundsloggerinfor   r
   r   r   itemsdropxgboostDMatrixappend)r   r   r   r   r   
checkpointstarting_modelremaining_itersstarting_itertrain_ds_itertrain_dfeval_ds_iterseval_dfstrain_Xtrain_yr$   r%   	eval_nameeval_dfeval_Xeval_yr&   r   r   r   _xgboost_train_fn_per_worker   sL   



r@   beta)	stabilityc                       sF  e Zd ZdZdZdZ	ddddddddddddddeeeg df ee	gdf f  dee	 dee
 deejj d	eejj d
ee	eef  deejj dee dee	eef  dee dee	eef  dee f fddZde	d	eejj d
ee	eef  dee dee dee	gdf fddZededejfddZ  ZS )r   a  A Trainer for distributed data-parallel XGBoost training.

    Example
    -------

    .. testcode::
        :skipif: True

        import xgboost

        import ray.data
        import ray.train
        from ray.train.xgboost import RayTrainReportCallback, XGBoostTrainer

        def train_fn_per_worker(config: dict):
            # (Optional) Add logic to resume training state from a checkpoint.
            # ray.train.get_checkpoint()

            # 1. Get the dataset shard for the worker and convert to a `xgboost.DMatrix`
            train_ds_iter, eval_ds_iter = (
                ray.train.get_dataset_shard("train"),
                ray.train.get_dataset_shard("validation"),
            )
            train_ds, eval_ds = train_ds_iter.materialize(), eval_ds_iter.materialize()

            train_df, eval_df = train_ds.to_pandas(), eval_ds.to_pandas()
            train_X, train_y = train_df.drop("y", axis=1), train_df["y"]
            eval_X, eval_y = eval_df.drop("y", axis=1), eval_df["y"]

            dtrain = xgboost.DMatrix(train_X, label=train_y)
            deval = xgboost.DMatrix(eval_X, label=eval_y)

            params = {
                "tree_method": "approx",
                "objective": "reg:squarederror",
                "eta": 1e-4,
                "subsample": 0.5,
                "max_depth": 2,
            }

            # 2. Do distributed data-parallel training.
            # Ray Train sets up the necessary coordinator processes and
            # environment variables for your workers to communicate with each other.
            bst = xgboost.train(
                params,
                dtrain=dtrain,
                evals=[(deval, "validation")],
                num_boost_round=10,
                callbacks=[RayTrainReportCallback()],
            )

        train_ds = ray.data.from_items([{"x": x, "y": x + 1} for x in range(32)])
        eval_ds = ray.data.from_items([{"x": x, "y": x + 1} for x in range(16)])
        trainer = XGBoostTrainer(
            train_fn_per_worker,
            datasets={"train": train_ds, "validation": eval_ds},
            scaling_config=ray.train.ScalingConfig(num_workers=4),
        )
        result = trainer.fit()
        booster = RayTrainReportCallback.get_model(result.checkpoint)

    Args:
        train_loop_per_worker: The training function to execute on each worker.
            This function can either take in zero arguments or a single ``Dict``
            argument which is set by defining ``train_loop_config``.
            Within this function you can use any of the
            :ref:`Ray Train Loop utilities <train-loop-api>`.
        train_loop_config: A configuration ``Dict`` to pass in as an argument to
            ``train_loop_per_worker``.
            This is typically used for specifying hyperparameters.
        xgboost_config: The configuration for setting up the distributed xgboost
            backend. Defaults to using the "rabit" backend.
            See :class:`~ray.train.xgboost.XGBoostConfig` for more info.
        datasets: The Ray Datasets to use for training and validation.
        dataset_config: The configuration for ingesting the input ``datasets``.
            By default, all the Ray Datasets are split equally across workers.
            See :class:`~ray.train.DataConfig` for more details.
        scaling_config: The configuration for how to scale data parallel training.
            ``num_workers`` determines how many Python processes are used for training,
            and ``use_gpu`` determines whether or not each process should use GPUs.
            See :class:`~ray.train.ScalingConfig` for more info.
        run_config: The configuration for the execution of the training run.
            See :class:`~ray.train.RunConfig` for more info.
        resume_from_checkpoint: A checkpoint to resume training from.
            This checkpoint can be accessed from within ``train_loop_per_worker``
            by calling ``ray.train.get_checkpoint()``.
        metadata: Dict that should be made available via
            `ray.train.get_context().get_metadata()` and in `checkpoint.get_metadata()`
            for checkpoints saved from this Trainer. Must be JSON-serializable.
        label_column: [Deprecated] Name of the label column. A column with this name
            must be present in the training dataset.
        params: [Deprecated] XGBoost training parameters.
            Refer to `XGBoost documentation <https://xgboost.readthedocs.io/>`_
            for a list of possible parameters.
        num_boost_round: [Deprecated] Target number of boosting iterations (trees in the model).
            Note that unlike in ``xgboost.train``, this is the target number
            of trees, meaning that if you set ``num_boost_round=10`` and pass a model
            that has already been trained for 5 iterations, it will be trained for 5
            iterations more, instead of 10 more.
        **train_kwargs: [Deprecated] Additional kwargs passed to ``xgboost.train()`` function.
    TN)train_loop_configxgboost_configscaling_config
run_configdatasetsdataset_configresume_from_checkpointmetadatar   paramsr   train_loop_per_workerrC   rD   rE   rF   rG   rH   rI   rJ   r   rK   r   c                   s|   t tjt dk rtd|d u }|r"| j|||
||d}|p i }n	|r+tdt  tt| j	|||||||||	d	 d S )Nz1.7.0zm`XGBoostTrainer` requires the `xgboost` version to be >= 1.7.0. Upgrade with: `pip install -U "xgboost>=1.7"`)r   rF   r   r   rG   zPassing `xgboost.train` kwargs to `XGBoostTrainer` is deprecated. In your training function, you can call `xgboost.train(**kwargs)` with arbitrary arguments. )	rL   rC   rD   rE   rF   rG   rH   rI   rJ   )
r   r/   __version__ImportError_get_legacy_train_fn_per_workerr   *LEGACY_XGBOOST_TRAINER_DEPRECATION_MESSAGEsuperr   __init__)selfrL   rC   rD   rE   rF   rG   rH   rI   rJ   r   rK   r   train_kwargs
legacy_api	__class__r   r   rR      s>   


zXGBoostTrainer.__init__r   returnc                 C   s   |pi }| tstdt dt|  |std|p d}tt | dg }tdd |D }i }|rN|jj	}	|jj
}
|	|d< |
d	urJ|
nd
|d< |sZ|tdi | ||d< tt||t||d}|S )z<Get the training function for the legacy XGBoostTrainer API.z`datasets` must be provided for the XGBoostTrainer API if `train_loop_per_worker` is not provided. This dict must contain the training dataset under the key: 'z'. Got keys: z`label_column` must be provided for the XGBoostTrainer API if `train_loop_per_worker` is not provided. This is the column name of the label in the dataset.
   	callbacksc                 s   s    | ]}t |tV  qd S N)
isinstancer   )r   callbackr   r   r   	<genexpr>  s    

zAXGBoostTrainer._get_legacy_train_fn_per_worker.<locals>.<genexpr>	frequencyNTcheckpoint_at_end)r   r   r   r   r   )getr
   
ValueErrorlistkeysr   rP   anycheckpoint_configcheckpoint_frequencyr`   r1   r   r   r@   set)rS   r   rF   rG   r   r   rZ   user_supplied_callbackcallback_kwargsrg   r`   train_fn_per_workerr   r   r   rO      sJ   


z.XGBoostTrainer._get_legacy_train_fn_per_workerr2   c                 C   s
   t |S )z5Retrieve the XGBoost model stored in this checkpoint.)r   r)   )clsr2   r   r   r   r)   3  s   
zXGBoostTrainer.get_modelr[   )__name__
__module____qualname____doc___handles_checkpoint_freq_handles_checkpoint_at_endr   r   r   r   r   r   r   ScalingConfig	RunConfigstrr   
DataConfigr	   r   intrR   rO   classmethodr/   Boosterr)   __classcell__r   r   rV   r   r   T   s~    f

	

9

;r   )&logging	functoolsr   typingr   r   r   r   r   r/   packaging.versionr   	ray.trainr   r	   ray.train.constantsr
   ray.train.trainerr   ray.train.utilsr   ray.train.xgboostr   r   ray.train.xgboost.v2r   SimpleXGBoostTrainerray.util.annotationsr   	getLoggerrm   r+   rP   dictru   rw   rh   r@   r   r   r   r   <module>   s:    


6