o
    ci-                     @   s<  d dl Z d dlZd dlmZ d dlZd dlZd dlmZm	Z	m
Z
mZ d dlZd dlZd dlmZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZmZ d d	lmZ e
r^d d
lm Z  dZ!e "e#Z$dede%fddZ&dee% de%fddZ'e	 dddde(de	ej)j*eej)j* f fddZ+eG dd deZ,dS )    N)Path)ListTupleTYPE_CHECKINGOptional)InputReader)	IOContext)from_json_datapostprocess_actions)concat_samplesSampleBatchDEFAULT_POLICY_ID)override	PublicAPI)SampleBatchType)AlgorithmConfigg      ?fpathextract_pathc                 C   s@   t t| d}|| W d    d S 1 sw   Y  d S )Nr)zipfileZipFilestr
extractall)r   r   zip_ref r   T/home/ubuntu/.local/lib/python3.10/site-packages/ray/rllib/offline/dataset_reader.py_unzip_this_path   s   "r   pathsformatc                 C   s,  g }| D ]}t dt|rbt|drtdd}z	tt|| W n% tyI   ztttj	j	| | W n tyF   td| w Y nw tt|
 t|j d|  }|| qt|dro|| qt| stttj	j	| }t| std| |}|| q|S )zLIf a path in paths is a zip file, unzip it and use path of the unzipped filez\.zip$zs3://z?unzip_if_needed currently does not support remote paths from s3z./zFile not found: .)researchr   
startswith
ValueErrorr   FileNotFoundErrorr   __file__parentabsolutestemappendexists)r   r   	ret_pathspathr   unzipped_pathrelative_pathr   r   r   _unzip_if_needed   s>   r/   configr   num_workersreturnc                 C   s  | j dksJ d| j  | j}|d}ddg}|dur+||vr+td| d| |d	}|d
}|r?|s;|r?td|rC|sI|sItd|durpt|trV|g}nt|trgt|d tsfJ dntdt||}|d|pvd}|dt}|r| }	n%|dkrt	j
j||d|id}	n|dkrt	j
j||d|id}	ntd||dkr|	|	gfS |	j|dd|}
|	dg|
 fS )a  Returns a dataset and a list of shards.

    This function uses algorithm configs to create a dataset and a list of shards.
    The following config keys are used to create the dataset:
        input: The input type should be "dataset".
        input_config: A dict containing the following key and values:
            `format`: str, speciifies the format of the input data. This will be the
            format that ray dataset supports. See ray.data.Dataset for
            supported formats. Only "parquet" or "json" are supported for now.
            `paths`: str, a single string or a list of strings. Each string is a path
            to a file or a directory holding the dataset. It can be either a local path
            or a remote path (e.g. to an s3 bucket).
            `loader_fn`: Callable[None, ray.data.Dataset], Instead of
            specifying paths and format, you can specify a function to load the dataset.
            `parallelism`: int, The number of tasks to use for loading the dataset.
            If not specified, it will be set to the number of workers.
            `num_cpus_per_read_task`: float, The number of CPUs to use for each read
            task. If not specified, it will be set to 0.5.

    Args:
        config: The config dict for the algorithm.
        num_workers: The number of shards to create for remote workers.

    Returns:
        dataset: The dataset object.
        shards: A list of dataset shards. For num_workers > 0 the first returned
        shared would be a dummy None shard for local_worker.
    datasetzQMust specify config.input_ as 'dataset' if calling `get_dataset_and_shards`. Got r   jsonparquetNzUnsupported format z. Supported formats are r   	loader_fnzBWhen using a `loader_fn`, you cannot specify a `format` or `path`.zMMust specify either a `loader_fn` or a `format` and `path` in `input_config`.r   z%Paths must be a list of path strings.z6Paths must be a path string or a list of path strings.parallelism   num_cpus_per_read_tasknum_cpus)r7   ray_remote_argsz!Un-supported Ray dataset format: F)
