o
    ci(>                     @   s   d dl Z d dlmZmZmZ d dlmZ d dlmZm	Z	 d dlm
Z
 d dlmZ d dlmZ eG dd	 d	ZeG d
d deZG dd deZeG dd dZeG dd dZeG dd dZeG dd dZdS )    N)CallableListOptional)DataIterator)MultiAgentBatchconcat_samples)SampleBatch)DeveloperAPI)EpisodeTypec                   @   s<   e Zd ZdZdddddededed	ed
eddfddZdS )MiniBatchIteratorBasez+The base class for all minibatch iterators.   Tr   
num_epochsshuffle_batch_per_epochnum_total_minibatchesbatchr   r   minibatch_sizer   returnNc                C   s   dS )a  Initializes a MiniBatchIteratorBase instance.

        Args:
            batch: The input multi-agent batch.
            num_epochs: The number of complete passes over the entire train batch. Each
                pass might be further split into n minibatches (if `minibatch_size`
                provided). The train batch is generated from the given `episodes`
                through the Learner connector pipeline.
            minibatch_size: The size of minibatches to use to further split the train
                batch into per epoch. The train batch is generated from the given
                `episodes` through the Learner connector pipeline.
            num_total_minibatches: The total number of minibatches to loop through
                (over all `num_epochs` epochs). It's only required to set this to != 0
                in multi-agent + multi-GPU situations, in which the MultiAgentEpisodes
                themselves are roughly sharded equally, however, they might contain
                SingleAgentEpisodes with very lopsided length distributions. Thus,
                without this fixed, pre-computed value, one Learner might go through a
                different number of minibatche passes than others causing a deadlock.
        N )selfr   r   r   r   r   r   r   S/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/utils/minibatch_utils.py__init__   s   zMiniBatchIteratorBase.__init__)__name__
__module____qualname____doc__r   intboolr   r   r   r   r   r      s$    r   c                       sL   e Zd ZdZdddddededed	ed
eddf fddZdd Z  Z	S )MiniBatchCyclicIteratoraw  This implements a simple multi-agent minibatch iterator.

    This iterator will split the input multi-agent batch into minibatches where the
    size of batch for each module_id (aka policy_id) is equal to minibatch_size. If the
    input batch is smaller than minibatch_size, then the iterator will cycle through
    the batch until it has covered `num_epochs` epochs.
    r   Tr   r   r   r   r   r   r   r   Nc                   sh   t  j||||d || _|| _|| _|| _dd |j D | _dd |j D | _	d| _
|| _dS )z/Initializes a MiniBatchCyclicIterator instance.)r   r   r   c                 S      i | ]}|d qS r   r   .0midr   r   r   
<dictcomp>O       z4MiniBatchCyclicIterator.__init__.<locals>.<dictcomp>c                 S   r   r    r   r!   r   r   r   r$   Q   r%   r   N)superr   _batch_minibatch_size_num_epochs_shuffle_batch_per_epochpolicy_batcheskeys_start_num_covered_epochs_minibatch_count_num_total_minibatches)r   r   r   r   r   r   	__class__r   r   r   8   s   

z MiniBatchCyclicIterator.__init__c                 c   s   | j dkrt| j | jk s| j dkr| j| j k ri }| jj D ]\}}t	|dkr5t
d| d| j| }| j}g }|jr`|tjd usNJ ddd }t||| jt	|  }ndd }|| ||kr||d  }|| ||}	|	dksJ d||	8 }d}| j|  d	7  < | jr|  || ||ksl|| }
|
|kr||||
  t|||< |
| j|< q#t|t	| j}|V  |  jd	7  _| j dkrt| j | jk s| j dkr| j| j k sd S d S d S d S )
Nr   zThe batch for module_id zt is empty! This will create an infinite loop because we need to cover the same number of samples for each module_id.z}MiniBatchCyclicIterator requires SampleBatch.SEQ_LENSto be present in the batch for slicing a batch in the batch dimension B.c                 S   s   t | tj S N)lenr   SEQ_LENSbr   r   r   get_len   s   z1MiniBatchCyclicIterator.__iter__.<locals>.get_lenc                 S   s   t | S r3   r4   r6   r   r   r   r8      s   zLength of a sample must be > 0!r   )r0   minr.   valuesr)   r/   r'   r+   itemsr4   
ValueErrorr-   r(   _slice_seq_lens_in_Bgetr   r5   r   appendr*   shuffler   r   )r   	minibatch	module_idmodule_batchsn_stepssamples_to_concatr8   sample
len_sampleer   r   r   __iter__V   sf   






z MiniBatchCyclicIterator.__iter__)
r   r   r   r   r   r   r   r   rK   __classcell__r   r   r1   r   r   .   s&    r   c                       s*   e Zd Zdef fddZdd Z  ZS )MiniBatchDummyIteratorr   c                    s   t  j|fi | || _d S r3   )r&   r   r'   )r   r   kwargsr1   r   r   r      s   
zMiniBatchDummyIterator.__init__c                 c   s    | j V  d S r3   )r'   )r   r   r   r   rK      s   zMiniBatchDummyIterator.__iter__)r   r   r   r   r   rK   rL   r   r   r1   r   rM      s    rM   c                   @   s<   e Zd Zdededededee f
ddZdefd	d
Z	dS )MiniBatchRayDataIteratoriterator
collate_fnfinalize_fnr   	num_itersc                K   s^   || _ || _|| _dd | D | _| j jd|| j| jd| j| _t| j| _|| _	d S )Nc                 S   s   i | ]\}}|d kr||qS )return_stater   )r"   kvr   r   r   r$      s    z5MiniBatchRayDataIterator.__init__.<locals>.<dictcomp>)
batch_size_collate_fn_finalize_fnr   )
	_iteratorrX   rY   r<   _kwargs_iter_batches_batched_iterableiter_epoch_iterator
_num_iters)r   rP   rQ   rR   r   rS   rN   r   r   r   r      s   
z!MiniBatchRayDataIterator.__init__r   c                 c   sz    d}| j d u s|| j k r;| jD ]}|d7 }|V  | j r#|| j kr# nqt| j| _| j s/d S | j d u s|| j k sd S d S Nr   r   )r`   r_   r^   r]   )r   	iterationr   r   r   r   rK      s   
z!MiniBatchRayDataIterator.__iter__N)
r   r   r   r   r   r   r   r   r   rK   r   r   r   r   rO      s    
rO   c                   @   s*   e Zd ZdZdedefddZdd ZdS )	ShardBatchIteratorzIterator for sharding batch into num_shards batches.

    Args:
        batch: The input multi-agent batch.
        num_shards: The number of shards to split the batch into.

    Yields:
        A MultiAgentBatch of size len(batch) / num_shards.
    r   
