o
    bi                     @   sd   d dl Zd dlmZ d dlmZ d dlmZ d dlm	Z	 d dlm
Z
 d dlmZ G dd	 d	eZdS )
    N)tree)Layer)SeedGenerator)backend_utils)	jax_utils)trackingc                       sD   e Zd ZdZ fddZ fddZejdddZd	d
 Z	  Z
S )	DataLayeraj
  Layer designed for safe use in `tf.data` or `grain` pipeline.

    This layer overrides the `__call__` method to ensure that the correct
    backend is used and that computation is performed on the CPU.

    The `call()` method in subclasses should use `self.backend` ops. If
    randomness is needed, define both `seed` and `generator` in `__init__` and
    retrieve the running seed using `self._get_seed_generator()`. If the layer
    has weights in `__init__` or `build()`, use `convert_weight()` to ensure
    they are in the correct backend.

    **Note:** This layer and its subclasses only support a single input tensor.

    Examples:

    **Custom `DataLayer` subclass:**

    ```python
    from keras.src.layers.preprocessing.data_layer import DataLayer
    from keras.src.random import SeedGenerator


    class BiasedRandomRGBToHSVLayer(DataLayer):
        def __init__(self, seed=None, **kwargs):
            super().__init__(**kwargs)
            self.probability_bias = ops.convert_to_tensor(0.01)
            self.seed = seed
            self.generator = SeedGenerator(seed)

        def call(self, inputs):
            images_shape = self.backend.shape(inputs)
            batch_size = 1 if len(images_shape) == 3 else images_shape[0]
            seed = self._get_seed_generator(self.backend._backend)

            probability = self.backend.random.uniform(
                shape=(batch_size,),
                minval=0.0,
                maxval=1.0,
                seed=seed,
            )
            probability = self.backend.numpy.add(
                probability, self.convert_weight(self.probability_bias)
            )
            hsv_images = self.backend.image.rgb_to_hsv(inputs)
            return self.backend.numpy.where(
                probability[:, None, None, None] > 0.5,
                hsv_images,
                inputs,
            )

        def compute_output_shape(self, input_shape):
            return input_shape
    ```

    **Using as a regular Keras layer:**

    ```python
    import numpy as np

    x = np.random.uniform(size=(1, 16, 16, 3)).astype("float32")
    print(BiasedRandomRGBToHSVLayer()(x).shape)  # (1, 16, 16, 3)
    ```

    **Using in a `tf.data` pipeline:**

    ```python
    import tensorflow as tf

    tf_ds = tf.data.Dataset.from_tensors(x)
    tf_ds = tf_ds.map(BiasedRandomRGBToHSVLayer())
    print([x.shape for x in tf_ds])  # [(1, 16, 16, 3)]
    ```

    **Using in a `grain` pipeline:**

    ```python
    import grain

    grain_ds = grain.MapDataset.source([x])
    grain_ds = grain_ds.map(BiasedRandomRGBToHSVLayer())
    print([x.shape for x in grain_ds])  # [(1, 16, 16, 3)]
    c                    s&   t  jdi | t | _d| _d S )NT )super__init__r   DynamicBackendbackend!_allow_non_tensor_positional_args)selfkwargs	__class__r	   ]/home/ubuntu/.local/lib/python3.10/site-packages/keras/src/layers/preprocessing/data_layer.pyr   ^   s   

zDataLayer.__init__c                    s  t |d }t|tjsSt rSt|sS j	
d t  fdd|}d} jr0d _d}zt j|fi |}W  j	  |rFd _|S  j	  |rRd _w t|tjst rtjj	d t j|fi |W  d    S 1 syw   Y  d S t j|fi |S )Nr   
tensorflowc                    s    j j|  jdS )N)dtype)r   convert_to_tensorcompute_dtype)xr   r	   r   <lambda>m   s    z$DataLayer.__call__.<locals>.<lambda>FTcpu)r   flatten
isinstancekerasKerasTensorr   in_tf_graphr   is_in_jax_tracing_scoper   set_backendmap_structure_convert_input_argsr
   __call__resetin_grain_data_pipelinesrcdevice_scope)r   inputsr   sample_inputswitch_convert_input_argsoutputsr   r   r   r%   c   sB   




$zDataLayer.__call__Nc                 C   sz   t | dr
t | dstd|d u s|tj kr| jS t | ds$i | _|| jv r.| j| S t| j| jd}|| j|< |S )Nseed	generatorzpThe `seed` and `generator` variable must be set in the `__init__` method before calling `_get_seed_generator()`._backend_generators)r   )hasattr
ValueErrorr   r   r/   r0   r   r.   )r   r   seed_generatorr	   r	   r   _get_seed_generator   s   



zDataLayer._get_seed_generatorc                 C   s.   | j jtj   kr|S tj|}| j |S )z9Convert the weight if it is from the a different backend.)r   namer   opsconvert_to_numpyr   )r   weightr	   r	   r   convert_weight   s   zDataLayer.convert_weight)N)__name__
__module____qualname____doc__r   r%   r    no_automatic_dependency_trackingr4   r9   __classcell__r	   r	   r   r   r   
   s    S%r   )keras.src.backendr   	keras.srcr   keras.src.layers.layerr   keras.src.random.seed_generatorr   keras.src.utilsr   r   r   r   r	   r	   r	   r   <module>   s    