o
    }oi}                  !   @   s  d dl Z d dlZd dlZd dlZd dlmZ d dlmZ d dlm	Z	 d dl
mZmZmZmZmZmZmZmZ d dlZd dlZd dlmZmZ d dlmZ d dlmZ d d	lmZmZ d d
l m!Z! d dl"m#Z# ertd dlmZ d dl$m%Z% de&defddZ'													d6deddd e(d!e)d"e)d#e(d$ee& d%e&d&e(d'e&d(e(d)ed* d+e&d,e&d-e&dd.f d/d0Z*G d1d. d.e!Z+G d2d3 d3eZ,G d4d5 d5e,Z-dS )7    N)	lru_cache)Path)sample)TYPE_CHECKINGAnyDictListLiteralMappingOptionalUnion)DatasetDictload_dataset)TokenizerSpec)CustomRetrievalDataModule)_get_samples_mapping_JSONLMemMapDataset)Dataset)NEMO_DATASETS_CACHE)PackedSequenceSpecsnamereturnc                 C   s   t t|  }|jddd |S )zHRetrieve the root path for the dataset. Create the folder if not exists.T)parentsexist_ok)r   r   mkdir)r   output r   Z/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/llm/gpt/data/reranker.pyget_dataset_root&   s   r      FT  right   train   firstquerypos_docneg_docpath	tokenizerr   
seq_lengthadd_bosadd_eosseedindex_mapping_dirtruncation_methodmemmap_workers	data_typenum_hard_negativesnegative_sample_strategyrandomr%   question_keypos_keyneg_keyReRankerDatasetc                 K   s2   t dt| |||||||||	|
||||d|S )z.Create ReRankerDataset for reranking training.)	file_pathr*   max_seq_lengthr,   r-   r1   r.   r/   r0   r2   r3   r4   r7   r8   r9   Nr   )r:   str)r)   r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r7   r8   r9   kwargsr   r   r   create_reranker_dataset.   s&   r?   c                '   @   s   e Zd ZdZ														
				d7dededededededededededee dedee	eef  dedede
d d ed!ed"ef&d#d$Zd%d& Zd'd( Zd)d* Zd+d, Zd-d. Zd/d0 Zd1d2 Ze d3d4 Zd5d6 ZdS )8r:   a}  A dataset class for training reranking models that handles
    query-document pairs with positive and negative examples.

    This dataset processes JSONL files containing query-document triplets
    (query, positive document, negative documents) and prepares them for reranking model training.
    It supports various tokenization options, sequence length constraints,
    and negative sampling strategies.

    The dataset expects each example to contain:
    - A query/question
    - One or more positive documents (relevant to the query)
    - Multiple negative documents (irrelevant to the query)

    During processing, it:
    1. Formats each query-document pair with appropriate separators
    2. Applies tokenization with optional BOS/EOS tokens
    3. Handles sequence length constraints through truncation
    4. Samples negative examples based on the specified strategy
    5. Prepares attention masks and position IDs for model input

    The collate function combines multiple examples into batches, handling padding and attention masks
    appropriately for the reranking task.

    Args:
        file_path (str): Path to a JSONL dataset with (query,pos_doc,neg_doc) triplets.
        tokenizer (TokenizerSpec): Tokenizer for processing text.
        max_seq_length (int, optional): Maximum sequence length for each example. Defaults to 1024.
        min_seq_length (int, optional): Minimum sequence length for each example. Defaults to 1.
        add_bos (bool, optional): Whether to add beginning of sequence token. Defaults to True.
        add_eos (bool, optional): Whether to add end of sequence token. Defaults to True.
        max_num_samples (int, optional): Maximum number of samples to load. Defaults to None.
        seed (int, optional): Random seed for data shuffling. Defaults to 1234.
        index_mapping_dir (str, optional): Directory to save index mapping. Defaults to None.
        virtual_tokens (int, optional): Number of virtual tokens to add. Defaults to 0.
        memmap_workers (Optional[int], optional): Number of workers for memmap loading. Defaults to None.
        truncation_method (str, optional): Truncation method ('left' or 'right'). Defaults to 'right'.
        special_tokens (Optional[Mapping[str, str]], optional): Special tokens for formatting. Defaults to None.
        data_type (str, optional): Type of data ('train', 'query', or 'doc'). Defaults to 'train'.
        num_hard_negatives (int, optional): Number of negative examples to use. Defaults to 4.
        negative_sample_strategy (Literal["random", "first"], optional): Strategy for sampling negatives.
        Defaults to 'first'.
        question_key (str, optional): Key for question in input data. Defaults to 'question'.
        pos_key (str, optional): Key for positive document in input data. Defaults to 'pos_doc'.
        neg_key (str, optional): Key for negative documents in input data. Defaults to 'neg_doc'.
          TNr    r   r!   r#   r$   r%   questionr'   r(   r;   r*   r<   min_seq_lengthr,   r-   max_num_samplesr.   r/   virtual_tokensr1   r0   special_tokensr2   r3   r4   r5   r7   r8   r9   c                 C   s  || _ || _|| _|| _|| _|| _|| _|| _|	| _|
