o
    }oiQ                     @   sJ  d dl Z d dlZd dlZd dlmZ d dlmZmZmZm	Z	m
Z
mZmZ d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z" e"d\Z#Z$ervd dlm%Z% d dl&m'Z' dd Z(dd Z)dd Z*G dd dej+eZ,de,de-de-dee-e.f dee-e.f f
ddZ/dS )    N)Path)TYPE_CHECKINGAnyDictListOptionalTypeUnion)rank_zero_info)EVAL_DATALOADERSTRAIN_DATALOADERS)
GPTDataset)MegatronDataset)data)WrappedDataLoader)IOMixin)MegatronDataSampler)safe_importtransformer_engineGPTDatasetConfig)TokenizerSpecc                 C   s0   | du rdS zt |  W dS  ty   Y dS w )z#Returns True if string is a number.NFT)float
ValueError)s r   ^/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/gpt/data/pre_training.pyis_number_tryexcept)   s   r   c                 C   sL   | ddd }t |dkrdS ttt|}t|r"t|s"J d|d S )z(
    Check if the paths are zipped.
    N   r   FzGot malformatted zipped list)lenlistmapr   anyall)pathsevenis_numr   r   r   is_zipped_list4   s   r'   c                 C   sT  | du rt dt| tst| tr(t| r| ddd } | D ]}t| qdS t| tr:|  D ]}t| q1dS t| tsHt| t	sHt dt	| }d}|
 ret|tjsctdt| ddS | r|t|tjsztdt| ddS |D ])}t	t|| }| stdt| d	t|tjstdt| dq~dS )
z;
    Validate the accessibility of the dataset assets.
    NzExpected path to have a value.   r   z+Expected path to be of string or Path type.)z.binz.idxz	Expected z to be readable.z
 to exist.)r   
isinstancetupler    r'   $validate_dataset_asset_accessibilitydictvaluesstrr   is_dirosaccessR_OKPermissionErrorexistsFileNotFoundError)r$   ppathsufficessuffix	file_pathr   r   r   r+   B   s@   


r+   c                -       s  e Zd ZdZdddddddddddddd	dd
dddefdeeB eeef B de	de
d de	de	de
ee	  de	dedededededede	dede
e de	de
e	 de
e	 de
e	 d ee d!df, fd"d#Zd$e	d%e	d&ee	ef d'ee	ef fd(d)Zd@d+ed!dfd,d-Zd!efd.d/Zd!efd0d1Zd!efd2d3Zd!efd4d5ZedAd7d8Zd!eeef fd9d:Zd;eeef d!dfd<d=Zd>d? Z  Z S )BPreTrainingDataModulea}  PyTorch Lightning-compatible data module for pre-training
       GPT-style models.
    Args:
        paths (Path | List | Dict[str, List]): Paths of the data distributions. Can be either a
            single path, a list of paths, or a dictionary. If a single path or a list of paths,
            the given paths will be used to generate the train, validation and test datasets. If
            providing a list of paths, the format can be either (1) a list of paths, e.g.
                ["path/to/dataset_1_prefix", "path/to/dataset_2_prefix"],
            or (2) a flattened, zipped list of weights and paths, e.g.
                ["30", "path/to/dataset_1_prefix", "70", "path/to/dataset_2_prefix"]
            If a dictionary is provided, it is expected to have the following form:
                {
                    'train': <TRAIN PATHS>,
                    'validation': <VALID PATHS>,
                    'test': <TEST PATHS>
                }
            where each value is either a path or a list of paths as described above.
            In this case, each split will be generated using the given paths.
            Note that if limit_val_batches <= 1, we generate the entire validaton dataset, so
            weights should not be provided for the validation split.
        seq_length (int): Sequence length.
        tokenizer (Optional["TokenizerSpec"]): An instance of a TokenizerSpec object.
        micro_batch_size (int): Batch size per GPU.
        global_batch_size (int): Global batch size.
        rampup_batch_size (Optional[List[int]]): Rampup batch size, should be in format of
            [start_global_batch_size, batch_size_increment, ramup_samples].
        num_workers (int): See ``torch.utils.data.DataLoader`` documentation.
        pin_memory (bool): See ``torch.utils.data.DataLoader`` documentation.
        persistent_workers (bool): See ``torch.utils.data.DataLoader`` documentation.
        reset_position_ids (bool): Option to reset the position IDs in the dataset at an interval.
            Not supported with fused and flash attention.
        create_attention_mask (bool): Option to enable the attention masks generation.
            Not supported with fused and flash attention.
        reset_attention_mask (bool): Option to reset the attention mask from the dataset.
            Not supported with fused and flash attention.
        eod_mask_loss (int): Option to enable the EOD mask loss.
        seed (int): Seed for generating the GPT dataset.
        split (str): A string of 3 comma-separated integers denoting how much of the distribution
            to allocate to train, validation, and test sets, respectively. Unused if ``paths`` is a dict.
        index_mapping_dir (Optional[str]): Path to a directory to write index mapping files.
        num_dataset_builder_threads (int): The number of threads to use for dataset building.
        num_train_samples (Optional[int]): The number of samples to use for training, defaults to total
            train steps times global batch size.
        num_val_samples (Optional[int]): The number of samples to use for validation, defaults to total
            validation steps times global batch size.
        num_test_samples (Optional[int]): The number of samples to use for testing, defaults to total
            test steps times global batch size.
        dataset_cls (Optional[Type[MegatronDataset]]): The dataset class to use for the data module.
    i   N      TFi  z	900,50,50r(   r$   
seq_length	tokenizerr   micro_batch_sizeglobal_batch_sizerampup_batch_sizenum_workers
pin_memorypersistent_workersreset_position_idscreate_attention_maskreset_attention_maskeod_mask_lossseedsplitindex_mapping_dirnum_dataset_builder_threadsnum_train_samplesnum_val_samplesnum_test_samplesdataset_clsreturnc                    sp  t    t|tttfs|g}ddlm} || _t	| i }t|trE|d ur1t
d|d ||d ||d ||d g|d< n||\}}t|d	krSd }||g|d
< ||d< || _|| _|| _|| _|| _|| _|| _|	| _|
| _|p|t | _|| _|| _|| _|| _|| _|| _d| _|| _|| _ || _!ddl"m#} |p|dd| _t$| j| j| j|d| _%d S )Nr   )get_blend_from_listzsplit=zP will be ignored since datasets are being created from 3 separate distributions.train
validationtestblend_per_splitr(   blendrK   )get_nmt_tokenizermegatronGPT2BPETokenizer)seq_lenr@   rA   rB   )&super__init__r)   r    r*   r,   megatron.core.datasets.utilsrS   rQ   r+   warningswarnr   build_kwargsr>   r@   rA   r?   rC   rD   rE   rF   HAVE_TErG   rH   rI   rJ   rK   rL   rM   init_global_steprN   rO   rP   3nemo.collections.nlp.modules.common.tokenizer_utilsrY   r   data_sampler)selfr$   r>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   rK   rL   rM   rN   rO   rP   rQ   rS   rb   weightsrY   	__class__r   r   r^      sb   






