o
    wi^R                     @   s   d Z ddlZddlZddlmZ ddlmZmZ ddlZddl	m
Z
 ddlmZ ddgZG d	d
 d
ZG dd deZG dd deZG dd deZG dd dZG dd deZeG dd deZdS )zDataloaders.    N)chain)OptionalTuple)logging)experimentalMegatronPretrainingBatchSampler%MegatronPretrainingRandomBatchSamplerc                   @   sn   e Zd ZdZ				ddedededed	ed
edee dee dee ddfddZdd Z	e
jdd ZdS )BaseMegatronSampler TNFtotal_samplesconsumed_samplesmicro_batch_sizedata_parallel_rankdata_parallel_size	drop_lastglobal_batch_sizerampup_batch_size pad_samples_to_global_batch_sizereturnc
           
      C   s   |dkrt d||dkrt d| |dkr!t d| ||kr-t d|||d urK|d u rK|||  dkrKt d| d| d| d	|	rU|d u rUt d
|| _|| _|| _|| _|| _| j| | _|| _|| _	|	| _
td| d|  d S )Nr   no sample to consume: {}2micro_batch_size size must be greater than 0, but /data parallel size must be greater than 0, but Adata_parallel_rank should be smaller than data size, but {} >= {}`global_batch_size` ()) is not divisible by `micro_batch_size () x data_parallel_size ()`zi`pad_samples_to_global_batch_size` can be `True` only when `global_batch_size` is set to an integer valuez=Instantiating MegatronPretrainingSampler with total_samples: z and consumed_samples: )RuntimeErrorformatr   r   r   r   r   $micro_batch_times_data_parallel_sizer   r   r   r   info)
selfr   r   r   r   r   r   r   r   r    r"   g/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/collections/common/data/data_samplers.py__init__$   sN   zBaseMegatronSampler.__init__c                 C   s^   | j | j }| jd ur&| jr|| j }n
|| j d | j }|| j| j  S |d | j d S )N   )r   r   r   r   r   )r!   num_available_samplesnum_global_batchesr"   r"   r#   __len__Z   s   
zBaseMegatronSampler.__len__c                 C      d S Nr"   r!   r"   r"   r#   __iter__g      zBaseMegatronSampler.__iter__)TNNF)__name__
__module____qualname____doc__intboolr   listr$   r(   abcabstractmethodr,   r"   r"   r"   r#   r	   !   s<    		

6r	   c                   @   s(   e Zd ZdZdd Zdd Zdd ZdS )	MegatronPretrainingSamplerr
   c                 C      | j | j }|| j }||fS r
   )r   r   r!   	start_idxend_idxr"   r"   r#   get_start_end_idxn      
z,MegatronPretrainingSampler.get_start_end_idxc                 C   s   t d| d dS )r
   r%   )ranger!   pad_samples_numr"   r"   r#   _get_padding_indicest   s   z/MegatronPretrainingSampler._get_padding_indicesc                 c   s    g }t | j| j}| js"| jr"t| | j }| |}t||}|D ]}|	| t|| j
krA|  \}}||| V  g }q$t|dkra| jsc| jrRJ d|  \}}||| V  d S d S d S )Nr   zDwith pad_samples_to_global_batch_size all batches should be complete)r@   r   r   r   r   lenr   rC   r   appendr   r=   )r!   batchindicesrB   pad_indicesidxr;   r<   r"   r"   r#   r,   x   s,   


z#MegatronPretrainingSampler.__iter__N)r.   r/   r0   r1   r=   rC   r,   r"   r"   r"   r#   r7   k   s
    r7   c                   @   s   e Zd ZdZdd ZdS )MegatronCorePretrainingSamplerr
   c                 C   s
   dg| S )r
   Nr"   rA   r"   r"   r#   rC      s   
z3MegatronCorePretrainingSampler._get_padding_indicesN)r.   r/   r0   r1   rC   r"   r"   r"   r#   rJ      s    rJ   c                       sl   e Zd ZdZ				ddededed	ed
ededee dee deddf fddZdd Zdd Z	  Z
S ) MegatronPretrainingRandomSamplerr
   TNFr   r   r   r   r   r   r   r   r   seedr   c
           
   
      sV   t  j||||||||d |rJ d|s| jdkrtd| j| j | _|	| _d S )Nr   r   r   r   r   r   r   r   zB`MegatronPretrainingRandomSampler` does not support sample paddingr%   a  `MegatronPretrainingRandomSampler` does not support drop_last=False when                   micro_batch_size * data_parallel_size > 1. Please reduce your MBS and data parallelism to 1                   if you want to use drop_last=False, or switch to drop_last=True to avoid this error)superr$   r   r   r   last_batch_sizerL   )
