o
    }oi@                     @   s   d dl Z d dlZd dlmZ d dlmZmZmZmZm	Z	 d dl
mZ d dlmZmZ d dlmZ d dlmZ d dlmZ d dlmZ erPd d	lmZ d d
lmZ G dd dejeZdS )    N)Path)TYPE_CHECKINGAnyDictListOptional)EVAL_DATALOADERSTRAIN_DATALOADERS)data)WrappedDataLoader)IOMixin)MegatronDataSamplerT5MaskedWordPieceDatasetConfig)TokenizerSpecc                +       sX  e Zd ZdZ												
								d>deeB eeef B dedede	d dedede	ee  dede
de
dededede
de
de
de
d ed!ed"e	e d#df* fd$d%Zd?d'ed#dfd(d)Zd#efd*d+Zd#efd,d-Zd#efd.d/Zd#efd0d1Zed@d3d4Zd#eeef fd5d6Zd7eeef d#dfd8d9Zd:d; Zd<d= Z  ZS )APreTrainingDataModulea=
  PyTorch Lightning-compatible data module for pre-training
       T5-style models.
    Args:
        paths (Path | List | Dict[str, List]): Paths of the data distributions. Can be either a
            single path, a list of paths, or a dictionary. If a single path or a list of paths,
            the given paths will be used to generate the train, validation and test datasets. If
            providing a list of paths, the format can be either (1) a list of paths, e.g.
                ["path/to/dataset_1_prefix", "path/to/dataset_2_prefix"],
            or (2) a flattened, zipped list of weights and paths, e.g.
                ["30", "path/to/dataset_1_prefix", "70", "path/to/dataset_2_prefix"]
            If a dictionary is provided, it is expected to have the following form:
                {
                    'train': <TRAIN PATHS>,
                    'validation': <VALID PATHS>,
                    'test': <TEST PATHS>
                }
            where each value is either a path or a list of paths as described above.
            In this case, each split will be generated using the given paths.
            Note that if limit_val_batches <= 1, we generate the entire validaton dataset, so
            weights should not be provided for the validation split.
        seq_length (int): Sequence length.
        seq_length_dec (int): Sequence length of decoder.
        tokenizer (Optional["TokenizerSpec"]): An instance of a TokenizerSpec object.
        micro_batch_size (int): Batch size per GPU.
        global_batch_size (int): Global batch size.
        rampup_batch_size (Optional[List[int]]): Rampup batch size, should be in format of
            [start_global_batch_size, batch_size_increment, ramup_samples].
        num_workers (int): See ``torch.utils.data.DataLoader`` documentation.
        pin_memory (bool): See ``torch.utils.data.DataLoader`` documentation.
        persistent_workers (bool): See ``torch.utils.data.DataLoader`` documentation.
        masking_probability (float):
        short_sequence_probability (float):
        masking_max_ngram (int):
        masking_do_full_word (bool):
        masking_do_permutation (bool):
        masking_use_longer_ngrams (bool):
        masking_use_geometric_distribution (bool):
        seed (int): Seed for generating the T5 dataset.
        split (str): A string of 3 comma-separated integers denoting how much of the distribution
            to allocate to train, validation, and test sets, respectively. Unused if ``paths`` is a dict.
        index_mapping_dir (Optional[str]): Path to a directory to write index mapping files.
          N@      TF333333?皙?
     
999982,9,9paths
seq_lengthseq_length_dec	tokenizerr   micro_batch_sizeglobal_batch_sizerampup_batch_sizenum_workers
pin_memorypersistent_workersmasking_probabilityshort_sequence_probabilitymasking_max_ngrammasking_do_full_wordmasking_do_permutationmasking_use_longer_ngrams"masking_use_geometric_distributionseedsplitindex_mapping_dirreturnc                    s~  t    t|tttfs|g}ddlm} i }t|tr>|d ur*t	d|d ||d ||d ||d g|d< n||\}}t
|d	krLd }||g|d
< ||d< || _|| _|| _|| _|| _|| _|| _|	| _|
| _|| _|| _|| _|| _|| _|| _|| _|| _|| _|| _d| _|d u rddlm } i }dd t!dD |d< |dd|d}|| _t"| j|||d| _#d S )Nr   )get_blend_from_listzsplit=zP will be ignored since datasets are being created from 3 separate distributions.train
validationtestblend_per_split   blendr-   )get_nmt_tokenizerc                 S   s   g | ]}d | dqS )z
<extra_id_> ).0ir9   r9   ]/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/t5/data/pre_training.py
<listcomp>   s    z2PreTrainingDataModule.__init__.<locals>.<listcomp>d   additional_special_tokensmegatronBertWordPieceCase)special_tokens)seq_lenr   r    r!   )$super__init__
isinstancelisttupledictmegatron.core.datasets.utilsr0   warningswarnlenbuild_kwargsr   r   r   r    r   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   init_global_step3nemo.collections.nlp.modules.common.tokenizer_utilsr7   ranger   data_sampler)selfr   r   r   r   r   r    r!   r"   r#   r$   r%   r&   r'   r(   r)   r*   r+   r,   r-   r.   r0   rN   weightsr7   rB   	__class__r9   r<   rE   N   sn   