| _	|| _
| j jr)| j jn| j j| _|| _|dks=|dks=J d|dksI|dksIJ d|du rWdd	d
ddd| _n|| _|| _|| _|| _|| _|| _|drtd|  t|d}t|}W d   n1 sw   Y  t|ddd}|D ]}|t|d  qW d   n1 sw   Y  |dd}t|gdd|	|d| _d| _ | !  t"d| j d| j d| j d| j d| j d| j d| j d| j d dS )a~  
        file_path: Path to a JSONL dataset with (query,pos_doc,neg_doc) triplets in jsonl format.
        tokenizer: Tokenizer for the dataset. Instance of a class that inherits TokenizerSpec.
        max_seq_length (int): maximum sequence length for each dataset examples.
            Examples will either be truncated to fit this length or dropped if they cannot be truncated.
        min_seq_length (int): min length of each data example in the dataset.
            Data examples will be dropped if they do not meet the min length requirements.
        add_bos (bool): Whether to add a beginning of sentence token to each data example
        add_eos (bool): Whether to add an end of sentence token to each data example
        seed: Random seed for data shuffling.
        max_num_samples: Maximum number of samples to load.
            This can be > dataset length if you want to oversample data. If None, all samples will be loaded.
        index_mapping_dir: Directory to save the index mapping to.
            If None, will write to the same folder as the dataset.
        truncation_method: Truncation from which position. Options: ['left', 'right']
        special_tokens: special tokens for the chat prompts, a dictionary of {token_type: token}.
            Default: {
                        'system_turn_start': '<extra_id_0>',
                        'turn_start': '<extra_id_1>',
                        'label_start': '<extra_id_2>',
                        'end_of_turn': '
',
                        'end_of_name": '
'
                    }
        negative_sample_strategy: Strategy for negative samples. Options: ['random', 'first']
        leftr!   z2truncation_method must be either "left" or "right"r6   r%   z;negative_sample_strategy must be either "random" or "first"Nz<extra_id_0>z<extra_id_1>z<extra_id_2>
)system_turn_start
turn_startlabel_startend_of_turnend_of_namez.jsonz$Converting JSON file to JSONL file: r.jsonlwr   )dataset_pathsr*   header_linesr/   workersz#Creating ReRankerDataset with seed=z
,
add_bos=z
, add_eos=z,
max_seq_length=z, min_seq_length=z,
pad_token_id=z, negative_sample_strategy=z,
num_hard_negatives=.)#r*   r;   r<   rC   r,   r-   rD   r.   r/   rE   r0   pad_ideos_idpad_token_idr4   rF   r2   r3   r7   r8   r9   endswithloggingwarningopenjsonloadreplacewritedumpsr   indexed_datasetsamples_mapping_build_samples_mappingwarn)selfr;   r*   r<   rC   r,   r-   rD   r.   r/   rE   r1   r0   rF   r2   r3   r4   r7   r8   r9   fdataitemr   r   r   __init__   s   /


zReRankerDataset.__init__c                 C   sR   | j d ur$t| j| jd | j | jd d| j| jdd d| jd
| _d S d | _d S )Nr"   r   /F)
ra   data_prefix
num_epochsrD   r<   short_seq_probr.   r   binary_headr/   )	rD   r   ra   r;   r<   r.   splitr/   rb   re   r   r   r   rc      s   

