o
    }oin                     @   s(  d dl Z d dlZd dlmZ d dlmZ d dlmZ d dl	Z
d dlZd dlmZ d dlmZmZmZ d dlmZ d dlmZ d dlmZ dd	 Zd
d Zdd Zdd Zdd Zd ddZG dd dejZ G dd de Z!G dd de Z"G dd de Z#G dd dejZ$G dd dej%j&jZ'dS )!    N)partial)Dict)DatasetDatasetDictload_dataset)
DataLoader)HFDatasetPackedSequenceHelper)loggingc                 C   s0   d| v r|  dd } d| v r|  dd } | S )zremoves split from name

    Args:
        name (str): partition name (e.g. "train[:100]")

    Returns:
        str: return partition name without any selector (e.g. "train").
    [r   +)split)name r   \/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/gpt/data/hf_dataset.pyclean_split   s
   	r   c                 C   s   g d}dd |D }i }|  D ]\}}||v sJ |D ]}|||< qq|D ]}	|	||	< q't| trNt|tsAJ dt|t|}|| }| ||< nt| try|  }
t	
d|
 |   D ]\}}|| }|| du ssJ |||< qcnnt|trt	
d| t| tsJ ttt|D ]\}}|| }|| du sJ | | ||< qn;t|trt	
d t| trJ |}d	|v rtd
d|v r|dd }|| }|| du sJ | ||< ntdt|t| ksJ | ttdd | }|dksJ d||S )a.  
    Given a dataset (e.g. from datasets.load_dataset or datasets.Dataset.from_dict) it
    returns a dictionary containing the corresponding dataset splits.

    For example:

    $ ds = load_dataset("dataset-id")
    $ ans = make_dataset_splits(ds)

    # `ds` contains the following
    $ print(ds)
    > DatasetDict({
    >    train: Dataset({
    >        features: ['id', 'title', 'context', 'question', 'answers'],
    >        num_rows: 87599
    >    })
    >    validation: Dataset({
    >        features: ['id', 'title', 'context', 'question', 'answers'],
    >        num_rows: 10570
    >    })
    > })

    # In this case the value of `ans` (returned value) will be:
    $ print(ans)
    > {
    >    "train": Dataset .. (with 87599 rows),
    >    "val": Dataset .. (with 10570 rows),
    > }
    traintestvalc                 S   s   i | ]}|d qS Nr   ).0_splitr   r   r   
<dictcomp>N   s    z'make_dataset_splits.<locals>.<dictcomp>z)Expected split to be a string, but got {}z'HF dataset has the following splits: {}Nz%Loaded HF dataset will use {} splits.z%Loaded HF dataset has a single split.r   z!Split concatenation not supportedr
   r   z-Expected split name to be None, str or a listc                 S   s   | d uS r   r   xr   r   r   <lambda>z       z%make_dataset_splits.<locals>.<lambda>z7Expected at least one split to have been initialized {})items
isinstancer   strformattyper   r   keysr	   infolist	enumeratemap
ValueErrorr   setsumvalues)datasetr   split_aliasesvalid_split_namesdataset_splitsalias_to_split
split_name_split_aliasesaliasr   dataset_split_namesalias_split_nameinum_init_splitsr   r   r   make_dataset_splits/   s^   









 r7   c                   C   s   t  pttjdddkS )z-returns whether it runs on a dist-environment
WORLD_SIZE0   )distis_initializedintosenvirongetr   r   r   r   !has_dist_env_init_or_rank_env_var   s   rA   c                 C   s   | j dkr
| dS | S )a  Ensures that the input tensor has at least two dimensions by adding an extra batch dimension if necessary.

    Parameters
    ----------
    tensor : torch.Tensor
        The input tensor to be batchified.

    Returns
    -------
    torch.Tensor
        The tensor with an extra dimension added if it was originally 1-dimensional.
        Otherwise, the tensor is returned as-is.
    r:   r   )ndim
