o
    ci2                     @   s   d dl Z d dlmZ d dlZd dlZd dlZd dlZd dl	Z	d dl
mZmZmZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZmZ d d	lmZ d d
lmZmZ d dlmZ erdd dlm Z  e !e"Z#eddG dd dZ$dS )    N)Path)AnyDictTYPE_CHECKING)COMPONENT_RL_MODULE)INPUT_ENV_SPACES)OfflinePreLearner)unflatten_dict)MultiAgentBatchSampleBatch)
force_list)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)	PublicAPI)AlgorithmConfigalpha)	stabilityc                   @   s`   e Zd ZedddZe			dded	ed
edee	e
f fddZedd Zedd ZdS )OfflineDataconfigr   c              
   C   s.  || _ | j j| _t|jtr| j jnt|j| _| j j| _| j j	| _
| j j| _|j| _|j| _d| _| j j| _| j j| _d | _| jdkrSdd l}|jdi | j| _n@| jdkrdtjjdi | j| _n/| jdkrxdd l}|jdi | j| _nt| jtjjr| j| _n| jd urtd| j d| jr| j
d| ji z7t  }t!t"j#| j| jfi | j
| _#| jr| j#$ | _#t  }t%&d	||  d
 t%'d(| j W n t)y } zt%*| W Y d }~nd }~ww d | _+| j,| j j-B | _-| j.| j j/B | _/d| _0| j j1p
t2| _1d | _3d | _4d | _5d S )NFgcsr   s3absz"Unknown `config.input_filesystem` z! Filesystems can be None for local, any instance of `pyarrow.fs.FileSystem`, 'gcs' for GCS, 's3' for S3, or 'abs' for adlfs.AzureBlobFileSystem.
filesystemz/===> [OfflineData] - Time for loading dataset: zs.zReading data from {} )6r   is_multi_agent
isinstanceinput_listr   pathinput_read_methoddata_read_methodinput_read_method_kwargsdata_read_method_kwargsinput_read_batch_sizedata_read_batch_sizematerialize_datamaterialize_mapped_datadata_is_mappedinput_filesystemr   input_filesystem_kwargsfilesystem_kwargsfilesystem_objectgcsfsGCSFileSystempyarrowfsS3FileSystemadlfsAzureBlobFileSystem
FileSystem
ValueErrorupdatetimeperf_countergetattrraydatamaterializeloggerdebuginfoformat	Exceptionerrorbatch_iteratorsdefault_map_batches_kwargsmap_batches_kwargsdefault_iter_batches_kwargsiter_batches_kwargsreturned_streaming_splitprelearner_classr   locality_hintslearner_handlesmodule_spec)selfr   r,   r1   
start_time	stop_timeer   r   R/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/offline/offline_data.py__init__   s   













zOfflineData.__init__F   Nnum_samplesreturn_iterator
num_shardsmodule_statec                 C   s  | j sO|s$|dkrt| jd jjtdt }n| jd jtdt }| j| jt	 | j
|d}| jj| jf|| jp;|d| j| _d| _ | jrO| j | _| jr[|rt| jtjr|dkrptd | jj|d| jd| _n+|ry| j | _n"d	tttjf d
tfdd}| jj d||d| j!| _t"| j| _|rt#| jS zt$| jW S  t%y   td d | _| j&|||d Y S w )NrR   r   )	component)r   spacesrK   rV   )fn_constructor_kwargs
batch_sizeTz/===> [OfflineData]: Return streaming_split ... )nequalrI   _batchreturnc                 S   s4   t | } tdd |  D tdd |  D dS )Nc                 S   s   i | ]	\}}|t |qS r   )r   ).0	module_idmodule_datar   r   rP   
<dictcomp>   s    z;OfflineData.sample.<locals>._collate_fn.<locals>.<dictcomp>c                 s   s$    | ]}t tt| V  qd S )N)lennextitervalues)r_   ra   r   r   rP   	<genexpr>   s
    
z:OfflineData.sample.<locals>._collate_fn.<locals>.<genexpr>)	env_steps)r	   r
   itemssumrf   )r]   r   r   rP   _collate_fn   s   z'OfflineData.sample.<locals>._collate_fn)rZ   rk   z>===> [OfflineData]: Batch iterator exhausted. Reinitiating ...)rS   rT   rU   r   )'r'   r9   getrJ   	get_stateremoter   r   rX   r   rK   r:   map_batchesrH   r$   rD   r&   r;   rB   r   typesGeneratorTyper<   r=   streaming_splitrI   iteratorr   strnpndarrayr
   iter_batchesrF   re   r   rd   StopIterationsample)rL   rS   rT   rU   rV   rY   rk   r   r   rP   ry   }   s   





zOfflineData.samplec                 C   s   t d| jjddS )N   T)concurrencyzero_copy_batch)maxr   num_learnersrL   r   r   rP   rC     s   z&OfflineData.default_map_batches_kwargsc                 C   s   ddiS )Nprefetch_batchesrz   r   r   r   r   rP   rE     s   z'OfflineData.default_iter_batches_kwargs)r   r   )FrR   N)__name__
__module____qualname__r   rQ   r   intboolr   rt   r   ry   propertyrC   rE   r   r   r   rP   r      s*    ]
 
r   )%loggingpathlibr   
pyarrow.fsr.   numpyru   r9   r6   rp   typingr   r   r   ray.rllib.corer   ray.rllib.envr   $ray.rllib.offline.offline_prelearnerr   ray.rllib.utilsr	   ray.rllib.policy.sample_batchr
   r   r   ray.rllib.utils.annotationsr   r   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   	getLoggerr   r<   r   r   r   r   rP   <module>   s*    
