o
    `۷i                  
   @   s
  d dl Z d dlmZmZmZmZmZmZmZ d dl	Z	d dl
Z
d dlmZ d dlmZmZmZ G dd deZG dd dZe
jG d	d
 d
Zdededee
jj dee ddf
ddZdeddfddZdddeeed ee f  ddfddZdddefddZdS )    N)Dict	FrozenSetListOptionalSetTupleType)ChannelContext)CommunicatorReduceOpTorchTensorAllocatorc                   @   sX  e Zd ZdZdeejj fddZde	ddfdd	Z
d
ejjde	fddZde	fddZdee	 fddZded fddZddde	ddfddZ	d2dee	 ddde	dee ddf
ddZ			 			d3d!d"Zejfddd dd#eddfd$d%Zejfddd dd#eddfd&d'Zed(d) Zed*d+ Zd4d,d-Zdefd.d/Zedefd0d1Z dS )5AbstractNcclGroupz)
    A dummy NCCL group for testing.
    actor_handlesc                 C   s   || _ d | _d S N)_actor_handles_rank)selfr    r   Z/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/experimental/collective/conftest.py__init__   s   
zAbstractNcclGroup.__init__rankreturnNc                 C   s
   || _ d S r   r   )r   r   r   r   r   
initialize      
zAbstractNcclGroup.initializeactorc                 C   s   | j |S r   )r   index)r   r   r   r   r   get_rank   s   zAbstractNcclGroup.get_rankc                 C   s
   t | jS r   )lenr   r   r   r   r   get_world_size   r   z AbstractNcclGroup.get_world_sizec                 C      | j S r   r   r   r   r   r   get_self_rank!      zAbstractNcclGroup.get_self_rankray.actor.ActorHandlec                 C   r!   r   )r   r   r   r   r   get_actor_handles$   r#   z#AbstractNcclGroup.get_actor_handlesvaluetorch.Tensor	peer_rankc                 C      t r   NotImplementedError)r   r&   r(   r   r   r   send'      zAbstractNcclGroup.sendshapedtypeztorch.dtype	allocatorc                 C   r)   r   r*   )r   r.   r/   r(   r0   r   r   r   recv*   s   zAbstractNcclGroup.recvsend_bufrecv_bufc                 C   r)   r   r*   )r   r2   r3   r   r   r   	allgather3   s   zAbstractNcclGroup.allgatheropc                 C   r)   r   r*   r   r2   r3   r5   r   r   r   	allreduce:      zAbstractNcclGroup.allreducec                 C   r)   r   r*   r6   r   r   r   reducescatterB   r8   zAbstractNcclGroup.reducescatterc                 C      d S r   r   r   r   r   r   recv_streamJ      zAbstractNcclGroup.recv_streamc                 C   r:   r   r   r   r   r   r   send_streamN   r<   zAbstractNcclGroup.send_streamc                 C   r:   r   r   r   r   r   r   destroyR   r-   zAbstractNcclGroup.destroyc                 C   s   dS )Nacceleratorr   r   r   r   r   get_transport_nameU   r-   z$AbstractNcclGroup.get_transport_namec                 C   r:   r   r   )clsr   r   r   generate_communicator_idX   r<   z*AbstractNcclGroup.generate_communicator_idr   )r2   r'   r3   r'   r   N)r   N)!__name__
__module____qualname____doc__r   rayr   ActorHandler   intr   r   r    r   r"   r%   r,   r   r   r1   r4   r   SUMr7   r9   propertyr;   r=   r>   strr@   classmethodrB   r   r   r   r   r      sp    
	





r   c                   @   s|   e Zd Zdd Z				dded dee ded	ee d
ee	e  defddZ
deddfddZdee ddfddZdS )MockNcclGroupSetc                 C   s
   i | _ d S r   )ids_to_actors_and_custom_commsr   r   r   r   r   ^   s   zMockNcclGroupSet.__init__NFactorsr$   custom_nccl_groupuse_communication_streamsaccelerator_module_nameaccelerator_communicator_clsr   c           	         s   t t t f| j< d u rttt }n	fdd D } fddt| D }t	j
|dd t }d urI|j< S t |j< S )Nc                    s   g | ]}  |qS r   )r   .0r   )rQ   r   r   
<listcomp>v   s    z-MockNcclGroupSet.__call__.<locals>.<listcomp>c              	      s$   g | ]\}}|j t| qS r   )__ray_call__remotemock_do_init_nccl_group)rV   r   r   rP   rQ   group_idr   r   rW   w   s       timeout)rL   uuiduuid4	frozensetrO   listranger   ziprG   getr	   get_currentcommunicatorsr   )	r   rP   rQ   rR   rS   rT   ranks
init_tasksctxr   r[   r   __call__e   s"   


zMockNcclGroupSet.__call__r\   c                    sp   t  } |jvrd S | j  \}} fdd|D }tj|dd  | jv r+| j = |j    |j = d S )Nc                    s   g | ]	}|j t qS r   )rX   rY   mock_do_destroy_nccl_grouprU   r\   r   r   rW      s    z<MockNcclGroupSet.mock_destroy_nccl_group.<locals>.<listcomp>r]   r^   )r	   rg   rh   rO   rG   waitr>   )r   r\   rk   rP   _destroy_tasksr   rn   r   mock_destroy_nccl_group   s   


z(MockNcclGroupSet.mock_destroy_nccl_groupnccl_group_idsc                 C   s2   t  }|D ]}|| jvsJ ||jvsJ qd S r   )r	   rg   rO   rh   )r   rs   rk   nccl_group_idr   r   r   check_teardown   s
   zMockNcclGroupSet.check_teardown)NFNN)rC   rD   rE   r   r   r   r
   boolrL   r   rl   rr   ru   r   r   r   r   rN   ]   s*    


&rN   c                   @   sj   e Zd Zdd Z	ddedeej dejfddZ	d	ejde
eef fd
dZde
ejdf fddZdS )CPUTorchTensorWorkerc                 C   s
   d| _ d S )Ncpu)devicer   r   r   r   r      r   zCPUTorchTensorWorker.__init__Nsizer/   r   c                 C   s   t j||| jdS )N)r/   ry   )torchonesry   )r   rz   r/   r   r   r   return_tensor   s   z"CPUTorchTensorWorker.return_tensortensorc                 C   s   |j | j ksJ |j|d fS )Nr   )ry   r.   )r   r~   r   r   r   r1      s   zCPUTorchTensorWorker.recv.c                 G   s   t |S r   )tuple)r   tensorsr   r   r   recv_tensors   s   z!CPUTorchTensorWorker.recv_tensorsr   )rC   rD   rE   r   rI   r   r{   r/   Tensorr}   r   r1   r   r   r   r   r   rw      s    
rw   r\   r   rP   rQ   r   c                 C   sH   t  }|d u rt|}|| ||j|< d S || ||j|< d S r   )r	   rg   r   r   rh   )r   r\   r   rP   rQ   rk   
nccl_groupr   r   r   rZ      s   

rZ   c                 C   s0   t  }||jvrd S |j|   |j|= d S r   )r	   rg   rh   r>   )r   r\   rk   r   r   r   rm      s
   
rm   dagzray.dag.DAGNodeactors_and_custom_commsr$   zray.dag.CompiledDAGc                 C   s8   t  }| d| | }t|j |ksJ ||fS )Nz,ray.dag.compiled_dag_node._init_communicator)rN   setattrexperimental_compilesetrO   values)monkeypatchr   r   mock_nccl_group_setcompiled_dagr   r   r   check_nccl_group_init   s   r   r   r   c                 C   s.   |  d|j |j }|  || d S )Nz/ray.dag.compiled_dag_node._destroy_communicator)r   rr   "_actors_to_created_communicator_idr   teardownru   )r   r   r   created_communicator_idsr   r   r   check_nccl_group_teardown   s   
r   )r`   typingr   r   r   r   r   r   r   r{   rG   ray.experimental.channel.commonr	   %ray.experimental.channel.communicatorr
   r   r   r   rN   rY   rw   rL   rI   r   rH   rZ   rm   r   r   r   r   r   r   <module>   sH    $NI


