o
    biP                     @   sZ  d dl Z d dlmZmZmZmZmZmZ d dlZ	d dl
Zd dlZd dlZd dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlmZ eddZd	eej fd
dZ			d6dejdeeee  eee   f  deeej!eej! f  de"d	eej#eej# f f
ddZ$			d7de	j%deej! deee df  de"d	ej#f
ddZ&			d7dee	j%ee e	j%f f deeej!ee ej!f f  deee df  de"d	eej#ee ej#f f f
ddZ'	d8deej(j)ef deej(j) d	ej(j)fd d!Z*d"d# Z+d$ee ef d%e d	ee ef fd&d'Z,			d7dee ee	j% f deeej!ee ej!f f  deee df  de"d	ee eej# f f
d(d)Z-			d9d*ej.deeej!ee ej!f f  d+e"de"d	ee eej# f f
d,d-Z/e0 defd.eej# deee df  d/e"d	ej#fd0d1Z1d*ed	e fd2d3Z2e0 defd*edeee df  d/e"d	efd4d5Z3dS ):    N)AnyDictListOptionalUnionSequence)#get_torch_device_manager_by_context)%_unwrap_ndarray_object_type_if_needed)TensorBatchTypeTensorBatchReturnType
_is_tensor_is_tensor_sequence_is_nested_tensor_sequence_is_tensor_mapping_is_tensor_sequence_mapping)env_bool,RAY_AIR_DEFAULT_TENSOR_NON_BLOCKING_TRANSFERTreturnc                   C   s
   t   S )a  Gets the correct torch device list configured for this process.

    Returns a list of torch accelerator (GPU, HPU, NPU...) devices allocated for
    the current worker.
    If no accelerators are assigned, then it returns a list with a single CPU device.
    )r   get_devices r   r   Q/home/ubuntu/.local/lib/python3.10/site-packages/ray/air/_internal/torch_utils.pyr      s   
r   
data_batchcolumnscolumn_dtypes	unsqueezec                    s   |o
t |d ttf}|s|rt |tjstdt| d|r#|ng }fdd fdd|rPt|ttfvrD|gt| }fdd	t||D S ||d
S )a.  Converts a Pandas dataframe to a torch Tensor or list of torch Tensors.

    The format of the return type will match the format of ``columns``. If a
    list of columns is provided, the return type will be a single tensor. If
    ``columns`` is a list of lists, then the return type will be a list of
    tensors.

    Args:
        data_batch: The pandas dataframe to convert to a
            torch tensor.
        columns:
            The names of the columns in the dataframe to include in the
            torch tensor. If this arg is a List[List[str]], then the return
            type will be a List of tensors. This is useful for multi-input
            models. If None, then use all columns in the ``data_batch``.
        column_dtype: The
            torch dtype to use for the tensor. If set to None,
            then automatically infer the dtype.
        unsqueeze: If set to True, the tensors
            will be unsqueezed (reshaped to (N, 1)) before being concatenated into
            the final tensor. Otherwise, they will be left as is, that is
            (N, ). Defaults to True.

    Returns:
        Either a torch tensor of size (N, len(columns)) where N is the
        number of rows in the ``data_batch`` Dataframe, or a list of
        tensors, where the size of item i is (N, len(columns[i])).

    r   z^If `columns` is a list of strings, `column_dtypes` must be None or a single `torch.dtype`.Got z	 instead.c                    sr   t | tjjjr|  } | jjtj	u r2 fdd| D }zt
|W S  ty1   t
| Y S w t
j|  dS )zcThis recursive function allows to convert pyarrow List dtypes
        to multi-dimensional tensors.c                    s   g | ]}| qS r   r   ).0x)dtype	tensorizer   r   
<listcomp>d   s    zEconvert_pandas_to_torch_tensor.<locals>.tensorize.<locals>.<listcomp>r   )
isinstancepdapi
extensionsExtensionArrayto_numpyr   typenpobject_torchstackRuntimeErrornested_tensor	as_tensor)valsr   tensors)r   r    r   r   W   s   z1convert_pandas_to_torch_tensor.<locals>.tensorizec           	         s   g }| r	 |  }n }|j D ]4}|| j}z||d}W n ty5 } ztd| d| d|d }~ww r=|d}|| qt|dkrRtj|dd}|S |d }|S )Nr    zFailed to convert column z to a Torch Tensor of dtype z2. See above exception chain for the exact failure.   )dimr   )	r   values	Exception
ValueErrorr   appendlenr*   cat)	r   r   feature_tensorsbatchcolcol_valstefeature_tensor)r   r   r   r   r   get_tensor_for_columnso   s4   



