o
    }oi:>                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d dlmZ d d	lmZ erXd d
lmZ d dlmZ G dd dejeZ dS )    N)Path)TYPE_CHECKINGAnyDictListOptionalUnion)MisconfigurationException)EVAL_DATALOADERSTRAIN_DATALOADERS)data)WrappedDataLoader)IOMixin)MegatronDataSampler BERTMaskedWordPieceDatasetConfig)TokenizerSpecc                !       s<  e Zd ZdZ															d5d
eeeeeef f de	de
d de	de	de
ee	  de	dededededede	dede
e ddf  fddZd6deddfdd Zdefd!d"Zdefd#d$Zdefd%d&Zdefd'd(Zed7d*d+Zdeeef fd,d-Zd.eeef ddfd/d0Zd1d2 Zd3d4 Z  ZS )8BERTPreTrainingDataModulea	  PyTorch Lightning-compatible data module for pre-training
       BERT-style models.
    Args:
        paths (Path | List | Dict[str, List]): Paths of the data distributions. Can be either a
            single path, a list of paths, or a dictionary. If a single path or a list of paths,
            the given paths will be used to generate the train, validation and test datasets. If
            providing a list of paths, the format can be either (1) a list of paths, e.g.
                ["path/to/dataset_1_prefix", "path/to/dataset_2_prefix"],
            or (2) a flattened, zipped list of weights and paths, e.g.
                ["30", "path/to/dataset_1_prefix", "70", "path/to/dataset_2_prefix"]
            If a dictionary is provided, it is expected to have the following form:
                {
                    'train': <TRAIN PATHS>,
                    'validation': <VALID PATHS>,
                    'test': <TEST PATHS>
                }
            where each value is either a path or a list of paths as described above.
            In this case, each split will be generated using the given paths.
            Note that if limit_val_batches <= 1, we generate the entire validaton dataset, so
            weights should not be provided for the validation split.
        seq_length (int): Sequence length.
        tokenizer (Optional["TokenizerSpec"]): An instance of a TokenizerSpec object.
        micro_batch_size (int): Batch size per GPU.
        global_batch_size (int): Global batch size.
        rampup_batch_size (Optional[List[int]]): Rampup batch size, should be in format of
            [start_global_batch_size, batch_size_increment, ramup_samples].
        num_workers (int): See ``torch.utils.data.DataLoader`` documentation.
        pin_memory (bool): See ``torch.utils.data.DataLoader`` documentation.
        persistent_workers (bool): See ``torch.utils.data.DataLoader`` documentation.
        reset_position_ids (bool): Option to reset the position IDs in the dataset at an interval.
        reset_attention_mask (bool): Option to reset the attention mask from the dataset.
        eod_mask_loss (int): Option to enable the EOD mask loss.
        seed (int): Seed for generating the GPT dataset.
        split (str): A string of 3 comma-separated integers denoting how much of the distribution
            to allocate to train, validation, and test sets, respectively. Unused if ``paths`` is a dict.
        index_mapping_dir (Optional[str]): Path to a directory to write index mapping files.
       N      TF  	900,50,50paths
seq_length	tokenizerr   micro_batch_sizeglobal_batch_sizerampup_batch_sizenum_workers
pin_memorypersistent_workersreset_position_idsreset_attention_maskeod_mask_lossseedsplitindex_mapping_dirreturnc                    s>  t    t|tttfs|g}ddlm} i }t|tr>|d ur*t	d|d ||d ||d ||d g|d< n||\}}t
|d	krLd }||g|d
< ||d< || _|| _|| _|| _|| _|| _|| _|	| _|
| _|| _|| _|| _|| _|| _d| _ddlm} |p|dd| _t| j| j| j|d| _d S )Nr   )get_blend_from_listzsplit=zP will be ignored since datasets are being created from 3 separate distributions.train
validationtestblend_per_split   blendr&   )get_nmt_tokenizermegatronBertWordPieceLowerCase)seq_lenr   r   r   )super__init__
isinstancelisttupledictmegatron.core.datasets.utilsr)   warningswarnlenbuild_kwargsr   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   init_global_step3nemo.collections.nlp.modules.common.tokenizer_utilsr0   r   data_sampler)selfr   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r)   r>   weightsr0   	__class__ _/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/bert/data/pre_training.pyr5   J   sT   






z"BERTPreTrainingDataModule.__init__ stagec                 C   s   ddl m} ddlm} t| dr| jdusJ d| jj}|dks&J d|| jj d | jj }| jj	}t
|| jj }t
|| jj }t
|| jj }	| jjd	krft| jjtrfd
| jvsdJ dd}|||	g}
|||
dd | jd \| _| _| _dS )zAssign Train/Val/Test datasetr   )BERTMaskedWordPieceDataset)BlendedMegatronDatasetBuildertrainerNz?Setup should be completed when trainer and config are attached.z Please specify trainer.max_stepsr.         ?r/   a  When using a single data distribution, limit_val_batches <= 1.0 is not supported. If you'd like to run with a fractional value of limit_val_batches, please pass in separate datasets for the train, validation, and test datasets by providing a dictionary of paths, e.g.: 
    paths={ 
         'train': [PATHS FOR TRAIN], 
         'validation': [PATHS FOR VALIDATION], 
         'test' :[PATHS FOR TEST],  
    }c                   S   s   dS )NTrF   rF   rF   rF   rG   <lambda>   s    z1BERTPreTrainingDataModule.setup.<locals>.<lambda>)is_built_on_rankconfig)#megatron.core.datasets.bert_datasetrJ   7megatron.core.datasets.blended_megatron_dataset_builderrK   hasattrrL   	max_stepsval_check_intervallimit_val_batcheslimit_test_batchesintrA   r   r6   floatr>   bert_dataset_configbuild	_train_ds_validation_ds_test_ds)rB   rI   rJ   rK   max_train_steps
