o
    $i                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZ d dl	Z	d dl
mZmZmZ er,d dlZe	jd dG dd dZG dd	 d	eZdS )
    N)defaultdict)TYPE_CHECKINGDictListOptionalTuple)CommunicatorReduceOpTorchTensorAllocator)num_cpusc                   @   sN   e Zd ZdZdefddZdedddefd	d
Zdeded ddfddZ	dS )CPUCommBarrierz
    Barrier actor that blocks the given number of actors until all actors have
    reached the Barrier.

    p2p operations are not done here (completed via shared memory channel).
    
num_actorsc                 C   s8   || _ t | _tt| _i | _tt| _	tt| _
d S N)r   asyncio	Condition	conditionr   listcollective_datacollective_data_shapeintnum_actors_seennum_actors_read)selfr    r   f/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/experimental/channel/cpu_communicator.py__init__   s   


zCPUCommBarrier.__init__op_iddatatorch.Tensoropc              	      s   j 4 I dH j j  | j   d7  < j  jkr6|j  }|j < j   nj  fddI dH  j  }j   d7  < j  jkrfj = j = j = |W  d  I dH  S 1 I dH sxw   Y  dS )z
        Wait at the communicator until all actors have sent `op_id` and `data`.
        Once data from all actors is received, execute the collective `op`
        on the communicator actor and return the result.
        N   c                      s   j   jkS r   )r   r   r   r   r   r   r   <lambda>8   s    z0CPUCommBarrier.wait_collective.<locals>.<lambda>)	r   r   appendr   r   	_apply_op
notify_allwait_forr   )r   r   r   r   r   r!   r   wait_collective'   s&   


0zCPUCommBarrier.wait_collectivetensorsreturnc                 C   s   |d   }|tjkr|dd D ]}||7 }q|S |tjkr.|dd D ]}||9 }q%|S |tjkrD|dd D ]}t||}q9|S |tjkrZ|dd D ]}t||}qO|S |tj	krit
|t| }|S td| d)zAApply the specified reduction operation across a list of tensors.r   r    Nz
Operation z not supported)cloner	   SUMPRODUCTMAXtorchmaxMINminAVGsumlen
ValueError)r   r   r(   resulttensorr   r   r   r$   E   s*   






zCPUCommBarrier._apply_opN)
__name__
__module____qualname____doc__r   r   r	   r'   r   r$   r   r   r   r   r      s
    r   c                
   @   s<  e Zd ZdZdeded fddZddd	efd
dZ	d3dee ddd	ede	e
 fddZ				d4ddZejfdddddefddZejfdddddefddZd5ddZdeddfd d!Zded fd"d#Zd$ejjdefd%d&Zde	e fd'd(Zdefd)d*Zdefd+d,Zd-d. Zd/d0 Zedefd1d2ZdS )6CPUCommunicatorzX
    Uses a CPU-based communicator actor instead of an accelerator group like NCCL.
    
world_sizeactor_handleszray.actor.ActorHandlec                 C   s(   || _ || _tt| _t | _d| _dS )z]We use the op index to synchronize the sender and receiver at the
        communicator actor.N)_world_size_actor_handlesr   r   num_opssetbarriers_rank)r   r=   r>   r   r   r   r   a   s
   

zCPUCommunicator.__init__r7   r   	peer_rankc                 C      d S r   r   )r   r7   rE   r   r   r   sendm   s   zCPUCommunicator.sendNshapedtypeztorch.dtype	allocatorc                 C   rF   r   r   )r   rH   rI   rE   rJ   r   r   r   recvr   s   zCPUCommunicator.recvsend_bufrecv_bufc                 C      t r   NotImplementedError)r   rL   rM   r   r   r   	allgather|   s   zCPUCommunicator.allgatherr   c                    s    fdd   D }ddttt| }tj|dd j} j	
| t|j j| ||}|d us>J d|d d  |d d <  j|  d7  < d S )	Nc                    s   g | ]}  |qS r   )get_rank).0actor_handler   r   r   
<listcomp>   s    
z-CPUCommunicator.allreduce.<locals>.<listcomp>zbarrier-collective--T)nameget_if_existsz-Receiving buffer required for CPUCommunicatorr    )get_actor_handlesjoinmapstrsortedr   optionsremoter?   rC   addraygetr'   rA   )r   rL   rM   r   	all_ranksbarrier_keybarrierr6   r   rU   r   	allreduce   s   
zCPUCommunicator.allreducec                 C   rN   r   rO   )r   rL   rM   r   r   r   r   reducescatter   s   zCPUCommunicator.reducescatterr)   c                 C   s   | j D ]}t| qd S r   )rC   rb   kill)r   rf   r   r   r   destroy   s   
zCPUCommunicator.destroyrankc                 C   s
   || _ d S r   rD   )r   rk   r   r   r   
initialize   s   
zCPUCommunicator.initializec                 C      | j S r   )r@   rU   r   r   r   rZ         z!CPUCommunicator.get_actor_handlesactorc                 C   s:   dd | j D }z	||j}W |S  ty   tdw )z
        Return the given actor's rank in the CPU communicator.

        Args:
            actor: The actor handle to look up.
        c                 S   s   g | ]}|j qS r   )_ray_actor_id)rS   ar   r   r   rV      s    z,CPUCommunicator.get_rank.<locals>.<listcomp>z*Actor is not in the CPUCommunicator group.)r@   indexrq   r5   )r   rp   	actor_idsrk   r   r   r   rR      s   zCPUCommunicator.get_rankc                 C   rn   r   rl   rU   r   r   r   get_self_rank   ro   zCPUCommunicator.get_self_rankc                 C   rn   )zE
        Return the number of ranks in the CPU communicator.
        )r?   rU   r   r   r   get_world_size   s   zCPUCommunicator.get_world_sizec                 C   s   dS )Ncpur   rU   r   r   r   get_transport_name      z"CPUCommunicator.get_transport_namec                 C   rN   r   rO   rU   r   r   r   recv_stream   ry   zCPUCommunicator.recv_streamc                 C   rN   r   rO   rU   r   r   r   send_stream   ry   zCPUCommunicator.send_streamc                 C   s   dd l }t| S )Nr   )uuidr]   uuid4)clsr|   r   r   r   generate_communicator_id   s   z(CPUCommunicator.generate_communicator_idr   )rL   r   rM   r   )r)   N) r8   r9   r:   r;   r   r   r   rG   r   r   r
   rK   rQ   r	   r+   rg   rh   rj   rm   rZ   rb   rp   ActorHandlerR   ru   rv   r]   rx   rz   r{   classmethodr   r   r   r   r   r<   \   s\    






r<   )r   collectionsr   typingr   r   r   r   r   rb   %ray.experimental.channel.communicatorr   r	   r
   r.   r`   r   r<   r   r   r   r   <module>   s    
K