o
    ci                     @   sj   d dl Z d dlZd dlZd dlmZ d dlmZ dZdZe 	e
ZG dd dZe add	 Zd
d ZdS )    N)	nccl_util)ENV       c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	
StreamPoola  The class that represents a stream pool associated with a GPU.

    When multistream is enabled, we will allocate a pool of streams for each
    GPU, and get available stream from this pool when a collective kernel is
    initialized. This enables overlapping computation/communication kernels
    using multiple CUDA streams, given that the streams a appropriately
    synchronized. The class is thread-safe.


    Args:
        device_idx: the absolute index of the device for this pool.
    c                 C   s6   || _ d| _t | _d gt | _d| _t | _d S )NFr   )	
device_idx_initialized	threadingLock_initialized_lockNCCL_STREAM_POOL_SIZE_pool_counter
_pool_lock)selfr    r   d/home/ubuntu/.local/lib/python3.10/site-packages/ray/util/collective/collective_group/cuda_stream.py__init__   s   
zStreamPool.__init__c                 C   sV   | j   | js|   | j   | j  | j| j }| jd t | _| j  |S )zGet an available stream from the pool.

        The function locks the stream pool and releases the lock before
        returning.

        Returns:
            stream (cupy.cuda.Stream): the returned stream from pool.
           )	r   acquirer   
_init_oncereleaser   r   r   r   )r   streamr   r   r   
get_stream&   s   



zStreamPool.get_streamc                 C   s   t | j1 ttD ]$}tjjr"t	d t
jjddd| j|< qt	d t
jjj| j|< qW d   n1 s:w   Y  d| _dS )z)Initialize the stream pool only for once.zNCCL multistream enabled.F)nullnon_blockingzNCCL multistream disabled.NT)r   Devicer   ranger   r   NCCL_USE_MULTISTREAMvalloggerdebugcupycudaStreamr   r   
_init_flag)r   ir   r   r   r   =   s   


	zStreamPool._init_onceN)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r      s
    
r   c                  C   s   t tD ]} t| t| < qd S )N)r   MAX_GPU_PER_ACTORr   _device_stream_pool_map)r&   r   r   r   _init_stream_poolP   s   r-   c                 C   s*   t  }|  tst  |  t|  S )z)Get the CUDA stream pool of a GPU device.)r	   r
   r   r,   r-   r   )r   lockr   r   r   get_stream_poolV   s   r/   )loggingr	   r"   $ray.util.collective.collective_groupr   ray.util.collective.constr   r   r+   	getLoggerr'   r    r   dictr,   r-   r/   r   r   r   r   <module>   s    
?