o
    bi#                   	   @   s  d dl Z d dlmZmZmZmZmZmZmZm	Z	m
Z
mZ d dlZd dlmZ d dlmZ er@d dlZd dlZd dlZd dlmZ e
dedZeed e	d	 f Zedeee e	ed
f eedf eeef f ZdedefddZdedefddZdedefddZdedefddZdedefddZ ededefddZ!ede	d	 eedf f Z"eG dd dee Z#eG dd de#d Z$eG dd de#eeej%f  Z&eG d d! d!e#d" Z'eG d#d$ d$e$Z(dS )%    N)
TYPE_CHECKINGAnyDictGenericListMappingOptionalTupleTypeVarUnion)	DataBatch)DeveloperAPI)CollatedDataDataBatchType)boundtorch.Tensor)r   ..batchreturnc                 C   s   ddl }t| |jS )z*Check if a batch is a single torch.Tensor.r   N)torch
isinstanceTensor)r   r    r   G/home/ubuntu/.local/lib/python3.10/site-packages/ray/data/collate_fn.py
_is_tensor1   s   r   c                 C       t | ttfotdd | D S )aD  Check if a batch is a sequence of torch.Tensors.

    >>> import torch
    >>> _is_tensor_sequence(torch.ones(1))
    False
    >>> _is_tensor_sequence([torch.ones(1), torch.ones(1)])
    True
    >>> _is_tensor_sequence((torch.ones(1), torch.ones(1)))
    True
    >>> _is_tensor_sequence([torch.ones(1), 1])
    False
    c                 s       | ]}t |V  qd S Nr   .0tr   r   r   	<genexpr>E       z&_is_tensor_sequence.<locals>.<genexpr>r   listtupleallr   r   r   r   _is_tensor_sequence8   s    r(   c                 C   r   )a=  Check if a batch is a sequence of sequences of torch.Tensors.

    Stops at one level of nesting.

    >>> import torch
    >>> _is_nested_tensor_sequence([torch.ones(1), torch.ones(1)])
    False
    >>> _is_nested_tensor_sequence(
    ...    ([torch.ones(1), torch.ones(1)], [torch.ones(1)])
    ... )
    True
    c                 s   r   r   r(   r   r   r   r   r!   U       
z-_is_nested_tensor_sequence.<locals>.<genexpr>r#   r'   r   r   r   _is_nested_tensor_sequenceH   s   r+   c                 C       t | totdd |  D S )a   Check if a batch is a mapping of keys to torch.Tensors.

    >>> import torch
    >>> _is_tensor_mapping({"a": torch.ones(1), "b": torch.ones(1)})
    True
    >>> _is_tensor_mapping({"a": torch.ones(1), "b": [torch.ones(1), torch.ones(1)]})
    False
    c                 s   r   r   r   r   vr   r   r   r!   c   r"   z%_is_tensor_mapping.<locals>.<genexpr>r   r   r&   valuesr'   r   r   r   _is_tensor_mappingZ   s    	r1   c                 C   r,   )aE  Check if a batch is a mapping of keys to sequences of torch.Tensors.

    >>> import torch
    >>> _is_tensor_sequence_mapping({"a": torch.ones(1), "b": torch.ones(1)})
    False
    >>> _is_tensor_sequence_mapping(
    ...    {"a": (torch.ones(1), torch.ones(1)), "b": [torch.ones(1), torch.ones(1)]}
    ... )
    True
    c                 s   r   r   r)   r-   r   r   r   r!   q   r*   z._is_tensor_sequence_mapping.<locals>.<genexpr>r/   r'   r   r   r   _is_tensor_sequence_mappingf   s   r2   c                 C   s(   t | pt| pt| pt| pt| S )a"  Check if a batch matches any of the TensorBatchType variants.

    This function checks if the input batch is one of the following types:
    1. A single torch.Tensor
    2. A sequence of torch.Tensors
    3. A sequence of sequences of torch.Tensors
    4. A mapping (e.g., dict) of keys to torch.Tensors
    5. A mapping (e.g., dict) of keys to sequences of torch.Tensors

    Args:
        batch: The input batch to check. Can be any type.

    Returns:
        bool: True if the batch matches any TensorBatchType variant, False otherwise.
    )r   r(   r+   r1   r2   r'   r   r   r   is_tensor_batch_typev   s   r3   c                   @   s(   e Zd ZdZejdeddfddZdS )	CollateFnzAbstract interface for collate_fn for `iter_torch_batches`. See doc-string of
    `collate_fn` in `iter_torch_batches` API for more details.
    r   r   r   c                 C      dS )zConvert a batch of data to collated format.

        Args:
            batch: The input batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   selfr   r   r   r   __call__   s   