z&ReRankerDataset._build_samples_mappingc                 C   s,   | j d u r
t| jS | jd usJ t| jS N)rD   lenra   rb   rq   r   r   r   __len__  s   


zReRankerDataset.__len__c              
   C   s   t |tjr
| }| jd ur*|t| jk sJ | j| \}}}t |tjr*| }|d ur8|t| jk s7J nd}|dk rGt| | }d}nd}z| j| }|rUd|d< W n typ } zt	
d| d| j  |d }~ww | |S )Nrk   r   TF__AUTOGENERATED__zError while loading example z from dataset )
isinstancenpint64rh   rb   rs   uint32ra   	ExceptionrY   errorr;   _process_example)re   idx_auto_gen_idxexampleer   r   r   __getitem__  s2   


zReRankerDataset.__getitem__c                    s  dd |  D }|j |j }|j }t|tr!|d n|}t|jks.J djdkr;t	|jd}njdkrH|dj }nt
d	j t|jks[J d
dd  j |} fdd|D }jrjjgj | }fdd|D }jrjjg| }fdd|D }|djd  }fdd|D }jr|jjg }fdd|D }|||d}|S )z
        Create an example by concatenating text and answer.
        Truncation is carried out when needed, but it is performed only on the prompt side.
        BOS, EOS, and SEP, are added if specified.
        c                 S   s   i | ]\}}||qS r   r   ).0kvr   r   r   
<dictcomp>/  s    z4ReRankerDataset._process_example.<locals>.<dictcomp>r   z$Error: not enough negative documentsr6   )r   r%   Nz"Invalid negative sample strategy: z3Error in sampling required number of hard negativesc                 S   s   d|  d| S )Nz	question:z 
 
 passage:r   )qpr   r   r   format_textA  s   z5ReRankerDataset._process_example.<locals>.format_textc                    s   g | ]}j  |qS r   )r*   text_to_ids)r   exr   rB   re   r   r   
<listcomp>E      z4ReRankerDataset._process_example.<locals>.<listcomp>c                    s    g | ]} j jg j | qS r   )r*   rV   rE   r   nrq   r   r   r   K  s     c                    s   g | ]	} j jg| qS r   )r*   bos_idr   rq   r   r   r   O      rA   c                    s   g | ]}|d  j d  qS NrA   )r<   r   rq   r   r   r   S  r   c                    s   g | ]	}| j jg qS r   )r*   rV   r   rq   r   r   r   W  r   )positive	negativesmetadata)itemsr7   r8   r9   rv   listrs   r3   r4   r   
ValueErrorr*   r   rE   rV   r,   r   r<   r-   )re   r   r   r'   r(   r   r   processed_exampler   r   r   r|   )  s>   




