o
    $im                  	   @   s  d Z ddlZddlZddlZddlZddlZddlmZmZ ddl	Z
ddlZddlm  mZ ddlmZ ddlmZmZ ddlmZ eeZzddlmZ d	Zd
aW n eya   d
Zd	aY nw z
ddlm Z  d	Z!W n eyw   d
Z!Y nw dd Z"dd Z#dd Z$dee%e&f fddZ'G dd de(Z)e) a*e+ a,dd Z-ej.j/ddfde&de&de%de&fdd Z0ej.j/ddfde&d!ee& de%de&fd"d#Z1dfde%ddfd$d%Z2dfde%de&fd&d'Z3dfde%de&fd(d)Z4dej5j6fde%fd*d+Z7dej5j6fd,e8de%fd-d.Z9dfde%fd/d0Z:ddej5j6fd1e&de%fd2d3Z;dddej5j6fd,e8d1e&d4e&de%fd5d6Z<dgd7e&de%fd8d9Z=	dhd7e&d:e&de%fd;d<Z>dfd,e8de%fd=d>Z?	dfd?e8d@e8de%fdAdBZ@dej5j6fd,e8de%fdCdDZAdej5j6fde%fdEdFZBdfd1e&de%fdGdHZC		did1e&dIe&de%dJe&fdKdLZDdfd7e&de%fdMdNZE		did7e&dOe&de%dJe&fdPdQZFdRe&fdSdTZGdfde%fdUdVZHdWdX ZIdYej.fdZd[ZJd\d] ZKde&fd^d_ZLd`da ZMdbdc ZNddde ZOdS )jz5APIs exposed under the namespace ray.util.collective.    N)ListTuple   )types)find_free_portis_ipv6)get_master_address_metadata_key)	NCCLGroupTF)TorchGLOOGroupc                   C   s   t  rtrtd datS )NzqNCCL seems unavailable. Please install Cupy following the guide at: https://docs.cupy.dev/en/stable/install.html.F)rayget_gpu_ids_LOG_NCCL_WARNINGloggerwarning_NCCL_AVAILABLE r   r   [/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/util/collective/collective.pynccl_available*   s   r   c                   C      t S N_TORCH_DISTRIBUTED_AVAILABLEr   r   r   r   gloo_available6   s   r   c                   C   r   r   r   r   r   r   r   torch_distributed_available<   s   r   returnc                  C   s*   t j } tt| rtjntj}| |fS )z4Returns the IP address and a free port on this node.)r   utilget_node_ip_addressr   r   socketAF_INET6AF_INET)addrportr   r   r   get_address_and_port@   s   
r"   c                   @   s8   e Zd ZdZdd Zdd Zdd Zdd	 Zd
d ZdS )GroupManagera  Use this class to manage the collective groups we created so far.

    Each process will have an instance of `GroupManager`. Each process
    could belong to multiple collective groups. The membership information
    and other metadata are stored in the global `_group_mgr` object.
    c                 C   s
   i | _ d S r   _name_group_map)selfr   r   r   __init__O      
zGroupManager.__init__c                 C   s  t |}|t jjkr_t|}|dkr$t \}}t|| d|  n+t |r-|d nd }		 t|}
|
dur;nt |	krIt	d| dt
d	 q1td
| t||||}n |t jjkrxt| td| t|||}ntd| || j|< | j| S )zThe entry to create new collective groups in the manager.

        Put the registration and the group information into the manager
        metadata as well.
        r   :g     @@g      >@TNz:Timed out waiting for GLOO rendezvous metadata for group 'z'.g?z.Creating torch.distributed GLOO group: '{}'...zCreating NCCL group: '{}'...zUnexpected backend: )r   BackendGLOO_get_master_addr_keyr"   _internal_kv_internal_kv_puttime_internal_kv_getTimeoutErrorsleepr   debugformatr
   NCCL_check_backend_availabilityr	   RuntimeErrorr%   )r&   backend
world_sizerank
group_namegloo_timeoutmetadata_keyr    r!   
deadline_smetagr   r   r   create_collective_groupR   s<   