zPreTrainingDataModule.__init__trainer_max_stepstrainer_val_check_intervaltrainer_limit_val_batchestrainer_limit_test_batchesc                 C   s`  ddl m} |}|dksJ d| dt|| jj }| jdur9| j|ks.J d| d| j}t|| jj }|| d | }t|| jj }	|}
t|
| jj }| jdurh| j|	kseJ d	|	 d| j}	| jdur}| j|kszJ d
| d| j}|dkr|dkrt|t	rd| j
vsJ dd}	||	|g}|| j|dd | jd \| _| _| _dS )z%
        Build the datasets.
        r   )BlendedMegatronDatasetBuilderz
max_steps z should be greater than 0Nz3num_train_samples must be greater than or equal to .r(   z%num_val_samples must be greater than z&num_test_samples must be greater than g        g      ?rX   a  When using a single data distribution, limit_val_batches <= 1.0 is not supported. If you'd like to run with a fractional value of limit_val_batches, please pass in separate datasets for the train, validation, and test datasets by providing a dictionary of paths, e.g.: 
    paths={ 
         'train': [PATHS FOR TRAIN], 
         'validation': [PATHS FOR VALIDATION], 
         'test' :[PATHS FOR TEST],  
    }c                   S   s   dS )NTr   r   r   r   r   <lambda>.  s    z-PreTrainingDataModule.build.<locals>.<lambda>)is_built_on_rankconfig)7megatron.core.datasets.blended_megatron_dataset_builderro   intrf   rA   rN   rO   rP   r)   r   rb   rQ   gpt_dataset_configbuild	_train_ds_validation_ds_test_ds)rg   rk   rl   rm   rn   ro   train_itersrN   
eval_itersrO   
test_itersrP   train_valid_test_num_samplesr   r   r   rw      sN   






zPreTrainingDataModule.build stagec                 C   sB   t | dr
| jdusJ d| j| jj| jj| jj| jjd dS )z(
        Setup the data module.
        trainerNz?Setup should be completed when trainer and config are attached.rk   rl   rm   rn   )hasattrr   rw   	max_stepsval_check_intervallimit_val_batcheslimit_test_batches)rg   r   r   r   r   setup2  s   
zPreTrainingDataModule.setupc                 C      | j | jddS )z+
        Get the train dataloader.
        rT   mode)_create_dataloaderrx   rg   r   r   r   train_dataloaderR     z&PreTrainingDataModule.train_dataloaderc                 C   r   )z0
        Get the validation dataloader.
        rU   r   )r   ry   r   r   r   r   val_dataloaderX  r   z$PreTrainingDataModule.val_dataloaderc                 C   r   )z*
        Get the test dataloader.
        rV   r   )r   rz   r   r   r   r   test_dataloader^  r   z%PreTrainingDataModule.test_dataloaderc                 K   sF   | j j| _| j| j_td||| j| j| jt|dt	j
jd|}|S )N
collate_fn)r   datasetrC   rD   rE   r   r   )r   global_steprd   rf   r   rC   rD   rE   getattrr   
dataloaderdefault_collate)rg   r   r   kwargsr   r   r   r   r   d  s   

	z(PreTrainingDataModule._create_dataloaderr   c                 C   sB   ddl m} |d| j| j| j| j| j| j| j| j	| j
d	| jS )z4
        Get the GPT dataset configuration.
        r   r   )	random_seedsequence_lengthr?   path_to_cacherF   rG   rH   rI   rM   Nr   )"megatron.core.datasets.gpt_datasetr   rJ   r>   r?   rL   rF   rG   rH   rI   rM   rb   )rg   r   r   r   r   rv   r  s   
z(PreTrainingDataModule.gpt_dataset_configc                 C   s   | j | jj| j }d|iS )zCalled when saving a checkpoint, implement to generate and save datamodule state.

        Returns:
            A dictionary containing datamodule state.

        consumed_samples)rf   compute_consumed_samplesr   r   rd   )rg   r   r   r   r   
state_dict  s   z PreTrainingDataModule.state_dictr   c              	   C   sv   zddl m} W n ttfy   td ddlm} Y nw |d }|| j_|| j_	||dd d| j_
td d	S )
zCalled when loading a checkpoint, implement to reload datamodule state given datamodule stat

        Args:
            state_dict: the datamodule state returned by ``state_dict``.

        r   )update_num_microbatchesCMegatron num_microbatches_calculator not found, using Apex version.r   F)r   consistency_checkr(   z*** Loaded DataModule state dict successfully. IGNORE PTL's warning below about the dataloader not being resumable. This is warning is expected because we are handling dataloader resumption manually in NeMo. ***N))megatron.core.num_microbatches_calculatorr   ImportErrorModuleNotFoundErrorloggingwarning(apex.transformer.pipeline_parallel.utilsrf   init_consumed_samplesprev_consumed_samplesif_first_stepr
   )rg   r   r   r   r   r   r   load_state_dict  s"   
z%PreTrainingDataModule.load_state_dictc              	   C   s   ddl m} || jj| j| j_|| jj| j| j_zddlm} W n t	t
fy8   td ddlm} Y nw | j j| 9  _dS )zx
        Reconfigure trainer.limit_train_batches and trainer.limit_val_batches in terms of num of microbatches.
        r   )_reconfigure_limit_batches)get_num_microbatchesr   N)#nemo.collections.llm.gpt.data.utilsr   r   limit_train_batchesrx   r   ry   r   r   r   r   r   r   r   num_sanity_val_steps)rg   r   r   r   r   r   reconfigure_limit_batches  s   

z/PreTrainingDataModule.reconfigure_limit_batches)r   )rR   r   )!__name__
__module____qualname____doc__r   r   r   r   r.   ru   r   boolr   r   r^   r	   r   rw   r   r   r   r   r   r   r   r   propertyrv   r   r   r   r   __classcell__r   r   ri   r   r;   k   s    5
	
S


A 
r;   
datamodulerk   rl   rm   rn   c                 C   s\   ddl m} | rJ d|jddd ddlm} |d|   | j||||d dS )	a`  
    Builds the index mapping cache for nemo.collections.llm.gpt.data.PreTrainingDataModule.

    Args:
        datamodule (PreTrainingDataModule): The pre-training data module to build.
        trainer_max_steps (int): The max_steps set in your trainer.
        trainer_val_check_interval (int): The interval at which to perform validation in your trainer.
        trainer_limit_val_batches (Union[int, float]): The number of validation batches to use in your trainer.
        trainer_limit_test_batches (Union[int, float]): The number of test batches to use in your trainer.

    Returns:
        None
    r   NzHThis function cannot be called inside an existing torch.distributed job.r(   )
world_sizerank)r   z	Building r   )torch.distributeddistributedis_initializedinit_process_group
nemo.utilsr   inforw   )r   rk   rl   rm   rn   distr   r   r   r   build_pretraining_datamodule  s   
r   )0r   r0   r`   pathlibr   typingr   r   r   r   r   r   r	   lightning.pytorchpytorchpl%lightning.pytorch.utilities.rank_zeror
   !lightning.pytorch.utilities.typesr   r   r   r   'megatron.core.datasets.megatron_datasetr   torch.utilsr   nemo.lightning.datar   nemo.lightning.io.mixinr   nemo.lightning.pytorch.pluginsr   nemo.utils.import_utilsr   _rc   r   1nemo.collections.common.tokenizers.tokenizer_specr   r   r'   r+   LightningDataModuler;   ru   r   r   r   r   r   r   <module>   sH   $)  ^