z ReRankerDataset._process_examplec                 C   s   t |tjrdd |D S |S )Nc                 S   s   g | ]}|  qS r   )tolist)r   rh   r   r   r   r   b  s    z7ReRankerDataset._maybe_cast_to_list.<locals>.<listcomp>)rv   rw   ndarray)re   xr   r   r   _maybe_cast_to_list`  s   z#ReRankerDataset._maybe_cast_to_listc                 C   s   || d | | S r   r   )re   r   mr   r   r   _ceil_to_neareste  s   z ReRankerDataset._ceil_to_nearestc                    sJ   |  |}| j| jdkr fdd|D }|S  fdd|D }|S )NrG   c                    s"   g | ]}g t |  | qS r   rs   r   r   
max_lengthrU   r   r   r   l     " z1ReRankerDataset._collate_item.<locals>.<listcomp>c                    s"   g | ]}|g t |   qS r   r   r   r   r   r   r   n  r   )r   rW   r0   )re   rh   r   r   r   r   _collate_itemh  s   

zReRankerDataset._collate_itemc                 C   s8   t |}| jdkrd||| d< |S d|d|< |S )ztCreate `attention_mask`.
        Args:
            input_ids: A 1D tensor that holds the indices of tokens.
        rG   rA   N)torchzerosr0   )re   r   item_lengthattention_maskr   r   r   _create_attention_mask2q  s   

z'ReRankerDataset._create_attention_mask2c           
         s   g }g }g }d |D ]@}| |d  | |d  | t|d  |d D ]}| | | t| q't t|d gdd |d D R   q
tj d  jks\J  fdd	|D }t|} fd
d	|D }t|}tj	| d}|t
||||d}	|	S )z0
        Collate query passage together
        rk   r   r   r   c                 s   s    | ]}t |V  qd S rr   r   )r   ndr   r   r   	<genexpr>  s    z-ReRankerDataset.collate_fn.<locals>.<genexpr>   c                    s   g | ]}  |qS r   )r   )r   rs   r   re   r   r   r         z.ReRankerDataset.collate_fn.<locals>.<listcomp>c                    s   g | ]}t t qS r   )r   range)r   r~   r   r   r   r     r   r   )	input_idstoken_type_idsr   r   position_ids)appendrs   maxminr<   r   r   stack
LongTensorr   
zeros_like)
re   batchr   r   lengthsrh   r   r   r   processed_batchr   r   r   
collate_fn  s4   
*

zReRankerDataset.collate_fn)r@   rA   TTNr    Nr   Nr!   Nr#   r$   r%   rB   r'   r(   )__name__
__module____qualname____doc__r=   r   intboolr   r
   r	   ri   rc   rt   r   r|   r   r   r   r   no_gradr   r   r   r   r   r   r:   V   s    2	

o7	
c                2       s   e Zd ZdZ														
										d,deeee f dee dee dee dee dee de	ded de	de	deee	  de
de
de	de	de	d e
d!e
d"ed# d$ed%ed&ed'eeeef  f. fd(d)Zed*d+ Z  ZS )-CustomReRankerDataModulea7  A data module for managing reranking datasets that handles data loading, preprocessing, and batching.

    This module extends CustomRetrievalDataModule to provide specialized functionality for reranking tasks.
    It manages the creation and organization of training, validation, and test datasets for reranking models,
    with support for automatic dataset splitting and various data loading configurations.

    The module can work with either:
    1. A single data file that will be automatically split into train/val/test sets
    2. Separate files for training, validation, and testing

    Key features:
    - Automatic dataset splitting with configurable ratios
    - Support for both JSON and JSONL file formats
    - Configurable batch sizes and data loading parameters
    - Efficient data loading with memory mapping
    - Support for packed sequence specifications
    - Customizable data keys for query and document fields

    Args:
        data_root (Union[str, List[str]]): Path(s) to the training data file(s) in JSON/JSONL format.
        val_root (Optional[str]): Path to validation data file. If None, will split from data_root.
        test_root (Optional[str]): Path to test data file. If None, will split from data_root.
        val_ratio (Optional[float]): Ratio of data to use for validation when splitting. Defaults to 0.04.
        test_ratio (Optional[float]): Ratio of data to use for testing when splitting. Defaults to 0.01.
        dataset_identifier (Optional[str]): Unique identifier for the dataset. If None, generated from data_root.
        seq_length (int): Maximum sequence length for model input. Defaults to 2048.
        tokenizer (Optional[TokenizerSpec]): Tokenizer for text processing. Defaults to None.
        micro_batch_size (int): Batch size for each training step. Defaults to 4.
        global_batch_size (int): Total batch size across all GPUs. Defaults to 8.
        rampup_batch_size (Optional[List[int]]): Batch sizes for training rampup. Defaults to None.
        force_redownload (bool): Whether to force redownload of dataset. Defaults to False.
        delete_raw (bool): Whether to delete raw data after processing. Defaults to True.
        seed (int): Random seed for reproducibility. Defaults to 1234.
        memmap_workers (int): Number of workers for memory-mapped file loading. Defaults to 1.
        num_workers (int): Number of workers for data loading. Defaults to 8.
        pin_memory (bool): Whether to pin memory for faster GPU transfer. Defaults to True.
        persistent_workers (bool): Whether to keep workers alive between epochs. Defaults to False.
        packed_sequence_specs (Optional[PackedSequenceSpecs]): Specifications for packed sequences. Defaults to None.
        query_key (str): Key for query field in data. Defaults to "question".
        pos_doc_key (str): Key for positive document field in data. Defaults to "pos_doc".
        neg_doc_key (str): Key for negative document field in data. Defaults to "neg_doc".
        dataset_kwargs (Optional[Dict[str, Any]]): Additional arguments for dataset creation. Defaults to None.
    N{Gz?{Gz?r   r$      FTr    rA   rB   r'   r(   	data_rootval_root	test_root	val_ratio
test_ratiodataset_identifierr+   r*   r   micro_batch_sizeglobal_batch_sizerampup_batch_sizeforce_redownload
delete_rawr.   r1   num_workers
pin_memorypersistent_workerspacked_sequence_specsr   	query_keypos_doc_keyneg_doc_keydataset_kwargsc                    s   |du rt t|  }|| _t jdi d|d|d|d|d|d|d|d	|d
|	d|
d|d|d|d|d|d|d|d|d|d|d|d|d| dS )a	  Custom DataModule for Finetuning reranking Dataset.

        Args:
            data_root (Union[str, List[str]]): The JSON/JSONL data file(s) used for training/validation/test.
                if val_root/test_root is not present, data_root will be split to training and val/test based on
                val_ratio/test_ratio.
            val_root (Optional[str]): The JSON/JSONL data file used for validation. If not provided, validation set
                will be split from data_root.
            test_root (Optional[str]): The JSON/JSONL data file used for test. If not provided, test set
                will be split from data_root.
            val_ratio (Optional[float]): The ratio of validation set when splitting from data_root.
            test_ratio (Optional[float]): The ratio of test set when splitting from data_root.
            dataset_identifier (str): Dataset identifier when saving the dataset to NEMO_HOME.
            seq_length (int, optional): The maximum sequence length for the input and output text. Defaults to 2048.
            tokenizer (Optional[TokenizerSpec], optional): The tokenizer to use for preprocessing the text.
                If not provided, a Megatron GPT2 BPE tokenizer will be used.
            micro_batch_size (int, optional): The micro batch size for training. Defaults to 4.
            global_batch_size (int, optional): The global batch size for training. Defaults to 8.
            rampup_batch_size (Optional[List[int]], optional): A list of batch sizes for ramping up during training.
                Defaults to None.
            seed (int, optional): The random seed for data shuffling. Defaults to 1234.
            memmap_workers (int, optional): The number of worker processes for loading data using TextMemMapDataset.
                Defaults to 1.
            num_workers (int, optional): The number of worker processes for data loading. Defaults to 8.
            pin_memory (bool, optional): Whether to pin memory during data loading for faster GPU training.
                Defaults to True.
            persistent_workers (bool, optional): Whether to keep data loading workers persistent across epochs.
                Defaults to False.
            dataset_kwargs (Optional[Dict[str, Any]], optional): Keyword arguments to pass into the GPTSFTDataset class
        Nr   r   r   r   r   r   r+   r*   r   r   r   r   r   r.   r1   r   r   r   r   r   r   r   r   r   )hashlibmd5r=   encode	hexdigestr   superri   )re   r   r   r   r   r   r   r+   r*   r   r   r   r   r   r.   r1   r   r   r   r   r   r   r   r   	__class__r   r   ri     sd   8	
z!CustomReRankerDataModule.__init__c                 K   s"   t |f| j| j| j| jd|S )N)r*   r+   r1   r.   r?   r*   r+   r1   r.   re   r)   r>   r   r   r   _create_dataset2  s   z(CustomReRankerDataModule._create_dataset)NNr   r   Nr   Nr$   r   NFTr    rA   r   TFNrB   r'   r(   N)r   r   r   r   r   r=   r   r   floatr   r   r   r   ri   r   r   __classcell__r   r   r   r   r     s    /	

Vr   c                        s   e Zd ZdZdddddddddd	d
ddi fdededed dededeee  dedededededededee	ee
f  f fddZd,ddZdd  Zd-d#ed$ed%efd&d'Zd(d) Zed*d+ Z  ZS ).SpecterReRankerDataModulea  A data module for fine-tuning on the Specter dataset.

    This class inherits from the `CustomReRankerDataModule` class and is specifically designed for fine-tuning models
    on the SPECTER Datasets. It handles data download, preprocessing, splitting, and preparing the data
    in a format suitable for training, validation, and testing.

    Args:
        force_redownload (bool, optional): Whether to force re-download the dataset even if it exists locally.
                                           Defaults to False.
        delete_raw (bool, optional): Whether to delete the raw downloaded dataset after preprocessing.
                                     Defaults to True.
        See FineTuningDataModule for the other args
    Ni   r$   r   FTr    rA   r   dataset_rootr+   r*   r   r   r   r   r   r   r.   r1   r   r   r   r   c                    sp   || _ || _td| _|	| _|   t jtdd tdd tdd ||||||	|
|||ddi|d d S )Nspecterztraining.jsonlzvalidation.jsonlz
test.jsonlr3   rA   )r   r   r   r+   r*   r   r   r   r.   r1   r   r   r   r   )r   r   r   r   r.   prepare_datar   ri   )re   r   r+   r*   r   r   r   r   r   r.   r1   r   r   r   r   r   r   r   ri   M  s0   




z"SpecterReRankerDataModule.__init__r   c                 C   s*   | j  r| jr|  }| | dS dS )z Prepare dataset for fine-tuning.N)
train_pathexistsr   _download_data_preprocess_and_split_data)re   dsetr   r   r   r   x  s   z&SpecterReRankerDataModule.prepare_datac                 C   s:   t d| jj d tddt| j| jrddS d dS )NzDownloading z...zsentence-transformers/spectertripletr   )	cache_dirdownload_mode)rY   infor   r   r   r=   r   r   rq   r   r   r   r     s   z(SpecterReRankerDataModule._download_data皙?333333?r   train_ratior   c                 C   sd  t d| jj d d| | }i }|d}|j|| | jd}|d j|||  | jd}|d |d< |d |d< |d |d< | D ]G\}	}| j|	 d	 }
|
j	d
dd"}|D ]}|
t|d |d |d gdd  q\W d   n1 s~w   Y  t |	 d|
  qF| jr| j D ]}| rt| qd	t|jvr|  qdS dS )a  Preprocesses and splits the downloaded dataset into training, validation, and test sets.

        Args:
            dset (DatasetDict): The downloaded dataset object.
            split_val_from_train (bool, optional): Whether to split the validation set from the training set.
                If False, the validation set is split from the test set. Defaults to True.
            val_proportion (float, optional): The proportion of the training or test set to be used
                for the validation split. Defaults to 0.05.
        zPreprocessing z! to jsonl format and splitting...rA   r#   )	test_sizer.   testtraining
validationrO   rP   zutf-8)encodinganchorr   negative)rB   r'   r(   rH   Nz split saved to )rY   r   r   r   gettrain_test_splitr.   r   r   r[   r_   r\   r`   r   iterdiris_dirshutilrmtreer=   r   unlink)re   r   r   r   r   save_splitsdatasetsplit_datasetsplit_dataset2
split_nameoutput_filerf   or   r   r   r   r     s@   

