o
    bi	+                     @   s2  d dl Z d dlZd dlmZmZmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZmZ d dlmZ d dlmZmZ d dl m!Z!m"Z" d dl#m$Z$ d dl%m&Z& ertd dl'Z'e (e)Z*G dd ded Z+e&G dd de+Z,e&G dd de+Z-dS )    N)TYPE_CHECKINGAnyDictIterableOptional)urlparse)%add_creatable_buckets_param_if_s3_uri)DelegatingBlockBuilder)TaskContext)WRITE_UUID_KWARG_NAME)SaveMode)RetryingPyFileSystem_is_local_schemecall_with_retry)BlockBlockAccessor)DataContext)DatasinkWriteResult)FilenameProvider_DefaultFilenameProvider)_resolve_paths_and_filesystem)DeveloperAPIc                   @   s   e Zd Zddddddejddeded dedeeee	f  d	ee
 d
ee dee defddZdeddfddZd$ddZdefddZdee deddfddZdededefddZded fd d!Zedefd"d#ZdS )%_FileDatasinkNT)
filesystemtry_create_diropen_stream_argsfilename_providerdataset_uuidfile_formatmodepathr   zpyarrow.fs.FileSystemr   r   r   r   r   r    c          
      C   s   |du ri }|du rt ||d}t | _|| _t||\}	| _tj| j| jj	d| _t
|	dks7J t
|	|	d | _|| _|| _|| _|| _|| _|| _d| _dS )a
  Initialize this datasink.

        Args:
            path: The folder to write files to.
            filesystem: The filesystem to write files to. If not provided, the
                filesystem is inferred from the path.
            try_create_dir: Whether to create the directory to write files to.
            open_stream_args: Arguments to pass to ``filesystem.open_output_stream``.
            filename_provider: A :class:`ray.data.datasource.FilenameProvider` that
                generates filenames for each row or block.
            dataset_uuid: The UUID of the dataset being written. If specified, it's
                included in the filename.
            file_format: The file extension. If specified, files are written with this
                extension.
        N)r   r   )retryable_errors   r   F)r   r   get_current_data_contextunresolved_pathr   r   r   wrapretried_io_errorslenr!   r   r   r   r   r   r    has_created_dir)
selfr!   r   r   r   r   r   r   r    paths r-   U/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/datasource/file_datasink.py__init__!   s*   



z_FileDatasink.__init__returnpyarrow.NativeFilec                 C   s   | j j|fi | jS N)r   open_output_streamr   )r+   r!   r-   r-   r.   r3   U   s   z _FileDatasink.open_output_streamc                 C   s   ddl m} | j| jj|ju}|rQ| jtj	kr"t
d| j d| jtjkr7td| j d| j  d S | jtjkrQtd| j d| j  | j| j | | j| _d S )Nr   FileTypezPath zO already exists. If this is unexpected, use mode='ignore' to ignore those filesz
[SaveMode=z] Skipping z] Replacing contents )
pyarrow.fsr5   r   get_file_infor!   typeNotFoundr    r   ERROR
ValueErrorIGNOREloggerwarning	OVERWRITEdelete_dir_contents_create_dirr*   )r+   r5   
dir_existsr-   r-   r.   on_write_startX   s   z_FileDatasink.on_write_startc                 C   sj   ddl m} t|}|jdk}|o| jj }| jr3|s3| j|j	|j
u r3t|}| jj|dd dS dS )zsCreate a directory to write files to.

        If ``try_create_dir`` is ``False``, this method is a no-op.
        r   r4   s3T)	recursiveF)r6   r5   r   schemer%   s3_try_create_dirr   r   r7   r8   r9   r   
create_dir)r+   destr5   
parsed_uri	is_s3_uriskip_create_dir_for_s3tmpr-   r-   r.   rA   k   s   

