o
    wi%g                  	   @   sj  d dl mZmZmZ d dlmZ d dlmZmZm	Z	m
Z
mZmZmZmZmZmZmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlm Z  d dl!m"Z" d dl#m$Z$m%Z% d dl&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z- zd dl.m/Z/ d dl0m1Z1 dZ2W n e3e4fy   e5Z/e5Z1dZ2Y nw d dlm6Z6m7Z7 d dl8m9Z9 d dl:m;Z; d dl<m=Z= d dl>m?Z? d dl@mAZA d dlBmCZC d dlDmEZE d dlFmGZG d dlHmIZImJZJ d d lKmLZLmMZM d d!lNmOZO d d"lPmQZQ d d#lRmSZS eQd$\ZTZUer	d d%lVmWZW ed& ZXG d'd( d(e ZYG d)d* d*e)ZZG d+d, d,eZ[eG\eOd-eOd.eYfd/d0Z]dS )1    )	ExitStackcontextmanagernullcontext)	timedelta)TYPE_CHECKINGAnyCallableContextManagerDict	GeneratorIteratorListLiteralOptionalUnionN)CPUAccelerator)Accelerator)default_pg_timeout)ClusterEnvironment)CheckpointIO)	Precision)DDPStrategy)!_validate_keys_for_strict_loading)_PATH	_Stateful)LightningDataModule)_DataFetcher)_WrappingCheckpointIO)CombinedLoader)DistributedDataParallelConfig)OptimizerConfigTF)Tensornn)	noop_hook)Module)DistributedDataParallel)	Optimizer)
DataLoader)override)_strategy_lib)	to_fabric)MegatronCheckpointIOckpt_to_weights_subdir)CallbackConnectorMegatronParallel)MegatronStrategy)safe_import)unwrap_modelzmodelopt.torch.opt)DataSampler)megatronpytorchc                A       s  e Zd ZdZdddddddddddddddddddddeddddddddfd	ed
edee dedee dedededededee dee ded dee	 dee
ej  dee dee dee dee deeef dee dee d ed! d"ed#ed$eej d%ed&ed'ed(ee d)ed*df@ fd+d,Zed_ fd-d.Zd/ed*efd0d1Zed2ed*efd3d4Z			5d`d6e d7e!d8ee" d9ee" d:e#d*e$fd;d<Z%ed=e$d*e$fd>d?Z&ed@e'd*e f fdAdBZ(dadCee d*e)fdDdEZ*d@e+j'd*dfdFdGZ,e		dbdHe-dIe.eee'e$ef f dJee dKee.ee"eegef f  d*df
dLdMZ/		dcdHe-dIeee'e$e.eee'e$ef f f  dNed*e.eef fdOdPZ0e	ddd@e'dQe.eeee1f f dNed*dfdRdSZ2e3dTdU Z4e5d*e6dV fdWdXZ7e3ed*efdYdZZ8e3d*efd[d\Z9e3d]d^ Z:  Z;S )eFabricMegatronStrategyz'
    Fabric strategy for Megatron.
       NFr   r3   popenTtensor_model_parallel_sizepipeline_model_parallel_size$virtual_pipeline_model_parallel_size$pipeline_model_parallel_comm_backend"microbatch_group_size_per_vp_stagecontext_parallel_sizesequence_parallelexpert_model_parallel_sizemoe_extended_tpexpert_tensor_parallel_size"encoder_tensor_model_parallel_size$encoder_pipeline_model_parallel_sizedata_samplerr2   acceleratorparallel_devicescluster_environmentcheckpoint_io	precisionmegatron_callbacksddpprocess_group_backendtimeoutstart_method)r7   spawnfork
forkserverno_ddp_communication_hookoutput_data_idxpipeline_dtypeinit_model_paralleluse_tp_pp_dp_mapping#num_distributed_optimizer_instancesnccl_communicator_config_pathkwargsreturnc                     s*  t  jd||||||||d| t | _|| _|| _|| _|| _|d ur(|n|| _|| _	|| _
|
| _|	| _|| _|| _|| _|| _|| _|| _|| _|| _|| _|| _t | _|ra| j| || _|| _t  || _|dkrxt | _d S t|tr|| _d S |dkrd | _d| _d S td| )N)rE   rF   rG   rH   rI   rL   rM   rN   r3   r4   FzInvalid DDP type:  ) super__init__r-   rJ   rD   r8   r9   r;   r<   r=   r?   rA   r@   r:   r>   rB   rC   rT   _init_model_parallelrV   rW   rX   rR   addrS   r)   enable_nvidia_optimizations_ddpr   
ddp_config
isinstance
ValueError) selfr8   r9   r:   r;   r<   r=   r>   r?   r@   rA   rB   rC   rD   rE   rF   rG   rH   rI   rJ   rK   rL   rM   rN   rR   rS   rT   rU   rV   rW   rX   rY   	__class__r[   ]/home/ubuntu/sommelier/.venv/lib/python3.10/site-packages/nemo/lightning/fabric/strategies.pyr]   X   sd   "	