num_blocksshuffle)input_input_configgetr#   
isinstancer   listr/   DEFAULT_NUM_CPUS_PER_TASKraydata	read_jsonread_parquetrepartitionsplit)r0   r1   r?   r   supported_fmtsr   r6   r7   cpus_per_taskr3   remote_shardsr   r   r   get_dataset_and_shardsE   sh   !









rM   c                   @   sj   e Zd ZdZeddejjdee	 fddZ
eedefdd	Zd
edefddZd
edefddZdS )DatasetReadera  Reader object that loads data from Ray Dataset.

    Examples:
        config = {
            "input": "dataset",
            "input_config": {
                "format": "json",
                # A single data file, a directory, or anything
                # that ray.data.dataset recognizes.
                "paths": "/tmp/sample_batches/",
                # By default, parallelism=num_workers.
                "parallelism": 3,
                # Dataset allocates 0.5 CPU for each reader by default.
                # Adjust this value based on the size of your offline dataset.
                "num_cpus_per_read_task": 0.5,
            }
        }
    Ndsioctxc                    s(  |pt  _d __d_|_jsdnj _dtjj	
 _jjdd_jjdd}jjdd |rKttj| d_|rjjdursjjj_jt_jjddspjjjtnd_td	jj d
|  d  fdd}| _dS d_dS )ziInitializes a DatasetReader instance.

        Args:
            ds: Ray dataset to sample from.
        NFtrain_batch_sizer8   num_env_runnersr   seed_disable_preprocessorszDatasetReader z has z
, samples.c                  3   s"    	 j j d} |  E d H  q)NT)rS   )_datasetrandom_shuffle	iter_rows)rO   rS   selfr   r   iterator   s
   z(DatasetReader.__init__.<locals>.iterator)r   _ioctx_default_policy
policy_mappreprocessorrU   countrD   rE   DataContextget_currentenable_progress_barsr0   r@   
batch_sizemaxmathceilworker_policy_mapr   preprocessorsprintworker_index_iter)rY   rO   rP   r1   rZ   r   rX   r   __init__   s2   
zDatasetReader.__init__r2   c                 C   s   | j d usJ g }d}|| jk r;t| j }t|| jj}||j7 }| |}t|| j}| 	|}|
| || jk st|}|S )Nr   )rl   rc   nextr	   r[   rg   r_   _preprocess_if_neededr
   _postprocess_if_neededr)   r   )rY   retr_   dr   r   r   rn      s   






	zDatasetReader.nextbatchc                    sD    j r tjtjfD ]}||v rt fdd|| D ||< q	|S )Nc                    s   g | ]} j |qS r   )r^   	transform).0srY   r   r   
<listcomp>  s    z7DatasetReader._preprocess_if_needed.<locals>.<listcomp>)r^   r   CUR_OBSNEXT_OBSnpstack)rY   rs   keyr   rw   r   ro     s   z#DatasetReader._preprocess_if_neededc                 C   sf   | j jds	|S t|tr/g }| D ]}| jd ur%|| j| q|| qt	|S t
d)Npostprocess_inputsz7Postprocessing of multi-agent data not implemented yet.)r[   r0   r@   rA   r   split_by_episoder\   r)   postprocess_trajectoryr   NotImplementedError)rY   rs   out	sub_batchr   r   r   rp     s   

z$DatasetReader._postprocess_if_needed)N)__name__
__module____qualname____doc__r   rD   rE   Datasetr   r   rm   r   r   r   rn   ro   rp   r   r   r   r   rN      s    ,
rN   )r   )-loggingre   pathlibr   r    numpyr{   typingr   r   r   r   r   ray.datarD   ray.rllib.offline.input_readerr   ray.rllib.offline.io_contextr   ray.rllib.offline.json_readerr	   r
   ray.rllib.policy.sample_batchr   r   r   ray.rllib.utils.annotationsr   r   ray.rllib.utils.typingr   %ray.rllib.algorithms.algorithm_configr   rC   	getLoggerr   loggerr   r   r/   intrE   r   rM   rN   r   r   r   r   <module>   s>    
'l