o
    `۷i#                  	   @   s  d dl Z d dlmZ d dlmZmZmZmZmZm	Z	m
Z
mZmZmZ d dlZd dlmZ d dlmZ erNd dlZd dlZd dlZd dlmZ d dlmZmZ edd	d
Zeed ed f Zedeee eedf e	edf e	eef f Zdede fddZ!dede fddZ"dede fddZ#dede fddZ$dede fddZ%edede fddZ&eded eedf f Z'eG dd dee Z(eG dd de(d  Z)eG d!d" d"e(eeej*f  Z+eG d#d$ d$e(d% Z,eG d&d' d'e)Z-dS )(    N)ThreadPoolExecutor)
TYPE_CHECKINGAnyDictGenericListMappingOptionalTupleTypeVarUnion)env_integer)DeveloperAPI)	DataBatch)CollatedDataTorchDeviceTypeDataBatchTyper   )boundtorch.Tensor)r   ..batchreturnc                 C   s   ddl }t| |jS )z*Check if a batch is a single torch.Tensor.r   N)torch
isinstanceTensor)r   r    r   I/home/ubuntu/vllm_env/lib/python3.10/site-packages/ray/data/collate_fn.py
_is_tensor3   s   r   c                 C       t | ttfotdd | D S )aD  Check if a batch is a sequence of torch.Tensors.

    >>> import torch
    >>> _is_tensor_sequence(torch.ones(1))
    False
    >>> _is_tensor_sequence([torch.ones(1), torch.ones(1)])
    True
    >>> _is_tensor_sequence((torch.ones(1), torch.ones(1)))
    True
    >>> _is_tensor_sequence([torch.ones(1), 1])
    False
    c                 s       | ]}t |V  qd S Nr   .0tr   r   r   	<genexpr>G       z&_is_tensor_sequence.<locals>.<genexpr>r   listtupleallr   r   r   r   _is_tensor_sequence:   s    r+   c                 C   r   )a=  Check if a batch is a sequence of sequences of torch.Tensors.

    Stops at one level of nesting.

    >>> import torch
    >>> _is_nested_tensor_sequence([torch.ones(1), torch.ones(1)])
    False
    >>> _is_nested_tensor_sequence(
    ...    ([torch.ones(1), torch.ones(1)], [torch.ones(1)])
    ... )
    True
    c                 s   r   r   r+   r!   r   r   r   r$   W       
z-_is_nested_tensor_sequence.<locals>.<genexpr>r&   r*   r   r   r   _is_nested_tensor_sequenceJ   s   r.   c                 C       t | totdd |  D S )a   Check if a batch is a mapping of keys to torch.Tensors.

    >>> import torch
    >>> _is_tensor_mapping({"a": torch.ones(1), "b": torch.ones(1)})
    True
    >>> _is_tensor_mapping({"a": torch.ones(1), "b": [torch.ones(1), torch.ones(1)]})
    False
    c                 s   r   r   r    r"   vr   r   r   r$   e   r%   z%_is_tensor_mapping.<locals>.<genexpr>r   r   r)   valuesr*   r   r   r   _is_tensor_mapping\   s    	r4   c                 C   r/   )aE  Check if a batch is a mapping of keys to sequences of torch.Tensors.

    >>> import torch
    >>> _is_tensor_sequence_mapping({"a": torch.ones(1), "b": torch.ones(1)})
    False
    >>> _is_tensor_sequence_mapping(
    ...    {"a": (torch.ones(1), torch.ones(1)), "b": [torch.ones(1), torch.ones(1)]}
    ... )
    True
    c                 s   r   r   r,   r0   r   r   r   r$   s   r-   z._is_tensor_sequence_mapping.<locals>.<genexpr>r2   r*   r   r   r   _is_tensor_sequence_mappingh   s   r5   c                 C   s(   t | pt| pt| pt| pt| S )a"  Check if a batch matches any of the TensorBatchType variants.

    This function checks if the input batch is one of the following types:
    1. A single torch.Tensor
    2. A sequence of torch.Tensors
    3. A sequence of sequences of torch.Tensors
    4. A mapping (e.g., dict) of keys to torch.Tensors
    5. A mapping (e.g., dict) of keys to sequences of torch.Tensors

    Args:
        batch: The input batch to check. Can be any type.

    Returns:
        bool: True if the batch matches any TensorBatchType variant, False otherwise.
    )r   r+   r.   r4   r5   r*   r   r   r   is_tensor_batch_typex   s   r6   c                   @   s(   e Zd ZdZejdeddfddZdS )	CollateFnzAbstract interface for collate_fn for `iter_torch_batches`. See doc-string of
    `collate_fn` in `iter_torch_batches` API for more details.
    r   r   r   c                 C      dS )zConvert a batch of data to collated format.

        Args:
            batch: The input batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   selfr   r   r   r   __call__   s   
zCollateFn.__call__N)__name__
__module____qualname____doc__abcabstractmethodr   r;   r   r   r   r   r7      s    r7   c                   @      e Zd ZdZd	ddZdS )
ArrowBatchCollateFna  Collate function that takes pyarrow.Table as the input batch type.
    Arrow tables with chunked arrays can be efficiently transferred to GPUs without
    combining the chunks with the `arrow_batch_to_tensors` utility function.
    See `DefaultCollateFn` for example.
    r   pyarrow.Tabler   r   c                 C   r8   )zConvert a batch of pyarrow.Table to collated format.

        Args:
            batch: The input pyarrow.Table batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   r9   r   r   r   r;         	zArrowBatchCollateFn.__call__N)r   rD   r   r   r<   r=   r>   r?   r;   r   r   r   r   rC      s    rC   rD   c                   @   s,   e Zd ZdZdeeejf ddfddZdS )NumpyBatchCollateFnzQCollate function that takes a dictionary of numpy arrays as the input batch type.r   r   r   c                 C   r8   )zConvert a batch of numpy arrays to collated format.

        Args:
            batch: The input dictionary of numpy arrays batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   r9   r   r   r   r;      rE   zNumpyBatchCollateFn.__call__N)	r<   r=   r>   r?   r   strnpndarrayr;   r   r   r   r   rG      s     rG   c                   @   rB   )
PandasBatchCollateFnzGCollate function that takes a pandas.DataFrame as the input batch type.r   pandas.DataFramer   r   c                 C   r8   )zConvert a batch of pandas.DataFrame to collated format.

        Args:
            batch: The input pandas.DataFrame batch to collate.

        Returns:
            The collated data in the format expected by the model.
        Nr   r9   r   r   r   r;      rE   zPandasBatchCollateFn.__call__N)r   rL   r   r   rF   r   r   r   r   rK      s    rK   rL   c                	       s   e Zd ZdZeddZdddefdeedee	df f  ded	 d
e
def fddZdd Zdddeee	df ee	ed f f fddZ  ZS )DefaultCollateFnzIDefault collate function for converting Arrow batches to PyTorch tensors.2RAY_DATA_DEFAULT_COLLATE_FN_THREADPOOL_MAX_WORKERS   NFdtypesztorch.dtypedevicer   
pin_memorynum_workersc                    sP   ddl }t   || _t|ttfr||| _n|| _|| _|| _	d| _
dS )aH  Initialize the collate function.

        Args:
            dtypes: The torch dtype(s) for the created tensor(s); if None, the dtype
                will be inferred from the tensor data.
            device: The device on which the tensor should be placed. Can be a string
                (e.g. "cpu", "cuda:0") or a torch.device object.
            pin_memory: Whether to pin the memory of the created tensors.
            num_workers: Number of worker threads for parallel tensor conversion.
                Defaults to `RAY_DATA_DEFAULT_COLLATE_FN_THREADPOOL_MAX_WORKERS`.
        r   N)r   super__init__rP   r   rH   intrQ   rR   rS   _threadpool)r:   rP   rQ   rR   rS   r   	__class__r   r   rU      s   

zDefaultCollateFn.__init__c                 C   s"   t | ddr| jjdd dS dS )z#Clean up threadpool on destruction.rW   NF)wait)getattrrW   shutdown)r:   r   r   r   __del__  s   zDefaultCollateFn.__del__r   rD   r   r   c                 C   s\   ddl m} | jdkr| jdu rt| jd| _| jduo!| jjdk}||| j|| j| jdS )zConvert an Arrow batch to PyTorch tensors.

        Args:
            batch: PyArrow Table to convert

        Returns:
            Dictionary mapping column names to lists of tensors
        r   )arrow_batch_to_tensorsN)max_workerscpu)rP   combine_chunksrR   
threadpool)	ray.data.util.torch_utilsr^   rS   rW   r   rQ   typerP   rR   )r:   r   r^   ra   r   r   r   r;     s   zDefaultCollateFn.__call__)r<   r=   r>   r?   r   _DEFAULT_NUM_WORKERSr	   r   r   rH   boolrV   rU   r]   r   r;   __classcell__r   r   rX   r   rM      s2    rM   ).r@   concurrent.futuresr   typingr   r   r   r   r   r   r	   r
   r   r   numpyrI   ray._private.ray_constantsr   ray.util.annotationsr   pandaspyarrowr   ray.data.blockr   ray.data.datasetr   r   r   TensorSequenceTyperH   TensorBatchTyperf   r   r+   r.   r4   r5   r6   TensorBatchReturnTyper7   rC   rJ   rG   rK   rM   r   r   r   r   <module>   sd    0



 