o
    }oi                     @   sl   d dl Z d dlmZmZmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZ d dlmZ G dd deZdS )	    N)AnyDictLiteral)EVAL_DATALOADERS)parallel_state)DefaultTaskEncoderWorkerConfigget_savable_loaderget_train_dataset)EnergonMultiModalDataModulec                       s   e Zd ZdZ										d!d	ed
edededededededededB dedB ddf fddZd"de	d fddZ
defddZdeeef ddf fdd Z  ZS )#DiffusionDataModulea  
    A PyTorch Lightning DataModule for handling multimodal datasets with images and text.

    This data module is designed to work with multimodal datasets that involve both images and text.
    It provides a seamless interface to load training and validation data, manage batching, and handle
    the state of the data pipeline across training epochs. The module integrates with the Megatron-Energon
    framework for efficient data handling in large-scale distributed training.

    Attributes:
    path (str): Path to the energon dataset.
    tokenizer (Tokenizer): The tokenizer used for processing text.
    image_processor (ImageProcessor): The image processor used for preprocessing images.
    seq_length (int): The maximum sequence length for tokenized text.
    micro_batch_size (int): The batch size for training and validation.
    num_workers (int): Number of workers for data loading.
    pin_memory (bool): Whether to pin memory in the DataLoader.
    multimodal_sample_config (MultiModalSampleConfig): Configuration object for multimodal samples.
    task_encoder (MultiModalTaskEncoder): Encoder responsible for encoding and batching samples.
    init_global_step (int): The initial global step for the trainer, used for resuming training.
    data_sampler (SequentialMegatronSampler): Sampler responsible for generating sequential samples.
    train_dataloader_object (Optional): The DataLoader object for training data.
    val_dataloader_object (Optional): The DataLoader object for validation data.
             TNF ʚ;path
seq_lengthmicro_batch_sizeglobal_batch_sizenum_workers
pin_memorytask_encoderuse_train_split_for_valvirtual_epoch_lengthpacking_buffer_sizemax_samples_per_sequencereturnc                    s@   t  j|dd||||||d	 || _|	| _d| _|
| _|| _dS )a  
        Initialize the EnergonMultiModalDataModule.

        Parameters:
        path (str): Path to the dataset.
        tokenizer (Tokenizer): The tokenizer used for processing text.
        image_processor (ImageProcessor): The image processor used for preprocessing images.
        seq_length (int, optional): The maximum sequence length for tokenized text. Defaults to 2048.
        micro_batch_size (int, optional): The batch size for training and validation. Defaults to 1.
        num_workers (int, optional): Number of workers for data loading. Defaults to 1.
        pin_memory (bool, optional): Whether to pin memory in the DataLoader. Defaults to True.
        N)	r   	tokenizerimage_processorr   r   r   r   r   r   r   )super__init__r   r   num_workers_valr   r   )selfr   r   r   r   r   r   r   r   r   r   r   	__class__ p/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/diffusion/data/diffusion_energon_datamodule.pyr    2   s    
zDiffusionDataModule.__init__valsplit)trainr'   c                 C   sD   |dvrt d| jrd}t| j| j| j|| jd|| j| jd	}|S )a  
        Provide the dataset for training or validation.

        This method retrieves the dataset for the specified split (either 'train' or 'val') and configures
        it according to the worker configuration.

        Parameters:
        worker_config: Configuration for the data loader workers.
        split (Literal['train', 'val'], optional): The data split to retrieve ('train' or 'val'). Defaults to 'val'.

        Returns:
        Dataset: The dataset configured for the specified split.
        >   r'   r)   z=Invalid value for split. Allowed values are 'train' or 'val'.r)   N)
batch_sizer   worker_configr   shuffle_buffer_size
split_partr   r   )	
ValueErrorr   r
   r   r   r   r   r   r   )r"   r+   r(   _datasetr%   r%   r&   datasets_provider^   s    z%DiffusionDataModule.datasets_providerc                 C   s   | j r|  S | jr| jS t s#d| j }t| t	| j
}n%t }t }t }td| d| d|  t||| j
|ddd}| j|dd	}t||d
}|| _| jS )an  
        Initialize and return the validation DataLoader.

        This method initializes the DataLoader for the validation dataset. It ensures that the parallel state
        is initialized correctly for distributed training and returns a configured DataLoader object.

        Returns:
        EVAL_DATALOADERS: The DataLoader for the validation dataset.
        zjMuiltimodal val data loader parallel state is not initialized using default worker config with no_workers zrank z world_size z data_parallel_group Nr   )rank
world_sizer   data_parallel_groupworker_debug_pathworker_log_levelr'   )r(   )r+   )r   train_dataloaderval_dataloader_objectr   is_initializedr   logginginfor   default_worker_configr!   get_data_parallel_rankget_data_parallel_world_sizeget_data_parallel_groupr0   r	   )r"   messager+   r1   r2   r3   val_datasetenergon_loaderr%   r%   r&   val_dataloader}   s6   

z"DiffusionDataModule.val_dataloader
state_dictc              
      sJ   z	t  | W dS  ty$ } ztd|  W Y d}~dS d}~ww )az  
        Load the state of the data module from a checkpoint.

        This method is called when loading a checkpoint. It restores the state of the data module,
        including the state of the dataloader and the number of consumed samples.

        Parameters:
        state_dict (Dict[str, Any]): The state dictionary containing the saved state of the data module.
        z#datamodule.load_state_dict failed  N)r   load_state_dict	Exceptionr9   warning)r"   rC   er#   r%   r&   rD      s   
z#DiffusionDataModule.load_state_dict)
r   r   r   r   TNFr   NN)r'   )__name__
__module____qualname____doc__strintboolr   r    r   r0   r   rB   r   r   rD   __classcell__r%   r%   r#   r&   r      sP    	
,&*r   )r9   typingr   r   r   !lightning.pytorch.utilities.typesr   megatron.corer   megatron.energonr   r   r	   r
   -nemo.collections.multimodal.data.energon.baser   r   r%   r%   r%   r&   <module>   s   