r!   r   r   r   r   r   r   r   r   rL   	__class__r"   r#   r$      (   
z)MegatronPretrainingRandomSampler.__init__c                 C   s   | j | jr| jnd }|| j|  }| jd ur2| jr || j }n
|| j d | j }|| j| j  S | jr:|| j S |d | j S )Nr   r%   )r   r   rO   r   r   r   )r!   active_total_samplesr&   r'   r"   r"   r#   r(      s   

z(MegatronPretrainingRandomSampler.__len__c           
      #   s   | j | j }| j| | _| j| }|| j dksJ | j | j | j }|| j }| j|  t	 }|
| j| j  tj||d } fdd||d  D }g }|D ]}	||	 t|| jkrp|  j| j7  _|V  g }qUt|dkr| js|V  d S d S d S )Nr   	generatorc                       g | ]} | qS r"   r"   .0xr;   r"   r#   
<listcomp>       z=MegatronPretrainingRandomSampler.__iter__.<locals>.<listcomp>)r   rO   r   epochr   r   r   r   torch	Generatormanual_seedrL   randpermtolistrE   rD   r   
r!   rS   current_epoch_samplesbucket_sizebucket_offsetg
random_idx	idx_rangerF   rI   r"   rZ   r#   r,      s.   




z)MegatronPretrainingRandomSampler.__iter__)TNFr   )r.   r/   r0   r1   r2   r3   r   r$   r(   r,   __classcell__r"   r"   rP   r#   rK      s:    		
"rK   c                   @   s   e Zd ZU dZeed< eed< eed< 	ddededed	ed
edededdfddZdeddfddZe	defddZ
e
jdeddfddZ
defddZejdd ZdS )BaseMegatronBatchSampleraH  Megatron style BatchSampler.

    Let mbs, gbs, tp, pp, and dp stand for "micro batch size", "global batch size",
    "tensor model parallel world size", "pipeline model parallel world size", and
    "data parallel world size", the number of micro batches (hereafter, nmb) is defined as
    :math:`nmb = gbs \div (mbs \times dp)`.

    See `apex/transformer/microbatches.py#L91-L98 <https://github.com/NVIDIA/apex/blob/
    44c3043685b6115e7b81b3458a6c76601b1e55b4/apex/transformer/microbatches.py#L91-L98>`_
    for the initial settings of the number of micro batches and
    `apex/transformer/microbatches.py#L160-L177 <https://github.com/NVIDIA/apex/blob/
    44c3043685b6115e7b81b3458a6c76601b1e55b4/apex/transformer/microbatches.py#L160-L177>_`.
    for warming up of global batch size.

    e.g.) `(mbs, gbs, tp, pp, dp) = (1, 16, 1, 1, 2)`, then the number of micro batches is
    :math:`gbs \div (mbs \times dp) = 16 \div (1 \times 2) = 8`.
    In this case, an instance of Megatron Batch Sampler on each data parallel rank is expected
    returns :math:`nmb \times mbs = 8` indices.
    _global_batch_size_num_micro_batches-_global_batch_size_on_this_data_parallel_rankFr   r   r   r   r   r   r   r   Nc	           	      C   s   |dkrt d||dkrt d| |dkr!t d| ||kr-t d|||| _|| _|| _|| _|| _|| _|| _| j| j | _	| 
| dS )aM  Constructor of Megatron-LM style Batch Sampler.

        Args:
            total_samples: The size of dataset.
            consumed_samples: The number of samples that have been used.
            micro_batch_size: The size of each micro batch.
            global_batch_size: The size of global batch.
            data_parallel_rank: The value you can obtain via
                `parallel_state.get_data_parallel_rank()` of megatron.core.
            data_parallel_size: The value you can obtain via
                `parallel_state.get_data_parallel_world_size()` of megatron.core.
        r   r   r   r   r   N)r   r   r   r   r   r   r   r   r   r   update_global_batch_size)	r!   r   r   r   r   r   r   r   r   r"   r"   r#   r$     s*   z!BaseMegatronBatchSampler.__init__new_global_batch_sizec                 C   sX   || _ | j | j dkrtd| j  d| j d| j d| j | j | _| j| j | _dS )zUpdate the global batch size.r   r   r   r   r   N)rl   r   r   r   r   rm   rn   r!   rp   r"   r"   r#   ro   6  s   
