o
    $ie,                     @   s:  d dl Z d dlZd dlmZmZmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZmZ d dlmZmZ d dlmZ d dlmZm Z  d dl!m"Z"m#Z# d dl$m%Z% d dl&m'Z' erxd dl(Z(e )e*Z+G dd ded Z,e'G dd de,Z-e'G dd de,Z.dS )    N)TYPE_CHECKINGAnyDictIterableOptional)urlparse)call_with_retry)%add_creatable_buckets_param_if_s3_uri)DelegatingBlockBuilder)TaskContext)WRITE_UUID_KWARG_NAME)SaveMode)RetryingPyFileSystem_is_local_scheme)BlockBlockAccessor)DataContext)DatasinkWriteResult)FilenameProvider_DefaultFilenameProvider)_resolve_paths_and_filesystem)DeveloperAPIc                   @   s   e Zd Zddddddejddeded dedeeee	f  d	ee
 d
ee dee defddZdeddfddZd&ded ddfddZdefddZdee deddfddZdededefdd Zd!ed fd"d#Zedefd$d%ZdS )'_FileDatasinkNT)
filesystemtry_create_diropen_stream_argsfilename_providerdataset_uuidfile_formatmodepathr   zpyarrow.fs.FileSystemr   r   r   r   r   r    c          
      C   s   |du ri }|du rt ||d}t | _|| _t||\}	| _tj| j| jj	d| _t
|	dks7J t
|	|	d | _|| _|| _|| _|| _|| _|| _d| _d| _d| _dS )a
  Initialize this datasink.

        Args:
            path: The folder to write files to.
            filesystem: The filesystem to write files to. If not provided, the
                filesystem is inferred from the path.
            try_create_dir: Whether to create the directory to write files to.
            open_stream_args: Arguments to pass to ``filesystem.open_output_stream``.
            filename_provider: A :class:`ray.data.datasource.FilenameProvider` that
                generates filenames for each row or block.
            dataset_uuid: The UUID of the dataset being written. If specified, it's
                included in the filename.
            file_format: The file extension. If specified, files are written with this
                extension.
        N)r   r   )retryable_errors   r   F)r   r   get_current_data_contextunresolved_pathr   r   r   wrapretried_io_errorslenr!   r   r   r   r   r   r    has_created_dir_skip_write_write_started)
selfr!   r   r   r   r   r   r   r    paths r/   ^/home/ubuntu/veenaModal/venv/lib/python3.10/site-packages/ray/data/datasource/file_datasink.py__init__!   s.   



z_FileDatasink.__init__returnpyarrow.NativeFilec                 C   s   | j j|fi | jS N)r   open_output_streamr   )r-   r!   r/   r/   r0   r5   W   s   z _FileDatasink.open_output_streamschemazpyarrow.Schemac                 C   s   | j rd S d| _ ddlm} | j| jj|ju}|r\| jt	j
kr*td| j d| jt	jkrBtd| j d| j  d| _d S | jt	jkr\td| j d| j  | j| j | | j| _d S )	NTr   FileTypezPath zO already exists. If this is unexpected, use mode='ignore' to ignore those filesz
[SaveMode=z] Skipping z] Replacing contents )r,   
pyarrow.fsr8   r   get_file_infor!   typeNotFoundr    r   ERROR
ValueErrorIGNOREloggerwarningr+   	OVERWRITEdelete_dir_contents_create_dirr*   )r-   r6   r8   
dir_existsr/   r/   r0   on_write_startZ   s&   z_FileDatasink.on_write_startc                 C   sj   ddl m} t|}|jdk}|o| jj }| jr3|s3| j|j	|j
u r3t|}| jj|dd dS dS )zsCreate a directory to write files to.

        If ``try_create_dir`` is ``False``, this method is a no-op.
        r   r7   s3T)	recursiveF)r9   r8   r   schemer%   s3_try_create_dirr   r   r:   r;   r<   r	   
create_dir)r-   destr8   
parsed_uri	is_s3_uriskip_create_dir_for_s3tmpr/   r/   r0   rD   t   s   