z$GroupManager.create_collective_groupc                 C   s
   || j v S r   r$   r&   r;   r   r   r   is_group_exist~   r(   zGroupManager.is_group_existc                 C   s(   |  |std| dS | j| S )z,Get the collective group handle by its name.z"The group '{}' is not initialized.N)rC   r   r   r4   r%   rB   r   r   r   get_group_by_name   s   

zGroupManager.get_group_by_namec                 C   sp   |  |std| dS | j| }| j|= |  d| }zt|}t| W dS  t	y7   Y dS w )zGroup destructor.zThe group '{}' does not exist.Ninfo_)
rC   r   r   r4   r%   destroy_groupr   	get_actorkill
ValueError)r&   r;   r@   namestorer   r   r   destroy_collective_group   s   


z%GroupManager.destroy_collective_groupN)	__name__
__module____qualname____doc__r'   rA   rC   rD   rL   r   r   r   r   r#   G   s    ,r#   c                 C   s2   t  t| W  d   S 1 sw   Y  dS )zDCheck if the group is initialized in this process by the group name.N)_group_mgr_lock
_group_mgrrC   r;   r   r   r   is_group_initialized   s   $rT   default0u  r9   r:   r;   r<   c                 C   s   t   t|}t| |std|t- t|r!t	d| dks'J |dks-J || k s3J t
|| ||| W d   dS 1 sGw   Y  dS )a=  Initialize a collective group inside an actor process.

    Args:
        world_size: the total number of processes in the group.
        rank: the rank of the current process.
        backend: the CCL backend to use, NCCL or GLOO.
        group_name: the name of the collective group.

    Returns:
        None
    z%group_name '{}' needs to be a string.#Trying to initialize a group twice.r   N)_check_inside_actorr   r*   r6   rI   r4   rQ   rR   rC   r7   rA   )r9   r:   r8   r;   r<   r   r   r   init_collective_group   s   


"rY   ranksc           
   	   C   s6  t |}t| d| }z	t| td ty   Y nw t|t| kr4tdt|t| t	|t	t
t|krRtdt|ddd |D |dkr]td	|t|dksgtd
t||k sqtdddlm} d| }dd | D }|j|dd }	t|	j|||||g dS )a  Declare a list of actors as a collective group.

    Note: This function should be called in a driver process.

    Args:
        actors: a list of actors to be set in a collective group.
        world_size: the total number of processes in the group.
        ranks (List[int]): the rank of each actor.
        backend: the CCL backend to use, NCCL or GLOO.
        group_name: the name of the collective group.

    Returns:
        None
    rE   rW   zHEach actor should correspond to one rank. Got '{}' ranks but '{}' actorsz5Ranks must be a permutation from 0 to '{}'. Got '{}'. c                 S   s   g | ]}t |qS r   )str).0rr   r   r   
<listcomp>   s    z+create_collective_group.<locals>.<listcomp>r   z/World size must be greater than zero. Got '{}'.zRanks must be non-negative.z(Ranks cannot be greater than world_size.)Infoc                 S   s   g | ]}|j qS r   )_ray_actor_id)r]   ar   r   r   r_     s    detached)rJ   lifetimeN)r   r*   r6   r   rG   r7   rI   lenr4   setrangejoinallray.util.collective.utilr`   optionsremotegetset_info)
actorsr9   rZ   r8   r;   r<   rJ   r`   	actors_idinfor   r   r   rA      sB   

 rA   c                 C   s:   t   t t|  W d   dS 1 sw   Y  dS )z0Destroy a collective group given its group name.N)rX   rQ   rR   rL   rS   r   r   r   rL     s   "rL   c                 C   Z   t   t t| s	 W d   dS t| }|jW  d   S 1 s&w   Y  dS )a  Return the rank of this process in the given group.

    Args:
        group_name: the name of the group to query

    Returns:
        the rank of this process in the named group,
        -1 if the group does not exist or the process does
        not belong to the group.
    N)rX   rQ   rR   rC   rD   r:   r;   r@   r   r   r   get_rank  s   

$ru   c                 C   rr   )a  Return the size of the collective group with the given name.

    Args:
        group_name: the name of the group to query

    Returns:
        The world size of the collective group, -1 if the group does
            not exist or the process does not belong to the group.
    Nrs   )rX   rQ   rR   rC   rD   r9   rt   r   r   r   get_collective_group_size4  s   


