o
    wi                     @   s  d dl Z d dlZd dlZd dlZd dlmZmZmZmZm	Z	m
Z
 er#	 d dlZd dlmZ d dlZd dlm  mZ d dlmZmZ d dlmZmZ d dlmZ d dlmZmZ d dlm Z m!Z! d dl"m#Z# d d	l$m%Z% d d
l&m'Z( d dl)m*Z*m+Z+m,Z,m-Z-m.Z. d dl/m0Z0 d%ddZ1d&ddZ2d'ddZ3de4de4de4fddZ5dd Z6dd Z7G dd  d eZ8G d!d" d"e8Z9G d#d$ d$ej:Z;dS )(    N)TYPE_CHECKINGAnyDictListOptionalSequence)EVAL_DATALOADERSTRAIN_DATALOADERS)fetch_imagefetch_video)data)
DataLoaderDataset)CLIPImageProcessorQwen2VLImageProcessor)get_ltor_masks_and_position_ids)Qwen2VLDataConfig)conv_templates)IGNORE_INDEXIMAGE_TOKEN_INDEXSPECIAL_TOKEN_MAPVIDEO_TOKEN_INDEXVISION_END_TOKEN_INDEX)MegatronDataSamplerqwen2-vlc           
         s   t  ts	J d|d ur |d dd}|d }ni }d }|d ur d |dd}|d }|dkrt |ttfrB j| gt| }	n-t|drYt|t|krY fdd	|D }	ntd
t|drdt|n| dt| dtj	|	|d j
|d jd}	|d|	i ni }d }||||dS )Nz+processor needs to be Qwen2VLImageProcessorpt)imagesvideosreturn_tensorsimage_grid_thwvideo_grid_thw	qwen25-vl__len__c                    s   g | ]} j | qS  )temporal_patch_size).0tmp	processorr#   h/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/collections/vlm/qwen2vl/data/preloaded.py
<listcomp>A       z"process_vision.<locals>.<listcomp>zThe length of fps (z1) must be equal to the length of video_grid_thw (z#) or fps should be a single number.pixel_values_videosdtypedevicesecond_per_grid_ts)image_inputsr   video_inputsr    )
isinstancer   intfloatr$   lenhasattr
ValueErrortorchtensorr.   r/   update)
r(   r   r   fpsmodel_versionr1   r   videos_inputsr    r0   r#   r'   r)   process_vision0   sB   
r?      c                 C   s  |d dur|d j dd|d  }nd}|d dur(|d j dd|d  }nd}|du r0t}dd	 |D }d
ddd | D  d }t|| }	g }
d}d}|	D ]R}||v rp|dkrp|
|| g||   |d7 }qU||v r|dkr|
|| g||   |d7 }qU||v r|
||  qUt|dkr||dd}|
|j	 qU|d du s|t|d ksJ d|dt|d |d du s|t|d ksJ d|dt|d |
S )a  
    Tokenizes a given prompt with special handling for multiple special tokens.

    This function splits the prompt at special tokens, tokenizes each chunk separately,
    and then reassembles the chunks with the corresponding special token inserted in place of the placeholders.

    Parameters:
    prompt (str): The input prompt containing text and special token placeholders.
    tokenizer: The tokenizer object used to tokenize the prompt chunks.
    special_token_map (list, optional): A list containing tuples of special token strings
                                        and their corresponding token indices. Defaults to SPECIAL_TOKEN_MAP.

    Returns:
    torch.Tensor: A tensor of token IDs representing the tokenized prompt with special tokens.
    r   N   dimr@   r   r    c                 S   s   i | ]\}}||qS r#   r#   r%   tokenindexr#   r#   r)   
<dictcomp>w   r+   z*tokenize_special_token.<locals>.<dictcomp>(|c                 s   s    | ]}t |V  qd S N)reescape)r%   rE   r#   r#   r)   	<genexpr>z   s    z)tokenize_special_token.<locals>.<genexpr>)z<|image_pad|>z<|video_pad|>Fadd_special_tokenszimage_index=z* != len(vision_tensors['image_grid_thw'])=zvideo_index=z* != len(vision_tensors['video_grid_thw'])=)
prodr   joinkeysrK   splitextendappendr6   	input_ids)prompt	tokenizervision_tensorsmerge_lengthspecial_token_mapimage_token_lengthvideo_token_lengthspecial_token_dictregex_patternchunkstokenized_chunksimage_indexvideo_indexchunktokenized_chunkr#   r#   r)   tokenize_special_tokenY   sL    

