o
    }oi;                     @   s   d dl mZ d dlmZmZmZmZ d dlZd dl	m
Z d dlZd dlmZmZ d dlmZ d dlmZmZmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lm Z  d dl!m"Z" G dd dej#eZ$dS )    deepcopy)AnyDictLiteralOptionalN)EVAL_DATALOADERSTRAIN_DATALOADERS)parallel_state)WorkerConfigget_savable_loaderget_train_dataset)Self)TaskEncoder)IOMixinserializationtrack_io)MegatronDataSampler)loggingc                       s  e Zd ZdZ											d)deded	ed
ededededB dedededB dee dee dee ddf fddZ	de
je fddZd*ded fddZdefddZdefd d!Zd+d"d#Zdeeef fd$d%Zd&eeef ddfd'd(Z  ZS ),EnergonDataModulea7  
    A PyTorch Lightning DataModule for handling Energon datasets.

    It provides a seamless interface to load training and validation data, saving, and sampling strategies.
    The module integrates with the Megatron-Energon framework for efficient data handling
    in large-scale distributed training.
          NTd   pathtrain_encoder
seq_lengthmicro_batch_sizeglobal_batch_sizenum_workersnum_val_workers
pin_memoryshuffle_buffer_sizemax_samples_per_sequencedecoder_seq_lengthpacking_buffer_sizevalidation_encoderreturnc                    s   t    || _|| _|| _|| _|| _|| _|| _|| _|| _|	| _	|
| _
|| _|r-|n|| _d| _t| j| j| j| jd| _dd | j_d| _d| _|| _|pS| j| _|| _| jj| _dS )a`  
        Initialize the EnergonModule.

        Parameters:
            path (str): Path to the dataset (must be in webdataset format, and prepared using energon prepare).
            train_encoder (TaskEncoder): Encoder for training data.
            seq_length (int, optional): The maximum sequence length for tokenized text. Defaults to 2048.
            micro_batch_size (int, optional): The batch size for training and validation. Defaults to 1.
            global_batch_size (int, optional): The global batch size across all processes. Defaults to 1.
            num_workers (int, optional): Number of workers for data loading. Defaults to 1.
            num_val_workers (int, optional): Number of workers for validation data loading. Defaults to num_workers.
            pin_memory (bool, optional): Whether to pin memory in the DataLoader. Defaults to True.
            shuffle_buffer_size (int, optional): Size of the shuffle buffer. Defaults to 100.
            max_samples_per_sequence (int, optional): Maximum number of samples per sequence to load from memory.
                Defaults to None (loads the whole tar file at once).
            decoder_seq_length (int, optional): The max seq length for the decoder. Used in encoder-decoder models.
            packing_buffer_size (int, optional): Size of the packing buffer for batched samples. Defaults to None.
            validation_encoder (TaskEncoder, optional): Encoder for validation data.
                Defaults to None and will be the same as train_encoder.
            **kwargs: Additional keyword arguments passed to get_train_dataset() of Energon.
        r   )seq_lendecoder_seq_lenr   r   c                 S   s   | S )N )
dataloaderr)   r)   Y/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/vlm/data/data_module.py<lambda>f   s    z,EnergonDataModule.__init__.<locals>.<lambda>N)super__init__r   r   r#   r   r   r   r    r!   r"   r   r%   init_global_stepr   data_samplertransform_dataloadertrain_dataloader_objectval_dataloader_objectr$   r   kwargs	tokenizer)selfr   r   r   r   r   r   r   r    r!   r"   r#   r$   r%   r4   	__class__r)   r+   r.   )   s8   
'zEnergonDataModule.__init__c                 K   sT   dd |  D }| D ]}tt|stt| qtjt| fi |}|S )Nc                 S   s"   i | ]\}}|d vr|t |qS ))r%   r   r   ).0kvr)   r)   r+   
<dictcomp>s   s   " z-EnergonDataModule.io_init.<locals>.<dictcomp>)itemsvaluesr   find_node_traversertyper   fdlConfig)r6   r4   
cfg_kwargsvalcfgr)   r)   r+   io_initq   s   zEnergonDataModule.io_initrD   split)trainrD   c              
   C   sT   |dvrt d|dkr| jn| j}t| jf| j||| j|| j| jd| j	}|S )a  
        Provide the dataset for training or validation.

        This method retrieves the dataset for the specified split (either 'train' or 'val') and configures
        it according to the worker configuration.

        Parameters:
        worker_config: Configuration for the data loader workers.
        split (Literal['train', 'val'], optional): The data split to retrieve ('train' or 'val'). Defaults to 'val'.

        Returns:
        Dataset: The dataset configured for the specified split.
        >   rD   rH   z=Invalid value for split. Allowed values are 'train' or 'val'.rD   )
batch_sizetask_encoderworker_configr$   
split_partr!   r"   )

ValueErrorr%   r   r   r   r   r$   r!   r"   r4   )r6   rK   rG   rJ   _datasetr)   r)   r+   datasets_provider{   s"   	z#EnergonDataModule.datasets_providerc              	   C   s   | j r| j j| _| j| j_td| j  | jr| jS t s0td| j	  t
| j	}n&t }t }t }td| d| d| d t
||| j	|ddd	}| j|d
d}t||d}|| _| jS )a  
        Initialize and return the training DataLoader.

        This method initializes the DataLoader for the training dataset. It uses the global step
        from the trainer to configure the data sampler and ensures that the parallel state is initialized
        correctly for distributed training.

        Returns:
        TRAIN_DATALOADERS: The DataLoader for the training dataset.
        z5Ttrain dataloader initializing with init_global_step zZData loader parallel state is not initialized,using default worker config with no_workers z( Train dataloader initializing withrank  world_size  data_parallel_group z ****** Nr   rank
world_sizer   data_parallel_groupworker_debug_pathworker_log_levelrH   rG   rK   )trainerglobal_stepr/   r0   r   infor2   r
   is_initializedr   r   default_worker_configget_data_parallel_rankget_data_parallel_world_sizeget_data_parallel_grouprO   r   )r6   rK   rS   rT   rU   train_datasetenergon_dataloaderr)   r)   r+   train_dataloader   sJ   

z"EnergonDataModule.train_dataloaderc                 C   s   | j r| j S t std| j  t| j}n%t	 }t
 }t }td| d| d|  t||| j|ddd}| j|dd	}t||d
}|| _ | j S )an  
        Initialize and return the validation DataLoader.

        This method initializes the DataLoader for the validation dataset. It ensures that the parallel state
        is initialized correctly for distributed training and returns a configured DataLoader object.

        Returns:
        EVAL_DATALOADERS: The DataLoader for the validation dataset.
        zjData loader val data loader parallel state is not initialized,using default worker config with no_workers zrank rP   rQ   Nr   rR   rD   rX   rY   )r3   r
   r]   r   r\   r   r   r^   r   r_   r`   ra   rO   r   )r6   rK   rS   rT   rU   val_datasetenergon_loaderr)   r)   r+   val_dataloader   s2   