eval_iters
test_itersnum_train_samplesnum_val_samplesnum_test_samplestrain_valid_test_num_samplesrF   rF   rG   setup   s8   
zBERTPreTrainingDataModule.setupc                 C      | j | jddS )zCreate Train dataloaderr*   mode)_create_dataloaderr\   rB   rF   rF   rG   train_dataloader      z*BERTPreTrainingDataModule.train_dataloaderc                 C   rg   )zCreate Validation dataloaderr+   rh   )rj   r]   rk   rF   rF   rG   val_dataloader   rm   z(BERTPreTrainingDataModule.val_dataloaderc                 C   rg   )zCreate Test dataloaderr,   rh   )rj   r^   rk   rF   rF   rG   test_dataloader   rm   z)BERTPreTrainingDataModule.test_dataloaderc                 K   sF   | j j| _| j| j_td||| j| j| jt|dt	j
jd|}|S )N
collate_fn)ri   datasetr   r    r!   rp   rF   )rL   global_stepr?   rA   r   r   r    r!   getattrr   
dataloaderdefault_collate)rB   rq   ri   kwargsrt   rF   rF   rG   rj      s   

	z,BERTPreTrainingDataModule._create_dataloaderr   c                 C   s>   ddl m} |d
| j| j| j| jddddddddd| jS )zJCreate Bert Dataset Config using Mcore's BERT MaskedWordPieceDatasetConfigr   r   Tg333333?g?   F)random_seedsequence_lengthr   path_to_cacheclassification_headmasking_probabilityshort_sequence_probabilitymasking_max_ngrammasking_do_full_wordmasking_do_permutationmasking_use_longer_ngrams"masking_use_geometric_distributionNrF   )rQ   r   r%   r   r   r'   r>   )rB   r   rF   rF   rG   rZ      s"   z-BERTPreTrainingDataModule.bert_dataset_configc                 C   s   | j | jj| j }d|iS )zCalled when saving a checkpoint, implement to generate and save datamodule state.

        Returns:
            A dictionary containing datamodule state.

        consumed_samples)rA   compute_consumed_samplesrL   rr   r?   )rB   r   rF   rF   rG   
state_dict   s   z$BERTPreTrainingDataModule.state_dictr   c              	   C   sn   zddl m} W n ttfy   td ddlm} Y nw |d }|| j_|| j_	||dd d| j_
dS )	zCalled when loading a checkpoint, implement to reload datamodule state given datamodule stat

        Args:
            state_dict: the datamodule state returned by ``state_dict``.

        r   )update_num_microbatchesCMegatron num_microbatches_calculator not found, using Apex version.r   F)r   consistency_checkr.   N))megatron.core.num_microbatches_calculatorr   ImportErrorModuleNotFoundErrorloggingwarning(apex.transformer.pipeline_parallel.utilsrA   init_consumed_samplesprev_consumed_samplesif_first_step)rB   r   r   r   rF   rF   rG   load_state_dict  s   
z)BERTPreTrainingDataModule.load_state_dictc                 C   s,   |  | jj| jd |  | jj| jd dS )z5Reconfigure trainer.limit_val_batches for pretrainingr*   valN)_reconfigure_limit_batchesrL   limit_train_batchesr\   rV   r]   rk   rF   rF   rG   reconfigure_limit_batches  s   z3BERTPreTrainingDataModule.reconfigure_limit_batchesc              
   C   s:  zddl m} W n ttfy   td ddlm} Y nw t|tr*|| 9 }n[t|t	s1J |dks9|du r;dS t
|}t
|t	dkr|dkrN|}n7t|| }|dkru|dkrudt
| }td| d	| d
t
| d| d	|| k r~| }n|||   }|dkr|| j_n|| j_| j j| 9  _dS )zG
        Reconfigure trainer.limit_val_batches for pretraining
        r   )get_num_microbatchesr   g        NinfrM   zYou requested to check z of the val_dataloader but z * zX < 1. Please increase the `limit_val_batches` argument. Try at least `limit_val_batches=`r*   )r   r   r   r   r   r   r   r6   rX   rY   r=   r	   rL   r   rV   num_sanity_val_steps)rB   limit_batchesrt   ri   r   dl_len_in_micro_batcheslimit_micro_batchesmin_percentagerF   rF   rG   r   "  sD   



z4BERTPreTrainingDataModule._reconfigure_limit_batches)r   Nr   r   Nr   TFFFFr   r   N)rH   )r(   r   )__name__
__module____qualname____doc__r   r   r   r   strrX   r   boolr5   rf   r   rl   r
   rn   ro   r   rj   propertyrZ   r   r   r   r   r   __classcell__rF   rF   rD   rG   r   #   sx    )
	
E:
r   )!r   r;   pathlibr   typingr   r   r   r   r   r   lightning.pytorchpytorchpl&lightning.pytorch.utilities.exceptionsr	   !lightning.pytorch.utilities.typesr
   r   torch.utilsr   nemo.lightning.datar   nemo.lightning.io.mixinr   nemo.lightning.pytorch.pluginsr   rQ   r   1nemo.collections.common.tokenizers.tokenizer_specr   LightningDataModuler   rF   rF   rF   rG   <module>   s    