o
    }oi\                     @   s  d dl mZmZ d dlmZ d dlmZmZmZm	Z	m
Z
mZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ d dl m!Z! d dl"m#Z#m$Z$ d dl%m&Z& d dl'm(Z( d dl)m*Z* d dl+m,Z, d dl-m.Z. d dl/m0Z0 d dlm1Z1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z> d dl?m@Z@ d dlAmBZB d dlCmDZDmEZE d dlFmGZG erd dlHmIZI ed  ZJG d!d" d"eZKG d#d$ d$e(ZLG d%d& d&eZMe@NeGd'eGd(eKfd)d*ZOdS )+    )	ExitStackcontextmanager)	timedelta)TYPE_CHECKINGAnyCallableContextManagerDict	GeneratorIteratorListLiteralOptionalUnionN)CPUAccelerator)Accelerator)default_pg_timeout)ClusterEnvironment)CheckpointIO)	Precision)DDPStrategy)!_validate_keys_for_strict_loading)_PATH	_Stateful)LightningDataModule)_DataFetcher)_WrappingCheckpointIO)CombinedLoader)DistributedDataParallelConfig)OptimizerConfig)Tensornn)	noop_hook)Module)	Optimizer)
DataLoader)override)_strategy_lib)	to_fabric)MegatronCheckpointIO)CallbackConnectorMegatronParallel)MegatronStrategy)DataSampler)megatronpytorchc                A       s   e Zd ZdZdddddddddddddddddddddeddddddddfd	ed
edee dedee dedededededee dee ded dee	 dee
ej  dee dee dee dee deeef dee dee d ed! d"ed#ed$eej d%ed&ed'ed(ee d)ed*df@ fd+d,Zed[ fd-d.Zd/ed*efd0d1Zed2ed*efd3d4Z			5d\d6e d7e!d8ee" d9ee" d:e#d*e$fd;d<Z%ed=e$d*e$fd>d?Z&ed@e'd*e f fdAdBZ(d]dCee d*e)fdDdEZ*d@e+j'd*dfdFdGZ,e		d^dHe-dIe.eee'e$ef f dJee dKee.ee"eegef f  d*df
dLdMZ/		d_dHe-dIeee'e$e.eee'e$ef f f  dNed*e.eef fdOdPZ0e	d`d@e'dQe.eeee1f f dNed*dfdRdSZ2e3d*e4dT fdUdVZ5e6ed*efdWdXZ7e6dYdZ Z8  Z9S )aFabricMegatronStrategyz'
    Fabric strategy for Megatron.
       NFr   r.   popenTtensor_model_parallel_sizepipeline_model_parallel_size$virtual_pipeline_model_parallel_size$pipeline_model_parallel_comm_backend"microbatch_group_size_per_vp_stagecontext_parallel_sizesequence_parallelexpert_model_parallel_sizemoe_extended_tpexpert_tensor_parallel_size"encoder_tensor_model_parallel_size$encoder_pipeline_model_parallel_sizedata_samplerr-   acceleratorparallel_devicescluster_environmentcheckpoint_io	precisionmegatron_callbacksddpprocess_group_backendtimeoutstart_method)r2   spawnfork
forkserverno_ddp_communication_hookoutput_data_idxpipeline_dtypeinit_model_paralleluse_tp_pp_dp_mapping#num_distributed_optimizer_instancesnccl_communicator_config_pathkwargsreturnc                     s*  t  jd||||||||d| t | _|| _|| _|| _|| _|d ur(|n|| _|| _	|| _
|
| _|	| _|| _|| _|| _|| _|| _|| _|| _|| _|| _|| _t | _|ra| j| || _|| _t  || _|dkrxt | _d S t|tr|| _d S |dkrd | _d| _d S td| )N)r@   rA   rB   rC   rD   rG   rH   rI   r.   r/   FzInvalid DDP type:  ) super__init__r*   rE   r?   r3   r4   r6   r7   r8   r:   r<   r;   r5   r9   r=   r>   rO   _init_model_parallelrQ   rR   rS   rM   addrN   r'   enable_nvidia_optimizations_ddpr   
ddp_config
isinstance
ValueError) selfr3   r4   r5   r6   r7   r8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   rM   rN   rO   rP   rQ   rR   rS   rT   	__class__rV   T/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/fabric/strategies.pyrX   H   sd   "	


zFabricMegatronStrategy.__init__c                    sd   |    | jd usJ tj| j | j | j | jd t 	  t
j| j  t  d S )N)
world_sizeglobal_rank
local_rankparallel_config)_set_world_ranksrB   r'   init_parallel_ranksrd   re   rf   parallelismrW   _setup_distributedtorchcuda
set_devicerP   r`   ra   rV   rc   rk      s   
z)FabricMegatronStrategy._setup_distributed
datamodulec                 C   s<   |   | jst|dr|j| _| jr| j | j  |S )z)
        Process the datamodule.
        r?   )setupr?   hasattrrB   re   )r`   rp   rV   rV   rc   process_datamodule   s   z)FabricMegatronStrategy.process_datamodule
dataloaderc                 C   s:   | j r	| j |}t| jd}|t|d t| |S )z>
        Process the dataloader. Returns an iterator.
        rN   max_size_cycle)r?   transform_dataloader"_MegatronDataLoaderIterDataFetcherrN   rq   r   iter)r`   rt   outputrV   rV   rc   process_dataloader   s   z)FabricMegatronStrategy.process_dataloader      ?modeloptimizer_configno_weight_decay_condscale_lr_condlr_multc                 C   s>   t | jdr| j|}|jdusJ dtj|||||dS )z/
        Setup the Megatron optimizer.
        convert_configNz-Learning rate must be set in optimizer config)r   r   r   )rr   rD   r   lrr'   setup_megatron_optimizer)r`   r}   r~   r   r   r   rV   rV   rc   r      s   z/FabricMegatronStrategy.setup_megatron_optimizer	optimizerc                 C   s(   t | jdr| j|}| j| |S )zJPass the optimizer to the precision-plugin if needed & add it as callback.setup_optimizer)rr   