z>convert_pandas_to_torch_tensor.<locals>.get_tensor_for_columnsc                    s   g | ]
\}} ||d qS )r   r   r   )r   
subcolumnsr   )r@   r   r   r      s    
z2convert_pandas_to_torch_tensor.<locals>.<listcomp>rA   )	r!   listtupler*   r   	TypeErrorr'   r7   zip)r   r   r   r   multi_inputr   )r   r@   r   r   r   convert_pandas_to_torch_tensor(   s"   $
rH   Fndarrayr   deviceztorch.device
pin_memoryc                 C   s   t | } | jjtju rtdt  td t	j
| ||d}W d   n1 s+w   Y  |rH|jjdksDJ d|j d| d| }|S )	a  Convert a NumPy ndarray to a Torch Tensor.

    Args:
        ndarray: A NumPy ndarray that we wish to convert to a Torch Tensor.
        dtype: A Torch dtype for the created tensor; if None, the dtype will be
            inferred from the NumPy ndarray data.
        device: The device on which the tensor(s) should be placed; if None, the Torch
            tensor(s) will be constructed on the CPU.
        pin_memory: Whether to pin the memory of the created tensors.

    Returns: A Torch Tensor.
    a<  Numpy array of object dtype cannot be converted to a Torch Tensor. This may because the numpy array is a ragged tensor--it contains items of different sizes. If using `iter_torch_batches()` API, you can pass in a `collate_fn` argument to specify custom logic to convert the Numpy array batch to a Torch tensor batch.ignorer   rJ   Ncpuz:Pin memory is only supported for CPU tensors. Got device: z and pin_memory: .)r	   r   r'   r(   r)   r,   warningscatch_warningssimplefilterr*   r.   rJ   rK   )rI   r   rJ   rK   resultr   r   r   convert_ndarray_to_torch_tensor   s&   

rT   ndarraysdtypesc                    sr   t | tjr*t tr tdkrtd tt t	|  d}|S  fdd| 
 D }|S )a,  Convert a NumPy ndarray batch to a Torch Tensor batch.

    Args:
        ndarray: A (dict of) NumPy ndarray(s) that we wish to convert to a Torch Tensor.
        dtype: A (dict of) Torch dtype(s) for the created tensor; if None, the dtype
            will be inferred from the NumPy ndarray data.
        device: The device on which the tensor(s) should be placed; if None, the Torch
            tensor(s) will be constructed on the CPU.
        pin_memory: Whether to pin the memory of the created tensors.

    Returns: A (dict of) Torch Tensor(s).
    r1   z[When constructing a single-tensor batch, only a single dtype should be given, instead got: r   rJ   rK   c              	      s4   i | ]\}}|t |ttr| n d qS )rW   )rT   r!   dict)r   col_namecol_ndarrayrJ   rV   rK   r   r   
