o
    ciq:                     @   sV  d dl mZ d dlZd dlZd dlZd dlmZmZmZ d dl	Z	d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZ d dlmZ d dlmZmZ d dl m!Z! d dl"m#Z# d dl$m%Z%m&Z& e \Z'Z(dZ)dZ*dZ+dZ,dZ-dZ.dZ/dZ0dZ1da2G dd deZ3e3Z4G dd dej5Z6G dd dej5Z7dS )     )dequeN)AnyDictUnion)CircularBuffer)&LEARNER_RESULTS_CURR_ENTROPY_COEFF_KEY)COMPONENT_RL_MODULE)Learner)TrainingDataValueFunctionAPI)override5OverrideToImplementCustomLogic_CallToSuperRecommended)try_import_torch)LambdaDefaultDict)ALL_MODULESNUM_ENV_STEPS_SAMPLED_LIFETIME)MetricsLogger)	Scheduler)ModuleID
ResultDictgpu_loader_queue_wait_timergpu_loader_load_to_gpu_timer"learner_thread_in_queue_wait_timer learner_thread_env_steps_droppedlearner_thread_update_timerray_get_episodes_timerqueue_size_gpu_loader_queuequeue_size_learner_thread_queuequeue_size_results_queuec                
       s   e Zd Zeed fddZeedddedeee	f d	e
defd
dZedeee	f ddf fddZeedef fddZeeedee fddZ  ZS )IMPALALearnerreturnNc                    s   t    t  j_d _t  _t	 fdd _
t  _t ds.t jjd _ jjdkrL fddt jjD  _ jD ]}|  qEttj j d _ j  d S )	Nr   c                    s   t  j| j j jdS )N)fixed_value_or_schedule	frameworkdevice)r   configget_config_for_moduleentropy_coeffr#   _device)	module_idself ^/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/algorithms/impala/impala_learner.py<lambda>?   s
    z%IMPALALearner.build.<locals>.<lambda>_learner_thread_in_queue)maxlenc                    s$   g | ]}t  j j j jd qS ))in_queue	out_queuer$   metrics_logger)_GPULoaderThread_gpu_loader_in_queuer/   r(   metrics).0_r*   r,   r-   
<listcomp>Y   s    z'IMPALALearner.build.<locals>.<listcomp>)update_methodr1   learner)superbuild	threadingRLockr6   _threading_lock_num_updatesLock_num_updates_lockr   #entropy_coeff_schedulers_per_modulequeueQueuer5   hasattrr   r%   learner_queue_sizer/   num_gpus_per_learnerrangenum_gpu_loader_threads_gpu_loader_threadsstart_LearnerThreadr	   update_learner_thread)r+   t	__class__r*   r-   r=   -   s.   







	
zIMPALALearner.buildF)return_statetraining_data	timestepsrT   c          
         sN  |pi a |   j|d}|dusJ  jjdkr- j|  jt	t
f j  n!t jtrE j|} jjt	tf|dd n	t j| j  j  j}W d   n1 s_w   Y  i }|dkr j d _W d   n1 s{w   Y   j }|r j fdd j D d	d
}	t|	t |	t< |	|d< |S )aF  

        Args:
            batch:
            timesteps:
            return_state: Whether to include one of the Learner worker's state from
                after the update step in the returned results dict (under the
                `_rl_module_state_after_update` key). Note that after an update, all
                Learner workers' states should be identical, so we use the first
                Learner's state here. Useful for avoiding an extra `get_weights()` call,
                e.g. for synchronizing EnvRunner weights.
            **kwargs:

        Returns:

        )rU   Nr   sumreduce   c                    s"   g | ]}  |rtd  | qS )/)should_module_be_updatedr   )r7   midr*   r,   r-   r9      s    
z(IMPALALearner.update.<locals>.<listcomp>T)
componentsinference_only_rl_module_state_after_update)_CURRENT_GLOBAL_TIMESTEPS
solve_refs_make_batch_if_necessaryr%   rI   r5   putr6   	log_valuer   QUEUE_SIZE_GPU_LOADER_QUEUEqsize
isinstancer/   r   add LEARNER_THREAD_ENV_STEPS_DROPPEDrN   enqueuerC   rA   rY   	get_statemodulekeysrayr   )
r+   rU   rV   rT   kwargsbatch
ts_droppedcountresultlearner_stater,   r*   r-   rO   p   sR   


	zIMPALALearner.updatec                   sR   t  j|d | j D ]}| j| j|tdd}| jj	|t