zFabricMegatronStrategy.__init__c                    sd   |    | jd usJ tj| j | j | j | jd t 	  t
j| j  t  d S )N)
world_sizeglobal_rank
local_rankparallel_config)_set_world_ranksrG   r)   init_parallel_ranksri   rj   rk   parallelismr\   _setup_distributedtorchcuda
set_devicerU   re   rf   r[   rh   rp      s   
z)FabricMegatronStrategy._setup_distributed
datamodulec                 C   s<   |   | jst|dr|j| _| jr| j | j  |S )z)
        Process the datamodule.
        rD   )setuprD   hasattrrG   rj   )re   ru   r[   r[   rh   process_datamodule   s   z)FabricMegatronStrategy.process_datamodule
dataloaderc                 C   s:   | j r	| j |}t| jd}|t|d t| |S )z>
        Process the dataloader. Returns an iterator.
        rS   max_size_cycle)rD   transform_dataloader"_MegatronDataLoaderIterDataFetcherrS   rv   r   iter)re   ry   outputr[   r[   rh   process_dataloader   s   z)FabricMegatronStrategy.process_dataloader      ?modeloptimizer_configno_weight_decay_condscale_lr_condlr_multc                 C   s>   t | jdr| j|}|jdusJ dtj|||||dS )z/
        Setup the Megatron optimizer.
        convert_configNz-Learning rate must be set in optimizer config)r   r   r   )rw   rI   r   lrr)   setup_megatron_optimizer)re   r   r   r   r   r   r[   r[   rh   r      s   z/FabricMegatronStrategy.setup_megatron_optimizer	optimizerc                 C   s(   t | jdr| j|}| j| |S )zJPass the optimizer to the precision-plugin if needed & add it as callback.setup_optimizer)rw   
_precisionr   rJ   r_   )re   r   r[   r[   rh   r      s   z&FabricMegatronStrategy.setup_optimizermodulec           	         s   ddl m} t|| j d}t| jdr| jj}t| jdr1| j|| | j	r1| j| j	 t|dr:|
  t|| j| jt| jt| j	|d}| jrR|  | jr\|j| j | j	sddlm} dd	lm} | }|jdurz| | jd
< t |}| jrd|_| dt! |S |S )zL
        Setup the torch module. Returns a MegatronParallel object.
        r   )get_model_configNconvert_moduler   configure_model)precision_pluginvp_sizecpurb   convert_module_fn)mpu)AppStateprocess_groupF)"megatron.core.utilsr   r)   set_model_parallel_attributesro   rw   rI   r   r   rb   r   r.   r:   rc   rE   r   r^   rU   rD   	callbacksr_   megatron.corer   
nemo.utilsr   model_parallel_sizeget_data_parallel_group_ddp_kwargsr\   setup_modulerR   require_backward_grad_syncregister_comm_hookr#   )	re   r   r   r   megatron_parallelr   r   	app_statedist_data_parallelrf   r[   rh   r     sF   

	
z#FabricMegatronStrategy.setup_module
empty_initc                 C   sD   | j  }|  }t }|r|td || || |S )zK
        Get the context manager used for initializing the module.
        meta)rI   module_init_contextmegatron_contextr   enter_contextrq   device)re   r   precision_init_ctxmodule_sharded_ctxstackr[   r[   rh   r   C  s   


z*FabricMegatronStrategy.module_init_contextc                 C   s   dS )z0
        Move the module to the device.
        Nr[   )re   r   r[   r[   rh   module_to_deviceT  s   z'FabricMegatronStrategy.module_to_devicepathstatestorage_optionsfilter_dictc                 C   s:   |si }| j |d< | j||pi d}| jj|||d dS )a  Save model, optimizer, and other state as a checkpoint file.

        Args:
            path: A path to where the file(s) should be saved
            state: A dictionary with contents to be saved. If the dict contains modules or optimizers, their
                state-dict will be retrieved and converted automatically.
            storage_options: Additional options for the ``CheckpointIO`` plugin
            filter: An optional dictionary containing filter callables that return a boolean indicating whether the
                given item should be saved (``True``) or filtered out (``False``). Each filter key should match a
                state key, where its filter will be applied to the ``state_dict`` generated.

        content_metadata)filter)
checkpointr   r   N)sharded_state_dict_metadata"_convert_stateful_objects_in_staterH   save_checkpoint)re   r   r   r   r   r[   r[   rh   r   Z  s
   
