o
    }oi$1                     @   s   d dl Z d dlmZ d dlZd dlmZ d dlm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ dd Zdd	 Zd
d Zdd Zdd Zdd ZG dd dejZdS )    N)DatasetDatasetDictload_dataset)
DataLoader)DistributedSampler)loggingc                 C   s0   d| v r|  dd } d| v r|  dd } | S )zremoves split from name

    Args:
        name (str): partition name (e.g. "train[:100]")

    Returns:
        str: return partition name without any selector (e.g. "train").
    [r   +)split)name r   [/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/vlm/hf/data/hf_dataset.pyclean_split   s
   	r   c                 C   s   g d}dd |D }i }|  D ]\}}||v sJ |D ]}|||< qq|D ]}	|	||	< q't| trNt|tsAJ dt|t|}|| }| ||< nt| try|  }
t	
d|
 |   D ]\}}|| }|| du ssJ |||< qcnnt|trt	
d| t| tsJ ttt|D ]\}}|| }|| du sJ | | ||< qn;t|trt	
d t| trJ |}d	|v rtd
d|v r|dd }|| }|| du sJ | ||< ntdt|t| ksJ | ttdd | }|dksJ d||S )a.  
    Given a dataset (e.g. from datasets.load_dataset or datasets.Dataset.from_dict) it
    returns a dictionary containing the corresponding dataset splits.

    For example:

    $ ds = load_dataset("dataset-id")
    $ ans = make_dataset_splits(ds)

    # `ds` contains the following
    $ print(ds)
    > DatasetDict({
    >    train: Dataset({
    >        features: ['id', 'title', 'context', 'question', 'answers'],
    >        num_rows: 87599
    >    })
    >    validation: Dataset({
    >        features: ['id', 'title', 'context', 'question', 'answers'],
    >        num_rows: 10570
    >    })
    > })

    # In this case the value of `ans` (returned value) will be:
    $ print(ans)
    > {
    >    "train": Dataset .. (with 87599 rows),
    >    "val": Dataset .. (with 10570 rows),
    > }
    traintestvalc                 S   s   i | ]}|d qS Nr   ).0_splitr   r   r   
<dictcomp>J   s    z'make_dataset_splits.<locals>.<dictcomp>z)Expected split to be a string, but got {}z'HF dataset has the following splits: {}Nz%Loaded HF dataset will use {} splits.z%Loaded HF dataset has a single split.r	   z!Split concatenation not supportedr   r   z-Expected split name to be None, str or a listc                 S   s   | d uS r   r   xr   r   r   <lambda>v       z%make_dataset_splits.<locals>.<lambda>z7Expected at least one split to have been initialized {})items
isinstancer   strformattyper   r   keysr   infolist	enumeratemap
ValueErrorr
   setsumvalues)datasetr
   split_aliasesvalid_split_namesdataset_splitsalias_to_split
split_name_split_aliasesaliasr   dataset_split_namesalias_split_nameinum_init_splitsr   r   r   make_dataset_splits+   s^   









 r5   c                   C   s   t  pttjdddkS )z-returns whether it runs on a dist-environment
WORLD_SIZE0   )distis_initializedintosenvirongetr   r   r   r   !has_dist_env_init_or_rank_env_var{   s   r?   c                 C   s   | j dkr
| dS | S )a  Ensures that the input tensor has at least two dimensions by adding an extra batch dimension if necessary.

    Parameters
    ----------
    tensor : torch.Tensor
        The input tensor to be batchified.

    Returns
    -------
    torch.Tensor
        The tensor with an extra dimension added if it was originally 1-dimensional.
        Otherwise, the tensor is returned as-is.
    r8   r   )ndim
unsqueeze_)tensorr   r   r   batchify   s   

rC   c                    s   t t fdd| S )a  Extracts the value of the given key from each dictionary in a list of dictionaries.

    Parameters
    ----------
    batch : List[dict]
        A list of dictionaries.
    key : str
        The key whose values are to be extracted from each dictionary.

    Returns
    -------
    List
        A list of values associated with the specified key, in the same order as
        the dictionaries in the input batch.
    c                    s   |   S r   r   r   keyr   r   r      r   z(extract_key_from_dicts.<locals>.<lambda>)r"   r$   )batchrE   r   rD   r   extract_key_from_dicts   s   rG   c                    s"   t tt|   fdd| D S )a  Pads each list in a batch of lists to the same length with a specified token.

    Parameters
    ----------
    batch : List[List[int]]
        A batch of sequences (e.g., token IDs), where each sequence is a list of integers.
    pad_token_id : int
        The token ID to use for padding shorter sequences.

    Returns
    -------
    List[List[int]]
        A batch of sequences where each inner list has been padded with the pad token
        to match the length of the longest sequence in the batch.
    c                    s"   g | ]}|g t |   qS r   )len)r   itemmax_lenpad_token_idr   r   