$rv   c                 C   s.   t |  t|}tj}||_|| g| dS )a   Collective allreduce the tensor across the group.

    Args:
        tensor: the tensor to be all-reduced on this process.
        group_name: the collective group name to perform allreduce.
        op: The reduce operation.

    Returns:
        None
    N)_check_single_tensor_inputget_group_handler   AllReduceOptionsreduceOp	allreduce)tensorr;   opr@   optsr   r   r   r{   H  s
   r{   tensor_listc                 C   s<   t  stdt|  t|}t j}||_|| | dS )a  Collective allreduce a list of tensors across the group.

    Args:
        tensor_list (List[tensor]): list of tensors to be allreduced,
            each on a GPU.
        group_name: the collective group name to perform allreduce.

    Returns:
        None
    &Multigpu calls requires NCCL and Cupy.N)r   cupy_availabler7   _check_tensor_list_inputrx   ry   rz   r{   )r   r;   r}   r@   r~   r   r   r   allreduce_multigpuZ  s   r   c                 C   s   t | }|  dS )zBarrier all processes in the collective group.

    Args:
        group_name: the name of the group to barrier.

    Returns:
        None
    N)rx   barrierrt   r   r   r   r   p  s   	r   dst_rankc                 C   sF   t |  t|}t|| t }||_||_d|_|| g| dS )a:  Reduce the tensor across the group to the destination rank.

    Args:
        tensor: the tensor to be reduced on this process.
        dst_rank: the rank of the destination process.
        group_name: the collective group name to perform reduce.
        op: The reduce operation.

    Returns:
        None
    r   N)	rw   rx   _check_rank_validr   ReduceOptionsrz   	root_rankroot_tensorreduce)r|   r   r;   r}   r@   r~   r   r   r   r   }  s   
r   
dst_tensorc                 C   sb   t  stdt|  t|}t|| tt| | t  }||_	||_
||_|| | dS )a  Reduce the tensor across the group to the destination rank
    and destination tensor.

    Args:
        tensor_list: the list of tensors to be reduced on this process;
            each tensor located on a GPU.
        dst_rank: the rank of the destination process.
        dst_tensor: the index of GPU at the destination.
        group_name: the collective group name to perform reduce.
        op: The reduce operation.

    Returns:
        None
    r   N)r   r   r7   r   rx   r   _check_root_tensor_validre   r   rz   r   r   r   )r   r   r   r;   r}   r@   r~   r   r   r   reduce_multigpu  s   
r   src_rankc                 C   s@   t |  t|}t|| t }||_d|_|| g| dS )a(  Broadcast the tensor from a source process to all others.

    Args:
        tensor: the tensor to be broadcasted (src) or received (destination).
        src_rank: the rank of the source process.
        group_name: the collective group name to perform broadcast.

    Returns:
        None
    r   N)rw   rx   r   r   BroadcastOptionsr   r   	broadcastr|   r   r;   r@   r~   r   r   r   r     s   
r   
src_tensorc                 C   s\   t  stdt|  t|}t|| tt| | t  }||_	||_
|| | dS )ag  Broadcast the tensor from a source GPU to all other GPUs.

    Args:
        tensor_list: the tensors to broadcast (src) or receive (dst).
        src_rank: the rank of the source process.
        src_tensor: the index of the source GPU on the source process.
        group_name: the collective group name to perform broadcast.

    Returns:
        None
    r   N)r   r   r7   r   rx   r   r   re   r   r   r   r   )r   r   r   r;   r@   r~   r   r   r   broadcast_multigpu  s   
