o
    `۷i                     @   s   d dl Z d dlmZ d dlmZmZ d dlmZ d dlZd dl	Z	d dl
Z	d dlm  mZ d dlmZ d dlmZ d dlmZ e	jd dG d	d
 d
ZG dd dZG dd dejZdd ZG dd dejjZdS )    N)defaultdict)OptionalTuple)mock)
nccl_group)TorchTensorAllocator)Device)num_cpusc                   @   s*   e Zd ZdZd	ddZd
defddZdS )Barrierz
    Barrier that blocks the given number of actors until all actors have
    reached the barrier. This is used to mock out blocking NCCL ops.
       c                 C   s@   || _ t | _i | _tt| _tj	dtj
tjd}|  d S )NAray.experimental.channel.torch_tensor_type.TorchTensorType.devicenew_callablereturn_value)
num_actorsasyncio	Condition	conditiondatar   intnum_actors_seenr   patchPropertyMockr   CPUstart)selfr   device_property_patcher r   W/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/experimental/channel/conftest.py__init__   s   

zBarrier.__init__Nidxc              	      s   j 4 I dH P |dur jvsJ jjf|j < j   d7  < j  jkr6j   nj  fddI dH  |du rMj  }W d  I dH  |S 1 I dH s^w   Y  |S )z
        Wait at barrier until all actors have sent `idx`. One actor should
        provide `data`, and this value will be returned by this method for all
        other actors.
        N   c                      s   j   jkS N)r   r   r   r    r   r   r   <lambda>9   s    zBarrier.wait.<locals>.<lambda>)r   r   r   r   
notify_allwait_for)r   r    r   r   r#   r   wait(   s"   

zBarrier.wait)r   r"   )__name__
__module____qualname____doc__r   r   r'   r   r   r   r   r
      s    
r
   c                   @   s   e Zd Zdd Zdd ZdS )MockCudaStreamc                 C   s
   d| _ d S )Nr   )cuda_streamr   r   r   r   r   C   s   
zMockCudaStream.__init__c                 C   s   d S r"   r   r.   r   r   r   synchronizeF   s   zMockCudaStream.synchronizeN)r(   r)   r*   r   r/   r   r   r   r   r,   B   s    r,   c                
       sf   e Zd ZdZ fddZdejdefddZ	dd	e	e d
ej
dedee fddZdddZ  ZS )MockNcclGroupzl
    Mock the internal _NcclGroup to use a barrier actor instead of a NCCL group
    for communication.
    c                    s(   t  j|i | tt| _t | _d S r"   )superr   r   r   num_opssetbarriersr   argskwargs	__class__r   r   r   P   s   
zMockNcclGroup.__init__tensor	peer_rankc                 C   sp   t |  |g}d|d  d|d  }tj|d}| j| t|j| j	| | | j	|  d7  < d S )Nbarrier-r   -r!   name
sortedget_self_rankray	get_actorr4   addgetr'   remoter2   )r   r:   r;   barrier_keybarrierr   r   r   sendW   s   zMockNcclGroup.sendNshapedtype	allocatorc           	      C   s   t |  |g}d|d  d|d  }tj|d}| j| t|j| j	| }|d us4J d|||}|d d  |d d < | j	|  d7  < |S )Nr<   r   r=   r!   r>   z4torch tensor allocator is required for MockNcclGroupr@   )	r   rK   rL   r;   rM   rH   rI   received_tensorbufr   r   r   recv`   s   

zMockNcclGroup.recvreturnc                 C   s   | j D ]}t| qd S r"   )r4   rC   kill)r   rI   r   r   r   destroyu   s   
zMockNcclGroup.destroyr"   )rQ   N)r(   r)   r*   r+   r   torchTensorr   rJ   r   rL   r   r   rP   rS   __classcell__r   r   r8   r   r0   J   s    
r0   c                  C   s   t  } d| jj_t jd| t  t  d}|  tt	j
jj_t jddd d}|  t jdd	d d}|  t d
td}|  t dd}|  t ddd }|  t jdt jtjd}|  tj }|td dS )z*
    Patch methods that require CUDA.
    r   zsys.modules)z	cupy.cudacupyz$ray.util.collective.collective_groupztorch.cuda.current_streamc                   S      t S r"   r,   r   r   r   r   r$          z!start_nccl_mock.<locals>.<lambda>)r   ztorch.cuda.Streamc                   S   rX   r"   rY   r   r   r   r   r$      rZ   ztorch.Tensor.devicecudaztorch.Tensor.is_cudaTzQray.experimental.channel.torch_tensor_accelerator_channel._torch_tensor_allocatorc                 S   s   t j| |dS )N)rL   )rT   empty)rK   rL   r   r   r   r$      s    r   r   N)r   	MagicMockncclget_unique_idr   r   dictr   r0   rC   experimentalchannelr   
_NcclGrouprT   devicer   r   r   ray_channelChannelContextget_currentset_torch_device)	nccl_mock
cp_patcherstream_patchernew_stream_patchertensor_patchertensor_allocator_patcherr   ctxr   r   r   start_nccl_mockz   sH   

rp   c                       s,   e Zd ZdZ fddZ fddZ  ZS )TracedChannelzA
    Patched Channel that records all write ops for testing.
    c                    s   t  j|i | g | _d S r"   )r1   r   opsr5   r8   r   r   r      s   
zTracedChannel.__init__c                    s"   | j ||f t j|i |S r"   )rr   appendr1   writer5   r8   r   r   rt      s   zTracedChannel.write)r(   r)   r*   r+   r   rt   rV   r   r   r8   r   rq      s    rq   )r   collectionsr   typingr   r   unittestr   rT   rC   ray.dagray.experimental.channelra   rb   re   r   %ray.experimental.channel.communicatorr   ray.experimental.util.typesr   rG   r
   r,   rc   r0   rp   shared_memory_channelChannelrq   r   r   r   r   <module>   s"    
103