_precisionr   rE   rZ   )r`   r   rV   rV   rc   r      s   z&FabricMegatronStrategy.setup_optimizermodulec           	         s   ddl m} t|| j d}t| jdr| jj}t| jdr1| j|| | j	r1| j| j	 t|dr:|
  t|| j| jt| jt| j	|d}| jrR|  | jr\|j| j | j	sddlm} dd	lm} | }|jdurz| | jd
< t |}| jrd|_| dt! |S |S )zL
        Setup the torch module. Returns a MegatronParallel object.
        r   )get_model_configNconvert_moduler   configure_model)precision_pluginvp_sizecpur]   convert_module_fn)mpu)AppStateprocess_groupF)"megatron.core.utilsr   r'   set_model_parallel_attributesrj   rr   rD   r   r   r]   r   r+   r5   r^   r@   r   rY   rP   r?   	callbacksrZ   megatron.corer   
nemo.utilsr   model_parallel_sizeget_data_parallel_group_ddp_kwargsrW   setup_modulerM   require_backward_grad_syncregister_comm_hookr"   )	r`   r   r   r   megatron_parallelr   r   	app_statedist_data_parallelra   rV   rc   r      sF   

	
z#FabricMegatronStrategy.setup_module
empty_initc                 C   sD   | j  }|  }t }|r|td || || |S )zK
        Get the context manager used for initializing the module.
        meta)rD   module_init_contextmegatron_contextr   enter_contextrl   device)r`   r   precision_init_ctxmodule_sharded_ctxstackrV   rV   rc   r   3  s   


z*FabricMegatronStrategy.module_init_contextc                 C   s   dS )z0
        Move the module to the device.
        NrV   )r`   r   rV   rV   rc   module_to_deviceD  s   z'FabricMegatronStrategy.module_to_devicepathstatestorage_optionsfilter_dictc                 C   s(   | j ||pi d}| jj|||d dS )a  Save model, optimizer, and other state as a checkpoint file.

        Args:
            path: A path to where the file(s) should be saved
            state: A dictionary with contents to be saved. If the dict contains modules or optimizers, their
                state-dict will be retrieved and converted automatically.
            storage_options: Additional options for the ``CheckpointIO`` plugin
            filter: An optional dictionary containing filter callables that return a boolean indicating whether the
                given item should be saved (``True``) or filtered out (``False``). Each filter key should match a
                state key, where its filter will be applied to the ``state_dict`` generated.

        )filter)