zPreTrainingDataModule.__init__ stagec                 C   s   ddl m} ddlm} t| dr| jd usJ d| jj}|dks&J d|| jj d | jj }| jj	}t
|| jj }t
|| jj }t
|| jj }	| jjdkrft| jjtrfd	| jvsdJ d
d }|||	g}
|||
dd | jd \| _| _| _d S )Nr   )BlendedMegatronDatasetBuilder)T5MaskedWordPieceDatasettrainerz?Setup should be completed when trainer and config are attached.z Please specify trainer.max_stepsr5         ?r6   a  When using a single data distribution, limit_val_batches <= 1.0 is not supported. If you'd like to run with a fractional value of limit_val_batches, please pass in separate datasets for the train, validation, and test datasets by providing a dictionary of paths, e.g.: 
    paths={ 
         'train': [PATHS FOR TRAIN], 
         'validation': [PATHS FOR VALIDATION], 
         'test' :[PATHS FOR TEST],  
    }c                   S   s   dS )NTr9   r9   r9   r9   r<   <lambda>   s    z-PreTrainingDataModule.setup.<locals>.<lambda>)is_built_on_rankconfig)7megatron.core.datasets.blended_megatron_dataset_builderrY   !megatron.core.datasets.t5_datasetrZ   hasattrr[   	max_stepsval_check_intervallimit_val_batcheslimit_test_batchesintrR   r    rF   floatrN   t5_dataset_configbuild	_train_ds_validation_ds_test_ds)rS   rX   rY   rZ   max_train_steps
eval_iters
test_itersnum_train_samplesnum_val_samplesnum_test_samplestrain_valid_test_num_samplesr9   r9   r<   setup   s8   
zPreTrainingDataModule.setupc                 C      | j | jddS )Nr1   mode)_create_dataloaderrk   rS   r9   r9   r<   train_dataloader      z&PreTrainingDataModule.train_dataloaderc                 C   rv   )Nr2   rw   )ry   rl   rz   r9   r9   r<   val_dataloader   r|   z$PreTrainingDataModule.val_dataloaderc                 C   rv   )Nr3   rw   )ry   rm   rz   r9   r9   r<   test_dataloader   r|   z%PreTrainingDataModule.test_dataloaderc                 K   s<   | j j| _td||| j| j| jt|dtj	j
d|}|S )N
collate_fn)rx   datasetr"   r#   r$   r   r9   )r[   global_steprO   r   r"   r#   r$   getattrr
   
dataloaderdefault_collate)rS   r   rx   kwargsr   r9   r9   r<   ry      s   
	z(PreTrainingDataModule._create_dataloaderr   c                 C   sN   ddl m} |d| j| j| j| j| j| j| j| j	| j
| j| j| jd| jS )Nr   r   )random_seedsequence_lengthsequence_length_decoderr   path_to_cacher%   r&   r'   r(   r)   r*   r+   r9   )ra   r   r,   r   r   r   r.   r%   r&   r'   r(   r)   r*   r+   rN   )rS   r   r9   r9   r<   ri      s"   z'PreTrainingDataModule.t5_dataset_configc                 C   s   | j | jj| j }d|iS )zCalled when saving a checkpoint, implement to generate and save datamodule state.

        Returns:
            A dictionary containing datamodule state.

        consumed_samples)rR   compute_consumed_samplesr[   r   rO   )rS   r   r9   r9   r<   
state_dict	  s   z PreTrainingDataModule.state_dictr   c              	   C   sn   zddl m} W n ttfy   td ddlm} Y nw |d }|| j_|| j_	||dd d| j_
dS )	zCalled when loading a checkpoint, implement to reload datamodule state given datamodule stat

        Args:
            state_dict: the datamodule state returned by ``state_dict``.

        r   )update_num_microbatchesCMegatron num_microbatches_calculator not found, using Apex version.r   F)r   consistency_checkr5   N))megatron.core.num_microbatches_calculatorr   ImportErrorModuleNotFoundErrorloggingwarning(apex.transformer.pipeline_parallel.utilsrR   init_consumed_samplesprev_consumed_samplesif_first_step)rS   r   r   r   r9   r9   r<   load_state_dict  s   
z%PreTrainingDataModule.load_state_dictc                 C   s,   |  | jj| jd |  | jj| jd d S )Nr1   val)_reconfigure_limit_batchesr[   limit_train_batchesrk   re   rl   rz   r9   r9   r<   reconfigure_limit_batches+  s   z/PreTrainingDataModule.reconfigure_limit_batchesc              
   C   s:  zddl m} W n ttfy   td ddlm} Y nw t|tr*|| 9 }n[t|t	s1J |dks9|du r;dS t
|}t
|t	dkr|dkrN|}n7t|| }|dkru|dkrudt
| }td| d	| d
t
| d| d	|| k r~| }n|||   }|dkr|| j_n|| j_| j j| 9  _dS )zG
        Reconfigure trainer.limit_val_batches for pretraining
        r   )get_num_microbatchesr   g        Ninfr\   zYou requested to check z of the val_dataloader but z * zX < 1. Please increase the `limit_val_batches` argument. Try at least `limit_val_batches=`r1   )r   r   r   r   r   r   r   rF   rg   rh   rM   MisconfigurationExceptionr[   r   re   num_sanity_val_steps)rS   limit_batchesr   rx   r   dl_len_in_micro_batcheslimit_micro_batchesmin_percentager9   r9   r<   r   1  sD   



z0PreTrainingDataModule._reconfigure_limit_batches)r   r   Nr   r   Nr   TFr   r   r   TFFTr   r   N)rW   )r/   r   )__name__
__module____qualname____doc__r   r   r   strrg   r   boolrh   rE   ru   r	   r{   r   r}   r~   r   ry   propertyri   r   r   r   r   r   __classcell__r9   r9   rU   r<   r   "   s    .
	
X9
r   )r   rK   pathlibr   typingr   r   r   r   r   lightning.pytorchpytorchpl!lightning.pytorch.utilities.typesr   r	   torch.utilsr
   nemo.lightning.datar   nemo.lightning.io.mixinr   nemo.lightning.pytorch.pluginsr   ra   r   1nemo.collections.common.tokenizers.tokenizer_specr   LightningDataModuler   r9   r9   r9   r<   <module>   s   