r   c                 C   sL   t | t|  t|}t| |jkrtdt }|| g|g| dS )a   Allgather tensors from each process of the group into a list.

    Args:
        tensor_list: the results, stored as a list of tensors.
        tensor: the tensor (to be gathered) in the current process
        group_name: the name of the collective group.

    Returns:
        None
    zPThe length of the tensor list operands to allgather must be equal to world_size.N)	rw   r   rx   re   r9   r7   r   AllGatherOptions	allgather)r   r|   r;   r@   r~   r   r   r   r     s   r   output_tensor_listsinput_tensor_listc                 C   sB   t  stdt|  t| t|}t  }|| || dS )a  Allgather tensors from each gpus of the group into lists.

    Args:
        output_tensor_lists (List[List[tensor]]): gathered results, with shape
            must be num_gpus * world_size * shape(tensor).
        input_tensor_list: (List[tensor]): a list of tensors, with shape
            num_gpus * shape(tensor).
        group_name: the name of the collective group.

    Returns:
        None
    r   N)r   r   r7   _check_tensor_lists_inputr   rx   r   r   )r   r   r;   r@   r~   r   r   r   allgather_multigpu  s   r   c                 C   sR   t |  t| t|}t }||_t||jkrtd|	| g|g| dS )a  Reducescatter a list of tensors across the group.

    Reduce the list of the tensors across each process in the group, then
    scatter the reduced list of tensors -- one tensor for each process.

    Args:
        tensor: the resulted tensor on this process.
        tensor_list: The list of tensors to be reduced and scattered.
        group_name: the name of the collective group.
        op: The reduce operation.

    Returns:
        None
    zXThe length of the tensor list operands to reducescatter must not be equal to world_size.N)
rw   r   rx   r   ReduceScatterOptionsrz   re   r9   r7   reducescatter)r|   r   r;   r}   r@   r~   r   r   r   r     s   r   c                 C   sH   t  stdt| t|  t|}t  }||_|| || dS )a  Reducescatter a list of tensors across all GPUs.

    Args:
        output_tensor_list: the resulted list of tensors, with
            shape: num_gpus * shape(tensor).
        input_tensor_lists: the original tensors, with shape:
            num_gpus * world_size * shape(tensor).
        group_name: the name of the collective group.
        op: The reduce operation.

    Returns:
        None.
    r   N)	r   r   r7   r   r   rx   r   rz   r   )output_tensor_listinput_tensor_listsr;   r}   r@   r~   r   r   r   reducescatter_multigpu<  s   r   c                 C   R   t |  t|}t|| ||jkrtd|t }||_|	| g| dS )zSend a tensor to a remote process synchronously.

    Args:
        tensor: the tensor to send.
        dst_rank: the rank of the destination process.
        group_name: the name of the collective group.

    Returns:
        None
    "The destination rank '{}' is self.N)
rw   rx   r   r:   r7   r4   r   SendOptionsr   send)r|   r   r;   r@   r~   r   r   r   r   Y     

r   dst_gpu_index
n_elementsc                 C      t  stdt|  t|}t|| ||jkr!td||dk r,td|t  }||_	||_
||_|| g| dS )a  Send a tensor to a remote GPU synchronously.

    The function assumes each process owns >1 GPUs, and the sender
    process and receiver process has equal number of GPUs.

    Args:
        tensor: the tensor to send, located on a GPU.
        dst_rank: the rank of the destination process.
        dst_gpu_index: the destination gpu index.
        group_name: the name of the collective group.
        n_elements: if specified, send the next n elements
            from the starting address of tensor.

    Returns:
        None
    z!send_multigpu call requires NCCL.GThe dst_rank '{}' is self. Considering doing GPU to GPU memcpy instead?r   z The n_elements '{}' should >= 0.N)r   r   r7   rw   rx   r   r:   r4   r   r   r   r   r   )r|   r   r   r;   r   r@   r~   r   r   r   send_multigpun  s"   

r   c                 C   r   )zReceive a tensor from a remote process synchronously.

    Args:
        tensor: the received tensor.
        src_rank: the rank of the source process.
        group_name: the name of the collective group.

    Returns:
        None
    r   N)
rw   rx   r   r:   r7   r4   r   RecvOptionsr   recvr   r   r   r   r     r   r   src_gpu_indexc                 C   r   )a  Receive a tensor from a remote GPU synchronously.

    The function asssume each process owns >1 GPUs, and the sender
    process and receiver process has equal nubmer of GPUs.

    Args:
        tensor: The received tensor, located on a GPU.
        src_rank: The rank of the source process.
        src_gpu_index: The index of the source GPU on the src process.
        group_name: The name of the collective group.

    Returns:
        None
    z!recv_multigpu call requires NCCL.r   r   z#The n_elements '{}' should be >= 0.N)r   r   r7   rw   rx   r   r:   r4   r   r   r   r   r   )r|   r   r   r;   r   r@   r~   r   r   r   recv_multigpu  s"   