z_FileDatasink._create_dirblocksctxc                 C   s`   t  }|D ]}|| q| }t|}| dkr'td| j  d S | 	|d| d S )Nr   zSkipped writing empty block to )
r
   	add_blockbuildr   	for_blocknum_rowsr@   rA   r!   write_block)r-   rQ   rR   builderblockblock_accessorr/   r/   r0   write   s   
z_FileDatasink.writerY   block_indexc                 C      t r4   NotImplementedError)r-   rY   r\   rR   r/   r/   r0   rW      s   z_FileDatasink.write_blockwrite_resultc                 C   s*   | j r|jdkr| j| j d S d S d S )Nr   )r*   rV   r   
delete_dirr!   )r-   r`   r/   r/   r0   on_write_complete   s   z_FileDatasink.on_write_completec                 C   s   t | j S r4   )r   r&   r-   r/   r/   r0   supports_distributed_writes   s   z)_FileDatasink.supports_distributed_writesr4   )__name__
__module____qualname__r   APPENDstrr   boolr   r   r   r1   r5   rF   rD   r   r   r   r[   r   intrW   r   rb   propertyrd   r/   r/   r/   r0   r       sN    	

6 
r   c                   @   s@   e Zd ZdZdeeef ddfddZdede	d	e
fd
dZdS )RowBasedFileDatasinka  A datasink that writes one row to each file.

    Subclasses must implement ``write_row_to_file`` and call the superclass constructor.

    Examples:
        .. testcode::

            import io
            from typing import Any, Dict

            import pyarrow
            from PIL import Image

            from ray.data.datasource import RowBasedFileDatasink

            class ImageDatasink(RowBasedFileDatasink):
                def __init__(self, path: str, *, column: str, file_format: str = "png"):
                    super().__init__(path, file_format=file_format)
                    self._file_format = file_format
                    self._column = column

                def write_row_to_file(self, row: Dict[str, Any], file: "pyarrow.NativeFile"):
                    image = Image.fromarray(row[self._column])
                    buffer = io.BytesIO()
                    image.save(buffer, format=self._file_format)
                    file.write(buffer.getvalue())
    rowfiler3   c                 C   r]   )zWrite a row to a file.

        Args:
            row: The row to write.
            file: The file to write the row to.
        r^   )r-   rn   ro   r/   r/   r0   write_row_to_file      z&RowBasedFileDatasink.write_row_to_filerY   r\   rR   c                    s   t |jddD ]7\} j |jt |j||}tj	|t
d d  fdd}t|d djjd	 qd S )
NF)public_row_formatWriting  file.c                     <    }  |  W d    d S 1 sw   Y  d S r4   )r5   rp   ro   rn   r-   
write_pathr/   r0   write_row_to_path      "z;RowBasedFileDatasink.write_block.<locals>.write_row_to_pathwrite ''descriptionmatch)	enumerate	iter_rowsr   get_filename_for_rowkwargsr   task_idx	posixpathjoinr!   r@   debugr   r%   r(   )r-   rY   r\   rR   	row_indexfilenamery   r/   rw   r0   rW      s"   
z RowBasedFileDatasink.write_blockN)re   rf   rg   __doc__r   ri   r   rp   r   rk   r   rW   r/   r/   r/   r0   rm      s    	rm   c                       sn   e Zd ZdZdddee f fddZdedd	fd
dZdedede	fddZ
edee fddZ  ZS )BlockBasedFileDatasinka)  A datasink that writes multiple rows to each file.

    Subclasses must implement ``write_block_to_file`` and call the superclass
    constructor.

    Examples:
        .. testcode::

            class CSVDatasink(BlockBasedFileDatasink):
                def __init__(self, path: str):
                    super().__init__(path, file_format="csv")

                def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
                    from pyarrow import csv
                    csv.write_csv(block.to_arrow(), file)
    N)min_rows_per_filer   c                   s   t  j|fi | || _d S r4   )superr1   _min_rows_per_file)r-   r!   r   file_datasink_kwargs	__class__r/   r0   r1     s   
zBlockBasedFileDatasink.__init__rY   ro   r3   c                 C   r]   )zWrite a block of data to a file.

        Args:
            block: The block to write.
            file: The file to write the block to.
        r^   )r-   rY   ro   r/   r/   r0   write_block_to_file
  rq   z*BlockBasedFileDatasink.write_block_to_filer\   rR   c                    sh   j  |jt |j|}tj| fdd}t	d d t
|d djjd d S )Nc                     ru   r4   )r5   r   rv   rY   r-   rx   r/   r0   write_block_to_path  rz   z?BlockBasedFileDatasink.write_block.<locals>.write_block_to_pathrs   rt   r{   r|   r}   )r   get_filename_for_blockr   r   r   r   r   r!   r@   r   r   r%   r(   )r-   rY   r\   rR   r   r   r/   r   r0   rW     s   

z"BlockBasedFileDatasink.write_blockr2   c                 C   s   | j S r4   )r   rc   r/   r/   r0   min_rows_per_write$  s   z)BlockBasedFileDatasink.min_rows_per_write)re   rf   rg   r   r   rk   r1   r   r   r   rW   rl   r   __classcell__r/   r/   r   r0   r      s    	r   )/loggingr   typingr   r   r   r   r   urllib.parser   ray._common.retryr   ray._private.arrow_utilsr	   +ray.data._internal.delegating_block_builderr
   'ray.data._internal.execution.interfacesr   (ray.data._internal.planner.plan_write_opr   ray.data._internal.savemoder   ray.data._internal.utilr   r   ray.data.blockr   r   ray.data.contextr   ray.data.datasource.datasinkr   r   %ray.data.datasource.filename_providerr   r   ray.data.datasource.path_utilr   ray.util.annotationsr   pyarrow	getLoggerre   r@   r   rm   r   r/   r/   r/   r0   <module>   s4    
 =