checkpointr   r   N)"_convert_stateful_objects_in_staterC   save_checkpoint)r`   r   r   r   r   rV   rV   rc   r   J  s   z&FabricMegatronStrategy.save_checkpointstrictc                 C   sh  t |tr	tdtj  i }t |tr| |d< n>|r9|d  |d< d|v r8tj	|d |d dd|d< n!|
 D ]}t |trK| |d< q=t |trYtj	|dd|d< q=| jj||d}t |trq| j|||d i S t| | |d | 
 D ]/\}}||vrqt |trt |tr| j||||d q||| q||||< q|S )	z&
        Load the checkpoint.
        zIOptimizer loading is not supported, pass it as a dict including the model
state_dictr   T)
is_loading)sharded_state_dict)r   r   r   r   )r^   r$   NotImplementedErrorrl   rm   empty_cacher#   r   r'   optimizer_sharded_state_dictitemsrC   load_checkpointload_module_state_dictr   keyscopyr   popload_state_dict)r`   r   r   r   r   objr   namerV   rV   rc   r   a  sB   
	







z&FabricMegatronStrategy.load_checkpointr   c                 C   s   t j|||d dS )z-
        Load the module state dict.
        r   N)r'   load_model_state_dict)r`   r   r   r   rV   rV   rc   r     s   z-FabricMegatronStrategy.load_module_state_dict)NNNc                 #   sf    ddl m} |j  fdd}||_| jj}| jj}d| j_d| j_dV   |_|| j_|| j_dS )z/
        Context manager for Megatron.
        r   )transformer_enginec                    s    | }d|d< |S )zForces device to metar   r   rV   )crT   originalrV   rc   _get_extra_te_kwargs_meta  s   zJFabricMegatronStrategy.megatron_context.<locals>._get_extra_te_kwargs_metaFTN)megatron.core.extensionsr   _get_extra_te_kwargsrj   perform_initializationuse_cpu_initialization)r`   _ter   _orig_perform_initialization_orig_use_cpu_initializationrV   r   rc   r     s   z'FabricMegatronStrategy.megatron_contextc                 C   s4   | j du rt | _ | j S t| j trt | j _| j S )z(
        Get the checkpoint IO.
        N)_checkpoint_ior)   r^   r   rC   ro   rV   rV   rc   rC     s   

z$FabricMegatronStrategy.checkpoint_ioc                 C   s   ddl m} |di d| jd| jd| jd| jd| jd| jd	| jd
| j	d| j
d| jd| jd| jd| jd| jd| jd| jS )z-
        Get the parallelism config.
        r   )ParallelismConfigr3   r4   r6   r5   r7   r8   r9   r:   r<   r;   r=   r>   rO   rQ   rR   rS   NrV   )3nemo.lightning.pytorch.strategies.megatron_strategyr   r3   r4   r6   r5   r7   r8   r9   r:   r<   r;   r=   r>   rO   rQ   rR   rS   )r`   r   rV   rV   rc   rj     sD   	