rg   Fc                    s|   t }t }t||| d D ]* t fddt|D }t|s3|r;t|dd  r;  | f  S qdS )NrA   c                    s    g | ]} |  | kqS r#   r#   )r%   jipatterntemplater#   r)   r*      s     z(find_pattern_indices.<locals>.<listcomp>)rm   )r6   ranger9   r:   all)rl   rk   search_start_indexallow_first_token_mismatchtemplate_lenpattern_lenmatchr#   ri   r)   find_pattern_indices   s     ru   
source_len
target_len
cutoff_lenc                 C   s   |d |k r	|}n| d |k r||  }n
t ||| |   }t||}t|| d}t|| }|dkr?|dkr?|d7 }|d8 }||fS )zO
    Computes the real sequence length after truncation by the cutoff_len.
    r@   r   rA   )r4   minmax)rv   rw   rx   max_target_lennew_target_lenmax_source_lennew_source_lenr#   r#   r)   infer_seqlen   s   


r   c                 C   s  g }d}d}d}d}d}d}d}	d}
|
t |k r|
d t |k rO||
 dkrO||
d  dkrO||
d  dkrO|	dkrF|
}	|
dkrCd}n|
}n|
}d	}|
d
7 }
q|r|
d t |k r||
 dkr||
d  dkr||
d  dkr||
d
  dkr||
d  dkr|
d }|
d }d}d	}|
d7 }
q|r||
 dkr|
d t |k r||
d  dkr|
d }d}|dkr|dkr|dkr|dkr| ||d  }| ||d  }|||g d}d}d}d}|
d7 }
q|
d7 }
|
t |k st | tdd |D ksJ dt |  dtdd |D  |S )a  
    Extract user-assistant dialogue pairs from a sequence of tokens.

    Args:
        tokens: List of token ids
        decoded_tokens: List of decoded tokens (strings)

    Returns:
        List of dialogue pairs, where each pair is [user_tokens, assistant_tokens]
    Frm   r   r@   z<|im_start|>rA   user
T      z
<|im_end|>	assistant   c                 s   (    | ]}t |d  t |d  V  qdS r   rA   Nr6   r%   pairr#   r#   r)   rM     s    
z)extract_dialogue_pairs.<locals>.<genexpr>zTokens length mismatch:  != c                 s   r   r   r   r   r#   r#   r)   rM     s   & )r6   rV   sum)tokensdecoded_tokensdialogue_pairsin_userin_assistantuser_start_idxuser_end_idxassistant_start_idxassistant_end_idxfirst_user_startrj   user_tokensassistant_tokensr#   r#   r)   extract_dialogue_pairs   s|    B r   c                 C   s  t dd | D }dd tD }g }| D ]}|tks|tkr(||t|  q|||g qt |t | ksHJ dt | dt |  g }g }	|dd }
t| |}d}t|D ]l\}\}}||kri na|
dt |t |  }|
t |t | d }
|dt | }|t |d }t	t |t ||| \}}|d| }|d| }||| 7 }|d| }|d| }|	|| 7 }	||| 7 }q]t d	d |D |krt
d
|dtj|tjdtj|	tjdfS )ztruncate tokensc                 S      g | ]}|t kr|qS r#   r   r%   rj   r#   r#   r)   r*         z#truncate_tokens.<locals>.<listcomp>c                 S   s   i | ]\}}||qS r#   r#   rD   r#   r#   r)   rG     r+   z#truncate_tokens.<locals>.<dictcomp>z Decoded tokens length mismatch: r   Nr   c                 S   r   r#   r   r   r#   r#   r)   r*   B  r   z|Image/video tokens was truncated. This will cause training to fail. Please increase max_sequence_length max_sequence_length=z4 to accommodate the full image/video token sequence.r.   )r6   r   r   r   rV   r4   decoder   	enumerater   r8   r9   r:   long)r   labelsmax_sequence_lengthrY   vision_token_numspecial_index_mapr   _idtruncated_tokenstruncated_labelsremain_labelsr   total_lengthn
source_ids
target_idscurrent_labelssource_labelstarget_labelsrv   rw   r#   r#   r)   truncate_tokens  sN   
 r   c                       sv   e Zd ZdZ	d fdd	Zdd Zdeeej	f fdd	Z
