o
    }oiN                     @   s>  d dl Z d dlZd dlZd dlmZ d dlmZmZmZ d dl	Z	d dl
mZ d dlmZmZ 	d*dd	d
edefddZ	d+dedededeee  ddf
ddZ		 					 	d,dedededeee  deded d
ededed dededefd d!ZG d"d# d#eZG d$d% d%ZG d&d' d'eZG d(d) d)eZdS )-    N)chain)ListLiteralOptional)_IndexBatchSamplerWrapper)
DataLoaderDatasetTFdatasetr   	drop_lastreturnc                 K   s&   t | fd| ji|}||_||_|S )N
collate_fn)r   r   
_drop_last!_pad_samples_to_global_batch_size)r	   r
    pad_samples_to_global_batch_sizekwargsoutput r   G/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/data.pycreate_dataloader   s   r   global_rankmicro_batch_sizeglobal_batch_sizerampup_batch_sizec              	   C   s  ddl m} ddlm} zddlm}m}m}m}	m	}
 d}W n( t
tfyF   td ddlm} ddlm}m}m}	 dd	lm}
 d
}Y nw | }tj|d dkrY|j}n| }|rddlm} |du rt|
||||j|d
d dS t||r| |ksJ | |ksJ |	 |||j  ksJ dS tdddlm} |du r|
||||j|d
d dS t||r| |ksJ | |ksJ |	 |||j  ksJ dS td)aD  
    Initializes the data for distributed training by setting up the microbatch calculator
    based on the provided global rank and data configuration.

    This function checks if the microbatch calculator has already been initialized. If it has,
    the function validates that the current configuration matches the initialized settings. If the
    calculator has not been initialized, it sets up a new one with the provided configuration.

    Args:
        global_rank (int): The global rank of the current process.
        config (DataConfig): The data configuration object containing settings for global batch size,
            micro batch size, data parallel size, and optional ramp-up batch size.

    Raises
    ------
        Exception: If the microbatch calculator has already been initialized with different settings.

    r   ).NEMO_MEGATRON_MODEL_PARALLEL_APPSTATE_OVERRIDE)AppState)!ConstantNumMicroBatchesCalculatorget_current_global_batch_sizeget_micro_batch_sizeget_num_microbatches init_num_microbatches_calculatorTzCMegatron num_microbatches_calculator not found, using Apex version.)ConstantNumMicroBatches)r   r   r   )setup_microbatch_calculatorFfalsetrue)#_GLOBAL_NUM_MICROBATCHES_CALCULATORN)rankr   r   data_parallel_sizer   decrease_batch_size_if_neededz*Microbatch calculator already initialized.)nemo.lightning._strategy_libr   
nemo.utilsr   )megatron.core.num_microbatches_calculatorr   r   r   r   r   ImportErrorModuleNotFoundErrorloggingwarningapex.transformer.microbatchesr    (apex.transformer.pipeline_parallel.utilsr!   osenvirongetlowerr   r$   r&   
isinstance	Exception)r   r   r   r   r   r   r   r   r   r   r   MCORE_MB_CALCULATOR	app_stateinit_global_rankr$   r   r   r   r!   &   s^   


	

	r!   singletrain   
