o
    }oi$                     @   s   d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ erLd d
lmZ G dd dejZdS )    N)	lru_cache)Path)TYPE_CHECKINGAnyDictListOptionalUnion)
DataLoader)AutoTokenizer)WrappedDataLoader)MegatronDataSampler)logging)TokenizerSpecc                       sP  e Zd ZdZ												d4d
eeef deded dededee	e  dededede
de
deeeef  f fddZdefddZdeeef fddZdeeef ddfd d!Zdefd"d#Zdefd$d%Zdefd&d'Zed(d) Zdefd*d+Zedefd,d-Zedefd.d/Zedefd0d1Zdefd2d3Z  ZS )5FineTuningDataModulea  Base class for fine-tuning an Bert.

    This class provides a foundation for building custom data modules for fine-tuning Nemo NLP models. It inherits from
    `pl.LightningDataModule` from the PyTorch Lightning library and handles data loading, preprocessing, and batch
    creation for training, validation, and testing.

    Args:
        dataset_root (Union[str, Path]): The root directory containing the training, validation, and test data.
        seq_length (int, optional): The maximum sequence length for the input and output text. Defaults to 2048.
        tokenizer (Optional[TokenizerSpec], optional): The tokenizer to use for preprocessing the text.
            If not provided, a Megatron GPT2 BPE tokenizer will be used.
        micro_batch_size (int, optional): The micro batch size for training. Defaults to 4.
        global_batch_size (int, optional): The global batch size for training. Defaults to 8.
        rampup_batch_size (Optional[List[int]], optional): A list of batch sizes for ramping up during training.
            Defaults to None.
        seed (int, optional): The random seed for data shuffling. Defaults to 1234.
        memmap_workers (int, optional): The number of worker processes for loading data using TextMemMapDataset.
            Defaults to 1.
        num_workers (int, optional): The number of worker processes for data loading. Defaults to 8.
        pin_memory (bool, optional): Whether to pin memory during data loading for faster GPU training.
            Defaults to True.
        persistent_workers (bool, optional): Whether to keep data loading workers persistent across epochs.
            Defaults to False.
        dataset_kwargs (Optional[Dict[str, Any]], optional): Keyword arguments to pass into the GPTSFTDataset class
       N           TFdataset_root
seq_length	tokenizerr   micro_batch_sizeglobal_batch_sizerampup_batch_sizeseedmemmap_workersnum_workers
pin_memorypersistent_workersdataset_kwargsc                    sj   t    || _|| _t|| _|| _|| _|	| _|
| _	|| _
|| _|| _|| _d | _d | _|p1i | _d S )N)super__init__r   r   r   r   r   r   r   r   r    r   r   r   data_samplermax_train_samplesr!   )selfr   r   r   r   r   r   r   r   r   r   r    r!   	__class__ ^/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/bert/data/fine_tuning.pyr#   ;   s   

zFineTuningDataModule.__init__stagec                 C   s>   t | j| j| j| jdd| _tt| j| j	j
 d | _dS )z/Called by pytorch lightning in datamodule setupbatch)seq_lenr   r   r   dataloader_typegGz?N)r   r   r   r   r   r$   intmathceiltrainer	max_stepsr%   )r&   r+   r)   r)   r*   setupZ   s   "
zFineTuningDataModule.setupreturnc                 C   s    | j | jj| j j }d|iS )zCalled when saving a checkpoint, implement to generate and save datamodule state.

        Returns:
            A dictionary containing datamodule state.

        consumed_samples)r$   compute_consumed_samplesr2   global_stepinit_global_step)r&   r6   r)   r)   r*   
state_dictj   s   zFineTuningDataModule.state_dictr:   c              	   C   sn   zddl m} W n ttfy   td ddlm} Y nw |d }|| j_|| j_	||dd d| j_
dS )	zCalled when loading a checkpoint, implement to reload datamodule state given datamodule stat

        Args:
            state_dict: the datamodule state returned by ``state_dict``.

        r   )update_num_microbatcheszCMegatron num_microbatches_calculator not found, using Apex version.r6   F)r6   consistency_checkr   N))megatron.core.num_microbatches_calculatorr;   ImportErrorModuleNotFoundErrorr   warning(apex.transformer.pipeline_parallel.utilsr$   init_consumed_samplesprev_consumed_samplesif_first_step)r&   r:   r;   r6   r)   r)   r*   load_state_dictv   s   
z$FineTuningDataModule.load_state_dictc                 C   &   | j | j| jfd| ji| jddS )Nmax_num_samplestrainmode)_create_dataloader_create_dataset
train_pathr%   r!   r&   r)   r)   r*   train_dataloader      z%FineTuningDataModule.train_dataloaderc                 C   rF   )NrG   
validationrI   )rK   rL   validation_pathr%   r!   rN   r)   r)   r*   val_dataloader   rP   z#FineTuningDataModule.val_dataloaderc                 C   rF   )NrG   testrI   )rK   rL   	test_pathr%   r!   rN   r)   r)   r*   test_dataloader   rP   z$FineTuningDataModule.test_dataloaderc                 K   s.   ddl m} ||f| j| j| j| jd|S )Nr   )create_sft_dataset)r   r   r   r   )#nemo.collections.llm.bert.data.corerW   r   r   r   r   )r&   pathkwargsrW   r)   r)   r*   rL      s   z$FineTuningDataModule._create_datasetc              	   K   s$   t d||| j| j| j|jd|S )N)rJ   datasetr   r   r    
collate_fnr)   )r   r   r   r    r\   )r&   r[   rJ   rZ   r)   r)   r*   rK      s   z'FineTuningDataModule._create_dataloaderc                 C   
   | j d S )zPath to training dataset fileztraining.jsonlr   rN   r)   r)   r*   rM         
zFineTuningDataModule.train_pathc                 C   r]   )zPath to validation dataset filezvalidation.jsonlr^   rN   r)   r)   r*   rR      r_   z$FineTuningDataModule.validation_pathc                 C   r]   )zPath to test dataset filez
test.jsonlr^   rN   r)   r)   r*   rU      r_   zFineTuningDataModule.test_pathc                 C   s   t | jtr9| jjj}|drd|ddd }|S |dr1d|ddd }|S |dd}|S d	t| j }|S )
z1Automatically get the model name from model path.zcontext/nemo_tokenizerz--/nemo_tokenizerunknown_tokenizer_)	
isinstancer   r   name_or_pathendswithjoinsplitreplacehash)r&   nametokenizer_model_namer)   r)   r*   _extract_tokenizer_model_name   s   

	
z2FineTuningDataModule._extract_tokenizer_model_name)r   Nr   r   Nr   r   r   TFN)__name__
__module____qualname____doc__r	   strr   r/   r   r   boolr   r   r#   r4   r:   rE   r
   rO   rS   rV   r   rL   rK   propertyrM   rR   rU   rp   __classcell__r)   r)   r'   r*   r       sl    

	

r   )r0   	functoolsr   pathlibr   typingr   r   r   r   r   r	   lightning.pytorchpytorchpltorch.utils.datar
   "nemo.collections.common.tokenizersr   nemo.lightning.datar   nemo.lightning.pytorch.pluginsr   
nemo.utilsr   r   LightningDataModuler   r)   r)   r)   r*   <module>   s    