o
    `۷i21                     @   s   d dl Z d dlZd dlZd dlmZ d dlmZmZmZ d dl	Z
d dlZd dlZd dlmZ d dlmZ d dlmZ d dlmZmZ d dlmZmZ d d	lmZmZ d d
lmZ er`d dlm Z  e !e"Z#eddG dd dZ$dS )    N)Path)TYPE_CHECKINGAnyDict)COMPONENT_RL_MODULE)INPUT_ENV_SPACES)OfflinePreLearner)MultiAgentBatchSampleBatch)
force_listunflatten_dict)OverrideToImplementCustomLogic5OverrideToImplementCustomLogic_CallToSuperRecommended)	PublicAPI)AlgorithmConfigalpha)	stabilityc                   @   s`   e Zd ZedddZe			dded	ed
edee	e
f fddZedd Zedd ZdS )OfflineDataconfigr   c                 C   s  || _ | j j| _t|jtr| j jnt|j| _| j j| _| j j	| _
| j j| _|j| _|j| _d| _| j j| _| j j| _d | _| jdkrSdd l}|jdi | j| _n@| jdkrdtjjdi | j| _n/| jdkrxdd l}|jdi | j| _nt| jtjjr| j| _n| jd urtd| j d| jr| j
d| ji t  }t!t"j#| j| jfi | j
| _#| jr| j#$ | _#t  }t%&d	| j d
|| dd d | _'| j(| j j)B | _)| j*| j j+B | _+d| _,| j j-pt.| _-d | _/d | _0d | _1d S )NFgcsr   s3absz"Unknown `config.input_filesystem` z! Filesystems can be None for local, any instance of `pyarrow.fs.FileSystem`, 'gcs' for GCS, 's3' for S3, or 'abs' for adlfs.AzureBlobFileSystem.
filesystemzTime to load offline data from z: z.2fzs. )2r   is_multi_agent
isinstanceinput_listr   pathinput_read_methoddata_read_methodinput_read_method_kwargsdata_read_method_kwargsinput_read_batch_sizedata_read_batch_sizematerialize_datamaterialize_mapped_datadata_is_mappedinput_filesystemr   input_filesystem_kwargsfilesystem_kwargsfilesystem_objectgcsfsGCSFileSystempyarrowfsS3FileSystemadlfsAzureBlobFileSystem
FileSystem
ValueErrorupdatetimeperf_countergetattrraydatamaterializeloggerdebugbatch_iteratorsdefault_map_batches_kwargsmap_batches_kwargsdefault_iter_batches_kwargsiter_batches_kwargsreturned_streaming_splitprelearner_classr   locality_hintslearner_handlesmodule_spec)selfr   r,   r1   
start_time	stop_timer   r   T/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/rllib/offline/offline_data.py__init__   sp   













zOfflineData.__init__F   Nnum_samplesreturn_iterator
num_shardsmodule_statec                 C   sz  | j sO|s$|dkrt| jd jjtdt }n| jd jtdt }| j| jt	 | j
|d}| jj| jf|| jp;|d| j| _d| _ | jrO| j | _| jr[|rt| jtjr|dkrk| jj|d| jd| _n+|rt| j | _n"dtttjf d	tfd
d}| jjd||d| j| _t | j| _|rt!| jS zt"| jW S  t#y   t$%d d | _| j&|||d Y S w )NrM   r   )	component)r   spacesrG   rQ   )fn_constructor_kwargs
batch_sizeT)nequalrE   _batchreturnc                 S   s4   t | } tdd |  D tdd |  D dS )Nc                 S   s   i | ]	\}}|t |qS r   )r
   ).0	module_idmodule_datar   r   rK   
<dictcomp>   s    z;OfflineData.sample.<locals>._collate_fn.<locals>.<dictcomp>c                 s   s$    | ]}t tt| V  qd S )N)lennextitervalues)rZ   r\   r   r   rK   	<genexpr>   s
    
z:OfflineData.sample.<locals>._collate_fn.<locals>.<genexpr>)	env_steps)r   r	   itemssumra   )rX   r   r   rK   _collate_fn   s   z'OfflineData.sample.<locals>._collate_fn)rU   rf   z*Batch iterator exhausted. Reinitiating ...)rN   rO   rP   r   )'r'   r9   getrF   	get_stateremoter   r   rS   r   rG   r:   map_batchesrD   r$   r@   r&   r;   r>   r   typesGeneratorTypestreaming_splitrE   iteratorr   strnpndarrayr	   iter_batchesrB   r`   r   r_   StopIterationr<   r=   sample)rH   rN   rO   rP   rQ   rT   rf   r   r   rK   rt   x   s   





zOfflineData.samplec                 C   s   t d| jjddS )N   T)concurrencyzero_copy_batch)maxr   num_learnersrH   r   r   rK   r?     s   z&OfflineData.default_map_batches_kwargsc                 C   s   ddiS )Nprefetch_batchesru   r   rz   r   r   rK   rA     s   z'OfflineData.default_iter_batches_kwargs)r   r   )FrM   N)__name__
__module____qualname__r   rL   r   intboolr   ro   r   rt   propertyr?   rA   r   r   r   rK   r      s*    Y
 
r   )%loggingr6   rk   pathlibr   typingr   r   r   numpyrp   
pyarrow.fsr.   r9   ray.rllib.corer   ray.rllib.envr   $ray.rllib.offline.offline_prelearnerr   ray.rllib.policy.sample_batchr	   r
   ray.rllib.utilsr   r   ray.rllib.utils.annotationsr   r   ray.util.annotationsr   %ray.rllib.algorithms.algorithm_configr   	getLoggerr|   r<   r   r   r   r   rK   <module>   s(    
