o
    bi                     @   s   d dl Z d dlmZ d dlmZmZmZmZmZ d dl	Z	d dl
mZ d dlmZmZ d dlmZ e eZedZ	 eeG dd	 d	ee ZeG d
d dee ZeG dd ded ZdS )    N)	dataclass)GenericIterableListOptionalTypeVar)TaskContext)BlockBlockAccessor)DeveloperAPIWriteReturnTypec                   @   s.   e Zd ZU dZeed< eed< ee ed< dS )WriteResultz3Aggregated result of the Datasink write operations.num_rows
size_byteswrite_returnsN)__name__
__module____qualname____doc__int__annotations__r   r    r   r   P/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/datasource/datasink.pyr      s
   
 r   c                   @   s   e Zd ZdZdddZdee dedefdd	Z	d
e
e fddZdeddfddZdefddZedefddZedee fddZdS )DatasinkzInterface for defining write-related logic.

    If you want to write data to something that isn't built-in, subclass this class
    and call :meth:`~ray.data.Dataset.write_datasink`.
    returnNc                 C      dS )zCallback for when a write job starts.

        Use this method to perform setup for write tasks. For example, creating a
        staging bucket in S3.
        Nr   selfr   r   r   on_write_start&      zDatasink.on_write_startblocksctxc                 C   s   t )a  Write blocks. This is used by a single write task.

        Args:
            blocks: Generator of data blocks.
            ctx: ``TaskContext`` for the write task.

        Returns:
            Result of this write task. When the entire write operator finishes,
            All returned values will be passed as `WriteResult.write_returns`
            to `Datasink.on_write_complete`.
        )NotImplementedError)r   r    r!   r   r   r   write.   s   zDatasink.writewrite_resultc                 C   r   )a  Callback for when a write job completes.

        This can be used to `commit` a write output. This method must
        succeed prior to ``write_datasink()`` returning to the user. If this
        method fails, then ``on_write_failed()`` is called.

        Args:
            write_result: Aggregated result of the
               Write operator, containing write results and stats.
        Nr   r   r$   r   r   r   on_write_complete@   s   zDatasink.on_write_completeerrorc                 C   r   )zCallback for when a write job fails.

        This is called on a best-effort basis on write failures.

        Args:
            error: The first error encountered.
        Nr   r   r'   r   r   r   on_write_failedM   s   zDatasink.on_write_failedc                 C   sD   t | j}d}|dr|dd }||r |dt|  }|S )zoReturn a human-readable name for this datasink.

        This is used as the names of the write tasks.
        r   _   N)typer   
startswithendswithlen)r   namedatasink_suffixr   r   r   get_nameW   s   


zDatasink.get_namec                 C   r   )z;If ``False``, only launch write tasks on the driver's node.Tr   r   r   r   r   supports_distributed_writesd   s   z$Datasink.supports_distributed_writesc                 C   r   )zThe target number of rows to pass to each :meth:`~ray.data.Datasink.write` call.

        If ``None``, Ray Data passes a system-chosen number of rows.
        Nr   r   r   r   r   min_rows_per_writei   r   zDatasink.min_rows_per_write)r   N)r   r   r   r   r   r   r	   r   r   r#   r   r&   	Exceptionr)   strr2   propertyboolr3   r   r   r4   r   r   r   r   r      s"    


r   c                   @   sV   e Zd ZdZdd Zdee deddfdd	Zd
e	d fddZ
deddfddZdS )DummyOutputDatasinka0  An example implementation of a writable datasource for testing.
    Examples:
        >>> import ray
        >>> from ray.data.datasource import DummyOutputDatasink
        >>> output = DummyOutputDatasink()
        >>> ray.data.range(10).write_datasink(output)
        >>> assert output.num_ok == 1
    c                 C   sH   t jj }t j|jdG dd d}| | _d| _d| _d| _	d S )N)scheduling_strategyc                   @   s.   e Zd Zdd ZdeddfddZdd	 ZdS )
z.DummyOutputDatasink.__init__.<locals>.DataSinkc                 S   s   d| _ d| _d S )Nr   T)rows_writtenenabledr   r   r   r   __init__   s   
z7DummyOutputDatasink.__init__.<locals>.DataSink.__init__blockr   Nc                 S   s    t |}|  j| 7  _d S N)r
   	for_blockr;   r   )r   r>   r   r   r   r#      s   
z4DummyOutputDatasink.__init__.<locals>.DataSink.writec                 S   s   | j S r?   )r;   r   r   r   r   get_rows_written   s   z?DummyOutputDatasink.__init__.<locals>.DataSink.get_rows_written)r   r   r   r=   r	   r#   rA   r   r   r   r   DataSink   s    rB   r   T)
raydataDataContextget_currentremoter:   	data_sinknum_ok
num_failedr<   )r   r!   rB   r   r   r   r=   }   s   

zDummyOutputDatasink.__init__r    r!   r   Nc                 C   s>   g }| j s	td|D ]}|| jj| qt| d S )Ndisabled)r<   
ValueErrorappendrH   r#   rG   rC   get)r   r    r!   tasksbr   r   r   r#      s   zDummyOutputDatasink.writer$   c                 C      |  j d7  _ d S Nr+   )rI   r%   r   r   r   r&         z%DummyOutputDatasink.on_write_completer'   c                 C   rQ   rR   )rJ   r(   r   r   r   r)      rS   z#DummyOutputDatasink.on_write_failed)r   r   r   r   r=   r   r	   r   r#   r   r&   r5   r)   r   r   r   r   r9   r   s    	
r9   )loggingdataclassesr   typingr   r   r   r   r   rC   'ray.data._internal.execution.interfacesr   ray.data.blockr	   r
   ray.util.annotationsr   	getLoggerr   loggerr   r   r   r9   r   r   r   r   <module>   s"    
S