z4SpecterReRankerDataModule._preprocess_and_split_datac                 C   s   dS )z?No need to reconfigure trainer.limit_val_batches for finetuningNr   rq   r   r   r   reconfigure_limit_batches  s   z3SpecterReRankerDataModule.reconfigure_limit_batchesc                 K   s$   t |f| j| j| j| jdd|S )NrB   )r*   r+   r1   r.   r7   r   r   r   r   r   r     s   z)SpecterReRankerDataModule._create_dataset)r   N)r   r   )r   r   r   r   r=   r   r   r   r   r   r   ri   r   r   r   r   r   r  r   r   r   r   r   r   r   r   >  sf    
	

+	)r   )r   FTr    Nr!   r"   r#   r$   r%   r&   r'   r(   ).r   r\   rY   r  	functoolsr   pathlibr   r6   r   typingr   r   r   r   r	   r
   r   r   numpyrw   r   datasetsr   r   "nemo.collections.common.tokenizersr   'nemo.collections.llm.gpt.data.retrievalr   #nemo.collections.llm.gpt.data.utilsr   r   nemo.core.classesr   nemo.lightning.baser   -nemo.collections.llm.gpt.data.packed_sequencer   r=   r   r   r   r?   r:   r   r   r   r   r   r   <module>   s   (	

(  [ 