o
    ci                     @   s  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlZd dl	m
Z d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZmZ d d
lmZ e eZG dd dZeddeG dd deZ	d dede de dede f
ddZ!d!ddZ"dd Z#G dd deZ$dS )"    N)	dataclass)	timedelta)Optional)Version)"register_custom_torch_dist_backend)get_address_and_port)WorkerGroup)BackendBackendConfig)	PublicAPIc                   @   s   e Zd Zdd Zdd ZdS )TorchConfigContextManagerc                 C   s8   t j rtjj  }|jdkrt j| d S d S d S )Ncuda)torchr   is_availableraytrain
get_devicetype
set_device)selfdevice r   J/home/ubuntu/.local/lib/python3.10/site-packages/ray/train/torch/config.py	__enter__   s   

z#TorchConfigContextManager.__enter__c                 C   s   dS )NFr   )r   r   value	tracebackr   r   r   __exit__      z"TorchConfigContextManager.__exit__N)__name__
__module____qualname__r   r   r   r   r   r   r      s    r   stable)	stabilityc                   @   sR   e Zd ZU dZdZee ed< dZeed< dZ	e
ed< edd	 Zed
d ZdS )TorchConfiga  Configuration for torch process group setup.

    See https://pytorch.org/docs/stable/distributed.html for more info.

    Args:
        backend: The backend to use for training.
            See ``torch.distributed.init_process_group`` for more info and
            valid values.
            If set to None, nccl will be used if GPUs are requested, else gloo
            will be used.
        init_method: The initialization method to use. Either "env"
            for environment variable initialization or "tcp" for TCP
            initialization. Defaults to "env".
        timeout_s: Seconds for process group operations to timeout.
    Nbackendenvinit_method  	timeout_sc                 C      t S N)_TorchBackendr   r   r   r   backend_cls9   r   zTorchConfig.backend_clsc                 C   r)   r*   )r   r,   r   r   r   train_func_context=   r   zTorchConfig.train_func_context)r   r   r    __doc__r$   r   str__annotations__r&   r(   intpropertyr-   r.   r   r   r   r   r#   "   s   
 
r#   r'   r$   
world_rank
world_sizer&   r(   c              	   C   s   |dkrt d| d| d| d nt d| d| d| d t d|   | dkr]ttjtdk r=d	}d
}nd}d}|tjvr\|tjvr\t d| d| d dtj|< n| dkret|  t	j
| |||t|dd dS )a{  Connects the distributed PyTorch backend.

    Args:
        backend: The backend (nccl, gloo, etc.) to use for training.
        world_rank: Rank of the current worker.
        world_size: Number of workers participating in the job.
        init_method: URL specifying how to initialize the process group.
        timeout_s: Seconds for process group operations to timeout.
    r   zSetting up process group for: z [rank=z, world_size=]zusing ncclz2.2.0NCCL_ASYNC_ERROR_HANDLINGNCCL_BLOCKING_WAITTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAITzSetting zn=1 to fail if NCCL collective communication operations are timing out. To override this behavior, you can set z=0.1hccl)seconds)r$   r&   rankr5   timeoutN)loggerinfodebugr   r   __version__osenvironr   distinit_process_groupr   )r$   r4   r5   r&   r(   'TORCH_NCCL_ASYNC_ERROR_HANDLING_ENV_VAR TORCH_NCCL_BLOCKING_WAIT_ENV_VARr   r   r   _setup_torch_process_groupB   sH   



rK   Fc              	   C   sp   ddl m} | }| rt  tj r4|D ]}tj| tj  W d    n1 s.w   Y  qd S d S )Nr   )get_devices)	ray.air._internal.torch_utilsrL   rG   destroy_process_groupr   r   r   r   empty_cache)rN   rL   devicesr   r   r   r   _shutdown_torch|   s   
rQ   c                  C   s   ddl m}  tj }t| tjd< t|	 tjd< t|
 tjd< t| tjd< t| tjd< |  }t|tjd< d S )	Nr   )r   
LOCAL_RANKRANKLOCAL_WORLD_SIZE
WORLD_SIZE	NODE_RANKACCELERATE_TORCH_DEVICE)ray.train.torchr   r   r   get_contextr0   get_local_rankrE   rF   get_world_rankget_local_world_sizeget_world_sizeget_node_rank)r   contextr   r   r   r   _set_torch_distributed_env_vars   s   
r`   c                   @   sP   e Zd ZU dZeed< dedefddZdedefddZ	dede
fd	d
ZdS )r+   Tshare_cuda_visible_devicesworker_groupbackend_configc           
      C   s   t  rn|jd u r|jdkrd}nd}n|j}|dt\}}|jdkr3dd }|j|||d d}n|jd	krAd
| d| }n	td|j dg }t	t
|D ]}	||j|	t||	t
|||jd qRt| d S td)Nr   r7   gloor%   c                 S   s   | t jd< t|t jd< d S )NMASTER_ADDRMASTER_PORT)rE   rF   r0   addrportr   r   r   set_env_vars   s   
z,_TorchBackend.on_start.<locals>.set_env_varsrg   zenv://tcpztcp://:zThe provided init_method (z2) is not supported. Must be either 'env' or 'tcp'.)r$   r4   r5   r&   r(   z#Distributed torch is not available.)rG   r   r$   num_gpus_per_workerexecute_singler   r&   execute
ValueErrorrangelenappendexecute_single_asyncrK   r(   r   getRuntimeError)
r   rb   rc   r$   master_addrmaster_portrj   urlsetup_futuresir   r   r   on_start   sF   



z_TorchBackend.on_startc                 C   s   |j tt|dkd d S )N   )rN   )ro   rQ   rr   r   rb   rc   r   r   r   on_shutdown   s   

z_TorchBackend.on_shutdownc                 C   s   | t d S r*   )ro   r`   r~   r   r   r   on_training_start   s   z_TorchBackend.on_training_startN)r   r   r    ra   boolr1   r   r#   r|   r   r
   r   r   r   r   r   r+      s   
 0r+   )r'   )F)%loggingrE   dataclassesr   datetimer   typingr   r   torch.distributeddistributedrG   packaging.versionr   r    ray.air._internal.device_managerr   ray.train._internal.utilsr    ray.train._internal.worker_groupr   ray.train.backendr	   r
   ray.utilr   	getLoggerr   rA   r   r#   r0   r2   rK   rQ   r`   r+   r   r   r   r   <module>   sD    
#

: