o
    }oi?                     @   s   d dl Z d dlmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ er^d dlmZ d dlmZ G dd dej Z!dS )    N)	lru_cache)Path)TYPE_CHECKINGAnyDictListOptionalUnion)rank_zero_info)
DataLoader)AutoTokenizer)create_sft_dataset)WrappedDataLoader)MegatronDataSampler)logging)TokenizerSpec)PackedSequenceSpecsc                       s  e Zd ZdZ													dDd
eeef deded dededee	e  dededede
de
ded deeeef  f fddZdd ZdEddZd efd!d"Zdeeef fd#d$Zd%eeef ddfd&d'Zdefd(d)Zdefd*d+Zdefd,d-ZedFd.d/Zdefd0d1Zedefd2d3Zedefd4d5Zedefd6d7Zedefd8d9Zedefd:d;Zedefd<d=Z edefd>d?Z!ede
fd@dAZ"defdBdCZ#  Z$S )GFineTuningDataModuleaZ  Base class for fine-tuning an LLM.

    This class provides a foundation for building custom data modules for fine-tuning Nemo NLP models. It inherits from
    `pl.LightningDataModule` from the PyTorch Lightning library and handles data loading, preprocessing, and batch
    creation for training, validation, and testing.

    Args:
        dataset_root (Union[str, Path]): The root directory containing the training, validation, and test data.
        seq_length (int, optional): The maximum sequence length for the input and output text. Defaults to 2048.
        tokenizer (Optional[TokenizerSpec], optional): The tokenizer to use for preprocessing the text.
            If not provided, a Megatron GPT2 BPE tokenizer will be used.
        micro_batch_size (int, optional): The micro batch size for training. Defaults to 4.
        global_batch_size (int, optional): The global batch size for training. Defaults to 8.
        rampup_batch_size (Optional[List[int]], optional): A list of batch sizes for ramping up during training.
            Defaults to None.
        seed (int, optional): The random seed for data shuffling. Defaults to 1234.
        memmap_workers (int, optional): The number of worker processes for loading data using TextMemMapDataset.
            Defaults to 1.
        num_workers (int, optional): The number of worker processes for data loading. Defaults to 8.
        pin_memory (bool, optional): Whether to pin memory during data loading for faster GPU training.
            Defaults to True.
        persistent_workers (bool, optional): Whether to keep data loading workers persistent across epochs.
            Defaults to False.
        packed_sequence_specs (PackedSequenceSpecs, optional): See PackedSequenceSpecs for details
        dataset_kwargs (Optional[Dict[str, Any]], optional): Keyword arguments to pass into the GPTSFTDataset class
       N           TFdataset_root
seq_length	tokenizerr   micro_batch_sizeglobal_batch_sizerampup_batch_sizeseedmemmap_workersnum_workers
pin_memorypersistent_workerspacked_sequence_specsr   dataset_kwargsc                    s   t    || _|| _t|| _|| _|| _|	| _|
| _	|| _
|| _|| _|| _d | _d | _|| _|s5dn|j| _|   |p@i | _|sFdn|j| _d| _d S )NFr   )super__init__r   r   r   r   r   r    r!   r"   r#   r   r   r   data_samplermax_train_samplesr$   packed_sequence_size'validate_batch_size_for_packed_sequencer%   pad_cu_seqlens_pad_cu_seqlensinit_global_step)selfr   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   	__class__ ]/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/gpt/data/fine_tuning.pyr(   ?   s(   



zFineTuningDataModule.__init__c                 C   sb   | j dkr-| jdkr/td| j d| j d| j| j  d| j d| j | j  d| j  dd	S d	S )
zV
        Validate that micro batch size must be 1 when using packed sequence.
        r   r   z^Micro batch size should be 1 when training with packed sequence, but your micro batch size is z. 
The following config is equivalent to your current setting for a packed dataset. Please update your config to the following: 
Set micro batch size to 1 (currently z)
Set global batch size to z (currently z!) 
Set packed sequence length to zr) 
For details please visit https://docs.nvidia.com/nemo-framework/user-guide/latest/sft_peft/packed_sequence.htmlN)r+   r   
ValueErrorr   r0   r3   r3   r4   r,   d   s"   