unsqueeze_)tensorr   r   r   batchify   s   

rE   c                    s   t t fdd| S )a  Extracts the value of the given key from each dictionary in a list of dictionaries.

    Parameters
    ----------
    batch : List[dict]
        A list of dictionaries.
    key : str
        The key whose values are to be extracted from each dictionary.

    Returns
    -------
    List
        A list of values associated with the specified key, in the same order as
        the dictionaries in the input batch.
    c                    s   |   S r   r   r   keyr   r   r      r   z(extract_key_from_dicts.<locals>.<lambda>)r$   r&   )batchrG   r   rF   r   extract_key_from_dicts   s   rI   c                    s6   t tt|  |r| |      fdd| D S )a  Pads each list in a batch of lists to the same length with a specified token.

    Parameters
    ----------
    batch : List[List[int]]
        A batch of sequences (e.g., token IDs), where each sequence is a list of integers.
    pad_token_id : int
        The token ID to use for padding shorter sequences.
    pad_seq_len_divisible : int
        The value to use for padding sequence length so that it is divisible by pad_seq_len_divisible.
    Returns
    -------
    List[List[int]]
        A batch of sequences where each inner list has been padded with the pad token
        to match the length of the longest sequence in the batch.
    c                    s"   g | ]}|g t |   qS r   )len)r   itemmax_lenpad_token_idr   r   
<listcomp>   s   " z$pad_within_micro.<locals>.<listcomp>)maxr&   rJ   rH   rN   pad_seq_len_divisibler   rL   r   pad_within_micro   s   rS   c                       s   e Zd ZdZddddddddddd	gd
dgg ddf	d$ fddZedd Zd%ddZd&ddZe	dd Z
e	dd Ze	dd Zdd Zdd Zd d! Zd'd"d#Z  ZS )(HFDatasetDataModulea$
  A PyTorch Lightning DataModule for loading and managing datasets from the `datasets` library.

    Args:
        path_or_dataset (str | Dataset | DatasetDict): The dataset name from HF or a preloaded dataset.
        split (str | list, optional): The dataset split(s) to load (e.g., "train" or ["train", "validation"]).
            Defaults to None.
        collate_fn (callable, optional): Custom function for batching data; defaults to a padding-based collation.
            Defaults to None.
        num_workers (int, optional): Number of workers for data loading. Defaults to 2.
        pin_memory (bool, optional): Whether to use pinned memory for faster GPU transfers. Defaults to True.
        persistent_workers (bool, optional): Whether to keep worker threads alive between epochs. Defaults to True.
        seq_length (int, optional): Maximum sequence length for tokenized inputs. Defaults to 1024.
        micro_batch_size (int, optional): Batch size per device. Defaults to 2.
        pad_token_id (int, optional): Token ID used for padding sequences. Defaults to 0.
        use_dist_sampler (bool, optional): Whether to enable distributed sampling. Defaults to False.
        train_aliases (list, optional): Alternative names for the training split. Defaults to ["train", "training"].
        test_aliases (list, optional): Alternative names for the test split. Defaults to ["test", "testing"].
        val_aliases (list, optional): Alternative names for the validation split.
            Defaults to ["val", "validation", "valid", "eval"].
        **kwargs: Additional arguments passed to `datasets.load_dataset`.

    Raises:
        ValueError: If `path_or_dataset` is not a valid dataset type (str, Dataset, or DatasetDict).

    Examples:
        Load a single split (train) from a dataset:
        ```python
        data_module = HFDatasetDataModule("rajpurkar/squad", split="train")
        ```

        Load multiple splits (train and validation):
        ```python
        data_module = HFDatasetDataModule("rajpurkar/squad", split=["train", "validation"])
        ```

        Use a preloaded dataset:
        ```python
        from datasets import load_dataset
        dataset = load_dataset("imdb")
        data_module = HFDatasetDataModule(dataset, split="train")
        ```

    Notes:
        - If `use_dist_sampler` is not enabled, but a distributed environment is detected,
        HFDatasetDataModule will use a distributed-sampler automatically.
        - If no collation function is provided, a default function with padding using `pad_token_id` is applied.
    N   T   r   Fr   trainingr   testing)r   