<dictcomp>   s    z?convert_ndarray_batch_to_torch_tensor_batch.<locals>.<dictcomp>)r!   r(   rI   rX   r7   r5   nextiterr3   rT   items)rU   rV   rJ   rK   r:   r   r[   r   +convert_ndarray_batch_to_torch_tensor_batch   s(   

r`   saved_modelmodel_definitionc                 C   sJ   t | tjjr	| S t | tr|std||  |S tdt|  d)zLoads a PyTorch model from the provided ``saved_model``.

    ``model_definition`` is only used when ``saved_model`` is
    a torch state dict, which will be loaded into ``model_definition``.
    Otherwise, ``model_definition`` is discarded.
    zYAttempting to load torch model from a state_dict, but no `model_definition` was provided.zSaved model is of type zt. The model saved in the checkpoint is expected to be of type `torch.nn.Module`, or a model state dict of type dict.)r!   r*   nnModulerX   r5   load_state_dictr'   )ra   rb   r   r   r   load_torch_model   s   


rf   c                 C   sv   t | tjrdS t | tr&|  D ]\}}t|r dS t|r# dS qdS t | ttfr9| D ]	}t|r8 dS q/dS )NTF)r!   r*   TensorrX   r_   contains_tensorrC   rD   )objkvr   r   r   rh     s    
rh   
state_dictprefixc                 C   s   d}| D ]}| |r"|t|d }|s|  } d}| || |< qd| v rN| d  | d< | d }|D ]}t|dkr>q5|t|d }||||< q5| S )aS  Strip the prefix in state_dict, if any and return a new dict.

    Adapted from https://github.com/pytorch/pytorch/blob/c18da597e0bb1c1aecc97c77a73fed1849057fa4/torch/nn/modules/utils.py
    The original method modified the dict in-place.

    Args:
        state_dict: a state-dict to be loaded to the model.
        prefix: prefix.

    FNT	_metadatar   )
startswithr7   copypop)rl   rm   copiedkeynewkeymetadatar   r   r   4consume_prefix_in_state_dict_if_present_not_in_place'  s$   
rv   c                    s    fdd|   D S )a~  Convert a dict mapping column names to lists of ndarrays to Torch Tensors.

    Args:
        ndarrays: A dict mapping column names to lists of ndarrays that we wish to convert
            to Torch Tensors.
        dtypes: A (dict of) Torch dtype(s) for the created tensors; if None, the dtype
            will be inferred from the NumPy ndarray data.
        device: The device on which the tensor(s) should be placed; if None, the Torch
            tensor(s) will be constructed on the CPU.
        pin_memory: Whether to pin the memory of the created tensors.

    Returns:
        A dict mapping column names to lists of Tensors.
    c                    s*   i | ]\ }  fd d|D qS )c                    s.   g | ]}t |ttr  nd qS ))rV   rJ   rK   r`   r!   rX   )r   rI   )rY   rJ   rV   rK   r   r   r   c  s    zHconvert_ndarray_list_to_torch_tensor_list.<locals>.<dictcomp>.<listcomp>r   )r   col_ndarraysr[   )rY   r   r\   b  s    
z=convert_ndarray_list_to_torch_tensor_list.<locals>.<dictcomp>)r_   )rU   rV   rJ   rK   r   r[   r   )convert_ndarray_list_to_torch_tensor_listN  s   
ry   r:   combine_chunksc                    sZ   ddl m} ddlm} |r!|| d} fdd| D S || }t| dS )a/  Convert PyArrow batch to PyTorch tensors.

    Args:
        batch: PyArrow batch to convert
        dtypes: A (dict of) Torch dtype(s) for the created tensors; if None, the dtype
            will be inferred from the NumPy ndarray data.
        combine_chunks: If True, combine chunks in Arrow batch before converting to
            tensors.
        pin_memory: Whether to pin the memory of the created tensors.

    Returns:
        A dictionary of column name to list of tensors. For non-chunked columns,
        the list will contain a single tensor.
    r   )transform_pyarrow)ArrowBlockAccessornumpyc                    s2   i | ]\}}|t |t tr | n d qS )rV   rK   rw   )r   rY   	col_arrayr~   r   r   r\     s    z*arrow_batch_to_tensors.<locals>.<dictcomp>r~   )ray.data._internal.arrow_opsr{   ray.data._internal.arrow_blockr|   to_batch_formatr_   table_to_numpy_dict_chunkedry   )r:   rV   rz   rK   r{   r|   numpy_batch
numpy_listr   r~   r   arrow_batch_to_tensorsp  s   	r   tensor_sequencenon_blockingc                    sB  | s	J d|  t dd | D sJ dtdd | D  | d j t  fdd| D s>J d	  d
dd | D  | d jdd t fdd| D saJ d d
dd | D  | d }|j}|jdd }tdd | D }tj|g|R ||d}d}| D ]}	||	jd  }
|||
 j|	|d |
}q|S )aP  Stack sequence of tensors into a contiguous GPU tensor.

    Args:
        tensor_sequence: Sequence of tensors to stack
        device: The device to move tensors to
        non_blocking: If True, perform device transfer without forcing a
            synchronization.

    Returns:
        A contiguous tensor on the target device
    z2Cannot stack empty sequence of tensors. Received: c                 s   s    | ]	}t |tjV  qd S N)r!   r*   rg   r   r=   r   r   r   	<genexpr>  s    