z<FineTuningDataModule.validate_batch_size_for_packed_sequencereturnc              	   C   s~   | j dkr;ddlm} | j s"|| j| j| j | j| j| j| j	d | j
 s=|| j| j
| j | j| j| j| j	d dS dS dS )z.
        Prepare packed sequence data
        r   )prepare_packed_sequence_data)
input_pathoutput_pathr+   r   max_seq_lengthr   output_metadata_pathN)r+   -nemo.collections.llm.gpt.data.packed_sequencer8   train_path_packedis_file
train_pathr   r   r   pack_metadatavalidation_path_packedvalidation_path)r0   r8   r3   r3   r4   prepare_datav   s0   




z!FineTuningDataModule.prepare_datastagec                 C   s>   t | j| j| j| jdd| _tt| j| j	j
 d | _dS )z/Called by pytorch lightning in datamodule setupbatch)seq_lenr   r   r   dataloader_typegGz?N)r   r   r   r   r   r)   intmathceiltrainer	max_stepsr*   )r0   rE   r3   r3   r4   setup   s   "
zFineTuningDataModule.setupc                 C   s   | j | jj| j }d|iS )zCalled when saving a checkpoint, implement to generate and save datamodule state.

        Returns:
            A dictionary containing datamodule state.

        consumed_samples)r)   compute_consumed_samplesrL   global_stepr/   )r0   rO   r3   r3   r4   
state_dict   s   zFineTuningDataModule.state_dictrR   c              	   C   sv   zddl m} W n ttfy   td ddlm} Y nw |d }|| j_|| j_	||dd d| j_
td d	S )
zCalled when loading a checkpoint, implement to reload datamodule state given datamodule stat

        Args:
            state_dict: the datamodule state returned by ``state_dict``.

        r   )update_num_microbatcheszCMegatron num_microbatches_calculator not found, using Apex version.rO   F)rO   consistency_checkr   z*** Loaded DataModule state dict successfully. IGNORE PTL's warning below about the dataloader not being resumable. This is warning is expected because we are handling dataloader resumption manually in NeMo. ***N))megatron.core.num_microbatches_calculatorrS   ImportErrorModuleNotFoundErrorr   warning(apex.transformer.pipeline_parallel.utilsr)   init_consumed_samplesprev_consumed_samplesif_first_stepr
   )r0   rR   rS   rO   r3   r3   r4   load_state_dict   s"   
z$FineTuningDataModule.load_state_dictc                 C   sH   | j | j| jdkr| jn| jf| jdkrd n| j| jd| jddS )Nr   )pack_metadata_pathmax_num_samplestrainmode)_create_dataloader_create_datasetr+   r@   r>   rA   r*   r%   r6   r3   r3   r4   train_dataloader   s   z%FineTuningDataModule.train_dataloaderc                 C   sF   | j | j| jdkr| jn| jf| jdkrd n| jdd| jddS )Nr   T)r^   is_test
validationra   )rc   rd   r+   rC   rB   rA   r%   r6   r3   r3   r4   val_dataloader   s   z#FineTuningDataModule.val_dataloaderc                 C   s&   | j | j| jfddd| jddS )N    T)tokens_to_generaterf   testra   )rc   rd   	test_pathr%   r6   r3   r3   r4   test_dataloader   s   z$FineTuningDataModule.test_dataloaderc              
   K   sN   | j dk}t|f| j|r| jn| j | j| j||rd n||rdn| jd|S )Nr   F)r   r   r    r   rf   pack_metadata_file_pathr-   )r+   r   r   r   r    r   r-   )r0   pathr^   rf   kwargsis_not_packingr3   r3   r4   rd      s   

	z$FineTuningDataModule._create_datasetc              	   K   s8   | j j| _| j| j_td||| j| j| j|jd|S )N)rb   datasetr!   r"   r#   