z&FabricMegatronStrategy.save_checkpointstrictc                 C   s8  t |tr	tdt|d }ddlm} trSt ||rS|j}t|dr(|	 nt
  tjj|gt|dddd	 W d
   n1 sDw   Y  tj|rStd tj  | j|}i }t |trn|j|d|d< nR|rt |d tr~|d j|d< |d j|d|d< d|v rtj|d |d d|d|d< n$| D ]}	t |	tr|	j|d|d< qt |	trtj|	d|d|d< q| jj||d}
t |tr| j ||
|d i S t!|" |
" |d |#  D ]1\}}	||
vrqt |	t$rt |	tr	| j |	|
%||d q|	&|
%| q|
%|||< q|
S )z&
        Load the checkpoint.
        zIOptimizer loading is not supported, pass it as a dict including the model
state_dictr   )Llama4OmniBaseModelhide_teacher_modelF)	is_savingzmodule.language_model.)prefixNz/Restored Model-Optimizer state from checkpoint.)metadatar   T)
is_loadingr   )sharded_state_dict)r   r   r   r   )'rc   r&   NotImplementedErrorr1   &nemo.collections.vlm.llama4.model.baser   HAVE_MODELOPTlanguage_modelrw   r   r   mtopluginsrestore_sharded_modelopt_stater,   ModeloptStateManageris_convertedprintrq   rr   empty_cacheunwrapped_checkpoint_ioload_content_metadatar$   r   r%   r   r)   optimizer_sharded_state_dictitemsrH   load_checkpointload_module_state_dictr   keyscopyr   popload_state_dict)re   r   r   r   unwrapped_modelr   
core_modelsharded_sd_metadatar   objr   namer[   r[   rh   r   t  sh   
	






z&FabricMegatronStrategy.load_checkpointr   c                 C   s   t j|||d dS )z-
        Load the module state dict.
        r   N)r)   load_model_state_dict)re   r   r   r   r[   r[   rh   r     s   z-FabricMegatronStrategy.load_module_state_dictc                 C   s$   i }t | jtr| jjrd|d< |S )zGMetadata used for sharded_state_dict generation during checkpoint save.fully_sharded_model_spacedistrib_optim_sharding_type)rc   rb   r   use_distributed_optimizer)re   r   r[   r[   rh   r     s   z2FabricMegatronStrategy.sharded_state_dict_metadata)NNNc                 #   sf    ddl m} |j  fdd}||_| jj}| jj}d| j_d| j_dV   |_|| j_|| j_dS )z/
        Context manager for Megatron.
        r   )transformer_enginec                    s    | }d|d< |S )zForces device to metar   r   r[   )crY   originalr[   rh   _get_extra_te_kwargs_meta  s   zJFabricMegatronStrategy.megatron_context.<locals>._get_extra_te_kwargs_metaFTN)megatron.core.extensionsr   _get_extra_te_kwargsro   perform_initializationuse_cpu_initialization)re   _ter   _orig_perform_initialization_orig_use_cpu_initializationr[   r   rh   r     s   z'FabricMegatronStrategy.megatron_contextc                 C   s4   | j du rt | _ | j S t| j trt | j _| j S )z(
        Get the checkpoint IO.
        N)_checkpoint_ior+   rc   r   rH   rt   r[   r[   rh   rH     s   

z$FabricMegatronStrategy.checkpoint_ioc                 C   s$   | j }t|tr|j }t|ts|S )z*Unwraps `checkpoint_io` from all wrappers.)rH   rc   r   )re   rH   r[   r[   rh   r     s
   

z.FabricMegatronStrategy.unwrapped_checkpoint_ioc                 C   s   ddl m} |di d| jd| jd| jd| jd| jd| jd	| jd
| j	d| j
d| jd| jd| jd| jd| jd| jd| jS )z-
        Get the parallelism config.
        r   )ParallelismConfigr8   r9   r;   r:   r<   r=   r>   r?   rA   r@   rB   rC   rT   rV   rW   rX   Nr[   )3nemo.lightning.pytorch.strategies.megatron_strategyr   r8   r9   r;   r:   r<   r=   r>   r?   rA   r@   rB   rC   rT   rV   rW   rX   )re   r   r[   r[   rh   ro     sD   	
z"FabricMegatronStrategy.parallelismrZ   N)NNr   N)NN)NT)T)<__name__
__module____qualname____doc__r   intr   strboolr   r   rq   r   r   r   r   r-   r   
DDPLiteralr   r   r   dtyper   r]   r(   rp   r   rx   r'   r   r   r.   r    r   floatr&   r   r   r$   r   r	   r   r"   r   r   r
   r   r   r!   r   propertyr   r   r   r   rH   r   ro   __classcell__r[   r[   rf   rh   r5   S   sV   	

 !Y
	;"

H
r5   c                	       sb   e Zd Zdddedededdf fdd	Zd fd
dZded fddZd fddZ	  Z
S )r}   Frz   argsrS   rY   rZ   Nc                   s.   t  j|i | || _d | _d| _d| _d S )Nr   )r\   r]   rS   _batch
_batch_idx_dataloader_idx)re   rS   r   rY   rf   r[   rh   r]     s
   