validationvalidevalreturnc                    s   t    |	d usJ |||d}t|tr)td| t|fd|i|}nt|ts3t|t	r>td| |}n	t
dt|t|||_|d u r[ fdd_n|_|_|_|_|_|_|	_|
_ _d S )Nr   z3Loading HF dataset from {}, this may take a moment.r   zUsing passed HF dataset {}zFExpected `path_or_dataset` to be str, Dataset, DatasetDict, but got {}c                    s   j | j dS N)rN   rR   )
collate_fnrN   r   rR   selfr   r   r     s    z.HFDatasetDataModule.__init__.<locals>.<lambda>)super__init__r   r   r	   r#   r    r   r   r   r'   r!   r7   r.   _collate_fnnum_workers
pin_memorypersistent_workers
seq_lengthmicro_batch_sizerN   use_dist_samplerrR   )r`   path_or_datasetr   r^   rd   re   rf   rg   rh   rN   ri   train_aliasestest_aliasesval_aliasesrR   kwargsr,   r+   	__class__r_   r   rb      s6   


zHFDatasetDataModule.__init__c                 K   s   t | }td||d|S )z wraps Dataset's from_dict method)rj   r   Nr   )r   	from_dictrT   )dataset_dictr   rn   r+   r   r   r   rq   -  s   
zHFDatasetDataModule.from_dictc                        fdd d   D S )Default batch collatorc              
      6   i | ]}|t ttt ||d krndqS 	loss_maskr   rE   torch
LongTensorrS   rI   r   rG   rH   rR   rN   r   r   r   5      
z2HFDatasetDataModule.collate_fn.<locals>.<dictcomp>r   r"   r`   rH   rN   rR   r   r|   r   r^   3  s   

zHFDatasetDataModule.collate_fnc                    s<   |dusJ |du r fdd}t | j j j| jdS )zDataloader creatorNc                    s    j |  j jdS r]   )r^   rN   rR   r   r`   r   r   r   F  s    
z6HFDatasetDataModule._make_dataloader.<locals>.<lambda>)rd   re   rf   r^   
batch_size)r   rd   re   rf   rh   )r`   r+   r^   r   r   r   _make_dataloaderB  s   z$HFDatasetDataModule._make_dataloaderc                 C   
   | j d S )zReturns the training partitionr   r.   r   r   r   r   r   S     
zHFDatasetDataModule.trainc                 C   r   )z Returns the validation partitionr   r   r   r   r   r   r   X  r   zHFDatasetDataModule.valc                 C   r   )zReturns the test partitionr   r   r   r   r   r   r   ]  r   zHFDatasetDataModule.testc                 C      |  | j| jS )Returns the train dataloaderr   r   rc   r   r   r   r   train_dataloaderb     z$HFDatasetDataModule.train_dataloaderc                 C   r   )!Returns the validation dataloaderr   r   rc   r   r   r   r   val_dataloaderf  r   z"HFDatasetDataModule.val_dataloaderc                 C   r   )Returns the test dataloaderr   r   rc   r   r   r   r   test_dataloaderj  r   z#HFDatasetDataModule.test_dataloaderc                 K   st   t |tr	|g}nt |trn|du r| j }ntd|D ]}| j| dur7| j| j|fi || j|< qdS )zrMaps a function to all/selected splits
        Additional arguments can be passed down to dataset's map via kwargsNzsplit_names must None/str/list)r   r   r$   r.   r"   r'   r&   )r`   functionsplit_namesrn   r0   r   r   r   r&   n  s   

zHFDatasetDataModule.map)r\   Nr   Nr   )NN)__name__
__module____qualname____doc__rb   staticmethodrq   r^   r   propertyr   r   r   r   r   r   r&   __classcell__r   r   ro   r   rT      s@    3;





rT   c                       sb   e Zd ZdZ	ddedef fddZd fd	d
	Zd fdd	Zdd Z	dd Z
dd Z  ZS )HFDatasetDataModulePackeda  
    Inherits HFDatasetDataModule class and overrides methods for adding packing functionality.
    Args:
        path_or_dataset (str | Dataset | DatasetDict): The dataset name from HF or a preloaded dataset.
        packed_sequence_size (int): Specifies the number of tokens to pack.
        split_across_pack [Optional(bool)]: If the last sample in a pack does not fit in ``packed_sequence_size``,
        split the sample into the next pack, or move it entirely to the beginning of the next pack.
        For pre-training, typically this is set to True for general text completion. For fine-tuning, typically this
        is set to False to avoid truncating sentences in instruct tuning. Default is False.
        max_packs (int): Maximum number of packs.
    FNsplit_across_pack	max_packsc                    s*   t  j|fi | || _|| _|| _d S r   )ra   rb   packed_sequence_sizer   r   )r`   rj   r   r   r   rn   ro   r   r   rb     s   
