o
    ci%                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZmZ d dlmZ d dlmZ d d	lmZ d d
lmZ e \ZZZe eZeG dd deZG dd dejZdS )    N)_Timer)LearnerThread)MinibatchBuffer)SampleBatch)OldAPIStackoverride)deprecation_warning)try_import_tf)LearnerInfoBuilder)RolloutWorkerc                       sn   e Zd ZdZ										dded	ed
ededededededef fddZee	dddZ
  ZS )MultiGPULearnerThreada   Learner that can use multiple GPUs and parallel loading.

    This class is used for async sampling algorithms.

    Example workflow: 2 GPUs and 3 multi-GPU tower stacks.
    -> On each GPU, there are 3 slots for batches, indexed 0, 1, and 2.

    Workers collect data from env and push it into inqueue:
    Workers -> (data) -> self.inqueue

    We also have two queues, indicating, which stacks are loaded and which
    are not.
    - idle_tower_stacks = [0, 1, 2]  <- all 3 stacks are free at first.
    - ready_tower_stacks = []  <- None of the 3 stacks is loaded with data.

    `ready_tower_stacks` is managed by `ready_tower_stacks_buffer` for
    possible minibatch-SGD iterations per loaded batch (this avoids a reload
    from CPU to GPU for each SGD iter).

    n _MultiGPULoaderThreads: self.inqueue -get()->
    policy.load_batch_into_buffer() -> ready_stacks = [0 ...]

    This thread: self.ready_tower_stacks_buffer -get()->
    policy.learn_on_loaded_batch() -> if SGD-iters done,
    put stack index back in idle_tower_stacks queue.
       N     ,  Flocal_workernum_gpustrain_batch_sizenum_multi_gpu_tower_stacksnum_sgd_iterlearner_queue_sizelearner_queue_timeoutnum_data_load_threads
_fake_gpusc                    s  |rt ddd t j|d|||d d| _|| _| jj| _tt| j	 j
| _
td| j
 | jt| j
 dks>J | jt| j
ksJJ dtt|| _t | _t | _| jD ]}| j| q^t|	D ]}t| |dkd	| _| j  qkt| j|||| _dS )
a  Initializes a MultiGPULearnerThread instance.

        Args:
            local_worker: Local RolloutWorker holding
                policies this thread will call `load_batch_into_buffer` and
                `learn_on_loaded_batch` on.
            num_gpus: Number of GPUs to use for data-parallel SGD.
            train_batch_size: Size of batches (minibatches if
                `num_sgd_iter` > 1) to learn on.
            num_multi_gpu_tower_stacks: Number of buffers to parallelly
                load data into on one device. Each buffer is of size of
                `train_batch_size` and hence increases GPU memory usage
                accordingly.
            num_sgd_iter: Number of passes to learn on per train batch
                (minibatch if `num_sgd_iter` > 1).
            learner_queue_size: Max size of queue of inbound
                train batches to this thread.
            num_data_load_threads: Number of threads to use to load
                data into GPU memory in parallel.
        z+MultiGPULearnerThread.minibatch_buffer_sizeT)olderrorr   )r   minibatch_buffer_sizer   r   r   Nz MultiGPULearnerThread devices {}zbatch too small)share_stats)r   super__init__minibatch_bufferr   r   
policy_mapnextitervaluesdevicesloggerinfoformatlenlistrangetower_stack_indicesqueueQueueidle_tower_stacksready_tower_stacksput_MultiGPULoaderThreadloader_threadstartr   ready_tower_stacks_buffer)selfr   r   lrr   r   r   r   r   r   r   r   idxi	__class__ `/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/execution/multi_gpu_learner_thread.pyr   1   sB   %




zMultiGPULearnerThread.__init__returnc                 C   s.  | j  s	td| j | j \}}W d    n1 sw   Y  d}| jJ tt| j	d}| j
 D ]/}| jjd urE| j|sEq6| j
| }|jd|d}|j||d | j| |||7 }q6| | _W d    n1 suw   Y  |r| j| | j||| jf | j| j  d S )Nz`The `_MultiGPULoaderThread` has died! Will therefore also terminate the `MultiGPULearnerThread`.r   )num_devices)offsetbuffer_index)	policy_id)r3   is_aliveRuntimeErrorload_wait_timerr5   get
grad_timerr
   r)   r%   r!   keysr   is_policy_to_trainlearn_on_loaded_batchadd_learn_on_batch_resultspolicy_ids_updatedappend"get_num_samples_loaded_into_bufferfinalizelearner_infor/   r1   outqueuer   pushinqueueqsize)r6   
buffer_idxreleasedrN   learner_info_builderpidpolicydefault_policy_resultsr<   r<   r=   step   sJ   


zMultiGPULearnerThread.step)
r   Nr   r   r   r   r   r   FNr>   N)__name__
__module____qualname____doc__r   intboolr   r   r   r[   __classcell__r<   r<   r:   r=   r      sB    	
[r   c                   @   s2   e Zd ZdedefddZdddZdd	d
ZdS )r2   multi_gpu_learner_threadr   c                 C   sD   t j|  || _d| _|r|j| _|j| _d S t | _t | _d S )NT)	threadingThreadr   rd   daemonqueue_timer
load_timerr   )r6   rd   r   r<   r<   r=   r      s   z_MultiGPULoaderThread.__init__r>   Nc                 C   s   	 |    q)N)_step)r6   r<   r<   r=   run   s   z_MultiGPULoaderThread.runc                 C   s   | j }|j}| j |j }W d    n1 sw   Y  |j }| j= | D ]0}|jj	d ur;|j	||s;q+|| }t
|trL|j||d q+||jv r[|j|j| |d q+W d    n1 sfw   Y  |j| d S )N)batchrA   )rd   r!   rh   rS   rF   r/   ri   rH   r   rI   
isinstancer   load_batch_into_bufferpolicy_batchesr0   r1   )r6   sr!   rl   rU   rX   rY   r<   r<   r=   rj      s6   


z_MultiGPULoaderThread._stepr\   )r]   r^   r_   r   rb   r   rk   rj   r<   r<   r<   r=   r2      s    

r2   )loggingr-   re   ray.util.timerr   "ray.rllib.execution.learner_threadr   $ray.rllib.execution.minibatch_bufferr   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.deprecationr   ray.rllib.utils.frameworkr	   $ray.rllib.utils.metrics.learner_infor
   #ray.rllib.evaluation.rollout_workerr   tf1tftfv	getLoggerr]   r&   r   rf   r2   r<   r<   r<   r=   <module>   s$    
 /