zCollateFn.__call__N)__name__
__module____qualname____doc__abcabstractmethodr   r8   r   r   r   r   r4      s    r4   c                   @      e Zd ZdZd	ddZdS )
ArrowBatchCollateFna  Collate function that takes pyarrow.Table as the input batch type.
    Arrow tables with chunked arrays can be efficiently transferred to GPUs without
    combining the chunks with the `arrow_batch_to_tensors` utility function.
    See `DefaultCollateFn` for example.
    r   pyarrow.Tabler   r   c                 C   r5   )zConvert a batch of pyarrow.Table to collated format.

        Args:
            batch: The input pyarrow.Table batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   r6   r   r   r   r8         	zArrowBatchCollateFn.__call__N)r   rA   r   r   r9   r:   r;   r<   r8   r   r   r   r   r@      s    r@   rA   c                   @   s,   e Zd ZdZdeeejf ddfddZdS )NumpyBatchCollateFnzQCollate function that takes a dictionary of numpy arrays as the input batch type.r   r   r   c                 C   r5   )zConvert a batch of numpy arrays to collated format.

        Args:
            batch: The input dictionary of numpy arrays batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   r6   r   r   r   r8      rB   zNumpyBatchCollateFn.__call__N)	r9   r:   r;   r<   r   strnpndarrayr8   r   r   r   r   rD      s     rD   c                   @   r?   )
PandasBatchCollateFnzGCollate function that takes a pandas.DataFrame as the input batch type.r   pandas.DataFramer   r   c                 C   r5   )zConvert a batch of pandas.DataFrame to collated format.

        Args:
            batch: The input pandas.DataFrame batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   r6   r   r   r   r8      rB   zPandasBatchCollateFn.__call__N)r   rI   r   r   rC   r   r   r   r   rH      s    rH   rI   c                       st   e Zd ZdZ			ddeedeedf f  deeedf  def fd	d
Z	dddeee
d f fddZ  ZS )DefaultCollateFnzIDefault collate function for converting Arrow batches to PyTorch tensors.NFdtypesztorch.dtypedeviceztorch.device
pin_memoryc                    s@   ddl }t   || _t|tr||| _n|| _|| _dS )a  Initialize the collate function.

        Args:
            dtypes: The torch dtype(s) for the created tensor(s); if None, the dtype
                will be inferred from the tensor data.
            device: The device on which the tensor should be placed. Can be a string
                (e.g. "cpu", "cuda:0") or a torch.device object.
            pin_memory: Whether to pin the memory of the created tensors.
        r   N)r   super__init__rK   r   rE   rL   rM   )r7   rK   rL   rM   r   	__class__r   r   rO      s   


zDefaultCollateFn.__init__r   rA   r   r   c                 C   s,   ddl m} | jjdk}||| j|| jdS )zConvert an Arrow batch to PyTorch tensors.

        Args:
            batch: PyArrow Table to convert

        Returns:
            Dictionary mapping column names to lists of tensors
        r   )arrow_batch_to_tensorscpu)rK   combine_chunksrM   )ray.air._internal.torch_utilsrR   rL   typerK   rM   )r7   r   rR   rT   r   r   r   r8      s   		zDefaultCollateFn.__call__)NNF)r9   r:   r;   r<   r   r   r   rE   boolrO   r   r8   __classcell__r   r   rP   r   rJ      s    &rJ   ))r=   typingr   r   r   r   r   r   r   r	   r
   r   numpyrF   ray.data.blockr   ray.util.annotationsr   pandaspyarrowr   ray.data.datasetr   r   TensorSequenceTyperE   TensorBatchTyperW   r   r(   r+   r1   r2   r3   TensorBatchReturnTyper4   r@   rG   rD   rH   rJ   r   r   r   r   <module>   s`    0



 