o
    cig                  	   @   s~  d Z ddlZddlZddlmZ ddlZddlZddlm	Z	 e
eZzddlmZ dZdaW n ey<   dZdaY nw z
ddlmZ dZW n eyR   dZY nw z
dd	lmZ dZW n eyh   dZY nw d
d Zdd Zdd ZG dd deZe add Ze	jj ddfde!de!de"de!fddZ#e	jj ddfde!dee! de"de!fddZ$dbde"ddfd d!Z%dbde"de!fd"d#Z&dbde"de!fd$d%Z'de	j(j)fde"fd&d'Z*de	j(j)fd(e+de"fd)d*Z,dbde"fd+d,Z-dde	j(j)fd-e!de"fd.d/Z.ddde	j(j)fd(e+d-e!d0e!de"fd1d2Z/dcd3e!de"fd4d5Z0	ddd3e!d6e!de"fd7d8Z1dbd(e+de"fd9d:Z2	dbd;e+d<e+de"fd=d>Z3de	j(j)fd(e+de"fd?d@Z4de	j(j)fde"fdAdBZ5dbd-e!de"fdCdDZ6		ded-e!dEe!de"dFe!fdGdHZ7dbd3e!de"fdIdJZ8		ded3e!dKe!de"dFe!fdLdMZ9dNe!fdOdPZ:dbde"fdQdRZ;dSdT Z<dUe	jfdVdWZ=dXdY Z>de!fdZd[Z?d\d] Z@d^d_ ZAd`da ZBdS )fz5APIs exposed under the namespace ray.util.collective.    N)List)types)	NCCLGroupTF)	GLOOGroup)TorchGLOOGroupc                   C   s   t  rtrtd datS )NzqNCCL seems unavailable. Please install Cupy following the guide at: https://docs.cupy.dev/en/stable/install.html.F)rayget_gpu_ids_LOG_NCCL_WARNINGloggerwarning_NCCL_AVAILABLE r   r   R/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/collective/collective.pynccl_available(   s   r   c                   C      t S N)_GLOO_AVAILABLEr   r   r   r   gloo_available4      r   c                   C   r   r   )_TORCH_DISTRIBUTED_AVAILABLEr   r   r   r   torch_distributed_available8   r   r   c                   @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )GroupManagera  Use this class to manage the collective groups we created so far.

    Each process will have an instance of `GroupManager`. Each process
    could belong to multiple collective groups. The membership information
    and other metadata are stored in the global `_group_mgr` object.
    c                 C   s   i | _ i | _d S r   )_name_group_map_group_name_map)selfr   r   r   __init__D   s   
zGroupManager.__init__c                 C   s   t |}|t jjkrtd|t jjkr(td| t|||dd|d}n1|t jj	kr=td| t
|||}n|t jjkrRtd| t|||}ntd| || j|< || j|< | j| S )	zThe entry to create new collective groups in the manager.

        Put the registration and the group information into the manager
        metadata as well.
        zRay does not support MPI.zCreating GLOO group: '{}'...ray_internal_kvtcp)
store_typedevice_typegloo_timeoutzCreating NCCL group: '{}'...z.Creating torch.distributed GLOO group: '{}'...zUnexpected backend: )r   BackendMPIRuntimeErrorGLOOr
   debugformatr   NCCLr   
TORCH_GLOOr   r   r   )r   backend
world_sizerank
group_namer    gr   r   r   create_collective_groupH   s2   



z$GroupManager.create_collective_groupc                 C   s
   || j v S r   )r   r   r,   r   r   r   is_group_existm   s   
zGroupManager.is_group_existc                 C   s(   |  |std| dS | j| S )z,Get the collective group handle by its name.z"The group '{}' is not initialized.N)r0   r
   r   r&   r   r/   r   r   r   get_group_by_namep   s   

zGroupManager.get_group_by_namec                 C   sx   |  |std| dS | j| }| j|= | j|= |  d| }zt|}t	| W dS  t
y;   Y dS w )zGroup destructor.zThe group '{}' does not exist.Ninfo_)r0   r
   r   r&   r   r   destroy_groupr   	get_actorkill