d
d Zdd Zdd ZdddZdd Zdd Z  ZS )PreloadedSupervisedDataset#Dataset for supervised fine-tuning.Nc           
         s   t    |d ur$t|d}t|}W d    n1 sw   Y  ng }td || _|| _ddl	m
}	 t| j|	rB| jj| _|| _|| _|j| _|j| _|| _t|dd | _t|dd pb| j| _|| _d S )Nrz*Formatting inputs...Skip in preloaded moder   )AutoTokenizerimage_foldervideo_folder)super__init__openjsonloadloggingwarningdata_configrY   =nemo.collections.common.tokenizers.huggingface.auto_tokenizerr   r3   image_processorsequence_lengthconv_templateimage_process_modelist_data_dictgetattrr   r   r=   )
self	data_pathr   rY   r   r=   r   filer   r   	__class__r#   r)   r   O  s*   
	


z#PreloadedSupervisedDataset.__init__c                 C   s
   t | jS rJ   )r6   r   r   r#   r#   r)   r"   r  s   
z"PreloadedSupervisedDataset.__len__returnc           	      C   s|   | j | }tt| j }| j||| jdkd}| || j| j| j	}| 
|||\}}td||d|d |d }|S )Nplain)	use_plain)r   r   r1   r2   r#   )r   copydeepcopysupported_conv_templatesr   _apply_prompt_templates_process_visionr   r   r=   _tokenize_and_labeldict)	r   rj   sourceconvchatmlrZ   r   r   	data_dictr#   r#   r)   __getitem__u  s   
z&PreloadedSupervisedDataset.__getitem__c                 C   s<   dd }| dg }| dg }|||}|||}||fS )av  
        Normalize image and video paths, converting relative paths to absolute paths.

        Args:
            source: Dictionary containing image and video paths
            image_folder: Base directory for image files
            video_folder: Base directory for video files

        Returns:
            Source dictionary with normalized image and video paths
        c                    sp   |du s| s| S t | D ])\} t tsqt fdddD s'tj r(qtjtj| | |< q| S )z(Convert relative paths to absolute pathsNc                 3   s    | ]}| v V  qd S rJ   r#   )r%   prefixpathr#   r)   rM     s    z^PreloadedSupervisedDataset._normalize_vision_paths.<locals>.normalize_paths.<locals>.<genexpr>)zhttp:zhttps:zfile:)	r   r3   stranyosr   isabsnormpathrR   )pathsbase_folderrj   r#   r   r)   normalize_paths  s   
"zKPreloadedSupervisedDataset._normalize_vision_paths.<locals>.normalize_pathsr   r   )get)r   r   r   r   r   r   r   r#   r#   r)   _normalize_vision_paths  s   

z2PreloadedSupervisedDataset._normalize_vision_pathsc           
      C   s   g }|D ]}| td|i qg }g }|D ]}td|idd\}}	| |	 | | qt|dkr5d }t|dkr=d }|||fS )NimagevideoT)return_video_sample_fpsr   )rV   r
   r   r6   )
r   r   r   r1   r   r2   video_sample_fps_listr   video_inputvideo_sample_fpsr#   r#   r)   _fetch_vision_content  s   

z0PreloadedSupervisedDataset._fetch_vision_contentc                 C   s:   |  |||\}}| ||\}}}	t| j|||	|}
|
S rJ   )r   r   r?   r   )r   r   r   r   r=   r   r   r1   r2   r   rZ   r#   r#   r)   r     s   z*PreloadedSupervisedDataset._process_visionFc                    s>   j d  j d d}|d |dddur|d  _ fdd}||}g  _tD ] \}}||d	  }| j |d
  ksFJ |  ||d  q.  }	|dg }