z+concat_tensors_to_device.<locals>.<genexpr>z5All items must be torch.Tensor. Found invalid types: c                 S   s    g | ]}t |tjst|qS r   )r!   r*   rg   r'   r   r   r   r   r          z,concat_tensors_to_device.<locals>.<listcomp>r   c                 3   s    | ]}|j  kV  qd S r   r    r   )first_dtyper   r   r         z0All tensors must have the same dtype. Expected: z, got: c                 S   s   g | ]}|j qS r   r    r   r   r   r   r     s    r1   Nc                 3   s"    | ]}|j d d  kV  qdS r1   Nshaper   )first_shaper   r   r     s     z4All tensors must have the same shape[1:]. Expected: c                 S   s   g | ]	}|j d d qS r   r   r   r   r   r   r         c                 s   s    | ]}|j d  V  qdS )r   Nr   r   r   r   r   r     r   rM   r   )allstrr   r   sumr*   emptycopy_)r   rJ   r   firstr   
shape_tail
total_rowsrS   	row_startr=   row_endr   )r   r   r   concat_tensors_to_device  sL   

r   c                 C   s   t | j}t| ttfr"dtdd | D }| d| d}|S t| tr?dtdd |  D }| d| d}|S |}|S )a  Get a string representation of the possibly nested type of the batch.

    >>> import torch
    >>> _get_type_str([1, 2, "???"])
    'list[int | str]'
    >>> _get_type_str({"a": [1, 2, 3], "b": 4})
    'dict[str, int | list[int]]'
    >>> _get_type_str({"a": torch.tensor(1), "b": [torch.tensor(2)]})
    'dict[str, Tensor | list[Tensor]]'
    >>> _get_type_str({"a": torch.tensor(1), "b": {"c": torch.tensor(2)}})
    'dict[str, Tensor | dict[str, Tensor]]'
    z | c                 S      h | ]}t |qS r   _get_type_strr   rk   r   r   r   	<setcomp>      z _get_type_str.<locals>.<setcomp>[]c                 S   r   r   r   r   r   r   r   r     r   z[str, )	r'   __name__r!   rC   rD   joinsortedrX   r3   )r:   	curr_type	val_typesinvalid_type_strr   r   r   r     s   

r   c                    s    du r| S t | r| j dS t| r#t|  fdd| D S t| r5t|  fdd| D S t| rE fdd|  D S t| rU fdd|  D S td	t	|  d
)a  Move tensors to the specified device.

    Concatenate nested lists/tuples of tensors along the first (batch) dimension.
    For example, for the input
    ((feature_0_chunk_0,), (feature_1_chunk_0, feature_1_chunk_1))
    the output will be (feature_0_chunk_0, feature_1_chunk_0+1)
    where each feature is concatenated along the batch dimension.

    Args:
        batch: A tensor or collection of tensors to move to device. Can be:
            - A single tensor
            - A sequence of tensors
            - A sequence of sequences of tensors. The inner sequence of tensors is
              combined during GPU transfer.
            - A mapping (e.g., dict) of keys to tensors or sequences of tensors. The
              sequence of tensors is combined during GPU transfer.
        device: The device to move tensors to. If None, tensors are not moved.
        non_blocking: If True, perform device transfer without forcing a
            synchronization.

    Returns:
        The input tensors moved to the specified device
    Nr   c                    s   g | ]	}|j  d qS r   tor   rJ   r   r   r   r     r   z*move_tensors_to_device.<locals>.<listcomp>c                    s   g | ]}t | qS r   r   r   r   r   r   r     s    c                    s    i | ]\}}||j  d qS r   r   )r   rj   r=   r   r   r   r\     r   z*move_tensors_to_device.<locals>.<dictcomp>c                    s   i | ]\}}|t | qS r   r   )r   rj   rk   r   r   r   r\     s    zInvalid input type: z.
Expected one of the following: torch.Tensor, List/Tuple[torch.Tensor], Dict[str, torch.Tensor], Mapping[str, List/Tuple[torch.Tensor]])
r   r   r   r'   r   r   r_   r   r5   r   )r:   rJ   r   r   r   r   move_tensors_to_device  s&   r   )NNT)NNFr   )NFF)4rP   typingr   r   r   r   r   r   r}   r(   pandasr"   r*   pyarrow ray.air._internal.device_managerr   "ray.air.util.data_batch_conversionr	   ray.data.collate_fnr
   r   r   r   r   r   r   ray._private.ray_constantsr   $DEFAULT_TENSOR_NON_BLOCKING_TRANSFERrJ   r   	DataFramer   r   boolrg   rH   rI   rT   r`   rc   rd   rf   rh   rv   ry   Tabler   no_gradr   r   r   r   r   r   r   <module>   s     $	
o
3
2




)
$
,8