z EnergonDataModule.val_dataloaderc                 C   s   t d dS )z
        Return None as test dataset split does not exist.

        This method overrides the test_dataloader method and returns None since the test dataset split
        is not defined or used in this module.

        Returns:
        None
        z-Data loader test dataset split does not existN)r   warning)r6   r)   r)   r+   test_dataloader   s   

z!EnergonDataModule.test_dataloaderc                 C   sz   | j r6| j j}g }tj t kr|jdd}| j	| j j
| j }|du r)g }td|  ||dS td i S )aT  
        Save the state of the data module.

        This method is called when saving a checkpoint. It generates and saves the state of the data module,
        including the state of the dataloader and the number of consumed samples.

        Returns:
        Dict[str, Any]: A dictionary containing the state of the data module.
        r   )global_dst_rankNz:Data loader saving dataloader state dict consumed samples )dataloader_stateconsumed_sampleszHtrainer object not connected to data module object returning empty state)rZ   rd   torchdistributedget_rankr
   get_model_parallel_src_ranksave_state_globalr0   compute_consumed_samplesr[   r/   r   r\   rh   )r6   dataloader_objstaterl   r)   r)   r+   
state_dict   s   

zEnergonDataModule.state_dictru   c              
   C   s  d|vrt d|   dS |d }z | jr'| jj | t d nt d|  t	dW n t
yN } zt d|  W Y d}~nd}~ww zdd	lm} W n ttfym   t d
 dd	lm} Y nw |d }|| j_|| j_t d|  ||dd dS )az  
        Load the state of the data module from a checkpoint.

        This method is called when loading a checkpoint. It restores the state of the data module,
        including the state of the dataloader and the number of consumed samples.

        Parameters:
        state_dict (Dict[str, Any]): The state dictionary containing the saved state of the data module.
        rk   zpData loader state cannot be resumed from state_dict, it does not have the required key dataloader_state. It has NzData loader state restoredz%Cannot restore state from state_dict zhCannot restore state from state_dict: Is the trainer object is initialized and attached to datamodule???zFailed to dataloader restore state due to [Please ensure you are using same version of energon while saving and loading, Continuing without restoring data loader] : r   )update_num_microbatcheszCMegatron num_microbatches_calculator not found, using Apex version.rl   z2Data loader load state dict with consumed_samples F)rl   consistency_check)r   rh   keysrZ   
datamodulerd   restore_state_globalr\   errorrM   	Exception)megatron.core.num_microbatches_calculatorrv   ImportErrorModuleNotFoundError(apex.transformer.pipeline_parallel.utilsr0   init_consumed_samplesprev_consumed_samples)r6   ru   rt   erv   rl   r)   r)   r+   load_state_dict  sN   


z!EnergonDataModule.load_state_dict)r   r   r   r   NTr   NNNN)rD   )r&   N)__name__
__module____qualname____doc__strr   intboolr   r.   rA   rB   r   rF   r   rO   r	   rd   r   rg   ri   r   r   ru   r   __classcell__r)   r)   r7   r+   r       sb    	
H
",
&""r   )%copyr   typingr   r   r   r   fiddlerA   lightning.pytorchpytorchpltorch.distributedrm   !lightning.pytorch.utilities.typesr   r	   megatron.corer
   megatron.energonr   r   r   typing_extensionsr   &nemo.collections.vlm.data.task_encoderr   nemo.lightning.io.mixinr   r   r   nemo.lightning.pytorch.pluginsr   
nemo.utilsr   LightningDataModuler   r)   r)   r)   r+   <module>   s   