o
    $iD                     @   s  U d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
Z
d dlmZ d dl
mZ d dlmZ d dlZd dlmZ d d	lmZmZ eeZeG d
d deZG dd deZG dd deZi aeed< e Z dd Z!defddZ"dS )    Ncontextmanager)	dataclass)Optional)Version)RabitTracker)CommunicatorContext)BaseWorkerGroup)BackendBackendConfigc                   @   s6   e Zd ZU dZdZeed< edd Zedd Z	dS )	XGBoostConfiga  Configuration for xgboost collective communication setup.

    Ray Train will set up the necessary coordinator processes and environment
    variables for your workers to communicate with each other.
    Additional configuration options can be passed into the
    `xgboost.collective.CommunicatorContext` that wraps your own `xgboost.train` code.

    See the `xgboost.collective` module for more information:
    https://github.com/dmlc/xgboost/blob/master/python-package/xgboost/collective.py

    Args:
        xgboost_communicator: The backend to use for collective communication for
            distributed xgboost training. For now, only "rabit" is supported.
    rabitxgboost_communicatorc                 C   s   t dd }|S )Nc                   s   s>    t di t  d V  W d    d S 1 sw   Y  d S )N )r   _get_xgboost_argsr   r   r   U/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/train/xgboost/config.py collective_communication_context*   s   "zJXGBoostConfig.train_func_context.<locals>.collective_communication_contextr   )selfr   r   r   r   train_func_context(   s   
z XGBoostConfig.train_func_contextc                 C   s4   | j dkrttjtdkrtS tS td| j  )Nr   z2.1.0zUnsupported backend: )r   r   xgboost__version___XGBoostRabitBackend_XGBoostRabitBackend_pre_xgb210NotImplementedErrorr   r   r   r   backend_cls1   s   
zXGBoostConfig.backend_clsN)
__name__
__module____qualname____doc__r   str__annotations__propertyr   r   r   r   r   r   r      s   
 
r   c                   @   F   e Zd Zdd ZdefddZdedefddZdedefd	d
ZdS )r   c                 C   s   d | _ d | _d S N)_tracker_wait_threadr   r   r   r   __init__>   s   
z_XGBoostRabitBackend.__init__worker_groupc                 C   s   t |}d|i}tj }t||dd| _| j  tj| jj	dd| _
| j
  || j  dtj|dd }t| d	d
 }||| d S )N	n_workerstaskr)   host_ipsortbyT)targetdaemon2RabitTracker coordinator started with parameters:
   indentc                 S   s<   dd l }d|j  dd|   | d< t|  d S )Nr   [xgboost.ray-rank=08]:dmlc_task_id)	ray.traintrainget_contextget_world_rankget_runtime_contextget_actor_id_set_xgboost_args)argsrayr   r   r   set_xgboost_communicator_args^   s   
z^_XGBoostRabitBackend._setup_xgboost_distributed_backend.<locals>.set_xgboost_communicator_args)lenr@   utilget_node_ip_addressr   r%   start	threadingThreadwait_forr&   updateworker_argsjsondumpsloggerdebugexecute)r   r(   num_workers
rabit_argstrain_driver_ip	start_logrA   r   r   r   "_setup_xgboost_distributed_backendB   s"   




z7_XGBoostRabitBackend._setup_xgboost_distributed_backendbackend_configc                 C      |j dksJ | | d S Nr   r   rT   r   r(   rU   r   r   r   on_training_startj      z&_XGBoostRabitBackend.on_training_startc                 C   sD   d}| j d ur| j j|d | j  r td| d d S d S d S N   )timeoutz?During shutdown, the RabitTracker thread failed to join within zL seconds. The process will still be terminated as part of Ray actor cleanup.)r&   joinis_aliverM   warningr   r(   rU   r^   r   r   r   on_shutdownp   s   

z _XGBoostRabitBackend.on_shutdownN	r   r   r   r'   r	   rT   r   rZ   rc   r   r   r   r   r   =   s    (
r   c                   @   r#   )r   c                 C   s
   d | _ d S r$   )r%   r   r   r   r   r'      s   
z(_XGBoostRabitBackend_pre_xgb210.__init__r(   c                    s   t |}d|i tj }t||dd| _| jj|d | j } | dt	j
 dd }t|  fdd	}|| d S )
NDMLC_NUM_WORKERr*   r+   )r)   r0   r1   r2   c                     sV   dd l }   D ]\}}t|tj|< qd| j  dd|  	  tjd< d S )Nr   r4   r5   r6   DMLC_TASK_ID)
r8   itemsr    osenvironr9   r:   r;   r<   r=   )r@   kvrQ   r   r   set_xgboost_env_vars   s   
z`_XGBoostRabitBackend_pre_xgb210._setup_xgboost_distributed_backend.<locals>.set_xgboost_env_vars)rB   r@   rC   rD   r   r%   rE   worker_envsrI   rK   rL   rM   rN   rO   )r   r(   rP   rR   rJ   rS   rm   r   rl   r   rT      s    



zB_XGBoostRabitBackend_pre_xgb210._setup_xgboost_distributed_backendrU   c                 C   rV   rW   rX   rY   r   r   r   rZ      r[   z1_XGBoostRabitBackend_pre_xgb210.on_training_startc                 C   sD   | j sd S d}| j jj|d | j j r td| d d S d S r\   )r%   threadr_   r`   rM   ra   rb   r   r   r   rc      s   z+_XGBoostRabitBackend_pre_xgb210.on_shutdownNrd   r   r   r   r   r   ~   s    '
r   _xgboost_argsc                 C   s.   t  | aW d    d S 1 sw   Y  d S r$   _xgboost_args_lockrp   )r?   r   r   r   r>      s   "r>   returnc                   C   s,   t 
 tW  d    S 1 sw   Y  d S r$   rq   r   r   r   r   r      s   $r   )#rK   loggingrh   rF   
contextlibr   dataclassesr   typingr   r   packaging.versionr   r   xgboost.collectiver   r@   %ray.train._internal.base_worker_groupr	   ray.train.backendr
   r   	getLoggerr   rM   r   r   r   rp   dictr!   Lockrr   r>   r   r   r   r   r   <module>   s.   
 
'A@