z1BaseMegatronBatchSampler.update_global_batch_sizec                 C   s   | j S r9   )rl   r+   r"   r"   r#   r   B  s   z*BaseMegatronBatchSampler.global_batch_sizec                 C   s   t d | j|d dS )r
   zF`self.update_global_batch_size(new_global_batch_size)` is recommended.)rp   N)warningswarnro   rq   r"   r"   r#   r   G  s   
c                 C   s6   | j | j| j   }| jr|| j S || j d | j S )zLength of Batch Sampler.

        ..note::
            When `rampup_batch_size` is enabled, the return value can be not exactly precise.

        r%   )r   r   r   r   )r!   r&   r"   r"   r#   r(   M  s   
z BaseMegatronBatchSampler.__len__c                 C   r)   r*   r"   r+   r"   r"   r#   r,   Z  r-   z!BaseMegatronBatchSampler.__iter__)F)r.   r/   r0   r1   r2   __annotations__r3   r$   ro   propertyr   setterr(   r5   r6   r,   r"   r"   r"   r#   rk      s>   
 

0rk   c                   @   s.   e Zd ZdZdeeef fddZdd ZdS )r   r
   r   c                 C   r8   r9   )r   rn   r:   r"   r"   r#   r=   a  r>   z1MegatronPretrainingBatchSampler.get_start_end_idxc                 #   s    g  t | j| j | jD ],} | t | jkr9 fddt | j| j| jD }t|| jks4J |V  g  qt dkrm| j	so fddt | jt | jD }| j
rh| j| j t| }|dg|  }|V  d S d S d S )Nc                       g | ]} | qS r"   r"   rX   irF   r"   r#   r[   n  s    z<MegatronPretrainingBatchSampler.__iter__.<locals>.<listcomp>r   c                    rw   r"   r"   rx   rz   r"   r#   r[   ~  r\   r?   )r@   r   r   rE   rD   rl   r   r   rn   r   r   )r!   rI   rG   num_padr"   rz   r#   r,   g  s.   

"
z(MegatronPretrainingBatchSampler.__iter__N)r.   r/   r0   r1   r   r2   r=   r,   r"   r"   r"   r#   r   ^  s    c                       sf   e Zd ZdZ		ddededededed	ed
edededdf fddZdefddZdd Z  Z	S )r   r
   Fr   r   r   r   r   r   r   r   r   rL   r   Nc
           
   
      sV   t  j||||||||d |rJ d|s| jdkrtd| j| j | _|	| _d S )NrM   zG`MegatronPretrainingRandomBatchSampler` does not support sample paddingr%   a!  `MegatronPretrainingRandomBatchSampler` does not support drop_last=False                   when micro_batch_size * data_parallel_size > 1. Please reduce your MBS and data parallelism to 1                   if you want to use drop_last=False, or switch to drop_last=True to avoid this error)rN   r$   r   r   r   rl   rO   rL   )
r!   r   r   r   r   r   r   r   r   rL   rP   r"   r#   r$     rR   z.MegatronPretrainingRandomBatchSampler.__init__c                 C   sH   | j | jr| jnd }|| j|  }| jr|| j S || j d | j S )zLength of Random Batch Sampler.

        ..note::
            When `rampup_batch_size` is enabled, the return value can be not exactly precise.

        r   r%   )r   r   rO   r   r   )r!   rS   r&   r"   r"   r#   r(     s
   
z-MegatronPretrainingRandomBatchSampler.__len__c           
      #   s   | j | j }| j| | _| j| }|| j dksJ | j | j | j }|| j }| j|  t	 }|
| j| j  tj||d } fdd||d  D }g }|D ]}	||	 t|| jkrp|  j| j7  _|V  g }qUt|dkr| js|V  d S d S d S )Nr   rT   c                    rV   r"   r"   rW   rZ   r"   r#   r[     r\   zBMegatronPretrainingRandomBatchSampler.__iter__.<locals>.<listcomp>)r   rO   r   r]   r   r   r   r   r^   r_   r`   rL   ra   rb   rE   rD   rn   rl   r   rc   r"   rZ   r#   r,     s.   




z.MegatronPretrainingRandomBatchSampler.__iter__)Fr   )
r.   r/   r0   r1   r2   r3   r$   r(   r,   rj   r"   r"   rP   r#   r     s6    	
")r1   r5   rr   	itertoolsr   typingr   r   r^   
nemo.utilsr   nemo.utils.decoratorsr   __all__r	   r7   rJ   rK   rk   r   r   r"   r"   r"   r#   <module>   s&   J&Tq'