z+_MegatronDataLoaderIterDataFetcher.__init__c                    s"   t    tt| | jd| _| S )Nrz   )r\   __iter__r~   _DataFetcherWrapperrS   iterator_wrapperrt   rf   r[   rh   r  #  s   
z+_MegatronDataLoaderIterDataFetcher.__iter__r  c                 C   s   | j rt| jS r   )doneStopIterationr  rt   r[   r[   rh   __next__(  s   z+_MegatronDataLoaderIterDataFetcher.__next__c                    s    t    d| _d| _d| _dS )z)
        Reset the data fetcher.
        Nr   )r\   resetr   r  r  rt   rf   r[   rh   r	  -  s   

z(_MegatronDataLoaderIterDataFetcher.reset)rZ   r}   r   )r   r   r   r   r   r]   r  r   r  r	  r   r[   r[   rf   rh   r}     s
    $r}   c                       s|   e Zd Z	ddededdfddZedefdd	Zedefd
dZ	ede
e fddZedd Z fddZ  ZS )r  Fdata_fetcherrS   rZ   Nc                 C   s   || _ || _d S r   )r
  rS   )re   r
  rS   r[   r[   rh   r]   8  s   
z_DataFetcherWrapper.__init__c                 C      | j jS )z4
        Check if the data fetcher is done.
        )r
  r  rt   r[   r[   rh   r  @     z_DataFetcherWrapper.donec                 C   r  )z7
        Check if the data fetcher is fetched.
        )r
  fetchedrt   r[   r[   rh   r  G  r  z_DataFetcherWrapper.fetchedc                 C   r  )z5
        Get the length of the data fetcher.
        )r
  lengthrt   r[   r[   rh   r  N  r  z_DataFetcherWrapper.lengthc                 C   r  )z&
        Get the data config.
        )r
  data_configrt   r[   r[   rh   r  U  r  z_DataFetcherWrapper.data_configc                    sJ   | j }|jrttt| \}}}||_||_||_| j	s |S |||fS r   )
r
  r  r  r\   r}   r  r   r  r  rS   )re   fetcherbatch	batch_idxdataloader_idxrf   r[   rh   r  \  s   
z_DataFetcherWrapper.__next__)F)r   r   r   r}   r   r]   r   r  r   r  r   r  r  r  r   r[   r[   rf   rh   r  7  s$    

r  strategyrZ   c                 C   s   t di d| jd| jd| jd| jd| jd| jd| jd| jd	| j	d
| j
d| jd| jd| jd| jd| jd| jd| jd| jS )z?
    Convert the Megatron strategy to the Fabric strategy.
    r8   r9   r;   r:   r<   r=   r>   r?   rA   r@   rB   rC   rT   rV   rK   rL   rM   rN   Nr[   )r5   r8   r9   r;   r:   r<   r=   r>   r?   rA   r@   rB   rC   rT   rV   ra   rL   _timeout_start_method)r  r[   r[   rh   convert_megatron_strategyl  sJ   	
r  )^
contextlibr   r   r   datetimer   typingr   r   r   r	   r
   r   r   r   r   r   r   rq   lightning.fabric.acceleratorsr   )lightning.fabric.accelerators.acceleratorr   5lightning.fabric.plugins.collectives.torch_collectiver   9lightning.fabric.plugins.environments.cluster_environmentr   )lightning.fabric.plugins.io.checkpoint_ior   "lightning.fabric.plugins.precisionr   lightning.fabric.strategiesr   $lightning.fabric.strategies.strategyr    lightning.fabric.utilities.typesr   r   lightning.pytorchr    lightning.pytorch.loops.fetchersr   $lightning.pytorch.plugins.io.wrapperr   +lightning.pytorch.utilities.combined_loaderr   megatron.core.distributedr   megatron.core.optimizerr    HAVE_MEGATRON_COREImportErrorModuleNotFoundErrorobjectr!   r"   ;torch.distributed.algorithms.ddp_comm_hooks.debugging_hooksr#   torch.nnr$   torch.nn.parallelr%   torch.optimr&   torch.utils.datar'   typing_extensionsr(   nemo.lightningr)    nemo.lightning.fabric.conversionr*   nemo.lightning.io.plr+   r,    nemo.lightning.megatron_parallelr-   r.   !nemo.lightning.pytorch.strategiesr/   nemo.utils.import_utilsr0   nemo.utils.model_utilsr1   r   r   +nemo.lightning.pytorch.plugins.data_samplerr2   r   r5   r}   r  registerr  r[   r[   r[   rh   <module>   sh   4   K5