z_FileDatasink._create_dirblocksctxc                 C   s`   t  }|D ]}|| q| }t|}| dkr'td| j  d S | 	|d| d S )Nr   zSkipped writing empty block to )
r	   	add_blockbuildr   	for_blocknum_rowsr=   r>   r!   write_block)r+   rN   rO   builderblockblock_accessorr-   r-   r.   write   s   
z_FileDatasink.writerV   block_indexc                 C      t r2   NotImplementedError)r+   rV   rY   rO   r-   r-   r.   rT      s   z_FileDatasink.write_blockwrite_resultc                 C   s*   | j r|jdkr| j| j d S d S d S )Nr   )r*   rS   r   
delete_dirr!   )r+   r]   r-   r-   r.   on_write_complete   s   z_FileDatasink.on_write_completec                 C   s   t | j S r2   )r   r&   r+   r-   r-   r.   supports_distributed_writes   s   z)_FileDatasink.supports_distributed_writes)r0   N)__name__
__module____qualname__r   APPENDstrr   boolr   r   r   r/   r3   rC   rA   r   r   r
   rX   r   intrT   r   r_   propertyra   r-   r-   r-   r.   r       sN    	

4
 
r   c                   @   s@   e Zd ZdZdeeef ddfddZdede	d	e
fd
dZdS )RowBasedFileDatasinka  A datasink that writes one row to each file.

    Subclasses must implement ``write_row_to_file`` and call the superclass constructor.

    Examples:
        .. testcode::

            import io
            from typing import Any, Dict

            import pyarrow
            from PIL import Image

            from ray.data.datasource import RowBasedFileDatasink

            class ImageDatasink(RowBasedFileDatasink):
                def __init__(self, path: str, *, column: str, file_format: str = "png"):
                    super().__init__(path, file_format=file_format)
                    self._file_format = file_format
                    self._column = column

                def write_row_to_file(self, row: Dict[str, Any], file: "pyarrow.NativeFile"):
                    image = Image.fromarray(row[self._column])
                    buffer = io.BytesIO()
                    image.save(buffer, format=self._file_format)
                    file.write(buffer.getvalue())
    rowfiler1   c                 C   rZ   )zWrite a row to a file.

        Args:
            row: The row to write.
            file: The file to write the row to.
        r[   )r+   rk   rl   r-   r-   r.   write_row_to_file      z&RowBasedFileDatasink.write_row_to_filerV   rY   rO   c                    s   t |jddD ]7\} j |jt |j||}tj	|t
d d  fdd}t|d djjd	 qd S )
NF)public_row_formatWriting  file.c                     <    }  |  W d    d S 1 sw   Y  d S r2   )r3   rm   rl   rk   r+   
write_pathr-   r.   write_row_to_path      "z;RowBasedFileDatasink.write_block.<locals>.write_row_to_pathwrite ''descriptionmatch)	enumerate	iter_rowsr   get_filename_for_rowkwargsr   task_idx	posixpathjoinr!   r=   debugr   r%   r(   )r+   rV   rY   rO   	row_indexfilenamerv   r-   rt   r.   rT      s"   
z RowBasedFileDatasink.write_blockN)rb   rc   rd   __doc__r   rf   r   rm   r   rh   r
   rT   r-   r-   r-   r.   rj      s    	rj   c                       sn   e Zd ZdZdddee f fddZdedd	fd
dZdedede	fddZ
edee fddZ  ZS )BlockBasedFileDatasinka)  A datasink that writes multiple rows to each file.

    Subclasses must implement ``write_block_to_file`` and call the superclass
    constructor.

    Examples:
        .. testcode::

            class CSVDatasink(BlockBasedFileDatasink):
                def __init__(self, path: str):
                    super().__init__(path, file_format="csv")

                def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
                    from pyarrow import csv
                    csv.write_csv(block.to_arrow(), file)
    N)min_rows_per_filer   c                   s   t  j|fi | || _d S r2   )superr/   _min_rows_per_file)r+   r!   r   file_datasink_kwargs	__class__r-   r.   r/      s   
zBlockBasedFileDatasink.__init__rV   rl   r1   c                 C   rZ   )zWrite a block of data to a file.

        Args:
            block: The block to write.
            file: The file to write the block to.
        r[   )r+   rV   rl   r-   r-   r.   write_block_to_file  rn   z*BlockBasedFileDatasink.write_block_to_filerY   rO   c                    sh   j  |jt |j|}tj| fdd}t	d d t
|d djjd d S )Nc                     rr   r2   )r3   r   rs   rV   r+   ru   r-   r.   write_block_to_path  rw   z?BlockBasedFileDatasink.write_block.<locals>.write_block_to_pathrp   rq   rx   ry   rz   )r   get_filename_for_blockr   r   r   r   r   r!   r=   r   r   r%   r(   )r+   rV   rY   rO   r   r   r-   r   r.   rT   
  s   

z"BlockBasedFileDatasink.write_blockr0   c                 C   s   | j S r2   )r   r`   r-   r-   r.   min_rows_per_write  s   z)BlockBasedFileDatasink.min_rows_per_write)rb   rc   rd   r   r   rh   r/   r   r   r
   rT   ri   r   __classcell__r-   r-   r   r.   r      s    	r   ).loggingr   typingr   r   r   r   r   urllib.parser   ray._private.arrow_utilsr   +ray.data._internal.delegating_block_builderr	   'ray.data._internal.execution.interfacesr
   (ray.data._internal.planner.plan_write_opr   ray.data._internal.savemoder   ray.data._internal.utilr   r   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasinkr   r   %ray.data.datasource.filename_providerr   r   ray.data.datasource.path_utilr   ray.util.annotationsr   pyarrow	getLoggerrb   r=   r   rj   r   r-   r-   r-   r.   <module>   s2    
 
=