r   gpu_idc                 C   s,   t  stdddl}|j|   dS )zSynchronize the current process to a give device.

    Args:
        gpu_id: the GPU device id to synchronize.

    Returns:
        None
    z(synchronize call requires CUDA and NCCL.r   N)r   r   r7   cupycudaDevicesynchronize)r   cpr   r   r   r     s   	r   c                 C   s@  t   t t| sz3d|  }tj|d}t|j \}}}}}tj	j
j}|j }	|||	 }
t|||
| | W nI ty } z=dtjv rutjd | kruttjd }ttjd }tjd }tdd}t|||| | ntd	| |W Y d
}~nd
}~ww t| }|W  d
   S 1 sw   Y  d
S )zCheck if the group is initialized and return the group handle.

    Args:
        group_name: the name of the collective group.

    Returns:
        The collective group handle.
    rE   )rJ   collective_group_namecollective_rankcollective_world_sizecollective_backendcollective_gloo_timeoutrV   z<The collective group '{}' is not initialized in the process.N)rX   rQ   rR   rC   r   rG   rm   get_inforl   _privateworkerglobal_workercore_workerget_actor_idindexrA   rI   osenvironintgetenvr7   r4   rD   )r;   rJ   mgridsr9   r:   r8   r<   r   id_r^   excr@   r   r   r   rx     sJ   	







$rx   c                 C   sV   t | tjrdS t rt | tjjrdS t r"t | tjjr"dS t	d
t| )z-Check if the tensor is with a supported type.Nz[Unrecognized tensor type '{}'. Supported types are: np.ndarray, torch.Tensor, cupy.ndarray.)
isinstancenpndarrayr   r   r   torch_availablethTensorr7   r4   type)r|   r   r   r   rw     s   
rw   r8   c                 C   s@   | t jjkrt stddS | t jjkrt stddS dS )z'Check whether the backend is available.z#torch.distributed is not available.zNCCL is not available.N)r   r*   r+   r   r7   r5   r   )r8   r   r   r   r6   (  s   r6   c                  C   s"   t jjj} | jt jkrdS td)z1Check if currently it is inside a Ray actor/task.NzBThe collective APIs shall be only used inside a Ray actor or task.)r   r   r   r   modeWORKER_MODEr7   )r   r   r   r   rX   3  s   
rX   c                 C   s6   |dk rt d||| jkrt d|| jdS )z'Check the rank: 0 <= rank < world_size.r   zrank '{}' is negative.z+rank '{}' must be less than world size '{}'N)rI   r4   r9   )r@   r:   r   r   r   r   >  s   
r   c                 C   s>   t | tstdt| | std| D ]}t| qdS )z7Check if the input is a list of supported tensor types.z.The input must be a list of tensors. Got '{}'.zGot an empty list of tensors.N)r   listr7   r4   r   rw   )r   tr   r   r   r   H  s   


r   c                 C   sD   t | tstdt| | std|  | D ]}t| qdS )z@Check if the input is a list of lists of supported tensor types.z7The input must be a list of lists of tensors. Got '{}'.zDid not receive tensors. Got: N)r   r   r7   r4   r   r   )tensor_listsr   r   r   r   r   U  s   


r   c                 C   s2   |dk rt d||| krt d|| dS )z9Check the root_tensor device is 0 <= root_tensor < lengthr   zroot_tensor '{}' is negative.z9root_tensor '{}' is greater than the number of GPUs: '{}'N)rI   r4   )lengthr   r   r   r   r   b  s   r   )rU   )r   rU   )r   r   rU   )rU   r   )PrP   loggingr   r   	threadingr/   typingr   r   numpyr   r   ray.experimental.internal_kvexperimentalinternal_kvr-   r[   r   ray._common.network_utilsr   r   @ray.util.collective.collective_group.torch_gloo_collective_groupr   r,   	getLoggerrM   r   :ray.util.collective.collective_group.nccl_collective_groupr	   r   r   ImportErrorr
   r   r   r   r   r\   r   r"   objectr#   rR   LockrQ   rT   r*   r5   rY   rA   rL   ru   rv   ReduceOpSUMr{   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   rx   rw   r6   rX   r   r   r   r   r   r   r   r   <module>   s0   
W
,
B	



$


!

*
(3