z"HFDatasetDataModulePacked.__init__r   c                    s   	 t  |||S )z
        Creates the attn_mask and append it to the batch as its required in case of packed sequences. Then calls
        HFDatasetDataModule's collate_fn.
        )ra   r^   r   ro   r   r   r^     s   
z$HFDatasetDataModulePacked.collate_fnc                    s8   |dusJ t ||}|| j| j| j}t ||S )zj
        Pack the sequences in the dataset and then call HFDatasetDataModule's _make_dataloader()
        N)r   packr   r   r   ra   r   )r`   r+   r   r^   packed_seq_helper_classro   r   r   r     s   
z*HFDatasetDataModulePacked._make_dataloaderc                 C      |  | jd| jS )r   r   r   r   r   r   r   r        z*HFDatasetDataModulePacked.train_dataloaderc                 C   r   )r   r   r   r   r   r   r   r     r   z(HFDatasetDataModulePacked.val_dataloaderc                 C   r   )r   r   r   r   r   r   r   r     r   z)HFDatasetDataModulePacked.test_dataloader)FNr   r   )r   r   r   r   boolr=   rb   r^   r   r   r   r   r   r   r   ro   r   r     s    	r   c                       sH   e Zd ZdZd fdd	Zedd Zedd Zedd
dZ  Z	S )HellaSwagHFDataModulezKA data module for handling the HellaSwag dataset using HFDatasetDataModule.Rowan/hellaswagc                    sD   |j |_|j| _t|}t jt|d|d g|R i | d S )NiL  r   )		eos_token	pad_tokeneos_idrN   r   ra   rb   r   preprocess_dataset)r`   	tokenizerdataset_nameargsrn   r+   ro   r   r   rb     s   ,zHellaSwagHFDataModule.__init__c                 C   s2   |   } | dd} tdd| } | dd} | S )zEPreprocesses text data by removing unwanted characters and artifacts.z [title]z. z\[.*?\] z   )stripreplaceresub)textr   r   r   
preprocess  s
   z HellaSwagHFDataModule.preprocessc                 C   sj   | d d | d    }t| d d | }dd | d D }t| d	 }||||d ||  d
}|S )z_Processes a document from the HellaSwag dataset into a structured format suitable for training.ctx_ar   ctx_bactivity_labelz: c                 S   s   g | ]}t |qS r   )r   r   )r   endingr   r   r   rO     s    z5HellaSwagHFDataModule.process_doc.<locals>.<listcomp>endingslabel)querychoicesgoldr   )
capitalizer   r   r=   )docctxr   r   r   out_docr   r   r   process_doc  s   z!HellaSwagHFDataModule.process_doc*   c                 C   sR   t d |tj}dd }t||| d}|j|ddg d}|j|d}|S )	z5Preprocesses a dataset for training a language model.zPreprocessing dataset...c                 S   s,   || d |dd}dd |d D |d< |S )Nr   T)
max_length
truncationc                 S   s   g | ]}|d d dg qS )r:   Nir   )r   r   r   r   r   rO     s    zVHellaSwagHFDataModule.preprocess_dataset.<locals>.preprocess_batch.<locals>.<listcomp>	input_idslabelsr   )rH   r   r   ansr   r   r   preprocess_batch  s   zBHellaSwagHFDataModule.preprocess_dataset.<locals>.preprocess_batch)r   r   T)batched)r   attention_maskr   seed)printr&   r   r   r   select_columnsshuffle)r   r   r+   r   r   _preprocessing_functionr   r   r   r     s   