f|dd qd S )N)rV   r   )timestep   )window)r<   before_gradient_based_updaterm   rn   rD   rO   getr   r6   re   r   )r+   rV   r)   new_entropy_coeffrR   r,   r-   ry      s   z*IMPALALearner.before_gradient_based_updater)   c                    s   t  | | j| d S N)r<   remove_modulerD   pop)r+   r)   rR   r,   r-   r}      s   zIMPALALearner.remove_modulec                 C   s   t gS r|   r   )clsr,   r,   r-   rl_module_required_apis   s   z%IMPALALearner.rl_module_required_apisr!   N)__name__
__module____qualname__r   r	   r=   r
   r   strr   boolr   rO   r   ry   r}   classmethodlisttyper   __classcell__r,   r,   rR   r-   r    ,   s*    B
h r    c                       sF   e Zd Zdejdedejdef fddZ	dd	d
Z
dddZ  ZS )r4   r1   r2   r$   r3   c                   s6   t  jdd d| _|| _|| _d| _|| _|| _d S )Nr4   nameTr   )r<   __init__daemon	_in_queue
_out_queue_ts_droppedr(   r6   )r+   r1   r2   r$   r3   rR   r,   r-   r      s   
z_GPULoaderThread.__init__r!   Nc                 C   s   	 |    qr|   )_stepr*   r,   r,   r-   run
  s   z_GPULoaderThread.runc                 C   s   | j ttf | j }W d    n1 sw   Y  | j ttf |j| jdd}W d    n1 s8w   Y  t	| j
trV| j
|}| j jttf|dd d S t| j
|| j  d S )NF)
pin_memoryrW   rX   )r6   log_timer   GPU_LOADER_QUEUE_WAIT_TIMERr   rz   GPU_LOADER_LOAD_TO_GPU_TIMER	to_devicer(   rh   r   r   ri   re   rj   rN   rk   )r+   ma_batch_on_cpuma_batch_on_gpurr   r,   r,   r-   r     s   
z_GPULoaderThread._stepr   )r   r   r   rE   rF   r   torchr$   r   r   r   r   r   r,   r,   rR   r-   r4      s    
r4   c                       sN   e Zd Zdeeef f fddZdddZdd	 Ze	d
efddZ
  ZS )rN   r1   c                   s0   t  jdd d| _|| _d| _|| _|| _d S )NrN   r   TF)r<   r   r   r;   stopped_update_methodr   )r+   r:   r1   r;   rR   r,   r-   r   $  s   

z_LearnerThread.__init__r!   Nc                 C   s   | j s|   | j rd S d S r|   )r   stepr*   r,   r,   r-   r   9  s   z_LearnerThread.runc              	   C   s  | j jttf$ t| jtr| j }n| js!t	
d | jr| j }W d    n1 s0w   Y  | jr:d S | j jttf: | j| j t|dtdd | j j | j  jd7  _W d    n1 shw   Y  W d    d S W d    d S 1 sw   Y  d S )Ng-C6?)rq   T)r+   rU   rV   _no_metrics_reducerw   )r;   r6   r   r   "LEARNER_THREAD_IN_QUEUE_WAIT_TIMERrh   r   r   sampletimesleeppopleftr   LEARNER_THREAD_UPDATE_TIMERr   r
   ra   rC   rA   )r+   r   r,   r,   r-   r   =  s4   


	"z_LearnerThread.steplearner_queuec                 C   sL   t | | jkr|jttf|   dd | | |ttft |  d S )NrW   rX   )	lenr0   re   r   rj   r   	env_stepsappendQUEUE_SIZE_LEARNER_THREAD_QUEUE)r   rq   r6   r,   r,   r-   rk   h  s   


z_LearnerThread.enqueuer   )r   r   r   r   r   r   r   r   r   staticmethodrk   r   r,   r,   rR   r-   rN   #  s    

+rN   )8collectionsr   rE   r>   r   typingr   r   r   ro   ray.rllib.algorithms.appo.utilsr   "ray.rllib.algorithms.impala.impalar   ray.rllib.corer   ray.rllib.core.learner.learnerr	   $ray.rllib.core.learner.training_datar
   ray.rllib.core.rl_module.apisr   ray.rllib.utils.annotationsr   r   ray.rllib.utils.frameworkr   "ray.rllib.utils.lambda_defaultdictr   ray.rllib.utils.metricsr   r   &ray.rllib.utils.metrics.metrics_loggerr   #ray.rllib.utils.schedules.schedulerr   ray.rllib.utils.typingr   r   r   r8   r   r   r   rj   r   RAY_GET_EPISODES_TIMERrf   r   QUEUE_SIZE_RESULTS_QUEUEra   r    ImpalaLearnerThreadr4   rN   r,   r,   r,   r-   <module>   sF    
 J+