o
    bib                     @   s  d Z ddlZddlmZ ddlmZ ddlmZ ddlm	Z	m
Z
mZ ddlZddlmZ ddlZddlmZ ddlmZ dd	lmZ G d
d deZdedededededededede
e fddZdd Z		d#de
e dedede
e fddZdd  Zde
e fd!d"ZdS )$a  This file is modeled after ray/python/ray/train/torch/config.py

The logics are duplicated right now to allow maximum flexibility for
setting up PyTorch DDP process groups outside the context of Ray Train.
Eventually, these use cases should be consolidated.
    N)ABC)defaultdict)	timedelta)CallableListT)ActorHandle)get_devices)get_address_and_portc                   @   s*   e Zd ZdZdedef defddZdS )TorchDistributedWorkerzDefines the interfaces required by the init_torch_dist_process_group().

    This is modeled after RayTrainerWorker, which allows arbitrary functions
    to be executed on a remote DDP worker.
    func.returnc                 O   s   ||i |S )zExecutes the input function and returns the output.

        Args:
            func: The function to execute.
            args, kwargs: The arguments to pass into func.
         )selfr   argskwargsr   r   K/home/ubuntu/.local/lib/python3.10/site-packages/ray/air/util/torch_dist.pyexecute   s   zTorchDistributedWorker.executeN)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r      s    r   init_methodbackendrank
world_size
local_ranklocal_world_sizemaster_addrmaster_portgpu_idsc	                 K   s   | dkrt |tjd< t |tjd< d}
n| dkr"d| d| }
ntd|  d	|d
kr@dtjd< ddd |D tjd< |	t||
||d |	dtdd t	j
di |	 t |tjd< t |tjd< t |tjd< t |tjd< dS )z$Initialize torch distributed backendenvMASTER_ADDRMASTER_PORTzenv://tcpztcp://:zThe provided init_method (z2) is not supported. Must be either 'env' or 'tcp'.nccl1NCCL_ASYNC_ERROR_HANDLING,c                 s   s    | ]}t |V  qd S )N)str).0gidr   r   r   	<genexpr>G   s    z*_init_torch_distributed.<locals>.<genexpr>CUDA_VISIBLE_DEVICES)r   r   r   r   timeouti  )secondsRANK
LOCAL_RANK
WORLD_SIZELOCAL_WORLD_SIZENr   )r*   osenviron
ValueErrorjoinupdatedict
setdefaultr   distinit_process_group)r   r   r   r   r   r   r   r   r    init_process_group_kwargsurlr   r   r   _init_torch_distributed(   s8   
r@   c                  C   s   t   } t  }| |fS )z0Returns the node_id and gpu_ids for this worker.)rayget_runtime_contextget_node_idget_gpu_ids)node_idr    r   r   r   _get_node_and_gpu_ids[   s   rF   gloor!   workersr   c                 K   s0  t  stdtdd | D }tt}tt}t|D ]!\}\}}	|| 	| t
|	ts3|	g}	|	D ]	}
|| |
 q5qt| d jt\}}g }t| }g }t| D ]7\}}|| d }|| |}t|| }|	|jjtf||||||||t|| d	| |	| qYt| |S )a  Initialize a torch distributed process group.

    Note: this util assumes that the order of the workers passed in
    are their global ranks.

    Args:
        workers: A list of TorchDistributedWorker actors.
        backend: The torch distributed backend to use,
            possible choices are "gloo" or "nccl".
        init_method: The initialization method to use,
            possible choices are "env" or "tcp".
        init_process_group_kwargs: Additional kwargs to pass to the call to
            :meth:`torch.distributed.init_process_group`.

    Returns:
        Local ranks on their respective nodes for the list of workers.
    z#Distributed torch is not available.c                 S      g | ]}|j tqS r   )r   remoterF   r+   wr   r   r   
<listcomp>~       z1init_torch_dist_process_group.<locals>.<listcomp>r   )	r   r   r   r   r   r   r   r   r    )r<   is_availableRuntimeErrorrA   getr   listset	enumerateappend
isinstanceaddr   rJ   r
   lenindexr@   )rH   r   r   r>   node_and_gpu_idsnode_to_workersnode_to_gpu_idsirE   r    gpu_idr   r   setup_futuresr   local_ranksr   workerr   r   r   r   r   init_torch_dist_process_groupb   sT   


rb   c               	   C   s`   t   tj sdS t } | D ]}tj| tj  W d   n1 s(w   Y  qdS )z"Shutdown torch distributed backendN)r<   destroy_process_grouptorchcudarO   r	   deviceempty_cache)devicesrf   r   r   r   _shutdown_torch_distributed   s   
ri   c                 C   s   t dd | D  d S )Nc                 S   rI   r   )r   rJ   ri   rK   r   r   r   rM      rN   z5shutdown_torch_dist_process_group.<locals>.<listcomp>)rA   rQ   )rH   r   r   r   !shutdown_torch_dist_process_group   s   rj   )rG   r!   )r   r5   abcr   collectionsr   datetimer   typingr   r   r   rd   torch.distributeddistributedr<   rA   	ray.actorr   ray.air._internal.torch_utilsr	   ray.train._internal.utilsr
   r   r*   intr@   rF   rb   ri   rj   r   r   r   r   <module>   s\    	
3	
N