|dg }|	dt|
kswJ d|	ddt|
|	dt|ksJ d|	ddt|d}d}|		d|	d|}	|	S )a  
        According to https://github.com/QwenLM/Qwen2-VL#data-preparation
        [
          {
            "messages": [
              {
                "content": "<image>Who are they?",
                "role": "user"
              },
              {
                "content": "They're Kane and Gretzka from Bayern Munich.",
                "role": "assistant"
              },
              {
                "content": "What are they doing?<image>",
                "role": "user"
              },
              {
                "content": "They are celebrating on the soccer field.",
                "role": "assistant"
              }
            ],
            "images": [
              "demo_data1/1.jpg",
              "demo_data2/1.jpg"
            ]
          },
        ]
        r   rA   )r   r   messagessystemNc                    s8   t dk r| S d d  jd d d  jd iS )Nr@   r   rolerA   )r6   roles)r   r   r   r#   r)   
_fix_roles  s   (zFPreloadedSupervisedDataset._apply_prompt_templates.<locals>._fix_rolesr   r@   contentr   r   <image>zprompt.count('<image>')=z != len(images)=z<video>zprompt.count('<video>')=z != len(videos)=z+<|vision_start|><|image_pad|><|vision_end|>z+<|vision_start|><|video_pad|><|vision_end|>)
r   r   r   r   r   append_message
get_promptcountr6   replace)r   r   r   r   r   r   rh   sentencer   rX   r   r   image_blockvideo_blockr#   r   r)   r     s(   
00z2PreloadedSupervisedDataset._apply_prompt_templatesc                 C   s^  t || j|| jjd}dd tt|D }d}|j}tt|D ]F}|| d }	|	dkrgt|dd }
|| d }| jj||
d u rCdn|
 d	 d
d}t	|||\}}|dks[J d||| |||< |}q!t|d | j
krtdt|d  d| j
 d t||| j
| j\}}ntj|tjdtj|tjd}}|d d }|dd  }||fS )N)r[   c                 S   s   g | ]}t qS r#   )r   )r%   _r#   r#   r)   r*     s    zBPreloadedSupervisedDataset._tokenize_and_label.<locals>.<listcomp>r   r   stop_strrA    r   FrO   z'Not found valid answer in conversation.zcToken indices sequence length is longer than the specified maximum sequence length for this model (z > zJ). Running this sequence through the model will result in indexing errors.r   rm   )rg   rY   r   
merge_sizern   r6   r   r   encoderu   r   r   r   r   r9   r:   r   )r   r   r   rZ   r   r   rp   r   rj   r   r   answeranswer_tokensanswer_start
answer_endr#   r#   r)   r     sB   
"z.PreloadedSupervisedDataset._tokenize_and_labelc                 C   s(   t | jtr| jjd | jjd gS t)Nheightwidth)r3   r   r   	crop_sizeNotImplementedErrorr   r#   r#   r)   _get_crop_size1  s   z)PreloadedSupervisedDataset._get_crop_sizerJ   )F)__name__
__module____qualname____doc__r   r"   r   r   r9   Tensorr   r   r   r   r   r   r  __classcell__r#   r#   r   r)   r   L  s    	#*
A%r   c                       sD   e Zd ZdZ	d	 fdd	Zdee deeej	f fddZ
  ZS )
Qwen2VLDatasetr   Nc                    s0  | drt |||||| d S | drt d ||||| td |jd ur|j}t|dD ]Y}t|}	g |	d< |	d D ]A}
t	
d|
d }|D ])}|d	d
d }tj||}tj|sqtd|  qO|	d | qOt	dd|
d |
d< qC| j|	 q4d S d S td| d)Nz.jsonz.jsonlz$Loading image inputs from Dataset...r   r   conversationsz<img src="([^"]+)"valuerA   /rm   zImage not found: z<img src="([^"]+)">r   zFormatting of z is not supported in Qwen2VL.)endswithr   r   r   r   r   r   r   loadsrK   finditergrouprT   r   r   rR   isfilerV   subr   r8   )r   r   r   rY   r   r=   r   r   linerecordturnmatchesrt   
image_name
image_pathr   r#   r)   r   ;  s0   





