o
    $iP?                     @   s   d dl Z d dlmZmZ d dlmZ d dlmZmZm	Z	 d dl
mZ d dlmZ d dlmZmZ eG dd	 d	ZeG d
d deZG dd deZeG dd dZeG dd dZeG dd dZeG dd dZdS )    N)ListOptional)DataIterator)MultiAgentBatchSampleBatchconcat_samples)unflatten_dict)DeveloperAPI)
DeviceTypeEpisodeTypec                   @   s<   e Zd ZdZdddddededed	ed
eddfddZdS )MiniBatchIteratorBasez+The base class for all minibatch iterators.   Tr   
num_epochsshuffle_batch_per_epochnum_total_minibatchesbatchr   r   minibatch_sizer   returnNc                C   s   dS )a  Initializes a MiniBatchIteratorBase instance.

        Args:
            batch: The input multi-agent batch.
            num_epochs: The number of complete passes over the entire train batch. Each
                pass might be further split into n minibatches (if `minibatch_size`
                provided). The train batch is generated from the given `episodes`
                through the Learner connector pipeline.
            minibatch_size: The size of minibatches to use to further split the train
                batch into per epoch. The train batch is generated from the given
                `episodes` through the Learner connector pipeline.
            num_total_minibatches: The total number of minibatches to loop through
                (over all `num_epochs` epochs). It's only required to set this to != 0
                in multi-agent + multi-GPU situations, in which the MultiAgentEpisodes
                themselves are roughly sharded equally, however, they might contain
                SingleAgentEpisodes with very lopsided length distributions. Thus,
                without this fixed, pre-computed value, one Learner might go through a
                different number of minibatche passes than others causing a deadlock.
        N )selfr   r   r   r   r   r   r   \/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/rllib/utils/minibatch_utils.py__init__   s   zMiniBatchIteratorBase.__init__)__name__
__module____qualname____doc__r   intboolr   r   r   r   r   r      s$    r   c                       sL   e Zd ZdZdddddededed	ed
eddf fddZdd Z  Z	S )MiniBatchCyclicIteratoraw  This implements a simple multi-agent minibatch iterator.

    This iterator will split the input multi-agent batch into minibatches where the
    size of batch for each module_id (aka policy_id) is equal to minibatch_size. If the
    input batch is smaller than minibatch_size, then the iterator will cycle through
    the batch until it has covered `num_epochs` epochs.
    r   Tr   r   r   r   r   r   r   r   Nc                   sh   t  j||||d || _|| _|| _|| _dd |j D | _dd |j D | _	d| _
|| _dS )z/Initializes a MiniBatchCyclicIterator instance.)r   r   r   c                 S      i | ]}|d qS r   r   .0midr   r   r   
<dictcomp>O       z4MiniBatchCyclicIterator.__init__.<locals>.<dictcomp>c                 S   r    r!   r   r"   r   r   r   r%   Q   r&   r   N)superr   _batch_minibatch_size_num_epochs_shuffle_batch_per_epochpolicy_batcheskeys_start_num_covered_epochs_minibatch_count_num_total_minibatches)r   r   r   r   r   r   	__class__r   r   r   8   s   

