o
    `۷ik$                     @   s   d Z ddlZddlZddlmZ ddlmZmZ ddlZ	ddl
mZ ddlmZ ddlmZ ddlmZmZ dd	lmZmZ d
ZdZG dd dZedefddZdS )z
[1] IMPACT: Importance Weighted Asynchronous Architectures with Clipped Target Networks.
Luo et al. 2020
https://arxiv.org/pdf/1912.00167
    N)deque)AnyOptional)ModelCatalog)ModelV2)OldAPIStack))DEFAULT_HISTOGRAM_BOUNDARIES_SHORT_EVENTSTimerAndPrometheusLogger)Counter	Histogramfunctarget_funcc                	   @   s   e Zd ZdZdedefddZ	dded	ed
ee	 defddZ
dedefddZdd	ed
ee	 defddZdefddZedefddZdefddZdefddZdd ZdefddZdS )CircularBufferat  A circular batch-wise buffer with Queue-like interface.

    The buffer holds at most N batches, which are sampled at random (uniformly).
    If full and a new batch is added, the oldest batch is discarded. Each batch
    can be sampled at most K times (after which it is also discarded).

    This version implements Queue-like put/get methods with blocking support.
    num_batchesiterations_per_batchc                 C   s   || _ || _| j | j | _d| _tdd t| jD | jd| _t | _| j| _	t
 | _t
d| _tj | _d| _d| _d| _tddtdd| _| jd	| jji td
ddd| _| jd	| jji tddtdd| _| jd	| jji dS )a  
        Args:
            num_batches: N from the paper (queue buffer size).
            iterations_per_batch: K ("replay coefficient") from the paper. Defines
                how often a single batch can sampled before being discarded. If a
                new batch is added when the buffer is full, the oldest batch is
                discarded entirely (regardless of how often it has been sampled).
        r   c                 S   s   g | ]}d qS N ).0_r   r   U/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/algorithms/appo/utils.py
<listcomp>3   s    z+CircularBuffer.__init__.<locals>.<listcomp>)maxlen$rllib_utils_circular_buffer_put_timez"Time spent in CircularBuffer.put())rllib)namedescription
boundariestag_keysr   2rllib_utils_circular_buffer_put_ts_dropped_counterz8Total number of env steps dropped by the CircularBuffer.)r   r   r   $rllib_utils_circular_buffer_get_timez"Time spent in CircularBuffer.get()N)r   r   _NxK
_num_addedr   range_bufferset_indices_offset	threadingLock_lock	Semaphore_items_availablenprandomdefault_rng_rng_total_puts_total_gets_total_droppedr   r   !_metrics_circular_buffer_put_timeset_default_tags	__class____name__r
   '_metrics_circular_buffer_put_ts_dropped!_metrics_circular_buffer_get_time)selfr   r   r   r   r   __init__$   sN   	 



zCircularBuffer.__init__TNitemblocktimeoutreturnc              	   C   s"  t | j | jf |  jd7  _| jd }t| jD ]%}| j| | j	| j
 | j| j
| j  |  j
d7  _
| j  q|  jd7  _d}|durht|trY|d  n| }|dkrh| jj|d W d   n1 srw   Y  W d   |S W d   |S 1 sw   Y  |S )a  Add a new batch to the buffer.

        The batch is added K times (iterations_per_batch) to allow for K samples.
        If full, the oldest batch entries are dropped.

        Args:
            item: The batch to add
            block: Not used (always non-blocking for puts)
            timeout: Not used

        Returns:
            Number of dropped entries (0 or iterations_per_batch)
           r   N)value)r	   r3   r)   r0   r#   r"   r   appendr%   addr&   discardr    r+   releaser!   
isinstancetuple	env_stepsr7   inc)r9   r;   r<   r=   dropped_entryr   
dropped_tsr   r   r   put`   s<   

 
  zCircularBuffer.putc                 C   s   | j |ddS )z$Equivalent to self.put(block=False).Fr<   )rK   )r9   r;   r   r   r   
put_nowait   s   zCircularBuffer.put_nowaitc              	   C   s   t | jk t| dkrtd t| dks| j> | jt| j	}|| j
 | j }| j| }|dusFJ ||| j
| j	dd | jD fd| j|< | j	| W d   n1 s[w   Y  W d   |S W d   |S 1 ssw   Y  |S )a  Sample a random batch from the buffer.

        The sampled entry is removed and won't be sampled again.
        Blocks if the buffer is empty (when block=True).

        Args:
            block: If True, block until an item is available
            timeout: Maximum time to wait (only used when block=True)

        Returns:
            A randomly sampled batch

        Raises:
            TimeoutError: If timeout expires while blocking
            IndexError: If buffer is empty and block=False
        r   g-C6?Nc                 S   s   g | ]}|d u qS r   r   )r   br   r   r   r      s    z&CircularBuffer.get.<locals>.<listcomp>)r	   r8   lentimesleepr)   r/   choicelistr%   r&   r    r#   rC   )r9   r<   r=   idxactual_buffer_idxbatchr   r   r   get   s2   





zCircularBuffer.getc                 C   s   | j ddS )z$Equivalent to self.get(block=False).FrL   )rW   r9   r   r   r   
get_nowait   s   zCircularBuffer.get_nowaitc                 C   s6   | j  | j| jkW  d   S 1 sw   Y  dS )zIWhether the buffer has been filled once with at least `self.num_batches`.N)r)   r!   r   rX   r   r   r   filled   s   
$zCircularBuffer.filledc                 C   s4   | j  t| jW  d   S 1 sw   Y  dS )zIReturns the number of actually valid (non-expired) batches in the buffer.N)r)   rO   r%   rX   r   r   r   qsize   s   $zCircularBuffer.qsizec                 C   s   |   S r   )r[   rX   r   r   r   __len__   s   zCircularBuffer.__len__c                 C   s   dS )zNo-op for Queue compatibility.Nr   rX   r   r   r   	task_done   s   zCircularBuffer.task_donec              
   C   sZ   | j   t| j| j| j| j| j| j| j| j	| jkdW  d   S 1 s&w   Y  dS )z%Get buffer statistics for monitoring.)sizecapacityr   r   
total_puts
total_getstotal_droppedrZ   N)
r)   rO   r%   r    r   r   r0   r1   r2   r!   rX   r   r   r   	get_stats   s   
$zCircularBuffer.get_stats)TN)r6   
__module____qualname____doc__intr:   r   boolr   floatrK   rM   rW   rY   propertyrZ   r[   r\   r]   dictrc   r   r   r   r   r      s,    	=
2(r   r>   c                 C   s~   t | j| jd \}}t j| j| j|| jd t| jd| _| j	 | _
t j| j| j|| jd t| jd| _| j	 | _| jS )zBuilds model and target model for APPO.

    Returns:
        ModelV2: The Model for the Policy to use.
            Note: The target model will not be returned, just assigned to
            `policy.target_model`.
    model)r   	framework)r   get_action_distaction_spaceconfigget_model_v2observation_spacePOLICY_SCOPErm   rl   	variablesmodel_variablesTARGET_POLICY_SCOPEtarget_modeltarget_model_variables)policyr   	logit_dimr   r   r   make_appo_models   s,   
r{   )rf   r'   rP   collectionsr   typingr   r   numpyr,   ray.rllib.models.catalogr   ray.rllib.models.modelv2r   ray.rllib.utils.annotationsr   #ray.rllib.utils.metrics.ray_metricsr   r	   ray.util.metricsr
   r   rs   rv   r   r{   r   r   r   r   <module>   s"     J