o
    `۷i="                     @   s`  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dlZd dl	m
Z d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZmZ d dlmZm Z  d dl!m"Z" e #e$Z%G dd dZ&e"ddeG dd deZ'de(de)fddZ*	d'de(de+de+de(de+f
ddZ,d(d!d"Z-d#d$ Z.G d%d& d&eZ/dS ))    N)	dataclass)	timedelta)Optional)Version)build_address)ray_constants)"register_custom_torch_dist_backend)GetTimeoutError)BaseWorkerGroup)get_address_and_port)BackendBackendConfig).DEFAULT_TORCH_PROCESS_GROUP_SHUTDOWN_TIMEOUT_S&TORCH_PROCESS_GROUP_SHUTDOWN_TIMEOUT_S)	PublicAPIc                   @   s   e Zd Zdd Zdd ZdS )TorchConfigContextManagerc                 C   s8   t j rtjj  }|jdkrt j| d S d S d S )Ncuda)torchr   is_availableraytrain
get_devicetype
set_device)selfdevice r   L/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/train/torch/config.py	__enter__   s   

z#TorchConfigContextManager.__enter__c                 C   s   dS )NFr   )r   r   value	tracebackr   r   r   __exit__$      z"TorchConfigContextManager.__exit__N)__name__
__module____qualname__r   r!   r   r   r   r   r      s    r   stable)	stabilityc                   @   sR   e Zd ZU dZdZee ed< dZeed< dZ	e
ed< edd	 Zed
d ZdS )TorchConfiga  Configuration for torch process group setup.

    See https://pytorch.org/docs/stable/distributed.html for more info.

    Args:
        backend: The backend to use for training.
            See ``torch.distributed.init_process_group`` for more info and
            valid values.
            If set to None, nccl will be used if GPUs are requested, else gloo
            will be used.
        init_method: The initialization method to use. Either "env"
            for environment variable initialization or "tcp" for TCP
            initialization. Defaults to "env".
        timeout_s: Seconds for process group operations to timeout.
    Nbackendenvinit_method  	timeout_sc                 C      t S N)_TorchBackendr   r   r   r   backend_cls@   r"   zTorchConfig.backend_clsc                 C   r.   r/   )r   r1   r   r   r   train_func_contextD   r"   zTorchConfig.train_func_context)r#   r$   r%   __doc__r)   r   str__annotations__r+   r-   intpropertyr2   r3   r   r   r   r   r(   )   s   
 
r(   r)   returnc                 C   s    | dkpt dd | dD S )Nncclc                 s   s,    | ]}| d r|dd dkV  qdS )zcuda::   r:   N)
startswithsplit).0itemr   r   r   	<genexpr>K   s    
z#_is_backend_nccl.<locals>.<genexpr>,)anyr>   )r)   r   r   r   _is_backend_ncclI   s   rD   r,   
world_rank
world_sizer+   r-   c              	   C   s   |dkrt d| d| d| d nt d| d| d| d t d|   t| r]ttjtdk r=d}d	}nd
}d}|tjvr\|tjvr\t d| d| d dtj|< n| dkret	|  t
j| |||t|dd dS )a{  Connects the distributed PyTorch backend.

    Args:
        backend: The backend (nccl, gloo, etc.) to use for training.
        world_rank: Rank of the current worker.
        world_size: Number of workers participating in the job.
        init_method: URL specifying how to initialize the process group.
        timeout_s: Seconds for process group operations to timeout.
    r   zSetting up process group for: z [rank=z, world_size=]zusing z2.2.0NCCL_ASYNC_ERROR_HANDLINGNCCL_BLOCKING_WAITTORCH_NCCL_ASYNC_ERROR_HANDLINGTORCH_NCCL_BLOCKING_WAITzSetting zn=1 to fail if NCCL collective communication operations are timing out. To override this behavior, you can set z=0.1hccl)seconds)r)   r+   rankrF   timeoutN)loggerinfodebugrD   r   r   __version__osenvironr   distinit_process_groupr   )r)   rE   rF   r+   r-   'TORCH_NCCL_ASYNC_ERROR_HANDLING_ENV_VAR TORCH_NCCL_BLOCKING_WAIT_ENV_VARr   r   r   _setup_torch_process_groupR   sH   



r[   Fc              	   C   sp   ddl m} | }| rt  tj r4|D ]}tj| tj  W d    n1 s.w   Y  qd S d S )Nr   )get_devices)	ray.air._internal.torch_utilsr\   rW   destroy_process_groupr   r   r   r   empty_cache)r^   r\   devicesr   r   r   r   _shutdown_torch   s   
ra   c                  C   s   ddl m}  tj }t| tjd< t|	 tjd< t|
 tjd< t| tjd< t| tjd< |  }t|tjd< d S )	Nr   )r   
LOCAL_RANKRANKLOCAL_WORLD_SIZE
WORLD_SIZE	NODE_RANKACCELERATE_TORCH_DEVICE)ray.train.torchr   r   r   get_contextr5   get_local_rankrU   rV   get_world_rankget_local_world_sizeget_world_sizeget_node_rank)r   contextr   r   r   r   _set_torch_distributed_env_vars   s   
rp   c                   @   sL   e Zd ZU dZeed< dedefddZdefddZ	dede
fd	d
ZdS )r0   Tshare_cuda_visible_devicesworker_groupbackend_configc                 C   s   t  rw|jd u r| }|dd}|dkrd}nd}n|j}|dt\}}|jdkr<dd }|j|||d d	}	n|jd
krJdt	|| }	n	t
d|j dg }
tt|D ]}|
|j|t||t||	|jd q[t|
 d S td)NGPUr   r:   gloor*   c                 S   s   | t jd< t|t jd< d S )NMASTER_ADDRMASTER_PORT)rU   rV   r5   addrportr   r   r   set_env_vars   s   
z,_TorchBackend.on_start.<locals>.set_env_varsrx   zenv://tcpztcp://zThe provided init_method (z2) is not supported. Must be either 'env' or 'tcp'.)r)   rE   rF   r+   r-   z#Distributed torch is not available.)rW   r   r)   get_resources_per_workergetexecute_singler   r+   executer   
ValueErrorrangelenappendexecute_single_asyncr[   r-   r   RuntimeError)r   rr   rs   	resourcesnum_gpus_per_workerr)   master_addrmaster_portr{   urlsetup_futuresir   r   r   on_start   sJ   


z_TorchBackend.on_startc                 C   s^   |j tt|dkd}ttt}z
tj||d W d S  t	y.   t
d| d Y d S w )Nr<   )r^   )rP   z-Torch process group shutdown timed out after z seconds)execute_asyncra   r   r   env_integerr   r   r   r~   r	   rQ   warning)r   rr   rs   futuresr-   r   r   r   on_shutdown   s   


z_TorchBackend.on_shutdownc                 C   s   | t d S r/   )r   rp   )r   rr   rs   r   r   r   on_training_start   s   z_TorchBackend.on_training_startN)r#   r$   r%   rq   boolr6   r
   r(   r   r   r   r   r   r   r   r   r0      s   
 3r0   )r,   )F)0loggingrU   dataclassesr   datetimer   typingr   r   torch.distributeddistributedrW   packaging.versionr   r   ray._common.network_utilsr   ray._privater    ray.air._internal.device_managerr   ray.exceptionsr	   %ray.train._internal.base_worker_groupr
   ray.train._internal.utilsr   ray.train.backendr   r   ray.train.constantsr   r   ray.utilr   	getLoggerr#   rQ   r   r(   r5   r   rD   r7   r[   ra   rp   r0   r   r   r   r   <module>   sN    


: