o
    bi                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZm	Z	 d dl
Zd dlmZ d dlmZ d dlmZ d dlmZ er<d dlZe eZg dZG d	d
 d
eZG dd deZdejfddZdS )    N)BytesIO)TYPE_CHECKINGAnyDictListOptionalUnion)pyarrow_table_from_pydict)PandasBlockAccessor)DataContext)FileBasedDatasource)
jsonjsonlzjson.gzzjsonl.gzzjson.brzjsonl.brzjson.zstz	jsonl.zstzjson.lz4z	jsonl.lz4c                       sn   e Zd ZdZdddeeee f deeee	f  f fddZ
dd
dZdddZdddefddZ  ZS )ArrowJSONDatasourcez>JSON datasource, for reading and writing JSON and JSONL files.N)arrow_json_argspathsr   c                   sL   ddl m} t j|fi | |d u ri }|d|jdd| _|| _d S )Nr   )r   read_optionsF)use_threads)pyarrowr   super__init__popReadOptionsr   r   )selfr   r   file_based_datasource_kwargsr   	__class__ a/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/_internal/datasource/json_datasource.pyr   &   s   
zArrowJSONDatasource.__init__bufferpyarrow.lib.Bufferc              
   c   s    ddl }ddlm} | jj}t j}	 z|jt	|fd| ji| j
V  || j_W dS  |jyu } z<dt|v ri| jj|k r\td| jj d| jjd  d	 | j jd9  _n|| d
| jj d|W Y d}~nd}~ww q)zRead with PyArrow JSON reader, trying to auto-increase the
        read block size in the case of the read object
        straddling block boundaries.r   NTr   z0straddling object straddles two block boundariesz+JSONDatasource read failed with block_size=z. Retrying with block_size=   .z! - Auto-increasing block size to a+   bytes failed. Please try manually increasing the block size through the `read_options` parameter to a larger size. For example: `read_json(..., read_options=pyarrow.json.ReadOptions(block_size=10 << 25))`More information on this issue can be found here: https://github.com/apache/arrow/issues/25674)r   pyarrow.jsonr   r   
block_sizer   get_currenttarget_max_block_size	read_jsonr   r   ArrowInvalidstrloggerdebug)r   r   papajsoninit_block_sizemax_block_sizeer   r   r   _read_with_pyarrow_read_json9   sJ   

z0ArrowJSONDatasource._read_with_pyarrow_read_jsonc              
   c   s    ddl }ddl}|jdkrdS |t|}z
|j|V  W dS  tyd } z7dt|v s5J t|ddl	m
} |t}|D ]}| D ]\}	}
||	 |
 qGqAt|V  W Y d}~dS d}~ww )z{Fallback method to read JSON files with Python's native json.load(),
        in case the default pyarrow json reader fails.r   Nzno attribute 'from_pylist')defaultdict)r   r   sizeloadr   Tablefrom_pylistAttributeErrorr)   collectionsr2   listitemsappendr	   )r   r   r   r,   parsed_jsonr0   r2   dctrowkvr   r   r   _read_with_python_jsont   s&   
z*ArrowJSONDatasource._read_with_python_jsonfpyarrow.NativeFilepathc              
   c   sr    dd l }| }z| |E d H  W d S  |jy8 } ztd|  | |E d H  W Y d }~d S d }~ww )Nr   zyError reading with pyarrow.json.read_json(). Falling back to native json.load(), which may be slower. PyArrow error was:
)r   read_bufferr1   r(   r*   warningrA   )r   rB   rD   r,   r   r0   r   r   r   _read_stream   s   z ArrowJSONDatasource._read_stream)r   r    )__name__
__module____qualname____doc__r   r)   r   r   r   r   r   r1   rA   rG   __classcell__r   r   r   r   r   #   s    

;r   c                       sl   e Zd Zdeeee f def fddZdddefdd	Zddd
efddZ	ddded
dfddZ
  ZS )PandasJSONDatasourcer   target_output_size_bytesc                    s   t  j|fi | || _d S N)r   r   _target_output_size_bytes)r   r   rN   r   r   r   r   r      s   
zPandasJSONDatasource.__init__rB   rC   rD   c                 c   sV    |  |}tj||dd}|D ]}t|V  qW d    d S 1 s$w   Y  d S )NT	chunksizelines)_estimate_chunksizepdr'   _cast_range_index_to_string)r   rB   rD   rR   readerdfr   r   r   rG      s   
"z!PandasJSONDatasource._read_streamreturnc                 C   s   |  dks
J dtj|ddd}tt|}W d    n1 s#w   Y  t|}| dkr5dS | |  }t	t
| j| d}|d |S )Nr   z%File pointer must be at the beginning   TrQ   )tellrU   r'   rV   nextr
   	for_blocknum_rows
size_bytesmaxroundrP   seek)r   rB   rW   rX   block_accessorbytes_per_rowrR   r   r   r   rT      s   

z(PandasJSONDatasource._estimate_chunksize
filesystemzpyarrow.fs.FileSystemc                 K   s&   |j |fi |}| sJ d|S )NzFile must be seekable)open_input_fileseekable)r   re   rD   	open_argsfiler   r   r   _open_input_source   s   z'PandasJSONDatasource._open_input_source)rH   rI   rJ   r   r)   r   intr   rG   rT   rj   rL   r   r   r   r   rM      s    
rM   rX   c                 C   s    t | jtjr| jt| _| S rO   )
isinstancecolumnsrU   
RangeIndexastyper)   )rX   r   r   r   rV      s   rV   )loggingior   typingr   r   r   r   r   r   pandasrU   $ray.air.util.tensor_extensions.arrowr	   ray.data._internal.pandas_blockr
   ray.data.contextr   )ray.data.datasource.file_based_datasourcer   r   	getLoggerrH   r*   JSON_FILE_EXTENSIONSr   rM   	DataFramerV   r   r   r   r   <module>   s     
~/