ValueError)r   r,   r-   namestorer   r   r   destroy_collective_groupw   s   


z%GroupManager.destroy_collective_groupN)	__name__
__module____qualname____doc__r   r.   r0   r1   r9   r   r   r   r   r   <   s    %r   c                 C   s
   t | S )zDCheck if the group is initialized in this process by the group name.)
_group_mgrr0   r,   r   r   r   is_group_initialized   s   
r@   default0u  r*   r+   r,   r    c                 C   sv   t   t|}t| |std|t|rtd| dks$J |dks*J || k s0J t	|| ||| dS )a=  Initialize a collective group inside an actor process.

    Args:
        world_size: the total number of processes in the group.
        rank: the rank of the current process.
        backend: the CCL backend to use, NCCL or GLOO.
        group_name: the name of the collective group.

    Returns:
        None
    z%group_name '{}' needs to be a string.#Trying to initialize a group twice.r   N)
_check_inside_actorr   r!   _check_backend_availabilityr6   r&   r>   r0   r#   r.   )r*   r+   r)   r,   r    r   r   r   init_collective_group   s   


rF   ranksc           
   	   C   s6  t |}t| d| }z	t| td ty   Y nw t|t| kr4tdt|t| t	|t	t
t|krRtdt|ddd |D |dkr]td	|t|dksgtd
t||k sqtdddlm} d| }dd | D }|j|dd }	t|	j|||||g dS )a  Declare a list of actors as a collective group.

    Note: This function should be called in a driver process.

    Args:
        actors: a list of actors to be set in a collective group.
        world_size: the total number of processes in the group.
        ranks (List[int]): the rank of each actor.
        backend: the CCL backend to use, NCCL or GLOO.
        group_name: the name of the collective group.

    Returns:
        None
    r2   rC   zHEach actor should correspond to one rank. Got '{}' ranks but '{}' actorsz5Ranks must be a permutation from 0 to '{}'. Got '{}'. c                 S   s   g | ]}t |qS r   )str).0rr   r   r   
<listcomp>   s    z+create_collective_group.<locals>.<listcomp>r   z/World size must be greater than zero. Got '{}'.zRanks must be non-negative.z(Ranks cannot be greater than world_size.)Infoc                 S   s   g | ]}|j qS r   )_ray_actor_id)rJ   ar   r   r   rL      s    detached)r7   lifetimeN)r   r!   rE   r   r4   r#   r6   lenr&   setrangejoinallray.util.collective.utilrM   optionsremotegetset_info)
actorsr*   rG   r)   r,   r    r7   rM   	actors_idinfor   r   r   r.      sB   

 r.   returnc                 C   s   t   t|  dS )z0Destroy a collective group given its group name.N)rD   r>   r9   r?   r   r   r   r9      s   r9   c                 C   "   t   t| s	dS t| }|jS )a  Return the rank of this process in the given group.

    Args:
        group_name: the name of the group to query

    Returns:
        the rank of this process in the named group,
        -1 if the group does not exist or the process does
        not belong to the group.
    )rD   r@   r>   r1   r+   r,   r-   r   r   r   get_rank  s
   