z"FabricMegatronStrategy.parallelismrU   N)NNr|   N)NN)NT)T):__name__
__module____qualname____doc__r   intr   strboolr   r   rl   r   r   r   r   r*   r   
DDPLiteralr   r   r   dtyper   rX   r&   rk   r   rs   r%   r   r{   r+   r   r   floatr$   r   r   r#   r   r   r   r!   r   r   r	   r   r   r    r   r   r
   r   propertyrC   rj   __classcell__rV   rV   ra   rc   r0   C   sN   	

 !Y
	;"

3r0   c                	       sb   e Zd Zdddedededdf fdd	Zd fd
dZded fddZd fddZ	  Z
S )rx   Fru   argsrN   rT   rU   Nc                   s.   t  j|i | || _d | _d| _d| _d S )Nr   )rW   rX   rN   _batch
_batch_idx_dataloader_idx)r`   rN   r   rT   ra   rV   rc   rX     s
   
z+_MegatronDataLoaderIterDataFetcher.__init__c                    s"   t    tt| | jd| _| S )Nru   )rW   __iter__ry   _DataFetcherWrapperrN   iterator_wrapperro   ra   rV   rc   r     s   
z+_MegatronDataLoaderIterDataFetcher.__iter__r   c                 C   s   | j rt| jS r   )doneStopIterationr   ro   rV   rV   rc   __next__  s   z+_MegatronDataLoaderIterDataFetcher.__next__c                    s    t    d| _d| _d| _dS )z)
        Reset the data fetcher.
        Nr   )rW   resetr   r   r   ro   ra   rV   rc   r     s   

z(_MegatronDataLoaderIterDataFetcher.reset)rU   rx   r   )r   r   r   r   r   rX   r   r   r   r   r   rV   rV   ra   rc   rx     s
    $rx   c                       s|   e Zd Z	ddededdfddZedefdd	Zedefd
dZ	ede
e fddZedd Z fddZ  ZS )r   Fdata_fetcherrN   rU   Nc                 C   s   || _ || _d S r   )r   rN   )r`   r   rN   rV   rV   rc   rX      s   
z_DataFetcherWrapper.__init__c                 C      | j jS )z4
        Check if the data fetcher is done.
        )r   r   ro   rV   rV   rc   r        z_DataFetcherWrapper.donec                 C   r   )z7
        Check if the data fetcher is fetched.
        )r   fetchedro   rV   rV   rc   r     r   z_DataFetcherWrapper.fetchedc                 C   r   )z5
        Get the length of the data fetcher.
        )r   lengthro   rV   rV   rc   r     r   z_DataFetcherWrapper.lengthc                 C   r   )z&
        Get the data config.
        )r   data_configro   rV   rV   rc   r     r   z_DataFetcherWrapper.data_configc                    sJ   | j }|jrttt| \}}}||_||_||_| j	s |S |||fS r   )
r   r   r   rW   rx   r   r   r   r   rN   )r`   fetcherbatch	batch_idxdataloader_idxra   rV   rc   r   $  s   
z_DataFetcherWrapper.__next__)F)r   r   r   rx   r   rX   r   r   r   r   r   r   r   r   r   rV   rV   ra   rc   r     s$    

r   strategyrU   c                 C   s   t di d| jd| jd| jd| jd| jd| jd| jd| jd	| j	d
| j
d| jd| jd| jd| jd| jd| jd| jd| jS )z?
    Convert the Megatron strategy to the Fabric strategy.
    r3   r4   r6   r5   r7   r8   r9   r:   r<   r;   r=   r>   rO   rQ   rF   rG   rH   rI   NrV   )r0   r3   r4   r6   r5   r7   r8   r9   r:   r<   r;   r=   r>   rO   rQ   r\   rG   _timeout_start_method)r   rV   rV   rc   convert_megatron_strategy4  sJ   	
r   )P
contextlibr   r   datetimer   typingr   r   r   r   r	   r
   r   r   r   r   r   rl   lightning.fabric.acceleratorsr   )lightning.fabric.accelerators.acceleratorr   5lightning.fabric.plugins.collectives.torch_collectiver   9lightning.fabric.plugins.environments.cluster_environmentr   )lightning.fabric.plugins.io.checkpoint_ior   "lightning.fabric.plugins.precisionr   lightning.fabric.strategiesr   $lightning.fabric.strategies.strategyr    lightning.fabric.utilities.typesr   r   lightning.pytorchr    lightning.pytorch.loops.fetchersr   $lightning.pytorch.plugins.io.wrapperr   +lightning.pytorch.utilities.combined_loaderr   megatron.core.distributedr   megatron.core.optimizerr   r    r!   ;torch.distributed.algorithms.ddp_comm_hooks.debugging_hooksr"   torch.nnr#   torch.optimr$   torch.utils.datar%   typing_extensionsr&   nemo.lightningr'    nemo.lightning.fabric.conversionr(   nemo.lightning.io.plr)    nemo.lightning.megatron_parallelr*   r+   !nemo.lightning.pytorch.strategiesr,   +nemo.lightning.pytorch.plugins.data_samplerr-   r   r0   rx   r   registerr   rV   rV   rV   rc   <module>   sR   4   #5