z(HellaSwagHFDataModule.preprocess_dataset)r   )r   )
r   r   r   r   rb   r   r   r   r   r   r   r   ro   r   r     s    
	
r   c                       s4   e Zd ZdZ fddZdd Z fddZ  ZS )SquadHFDataModulea  
    A data module for handling the SQuAD dataset using HFDatasetDataModule.

    This class is responsible for tokenizing and formatting the SQuAD dataset for training
    language models. It extends `HFDatasetDataModule` and implements a prompt-based
    formatting function suitable for causal language modeling.

    Attributes:
        tokenizer: A tokenizer instance used to convert text into token IDs.
    c                    s2   t  jdi | || _| jj| j_| jj| _dS )z
        Initializes the SquadHFDataModule.

        Args:
            tokenizer: A tokenizer instance for processing text data.
            **kwargs: Additional arguments passed to the parent class (`HFDatasetDataModule`).
        Nr   )ra   rb   r   r   r   r   rN   )r`   r   rn   ro   r   r   rb   	  s   zSquadHFDataModule.__init__c                 C   s   d|d  d|d  dd|d d d	    g}tt| jj|\}}t| jd
d}t| jdd}t|d	krI|durI|d	 |krI|d	| t|d	kr^|dur^|d |kr^|| t	|| dd || dd d	gt|d  dgt|  dS )a  
        Formats a given example into a structured prompt for training.

        This method converts a dataset example (containing context, question, and answer)
        into a structured format, tokenizes it, and prepares input IDs and labels for
        training a language model.

        Args:
            example (dict): A dictionary containing the following keys:
                - 'context': The passage from which the question is derived.
                - 'question': The question about the passage.
                - 'answers': A dictionary with a 'text' key containing the answer(s).

        Returns:
            dict: A dictionary containing:
                - 'input_ids': Tokenized input sequence (excluding the last token).
                - 'labels': Tokenized output sequence (excluding the first token).
                - 'loss_mask': A mask indicating which tokens contribute to the loss.
        z	Context: contextz Question: questionz Answer:r   answersr   r   bos_idNr   r:   )r   r   rw   )
r   r$   r&   r   text_to_idsgetattrrJ   insertappenddict)r`   exampleformatted_textcontext_ids
answer_idsr   r   r   r   r   formatting_prompts_func  s     
z)SquadHFDataModule.formatting_prompts_funcc                    s(   t  | | j| jddg dd dS )z
        Prepares the dataset for training and applies formatting.

        Args:
            stage (str): The stage of training.
        FrU   )idtitler   r   r   )r   r   remove_columnsN)ra   setupr&   r   r`   stagero   r   r   r   <  s   
zSquadHFDataModule.setup)r   r   r   r   rb   r   r   r   r   r   ro   r   r     s
    &r   c                       s   e Zd ZdZ																d%d
edededededededededef fddZd&deddfddZde	fddZ
de	fddZde	fddZde	fd d!Zed'd#d$Z  ZS )(HFMockDataModulezMA PyTorch Lightning DataModule for generating mock data for testing purposes.   rV      N'     TFrg   
vocab_sizerh   num_train_samplesnum_val_samplesnum_test_samplesrd   re   rf   create_attention_maskc                    sv   t    || _|| _|| _|| _|| _|| _|	| _|
| _	|| _
dd | _|| _|d ur9|| d | | | _d S d S )Nc                 S   s   t j| ddS )Nr   )rN   )r   r^   r   r   r   r   r   k  s    z+HFMockDataModule.__init__.<locals>.<lambda>r:   )ra   rb   rg   rh   r   r   r   rd   re   rf   r   r^   r   )r`   rg   r   rh   rampup_batch_sizer   r   r   rd   re   rf   r   
vocab_filemerges_filerR   ro   r   r   rb   P  s   