rc   c                 C   r`   )a  Return the size of the collective group with the given name.

    Args:
        group_name: the name of the group to query

    Returns:
        The world size of the collective group, -1 if the group does
            not exist or the process does not belong to the group.
    ra   )rD   r@   r>   r1   r*   rb   r   r   r   get_collective_group_size  s
   

rd   c                 C   s.   t |  t|}tj}||_|| g| dS )a   Collective allreduce the tensor across the group.

    Args:
        tensor: the tensor to be all-reduced on this process.
        group_name: the collective group name to perform allreduce.
        op: The reduce operation.

    Returns:
        None
    N)_check_single_tensor_inputget_group_handler   AllReduceOptionsreduceOp	allreduce)tensorr,   opr-   optsr   r   r   ri   '  s
   ri   tensor_listc                 C   s<   t  stdt|  t|}t j}||_|| | dS )a  Collective allreduce a list of tensors across the group.

    Args:
        tensor_list (List[tensor]): list of tensors to be allreduced,
            each on a GPU.
        group_name: the collective group name to perform allreduce.

    Returns:
        None
    &Multigpu calls requires NCCL and Cupy.N)r   cupy_availabler#   _check_tensor_list_inputrf   rg   rh   ri   )rm   r,   rk   r-   rl   r   r   r   allreduce_multigpu9  s   rq   c                 C   s   t | }|  dS )zBarrier all processes in the collective group.

    Args:
        group_name: the name of the group to barrier.

    Returns:
        None
    N)rf   barrierrb   r   r   r   rr   O  s   	rr   dst_rankc                 C   sF   t |  t|}t|| t }||_||_d|_|| g| dS )a:  Reduce the tensor across the group to the destination rank.

    Args:
        tensor: the tensor to be reduced on this process.
        dst_rank: the rank of the destination process.
        group_name: the collective group name to perform reduce.
        op: The reduce operation.

    Returns:
        None
    r   N)	re   rf   _check_rank_validr   ReduceOptionsrh   	root_rankroot_tensorreduce)rj   rs   r,   rk   r-   rl   r   r   r   rx   \  s   
rx   
dst_tensorc                 C   sb   t  stdt|  t|}t|| tt| | t  }||_	||_
||_|| | dS )a  Reduce the tensor across the group to the destination rank
    and destination tensor.

    Args:
        tensor_list: the list of tensors to be reduced on this process;
            each tensor located on a GPU.
        dst_rank: the rank of the destination process.
        dst_tensor: the index of GPU at the destination.
        group_name: the collective group name to perform reduce.
        op: The reduce operation.

    Returns:
        None
    rn   N)r   ro   r#   rp   rf   rt   _check_root_tensor_validrR   ru   rh   rv   rw   rx   )rm   rs   ry   r,   rk   r-   rl   r   r   r   reduce_multigpuv  s   
r{   src_rankc                 C   s@   t |  t|}t|| t }||_d|_|| g| dS )a(  Broadcast the tensor from a source process to all others.

    Args:
        tensor: the tensor to be broadcasted (src) or received (destination).
        src_rank: the rank of the source process.
        group_name: the collective group name to perform broadcast.

    Returns:
        None
    r   N)re   rf   rt   r   BroadcastOptionsrv   rw   	broadcastrj   r|   r,   r-   rl   r   r   r   r~     s   
r~   
src_tensorc                 C   s\   t  stdt|  t|}t|| tt| | t  }||_	||_
|| | dS )ag  Broadcast the tensor from a source GPU to all other GPUs.

    Args:
        tensor_list: the tensors to broadcast (src) or receive (dst).
        src_rank: the rank of the source process.
        src_tensor: the index of the source GPU on the source process.
        group_name: the collective group name to perform broadcast.

    Returns:
        None
    rn   N)r   ro   r#   rp   rf   rt   rz   rR   r}   rv   rw   r~   )rm   r|   r   r,   r-   rl   r   r   r   broadcast_multigpu  s   
r   c                 C   sL   t | t|  t|}t| |jkrtdt }|| g|g| dS )a   Allgather tensors from each process of the group into a list.

    Args:
        tensor_list: the results, stored as a list of tensors.
        tensor: the tensor (to be gathered) in the current process
        group_name: the name of the collective group.

    Returns:
        None
    zPThe length of the tensor list operands to allgather must be equal to world_size.N)	re   rp   rf   rR   r*   r#   r   AllGatherOptions	allgather)rm   rj   r,   r-   rl   r   r   r   r     s   r   output_tensor_listsinput_tensor_listc                 C   sB   t  stdt|  t| t|}t  }|| || dS )a  Allgather tensors from each gpus of the group into lists.

    Args:
        output_tensor_lists (List[List[tensor]]): gathered results, with shape
            must be num_gpus * world_size * shape(tensor).
        input_tensor_list: (List[tensor]): a list of tensors, with shape
            num_gpus * shape(tensor).
        group_name: the name of the collective group.

    Returns:
        None
    rn   N)r   ro   r#   _check_tensor_lists_inputrp   rf   r   r   )r   r   r,   r-   rl   r   r   r   allgather_multigpu  s   r   c                 C   sR   t |  t| t|}t||jkrtdt }||_|	| g|g| dS )a  Reducescatter a list of tensors across the group.

    Reduce the list of the tensors across each process in the group, then
    scatter the reduced list of tensors -- one tensor for each process.

    Args:
        tensor: the resulted tensor on this process.
        tensor_list: The list of tensors to be reduced and scattered.
        group_name: the name of the collective group.
        op: The reduce operation.

    Returns:
        None
    zXThe length of the tensor list operands to reducescatter must not be equal to world_size.N)
re   rp   rf   rR   r*   r#   r   ReduceScatterOptionsrh   reducescatter)rj   rm   r,   rk   r-   rl   r   r   r   r     s   r   c                 C   sH   t  stdt| t|  t|}t  }||_|| || dS )a  Reducescatter a list of tensors across all GPUs.

    Args:
        output_tensor_list: the resulted list of tensors, with
            shape: num_gpus * shape(tensor).
        input_tensor_lists: the original tensors, with shape:
            num_gpus * world_size * shape(tensor).
        group_name: the name of the collective group.
        op: The reduce operation.

    Returns:
        None.
    rn   N)	r   ro   r#   r   rp   rf   r   rh   r   )output_tensor_listinput_tensor_listsr,   rk   r-   rl   r   r   r   reducescatter_multigpu  s   r   c                 C   R   t |  t|}t|| ||jkrtd|t }||_|	| g| dS )zSend a tensor to a remote process synchronously.

    Args:
        tensor: the tensor to send.
        dst_rank: the rank of the destination process.
        group_name: the name of the collective group.

    Returns:
        None
    "The destination rank '{}' is self.N)
re   rf   rt   r+   r#   r&   r   SendOptionsrs   send)rj   rs   r,   r-   rl   r   r   r   r   8     

r   dst_gpu_index
n_elementsc                 C      t  stdt|  t|}t|| ||jkr!td||dk r,td|t  }||_	||_
||_|| g| dS )a  Send a tensor to a remote GPU synchronously.

    The function assumes each process owns >1 GPUs, and the sender
    process and receiver process has equal number of GPUs.

    Args:
        tensor: the tensor to send, located on a GPU.
        dst_rank: the rank of the destination process.
        dst_gpu_index: the destination gpu index.
        group_name: the name of the collective group.
        n_elements: if specified, send the next n elements
            from the starting address of tensor.

    Returns:
        None
    z!send_multigpu call requires NCCL.GThe dst_rank '{}' is self. Considering doing GPU to GPU memcpy instead?r   z The n_elements '{}' should >= 0.N)r   ro   r#   re   rf   rt   r+   r&   r   rs   r   r   r   )rj   rs   r   r,   r   r-   rl   r   r   r   send_multigpuM  s"   

r   c                 C   r   )zReceive a tensor from a remote process synchronously.

    Args:
        tensor: the received tensor.
        src_rank: the rank of the source process.
        group_name: the name of the collective group.

    Returns:
        None
    r   N)
re   rf   rt   r+   r#   r&   r   RecvOptionsr|   recvr   r   r   r   r   w  r   r   src_gpu_indexc                 C   r   )a  Receive a tensor from a remote GPU synchronously.

    The function asssume each process owns >1 GPUs, and the sender
    process and receiver process has equal nubmer of GPUs.

    Args:
        tensor: The received tensor, located on a GPU.
        src_rank: The rank of the source process.
        src_gpu_index: The index of the source GPU on the src process.
        group_name: The name of the collective group.

    Returns:
        None
    z!recv_multigpu call requires NCCL.r   r   z#The n_elements '{}' should be >= 0.N)r   ro   r#   re   rf   rt   r+   r&   r   r|   r   r   r   )rj   r|   r   r,   r   r-   rl   r   r   r   recv_multigpu  s"   

r   gpu_idc                 C   s,   t  stdddl}|j|   dS )zSynchronize the current process to a give device.

    Args:
        gpu_id: the GPU device id to synchronize.

    Returns:
        None
    z(synchronize call requires CUDA and NCCL.r   N)r   ro   r#   cupycudaDevicesynchronize)r   cpr   r   r   r     s   	r   c              
   C   s  t   t| sz3d|  }tj|d}t|j \}}}}}tjjj	}|j
 }	|||	 }
t|||
| | W nI ty } z=dtjv rqtjd | krqttjd }ttjd }tjd }tdd}t|||| | ntd	| |W Y d
}~nd
}~ww t| }|S )zCheck if the group is initialized and return the group handle.

    Args:
        group_name: the name of the collective group.

    Returns:
        The collective group handle.
    r2   )r7   collective_group_namecollective_rankcollective_world_sizecollective_backendcollective_gloo_timeoutrB   z<The collective group '{}' is not initialized in the process.N)rD   r@   r   r4   rZ   get_inforY   _privateworkerglobal_workercore_workerget_actor_idindexr>   r.   r6   osenvironintgetenvr#   r&   r1   )r,   r7   mgridsr*   r+   r)   r    r   id_rK   excr-   r   r   r   rf     sF   	






rf   c                 C   sV   t | tjrdS t rt | tjjrdS t r"t | tjjr"dS t	d
t| )z-Check if the tensor is with a supported type.Nz[Unrecognized tensor type '{}'. Supported types are: np.ndarray, torch.Tensor, cupy.ndarray.)
isinstancenpndarrayr   ro   r   torch_availablethTensorr#   r&   type)rj   r   r   r   re     s   
re   r)   c                 C   s^   | t jjkrt stddS | t jjkrt stddS | t jjkr+t s-tddS dS )z'Check whether the backend is available.zGLOO is not available.zNCCL is not available.z#torch.distributed is not available.N)	r   r!   r$   r   r#   r'   r   r(   r   )r)   r   r   r   rE     s   rE   c                  C   s"   t jjj} | jt jkrdS td)z1Check if currently it is inside a Ray actor/task.NzBThe collective APIs shall be only used inside a Ray actor or task.)r   r   r   r   modeWORKER_MODEr#   )r   r   r   r   rD     s   
rD   c                 C   s6   |dk rt d||| jkrt d|| jdS )z'Check the rank: 0 <= rank < world_size.r   zrank '{}' is negative.z+rank '{}' must be less than world size '{}'N)r6   r&   r*   )r-   r+   r   r   r   rt     s   
rt   c                 C   s>   t | tstdt| | std| D ]}t| qdS )z7Check if the input is a list of supported tensor types.z.The input must be a list of tensors. Got '{}'.zGot an empty list of tensors.N)r   listr#   r&   r   re   )rm   tr   r   r   rp   '  s   


rp   c                 C   sD   t | tstdt| | std|  | D ]}t| qdS )z@Check if the input is a list of lists of supported tensor types.z7The input must be a list of lists of tensors. Got '{}'.zDid not receive tensors. Got: N)r   r   r#   r&   r   rp   )tensor_listsr   r   r   r   r   4  s   


r   c                 C   s2   |dk rt d||| krt d|| dS )z9Check the root_tensor device is 0 <= root_tensor < lengthr   zroot_tensor '{}' is negative.z9root_tensor '{}' is greater than the number of GPUs: '{}'N)r6   r&   )lengthrw   r   r   r   rz   A  s   rz   )rA   )r   rA   )r   r   rA   )rA   r   )Cr=   loggingr   typingr   numpyr   r   ray.util.collectiver   	getLoggerr:   r
   :ray.util.collective.collective_group.nccl_collective_groupr   r   r	   ImportError:ray.util.collective.collective_group.gloo_collective_groupr   r   @ray.util.collective.collective_group.torch_gloo_collective_groupr   r   r   r   r   objectr   r>   r@   r!   r'   r   rI   rF   r.   r9   rc   rd   ReduceOpSUMri   r   rq   rr   rx   r{   r~   r   r   r   r   r   r   r   r   r   r   rf   re   rE   rD   rt   rp   r   rz   r   r   r   r   <module>   s,   
R
)
B



$


!

*
(1