dataloaderconsumed_samplesdataloader_type)r:   cyclicbatchr   dataloader_mode)r;   
validationtestpredictr%   
world_sizec                 C   s   |dkrt t| j|||||	|
||d	}n4|dkr'tt| j|||	|
|d}n"|dkrBddlm} |t| j||||	|
|| d}nt| d	|d
v rQt|}t| j|| j	| j
| j| jdS )a  
    This function takes an existing PyTorch `DataLoader` and configures it to use a Megatron sampler.
    The Megatron sampler is responsible for splitting the data into batches
    during training with Megatron.

    Args:
        dataloader (DataLoader): The original PyTorch DataLoader to wrap.
        micro_batch_size (int): The size of each micro-batch.
        global_batch_size (int): The effective size of the training batch across all data parallel devices.
        rampup_batch_size (Optional[List[int]]): A list of target batch sizes for a gradual
            rampup schedule during training (optional).
        consumed_samples (int, optional): The number of samples consumed before
            starting this iteration (defaults to 0).
        dataloader_type (Literal["single", "cyclic", "batch"], optional): The type of
            Megatron sampler to use. Valid options are:
                - "single": Uses `MegatronPretrainingSampler` for single pass data sampling.
                - "cyclic": Uses `MegatronPretrainingRandomSampler` for cyclic data sampling.
                - "batch": Uses `MegatronPretrainingBatchSampler` for batch sampling. This is the option to
                  use for fine-tuning workloads, where sequence lengths are variable between samples.
                  Sampling the entire global batch together ensures that sequences in a global batch are
                  padded to the same lengths.
            Defaults to "single".
        drop_last (bool, optional): Whether to drop the last incomplete batch
            (defaults to True).
        pad_samples_to_global_batch_size (bool, optional): Whether to pad the last incomplete
            batch to the `global_batch_size`  (defaults to False, only applies when
            `drop_last` is False).
        dataloader_mode (Literal["train", "validation", "test", "predict"]): The mode of dataloader.

    Returns:
        DataLoader: A new DataLoader instance with the configured Megatron sampler.
    r:   )	total_samplesr>   r   r   r   data_parallel_rankr&   r
   r   r@   )rG   r>   r   rH   r&   r
   rA   r   )MegatronPretrainingBatchSampler)rG   r>   r   r   rH   r&   r
   r   z" dataloader type is not supported.)rD   rE   )batch_samplernum_workers
pin_memorypersistent_workersr   )MegatronPretrainingSamplerlenr	    MegatronPretrainingRandomSamplerLnemo.collections.nlp.data.language_modeling.megatron.megatron_batch_samplersrI   r6   r   r   rK   rL   rM   r   )r=   r   r   r   r>   r?   r
   r   rB   r%   rF   rJ   rI   r   r   r   add_megatron_sampler   sX   .	rR   c                       s"   e Zd ZdZd fdd	Z  ZS )WrappedDataLoaderz@Wrapper around torch DataLoader which stores the dataloader moder;   c                    s   t  jdi | || _d S )Nr   )super__init__mode)selfrV   dataloader_kwargs	__class__r   r   rU      s   
zWrappedDataLoader.__init__)r;   )__name__
__module____qualname____doc__rU   __classcell__r   r   rY   r   rS      s    rS   c                   @   sj   e Zd Z				ddededededed	ed
ee dee dee ddfddZdd Ze	j
dd ZdS )BaseMegatronSamplerTNFrG   r>   r   rH   r&   r
   r   r   r   r   c
           
      C   s   |dkrt d| |dkrt d| |dkr!t d| ||kr/t d| d| |d urM|d u rM|||  dkrMt d| d| d	| d
|	rW|d u rWt d|| _|| _|| _|| _| j| | _|| _|| _|	| _t	
d| d|  d S )Nr   zno sample to consume: z2micro_batch_size size must be greater than 0, but z/data parallel size must be greater than 0, but z9data_parallel_rank should be smaller than data size, but z >= z`global_batch_size` (z)) is not divisible by `micro_batch_size (z) x data_parallel_size (z)`zi`pad_samples_to_global_batch_size` can be `True` only when `global_batch_size` is set to an integer valuez=Instantiating MegatronPretrainingSampler with total_samples: z and consumed_samples: )RuntimeErrorrG   r>   r   rH   $micro_batch_times_data_parallel_sizer
   r   r   r-   info