zQwen2VLDataset.__init__	instancesr   c                 C   s  | j }d|d v }tdd |D }|d d d d }|D ]A}||d jd  }t|d d|fdd|d< t|d	 d|fdt|d	< |r_|d d
 |kr_t|d t|gfd|d< q|rtdd |D }tdd |D }|D ]C}||d jd  }	t|d d|	fd||d< |d }
||
jd  }tj	|g|
jdd R |
j
|
jd}tj|
|fdd|d< qvtdd |D tdd |D d}d|d v rtjdd |D dd|d< nd|d< d|d v rtjdd |D dd|d< nd|d< d|d v rtjdd |D dd|d< nd|d< d|d v rJtjdd |D dd|d< | jdkrEtjdd |D dd|d< nd|d< nd|d< d|d< | j}|d }|d	 }|r|d }g }|D ]*}|g  tdt|d D ]}||d  ||  }|d
 tt| qxqht|}tj| tj|jd}tj| tj|jd}nt||j|j|j|jd \}}}d!||dk < ||d"< |r||d< |S )#zR
        Collate function to bundle multiple samples into a single batch.
        
cu_seqlensr   c                 s       | ]
}|d  j d V  qdS )r   r   Nshaper%   instancer#   r#   r)   rM   l      z,Qwen2VLDataset.collate_fn.<locals>.<genexpr>rA   @   r   constantr   rm   c                 s   r%  )r$  r   Nr&  r(  r#   r#   r)   rM   x  r*  c                 s   r%  )r   r   Nr&  r(  r#   r#   r)   rM   y  r*  pixel_valuesNr-   rB   c                 S      g | ]}|d  qS )r   r#   r(  r#   r#   r)   r*         z-Qwen2VLDataset.collate_fn.<locals>.<listcomp>c                 S   r.  )r   r#   r(  r#   r#   r)   r*     r/  )rW   r   c                 S   r.  )r-  r#   r(  r#   r#   r)   r*     r/  r   c                 S   r.  )r   r#   r(  r#   r#   r)   r*     r/  r,   c                 S   r.  )r,   r#   r(  r#   r#   r)   r*     r/  r    c                 S   r.  )r    r#   r(  r#   r#   r)   r*     r/  r!   c                 S   r.  )r0   r#   r(  r#   r#   r)   r*     r/  r0   rW   )r   	eod_tokeneod_mask_lossreset_attention_maskreset_position_idsg        	loss_mask)r   rz   r'  Fpadr   r9   cat	IntTensorzerosr.   r/   stackr=   rY   rV   rn   r6   rU   list
LongTensoronessizer5   r   r   eos_token_idr1  r2  r3  )r   r#  r   packed_sequencemax_lenr)  pad_len
max_len_cumax_len_image
pad_len_cuxnum_pad
pad_tensorbatchrY   r   r   r$  position_ids	cu_seqlenindseqlenr4  attention_maskr#   r#   r)   
collate_fne  s    (


zQwen2VLDataset.collate_fnrJ   )r  r  r  r  r   r   r   r   r9   r  rO  r  r#   r#   r   r)   r  8  s
    	(*r  c                %       s   e Zd ZdZdedddddddddddddd	fd
eee B deee  dee de	dee	 dedede	de	de	de	de	de	de
de
de
de	ddf$ fddZd/deddfd d!Zdefd"d#Zdefd$d%Zdefd&d'Zdefd(d)Zdeeef fd*d+Zd,eeef ddfd-d.Z  ZS )0Qwen2VLPreloadedDataModulez!Preloaded DataModule for Qwen2VL.Ni   r      i'  TFi  r   weightsr   
seq_lengthdecoder_seq_lengthrY   r   micro_batch_sizeglobal_batch_sizenum_train_samplesnum_val_samplesnum_test_samplesnum_workers
pin_memorypersistent_workersuse_packed_sequenceseedr   c                    s   t    t|ttfs|g}|d ur%t|t|ksJ t|dkr%d }|| _|| _|| _|| _	|| _
|| _|	| _|
| _|| _|| _|| _|| _|| _|| _|| _|| _|| _|| _d| _t| j
| j|	|
dd| _d S )NrA   r   cyclic)seq_lendecoder_seq_lenrU  rV  dataloader_type)r   r   r3   r;  tupler6   r=   r   rR  r   rS  rT  rU  rV  rY   r   rW  rX  rY  rZ  r[  r\  r^  r]  init_global_stepr   data_sampler)r   r=   r   rR  r   rS  rT  rY   r   rU  rV  rW  rX  rY  rZ  r[  r\  r]  r^  r   r#   r)   r     sB   
z#Qwen2VLPreloadedDataModule.__init__r  stagec                 C   sp   t | jdksJ d| jrd S t| jd | j| j| j| j| jd| _	t| jd | j| j| j| j| jd| _
d S )NrA   z*not yet support blend dataset in Qwen 2.0!r   )r   )r6   r   r]  r  r   rY   r   r=   rS  	_train_ds_validation_ds)r   rf  r#   r#   r)   setup  s&   z Qwen2VLPreloadedDataModule.setupc                 C      |  | jS rJ   )_create_dataloaderrg  r   r#   r#   r)   train_dataloader     z+Qwen2VLPreloadedDataModule.train_dataloaderc                 C   rj  rJ   )rk  rh  r   r#   r#   r)   val_dataloader   rm  z)Qwen2VLPreloadedDataModule.val_dataloaderc                 C   rj  rJ   )rk  _test_dsr   r#   r#   r)   test_dataloader$  rm  z*Qwen2VLPreloadedDataModule.test_dataloaderc              	   K   s@   | j j| _| j| j_t|f| j| j| jt|dt	j
jd|S )NrO  )rZ  r[  r\  rO  )trainerglobal_steprd  re  r   rZ  r[  r\  r   r   
dataloaderdefault_collate)r   datasetkwargsr#   r#   r)   rk  (  s   