zHFMockDataModule.__init__r   r\   c                 C   sR   t | jd| j| j| j| _t | jd| j| j| j| _t | jd| j| j| j| _	dS )r   r   rZ   r   N)
_MockGPTDatasetr   r   rg   r   	_train_dsr   _val_dsr   _test_dsr   r   r   r   r   p  s*   
zHFMockDataModule.setupc                 C      |  | jS )r   )_create_dataloaderr   r   r   r   r   r        z!HFMockDataModule.train_dataloaderc                 C   r   )r   )r   r   r   r   r   r   r     r   zHFMockDataModule.val_dataloaderc                 C   r   )r   )r   r   r   r   r   r   r     r   z HFMockDataModule.test_dataloaderc                 C   s   t || j| j| j| j| jdS )z(creates the dataloader for given dataset)r   rd   re   rf   r^   )r   rh   rd   re   rf   r^   )r`   r+   r   r   r   r     s   z#HFMockDataModule._create_dataloaderr   c                    rs   )rt   c              
      ru   rv   rx   r{   r|   r   r   r     r}   z/HFMockDataModule.collate_fn.<locals>.<dictcomp>r   r~   rQ   r   r|   r   r^     s   

zHFMockDataModule.collate_fn)r   rV   r   Nr   r   r   r   TFFNNNr   r   )r   r   r   r   r=   r   rb   r   r   r   r   r   r   r   r   r^   r   r   r   ro   r   r   M  sX    	
 r   c                       sh   e Zd ZdZ		ddededededed	ed
df fddZd
efddZd
e	ee
f fddZ  ZS )r   z?A mock dataset for generating random data for testing purposes.Fr   r   r   num_samplesrg   r   r   r\   Nc                    s   t    || _|| _|| _|| _|| _|| _|r2t	tj
| j| jftjdtjd d f  | _tj
| jtjd | _tj| jtjd | _d S )N)dtype)ra   rb   r   rg   r   lengthr   r   nptrilonesfloat32newaxistolistr   rw   arangeint64position_ids)r`   r   r   r   rg   r   r   ro   r   r   rb     s   
	z_MockGPTDataset.__init__c                 C   s   | j S r   )r   r   r   r   r   __len__  s   z_MockGPTDataset.__len__c                 C   sr   t jj| j| d}|j| j| jgt jd }|j| j| jgt jd }||| j	| j
d}| jr7| j|d< |S )Nr   )sizer   )r   r   rw   r  r   )r   randomdefault_rngr   integersr   rg   r  r  rw   r  r   r   )r`   idxnp_genr   r   rH   r   r   r   __getitem__  s   
z_MockGPTDataset.__getitem__)Fr   )r   r   r   r   r=   r   r   rb   r  r   r$   r  r   r   r   ro   r   r     s*    r   r   )(r>   r   	functoolsr   typingr   lightning.pytorchpytorchplnumpyr   ry   torch.distributeddistributedr;   datasetsr   r   r   torch.utils.datar   8nemo.collections.llm.gpt.data.hf_dataset_packed_sequencer   
nemo.utilsr	   r   r7   rA   rE   rI   rS   LightningDataModulerT   r   r   r   r   utilsdatar   r   r   r   r   <module>   s2   P
 ?>@Pc