<listcomp>   s   " z$pad_within_micro.<locals>.<listcomp>)maxr$   rH   rF   rL   r   rJ   r   pad_within_micro   s   rP   c                       s   e Zd ZdZdddddddddddd	gd
dgg df	d) fddZedd Zed*ddZdefddZ	dd Z
d+ddZedd Zedd Zedd  Zd!d" Zd#d$ Zd%d& Zd,d'd(Z  ZS )-HFDatasetDataModulea  HFDatasetDataModule wraps HF's load_dataset (datasets library)
    so that it can be used within NeMo.
    Users can select whether to use an mcore-sampler via use_mcore_sampler arg.

    Usage examples:

    - loading a single split (train) from a dataset
    llm.HFDatasetDataModule("rajpurkar/squad", split="train")

    - loading multiple splits (train, validation) from a dataset
    llm.HFDatasetDataModule("rajpurkar/squad", split=["train", "validation"])
    N   Ti   r   Fr   trainingr   testing)r   
validationvalidevalreturnc                    s  t    d|v r|d d|v r|d |
d usJ |||d}t|tr;td|  t|fd|i|}n!t|tsEt|t	rRtdt|  |}n
t
dtt| t||| _|d u ro fdd	 _n| _| _| _| _| _| _|	 _|
 _| _d S )
Nuse_mcore_samplermcore_dataloader_typer   zLoading HF dataset from r
   zUsing passed HF dataset zDExpected `path_or_dataset` to be str, Dataset, DatasetDict, but got c                       t j|  jdS N)rL   rQ   
collate_fnrL   r   selfr   r   r      s    z.HFDatasetDataModule.__init__.<locals>.<lambda>)super__init__popr   r   r   r!   r   r   r   r%   r   r5   r,   _collate_fnnum_workers
pin_memorypersistent_workers
seq_lengthmicro_batch_sizeglobal_batch_sizerL   use_dist_sampler)r`   path_or_datasetr
   r^   re   rf   rg   rh   ri   rj   rL   rk   train_aliasestest_aliasesval_aliaseskwargsr*   r)   	__class__r_   r   rb      s8   




zHFDatasetDataModule.__init__c                 K   s   t | }td||d|S )z#Creates a Dataset from a dictionary)rl   r
   Nr   )r   	from_dictrQ   )dataset_dictr
   rp   r)   r   r   r   rs     s   
zHFDatasetDataModule.from_dictc                    s    fdd d   D S )zCollate for VLM datac              
      s(   i | ]}|t ttt |qS r   )rC   torch
LongTensorrP   rG   )r   rE   rO   r   r   r     s    	z2HFDatasetDataModule.collate_fn.<locals>.<dictcomp>r   )r    rO   r   rO   r   r^     s   
	zHFDatasetDataModule.collate_fnstagec                 C   s(   | j st rd| _ td dS dS dS )zsetups samplerTz#Turning on distributed data samplerN)rk   r?   r   r!   )r`   rw   r   r   r   setup  s   zHFDatasetDataModule.setupc                 C   s   | j rt|S dS )zreturns the data samplerN)rk   r   )r`   r)   r   r   r   get_data_sampler  s   z$HFDatasetDataModule.get_data_samplerc              
      sD   |dusJ |du r fdd}t | j j j| j |dS )zCreates a dataloaderNc                    r[   r\   r]   r   r_   r   r   r^   +  s   z8HFDatasetDataModule._make_dataloader.<locals>.collate_fn)re   rf   rg   r^   
batch_sizesampler)r   re   rf   rg   ri   ry   )r`   r)   r^   r   r_   r   _make_dataloader%  s   z$HFDatasetDataModule._make_dataloaderc                 C   
   | j d S )zTrain data splitr   r,   r_   r   r   r   r   8     
zHFDatasetDataModule.trainc                 C   r}   )zValidation data splitr   r~   r_   r   r   r   r   =  r   zHFDatasetDataModule.valc                 C   r}   )zTesting data splitr   r~   r_   r   r   r   r   B  r   zHFDatasetDataModule.testc                 C      |  | j| jS )z(Creates a dataloader for the train split)r|   r   rd   r_   r   r   r   train_dataloaderG     z$HFDatasetDataModule.train_dataloaderc                 C   r   )z-Creates a dataloader for the validation split)r|   r   rd   r_   r   r   r   val_dataloaderK  r   z"HFDatasetDataModule.val_dataloaderc                 C   r   )z'Creates a dataloader for the test split)r|   r   rd   r_   r   r   r   test_dataloaderO  r   z#HFDatasetDataModule.test_dataloaderc                 K   st   t |tr	|g}nt |trn|du r| j }ntd|D ]}| j| dur7| j| j|fi || j|< qdS )zrMaps a function to all/selected splits
        Additional arguments can be passed down to dataset's map via kwargsNzsplit_names must None/str/list)r   r   r"   r,   r    r%   r$   )r`   functionsplit_namesrp   r.   r   r   r   r$   S  s   

zHFDatasetDataModule.map)rX   N)r   r   )NN)__name__
__module____qualname____doc__rb   staticmethodrs   r^   r   rx   ry   r|   propertyr   r   r   r   r   r   r$   __classcell__r   r   rq   r   rQ      sF    :




rQ   )r<   lightning.pytorchpytorchplru   torch.distributeddistributedr9   datasetsr   r   r   torch.utils.datar   torch.utils.data.distributedr   
nemo.utilsr   r   r5   r?   rC   rG   rP   LightningDataModulerQ   r   r   r   r   <module>   s   P