o
    `۷i                     @   s   d dl Z d dlZd dlmZmZ d dlmZmZ d dlZ	d dl
mZ d dlmZ d dlmZ d dlmZmZ e \ZZZe eZeG dd	 d	ed
ZG dd dejZdS )    N)ABCMetaabstractmethod)DictList)MultiAgentBatch)	PublicAPI)try_import_tf)SampleBatchType
TensorTypec                   @   sF   e Zd ZdZeedefddZed
dede	e
ef fddZd	S )InputReaderzFAPI for collecting and returning experiences during policy evaluation.returnc                 C   s   t )zReturns the next batch of read experiences.

        Returns:
            The experience read (SampleBatch or MultiAgentBatch).
        )NotImplementedError)self r   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/offline/input_reader.pynext   s   zInputReader.next   
queue_sizec                    s   t | dr	tdtd |   t trtd fddt 	 D } fdd|D }dd	  fd
d|D D t
j|||d}| }td|  t| |||| _| j  | j  fdd	| D }|S )a`  Returns TensorFlow queue ops for reading inputs from this reader.

        The main use of these ops is for integration into custom model losses.
        For example, you can use tf_input_ops() to read from files of external
        experiences to add an imitation learning loss to your model.

        This method creates a queue runner thread that will call next() on this
        reader repeatedly to feed the TensorFlow queue.

        Args:
            queue_size: Max elements to allow in the TF queue.

        .. testcode::
            :skipif: True

            from ray.rllib.models.modelv2 import ModelV2
            from ray.rllib.offline.json_reader import JsonReader
            imitation_loss = ...
            class MyModel(ModelV2):
                def custom_loss(self, policy_loss, loss_inputs):
                    reader = JsonReader(...)
                    input_ops = reader.tf_input_ops()
                    logits, _ = self._build_layers_v2(
                        {"obs": input_ops["obs"]},
                        self.num_outputs, self.options)
                    il_loss = imitation_loss(logits, input_ops["action"])
                    return policy_loss + il_loss

        You can find a runnable version of this in examples/custom_loss.py.

        Returns:
            Dict of Tensors, one for each column of the read SampleBatch.
        _queue_runnerzfA queue runner already exists for this input reader. You can only call tf_input_ops() once per reader.z0Reading initial batch of data from input reader.z9tf_input_ops() is not implemented for multi agent batchesc                    s*   g | ]}t t  | jt jr|qS r   )np
issubdtypearraydtypenumber.0kbatchr   r   
<listcomp>U   s    z,InputReader.tf_input_ops.<locals>.<listcomp>c                    s   g | ]} | j qS r   )r   r   r   r   r   r   Z       c                 S   s"   i | ]\}}|d |dd  qS ))r   Nr   )r   r   sr   r   r   
<dictcomp>[      " z,InputReader.tf_input_ops.<locals>.<dictcomp>c                    s   g | ]	}| | j fqS r   )shaper   r   r   r   r   [   s    )capacitydtypesnameszCreating TF queue runner for {}c                    s"   i | ]\}}|t | | qS r   )tfreshape)r   r   t)shapesr   r   r#   d   r$   )hasattr
ValueErrorloggerinfor   
isinstancer   r   sortedkeystf1	FIFOQueuedequeueformat_QueueRunnerr   enqueuestartitems)r   r   r3   r'   queuetensorsoutr   )r   r,   r   tf_input_ops    s.   
$




zInputReader.tf_input_opsN)r   )__name__
__module____qualname____doc__r   r   r	   r   intr   strr
   r?   r   r   r   r   r      s    "r   )	metaclassc                   @   sD   e Zd ZdZdedddee ddfdd	Zd
efddZ	dd Z
dS )r8   z0Thread that feeds a TF queue from a InputReader.input_readerr<   ztf1.FIFOQueuer3   r'   ztf.dtypes.DTypec                 C   sZ   t j|  t | _d| _|| _|| _|| _	dd |D | _
|tt|| j
| _d S )NTc                 S   s   g | ]}t |qS r   )r4   placeholder)r   r   r   r   r   r   x   r    z)_QueueRunner.__init__.<locals>.<listcomp>)	threadingThread__init__r4   get_default_sessionsessdaemonrG   r3   r<   placeholdersr9   dictzip
enqueue_op)r   rG   r<   r3   r'   r   r   r   rK   k   s   
z_QueueRunner.__init__r   c                    s0    fddt jD }jjj|d d S )Nc                    s    i | ]\}}j |  | qS r   )rO   )r   ikeyr   r   r   r   r#   |   s     z(_QueueRunner.enqueue.<locals>.<dictcomp>)	feed_dict)	enumerater3   rM   runrR   )r   r   datar   rU   r   r9   {   s   z_QueueRunner.enqueuec                 C   s:   	 z| j  }| | W n ty   td Y nw q)NTzError reading from input)rG   r   r9   	Exceptionr/   	exception)r   r   r   r   r   rX      s   
z_QueueRunner.runN)r@   rA   rB   rC   r   r   rE   rK   r	   r9   rX   r   r   r   r   r8   h   s    
r8   )loggingrI   abcr   r   typingr   r   numpyr   ray.rllib.policy.sample_batchr   ray.rllib.utils.annotationsr   ray.rllib.utils.frameworkr   ray.rllib.utils.typingr	   r
   r4   r)   tfv	getLoggerr@   r/   r   rJ   r8   r   r   r   r   <module>   s    
U