num_shardsc                 C      || _ || _d S r3   )r'   _num_shards)r   r   rd   r   r   r   r         
zShardBatchIterator.__init__c           	      c   s    t | jD ]<}i }| jj D ]'\}}tt|| j }|| }t|| t|}|t	|t	| ||< qt
|t	|}|V  qd S r3   )rangerf   r'   r+   r<   mathceilr4   r:   r   r   )	r   ibatch_to_sendpid	sub_batchrW   startend	new_batchr   r   r   rK     s   zShardBatchIterator.__iter__N)r   r   r   r   r   r   r   rK   r   r   r   r   rc      s    
rc   c                   @   sD   e Zd ZdZ	ddee dedee fddZdee fd	d
Z	dS )ShardEpisodesIteratorzMIterator for sharding a list of Episodes into `num_shards` lists of Episodes.Nepisodesrd   len_lookback_bufferc                 C   s~   t |tdd| _|| _|| _tdd |D | _dd t| jD | _| j}t| jD ]}|||  }|| j|< ||8 }q+dS )a  Initializes a ShardEpisodesIterator instance.

        Args:
            episodes: The input list of Episodes.
            num_shards: The number of shards to split the episodes into.
            len_lookback_buffer: An optional length of a lookback buffer to enforce
                on the returned shards. When spitting an episode, the second piece
                might need a lookback buffer (into the first piece) depending on the
                user's settings.
        T)keyreversec                 s   s    | ]}t |V  qd S r3   r9   )r"   rJ   r   r   r   	<genexpr>*  s    z1ShardEpisodesIterator.__init__.<locals>.<genexpr>c                 S      g | ]}d qS r    r   r"   _r   r   r   
<listcomp>+      z2ShardEpisodesIterator.__init__.<locals>.<listcomp>N)	sortedr4   	_episodesrf   _len_lookback_buffersum_total_lengthrh   _target_lengths)r   rs   rd   rt   remaining_lengthrE   len_r   r   r   r     s   

zShardEpisodesIterator.__init__r   c           
      c   sJ   dd t | jD }dd t | jD }d}|t| jk r| j| }|t|}|| t| | j| krM|| | ||  t|7  < |d7 }nG| j| ||  }|dkr|d| |jt|d| j	d}}|| | ||  t|7  < || j|< n|dksJ || | |d7 }|t| jk s|D ]}	|	V  qdS )a2  Runs one iteration through this sharder.

        Yields:
            A sub-list of Episodes of size roughly `len(episodes) / num_shards`. The
            yielded sublists might have slightly different total sums of episode
            lengths, in order to not have to drop even a single timestep.
        c                 S   s   g | ]}g qS r   r   ry   r   r   r   r{   :  r|   z2ShardEpisodesIterator.__iter__.<locals>.<listcomp>c                 S   rx   r    r   ry   r   r   r   r{   ;  r|   r   r   N)rt   )
rh   rf   r4   r~   indexr:   r   r@   slicer   )
r   sublistslengthsepisode_indexepisode	min_indexr   
slice_partremaining_partsublistr   r   r   rK   2  s:   


 zShardEpisodesIterator.__iter__r3   )
r   r   r   r   r   r
   r   r   r   rK   r   r   r   r   rr     s    
rr   c                   @   s&   e Zd ZdZdefddZdd ZdS )ShardObjectRefIteratora,  Iterator for sharding a list of ray ObjectRefs into num_shards sub-lists.

    Args:
        object_refs: The input list of ray ObjectRefs.
        num_shards: The number of shards to split the references into.

    Yields:
        A sub-list of ray ObjectRefs with lengths as equal as possible.
    rd   c                 C   re   r3   )_object_refsrf   )r   object_refsrd   r   r   r   r   n  rg   zShardObjectRefIterator.__init__c                 c   sd    t | j}|| j }|| j }d}t| jD ]}|| ||k r"dnd }| j|| V  |}qd S ra   )r4   r   rf   rh   )r   nsublist_sizeremaining_elementsro   rk   rp   r   r   r   rK   r  s   


zShardObjectRefIterator.__iter__N)r   r   r   r   r   r   rK   r   r   r   r   r   b  s    
r   )ri   typingr   r   r   ray.datar   ray.rllib.policy.sample_batchr   r   r   ray.rllib.utils.annotationsr	   ray.rllib.utils.typingr
   r   r   rM   rO   rc   rr   r   r   r   r   r   <module>   s*    " 	2 N