z MiniBatchCyclicIterator.__init__c                 c   s   | j dkrt| j | jk s| j dkr| j| j k ri }| jj D ]\}}t	|dkr5t
d| d| j| }| j}g }|jr`|tjd usNJ ddd }t||| jt	|  }ndd }|| ||kr||d  }|| ||}	|	dksJ d||	8 }d}| j|  d	7  < | jr|  || ||ksl|| }
|
|kr||||
  t|||< |
| j|< q#t|t	| j}|V  |  jd	7  _| j dkrt| j | jk s| j dkr| j| j k sd S d S d S d S )
Nr   zThe batch for module_id zt is empty! This will create an infinite loop because we need to cover the same number of samples for each module_id.z}MiniBatchCyclicIterator requires SampleBatch.SEQ_LENSto be present in the batch for slicing a batch in the batch dimension B.c                 S   s   t | tj S N)lenr   SEQ_LENSbr   r   r   get_len   s   z1MiniBatchCyclicIterator.__iter__.<locals>.get_lenc                 S   s   t | S r4   r5   r7   r   r   r   r9      s   zLength of a sample must be > 0!r   )r1   minr/   valuesr*   r0   r(   r,   itemsr5   
ValueErrorr.   r)   _slice_seq_lens_in_Bgetr   r6   r   appendr+   shuffler   r   )r   	minibatch	module_idmodule_batchsn_stepssamples_to_concatr9   sample
len_sampleer   r   r   __iter__V   sf   






z MiniBatchCyclicIterator.__iter__)
r   r   r   r   r   r   r   r   rL   __classcell__r   r   r2   r   r   .   s&    r   c                       s*   e Zd Zdef fddZdd Z  ZS )MiniBatchDummyIteratorr   c                    s   t  j|fi | || _d S r4   )r'   r   r(   )r   r   kwargsr2   r   r   r      s   
zMiniBatchDummyIterator.__init__c                 c   s    | j V  d S r4   )r(   )r   r   r   r   rL      s   zMiniBatchDummyIterator.__iter__)r   r   r   r   r   rL   rM   r   r   r2   r   rN      s    rN   c                	   @   s8   e Zd Zdedededee fddZdefdd	Z	d
S )MiniBatchRayDataIteratoriteratordevicer   	num_itersc                K   sL   || _ dd | D | _| j jd||d| j| _t| j| _|| _d S )Nc                 S   s   i | ]\}}|d kr||qS )return_stater   )r#   kvr   r   r   r%      s    z5MiniBatchRayDataIterator.__init__.<locals>.<dictcomp>)
batch_sizerR   r   )	_iteratorr=   _kwargsiter_torch_batches_batched_iterableiter_epoch_iterator
_num_iters)r   rQ   rR   r   rS   rO   r   r   r   r      s   

z!MiniBatchRayDataIterator.__init__r   c                 c   s    d}| j d u s|| j k rU| jD ]-}|d7 }t|}tdd | D tdd | D d}|V  | j r=|| j kr= nqt| j| _| j sId S | j d u s|| j k sd S d S )Nr   r   c                 S   s   i | ]	\}}|t |qS r   )r   )r#   rD   module_datar   r   r   r%      s    z5MiniBatchRayDataIterator.__iter__.<locals>.<dictcomp>c                 s   s$    | ]}t tt| V  qd S r4   )r5   nextr\   r<   )r#   r_   r   r   r   	<genexpr>   s
    
z4MiniBatchRayDataIterator.__iter__.<locals>.<genexpr>)	env_steps)	r^   r]   r   r   r=   sumr<   r\   r[   )r   	iterationr   r   r   r   rL      s,   
z!MiniBatchRayDataIterator.__iter__N)
r   r   r   r   r
   r   r   r   r   rL   r   r   r   r   rP      s    
rP   c                   @   s*   e Zd ZdZdedefddZdd ZdS )	ShardBatchIteratorzIterator for sharding batch into num_shards batches.

    Args:
        batch: The input multi-agent batch.
        num_shards: The number of shards to split the batch into.

    Yields:
        A MultiAgentBatch of size len(batch) / num_shards.
    r   
num_shardsc                 C      || _ || _d S r4   )r(   _num_shards)r   r   rf   r   r   r   r        
zShardBatchIterator.__init__c           	      c   s    t | jD ]<}i }| jj D ]'\}}tt|| j }|| }t|| t|}|t	|t	| ||< qt
|t	|}|V  qd S r4   )rangerh   r(   r,   r=   mathceilr5   r;   r   r   )	r   ibatch_to_sendpid	sub_batchrW   startend	new_batchr   r   r   rL   
  s   zShardBatchIterator.__iter__N)r   r   r   r   r   r   r   rL   r   r   r   r   re      s    
re   c                   @   sD   e Zd ZdZ	ddee dedee fddZdee fd	d
Z	dS )ShardEpisodesIteratorzMIterator for sharding a list of Episodes into `num_shards` lists of Episodes.Nepisodesrf   len_lookback_bufferc                 C   s~   t |tdd| _|| _|| _tdd |D | _dd t| jD | _| j}t| jD ]}|||  }|| j|< ||8 }q+dS )a  Initializes a ShardEpisodesIterator instance.

        Args:
            episodes: The input list of Episodes.
            num_shards: The number of shards to split the episodes into.
            len_lookback_buffer: An optional length of a lookback buffer to enforce
                on the returned shards. When spitting an episode, the second piece
                might need a lookback buffer (into the first piece) depending on the
                user's settings.
        T)keyreversec                 s   s    | ]}t |V  qd S r4   r:   )r#   rK   r   r   r   ra   2  s    z1ShardEpisodesIterator.__init__.<locals>.<genexpr>c                 S      g | ]}d qS r!   r   r#   _r   r   r   
<listcomp>3      z2ShardEpisodesIterator.__init__.<locals>.<listcomp>N)	sortedr5   	_episodesrh   _len_lookback_bufferrc   _total_lengthrj   _target_lengths)r   ru   rf   rv   remaining_lengthrF   len_r   r   r   r     s   

zShardEpisodesIterator.__init__r   c           
      c   sJ   dd t | jD }dd t | jD }d}|t| jk r| j| }|t|}|| t| | j| krM|| | ||  t|7  < |d7 }nG| j| ||  }|dkr|d| |jt|d| j	d}}|| | ||  t|7  < || j|< n|dksJ || | |d7 }|t| jk s|D ]}	|	V  qdS )a2  Runs one iteration through this sharder.

        Yields:
            A sub-list of Episodes of size roughly `len(episodes) / num_shards`. The
            yielded sublists might have slightly different total sums of episode
            lengths, in order to not have to drop even a single timestep.
        c                 S   s   g | ]}g qS r   r   rz   r   r   r   r|   B  r}   z2ShardEpisodesIterator.__iter__.<locals>.<listcomp>c                 S   ry   r!   r   rz   r   r   r   r|   C  r}   r   r   N)rv   )
rj   rh   r5   r   indexr;   r   rA   slicer   )
r   sublistslengthsepisode_indexepisode	min_indexr   
slice_partremaining_partsublistr   r   r   rL   :  s:   


 zShardEpisodesIterator.__iter__r4   )
r   r   r   r   r   r   r   r   r   rL   r   r   r   r   rt     s    
rt   c                   @   s&   e Zd ZdZdefddZdd ZdS )ShardObjectRefIteratora,  Iterator for sharding a list of ray ObjectRefs into num_shards sub-lists.

    Args:
        object_refs: The input list of ray ObjectRefs.
        num_shards: The number of shards to split the references into.

    Yields:
        A sub-list of ray ObjectRefs with lengths as equal as possible.
    rf   c                 C   rg   r4   )_object_refsrh   )r   object_refsrf   r   r   r   r   v  ri   zShardObjectRefIterator.__init__c                 c   sd    t | j}|| j }|| j }d}t| jD ]}|| ||k r"dnd }| j|| V  |}qd S )Nr   r   )r5   r   rh   rj   )r   nsublist_sizeremaining_elementsrq   rm   rr   r   r   r   rL   z  s   


zShardObjectRefIterator.__iter__N)r   r   r   r   r   r   rL   r   r   r   r   r   j  s    
r   )rk   typingr   r   ray.datar   ray.rllib.policy.sample_batchr   r   r   ray.rllib.utilsr   ray.rllib.utils.annotationsr	   ray.rllib.utils.typingr
   r   r   r   rN   rP   re   rt   r   r   r   r   r   <module>   s*    " 	: N