z-Qwen2VLPreloadedDataModule._create_dataloaderc                 C   s   | j | jj| j }d|iS )zCalled when saving a checkpoint, implement to generate and save datamodule state.

        Returns:
            A dictionary containing datamodule state.

        consumed_samples)re  compute_consumed_samplesrq  rr  rd  )r   rw  r#   r#   r)   
state_dict5  s   z%Qwen2VLPreloadedDataModule.state_dictry  c                 C   sp   zddl m} W n ty   ddlm} Y nw |d }|| j_|| j_d| _|dur6|}|j|dd dS dS )zCalled when loading a checkpoint, implement to reload datamodule state given datamodule stat

        Args:
            state_dict: the datamodule state returned by ``state_dict``.

        r   )#_GLOBAL_NUM_MICROBATCHES_CALCULATORrw  rA   NF)rw  consistency_check)	(apex.transformer.pipeline_parallel.utilsrz  ModuleNotFoundErrornemo.lightning.apex_utilsre  init_consumed_samplesprev_consumed_samplesif_first_stepr;   )r   ry  rz  rw  num_microbatch_calculatorr#   r#   r)   load_state_dict?  s    
z*Qwen2VLPreloadedDataModule.load_state_dict)r  )r  r  r  r  r   r   r   r   r5   r4   boolr   ri  r	   rl  r   rn  rp  r   rk  r   r   ry  r  r  r#   r#   r   r)   rP    s|    

	
:"
rP  )Nr   )r@   N)r   F)<r   r   r   rK   typingr   r   r   r   r   r   r   lightning.pytorchpytorchplr9   torch.nn.functionalnn
functionalr5  !lightning.pytorch.utilities.typesr   r	   qwen_vl_utilsr
   r   torch.utilsr   torch.utils.datar   r   transformersr   r   2nemo.collections.nlp.modules.common.megatron.utilsr   (nemo.collections.vlm.qwen2vl.data.configr   .nemo.collections.vlm.qwen2vl.data.conversationr   r   3nemo.collections.vlm.qwen2vl.data.multimodal_tokensr   r   r   r   r   nemo.lightning.pytorch.pluginsr   r?   rg   ru   r4   r   r   r   r   r  LightningDataModulerP  r#   r#   r#   r)   <module>   s@    

)
>c3 m 