collate_fnr3   )	rL   rQ   r/   r)   r   r!   r"   r#   rs   )r0   rr   rb   rp   r3   r3   r4   rc      s   

z'FineTuningDataModule._create_dataloaderc                 C   
   | j d S )zPath to training dataset fileztraining.jsonlr   r6   r3   r3   r4   r@        
zFineTuningDataModule.train_pathc                 C   sD   |   }| jd | }| s |jddd tdt|  |S )z-The default directory to write packing files.packedT)parentsexist_okz&Using default path for packing files: )_extract_tokenizer_model_namer   existsmkdirr   infostr)r0   tokenizer_model_namedefault_pack_pathr3   r3   r4   r     s   z&FineTuningDataModule.default_pack_pathc                 C   s8   | j dkr| jjdur| jjS | j| j  d S td)z2Path to metadata dataset file for packed sequence.r   Nz_metadata.jsonlzBpack_metadata invalid since packed sequence size is not specified.)r+   r$   packed_metadata_pathr   r5   r6   r3   r3   r4   rA     s
   
z"FineTuningDataModule.pack_metadatac                 C   :   | j dkr| jjdur| jjS | jd| j  d S td)zPath to training dataset file for packed sequence. The file path contains a reference to the
        tokenizer/model name since packed sequence dataset consists of tokenized indices.r   N	training_.npyzH`train_path_packed` invalid since packed sequence size is not specified.)r+   r$   packed_train_data_pathr   r5   r6   r3   r3   r4   r>   &  
   
z&FineTuningDataModule.train_path_packedc                 C   r   )zPath to validation dataset file for packed sequence. The file path contains a reference to the
        tokenizer/model name since packed sequence dataset consists of tokenized indices.r   Nvalidation_r   zM`validation_path_packed` invalid since packed sequence size is not specified.)r+   r$   packed_val_data_pathr   r5   r6   r3   r3   r4   rB   1  r   z+FineTuningDataModule.validation_path_packedc                 C   rt   )zPath to validation dataset filezvalidation.jsonlru   r6   r3   r3   r4   rC   <  rv   z$FineTuningDataModule.validation_pathc                 C   rt   )zPath to test dataset filez
test.jsonlru   r6   r3   r3   r4   rl   A  rv   zFineTuningDataModule.test_pathc                 C   s(   | j dkr| jjdur| jjS | jS dS )z-Whether to pad cu_seqlens to a constant shaper   NF)r+   r$   r-   r.   r6   r3   r3   r4   r-   F  s
   
z#FineTuningDataModule.pad_cu_seqlensc                 C   s   | j jdur| j j}|S t| jtrE| jjj}|dr*d|ddd }|S |dr=d|ddd	 }|S |	dd}|S d
t
| j }|S )z1Automatically get the model name from model path.Nzcontext/nemo_tokenizerz--/nemo_tokenizerr&   unknown_tokenizer_)r$   r   
isinstancer   r   name_or_pathendswithjoinsplitreplacehash)r0   r   namer3   r3   r4   rz   P  s   

	
z2FineTuningDataModule._extract_tokenizer_model_name)r   Nr   r   Nr   r   r   TFNN)r7   N)NF)%__name__
__module____qualname____doc__r	   r~   r   rI   r   r   boolr   r   r(   r,   rD   rN   rR   r]   r   re   rh   rm   r   rd   rc   propertyr@   r   rA   r>   rB   rC   rl   r-   rz   __classcell__r3   r3   r1   r4   r   #   s    

	
%


	

	r   )"rJ   	functoolsr   pathlibr   typingr   r   r   r   r   r	   lightning.pytorchpytorchpl%lightning.pytorch.utilities.rank_zeror
   torch.utils.datar   "nemo.collections.common.tokenizersr   "nemo.collections.llm.gpt.data.corer   nemo.lightning.datar   nemo.lightning.pytorch.pluginsr   
nemo.utilsr   r   r=   r   LightningDataModuler   r3   r3   r3   r4   <module>   s     