rW   rG   r>   r   rH   r&   r
   r   r   r   r   r   r   rU      sH   zBaseMegatronSampler.__init__c                 C   sX   | j d ur"| jr| j| j  }n| j| j  d | j  }|| j | j  S | jd | j d S )Nr<   )r   r
   rG   rb   )rW   num_global_batchesr   r   r   __len__(  s   
zBaseMegatronSampler.__len__c                 C   s   d S Nr   )rW   r   r   r   __iter__4  s   zBaseMegatronSampler.__iter__TNNF)r[   r\   r]   intboolr   listrU   rf   abcabstractmethodrh   r   r   r   r   r`      s:    	

3r`   c                       sh   e Zd Z				ddededededed	ed
ee dee dee f fddZdd Zdd Z	  Z
S )rN   TNFrG   r>   r   rH   r&   r
   r   r   r   c
           
         s>   t  j|||||||||	d	 ||krtd| d| d S )N)	rG   r>   r   rH   r&   r
   r   r   r   zno samples left to consume: z, )rT   rU   ra   rd   rY   r   r   rU   9  s   z#MegatronPretrainingSampler.__init__c                 C   s   | j | j }|| j }||fS rg   )rH   r   )rW   	start_idxend_idxr   r   r   get_start_end_idxS  s   
z,MegatronPretrainingSampler.get_start_end_idxc                 c   s    g }t | j| j}| js&| jr&t| | j }t d| d d}t||}|D ]}|| t|| j	krE| 
 \}}||| V  g }q(t|dkre| jsg| jrVJ d| 
 \}}||| V  d S d S d S )Nr<   r   zDwith pad_samples_to_global_batch_size all batches should be complete)ranger>   rG   r
   r   rO   r   r   appendrb   rq   )rW   rA   indicespad_samples_numpad_indicesidxro   rp   r   r   r   rh   X  s,   

z#MegatronPretrainingSampler.__iter__ri   )r[   r\   r]   rj   rk   r   rl   rU   rq   rh   r_   r   r   rY   r   rN   8  s4    	
rN   c                       sh   e Zd Z				ddedededed	ed
edee dee deddf fddZdd Zdd Z  Z	S )rP   TNFr   rG   r>   r   rH   r&   r
   r   r   seedr   c
           
   
      sV   t  j||||||||d |rJ d|s| jdkrtd| j| j | _|	| _d S )N)rG   r>   r   rH   r&   r
   r   r   zB`MegatronPretrainingRandomSampler` does not support sample paddingr<   a
  `MegatronPretrainingRandomSampler` does not support drop_last=False when micro_batch_size * data_parallel_size > 1.                   please reduce your MBS and data parallelism to 1 if you want to use drop_last=False, or switch to drop_last=True to avoid this error)rT   rU   rb   ra   rG   last_batch_sizery   )
rW   rG   r>   r   rH   r&   r
   r   r   ry   rY   r   r   rU   r  s(   
z)MegatronPretrainingRandomSampler.__init__c                 C   s   | j | jr| jnd }|| j|  }| jd ur2| jr || j }n
|| j d | j }|| j| j  S | jr:|| j S |d | j S )Nr   r<   )rG   r
   rz   r>   r   rb   )rW   active_total_samplesnum_available_samplesre   r   r   r   rf     s   

z(MegatronPretrainingRandomSampler.__len__c                 #   s   | j | j }| j| | _| j| }|| j dksJ | j| j }| j | j | j }|| }| j|  t }|	| j
| j  tj||d } fdd||d  D }g }	|D ]}
|	|
 t|	| jkru|  j| j7  _|	V  g }	qZt|	dkr| js|	V  d S d S d S )Nr   )	generatorc                    s   g | ]} | qS r   r   ).0xro   r   r   
<listcomp>  s    z=MegatronPretrainingRandomSampler.__iter__.<locals>.<listcomp>)rG   rz   r>   epochrb   r   rH   torch	Generatormanual_seedry   randpermtolistrt   rO   r
   )rW   r{   current_epoch_samplesr&   bucket_sizebucket_offsetg
random_idx	idx_rangerA   rx   r   r   r   rh     s0   



z)MegatronPretrainingRandomSampler.__iter__)TNFr   )
r[   r\   r]   rj   rk   r   rU   rf   rh   r_   r   r   rY   r   rP   q  s8    	
!rP   )TFrg   )Nr   r:   TFr;   r   r<   )rm   r-   r1   	itertoolsr   typingr   r   r   r   'lightning.pytorch.overrides.distributedr   torch.utils.datar   r   rk   r   rj   r!   rR   rS   r`   rN   rP   r   r   r   r   <module>   s   


f
	

c	D9