o
    }oih                     @   s  d dl Z d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZmZmZmZmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d dlmZ d dlmZ d d	l m!Z! d d
l"m#Z# d dl$m%Z%m&Z&m'Z' d dl(m)Z) d dl*m+Z+ d dl,m-Z-m.Z.m/Z/m0Z0 d dl1mZ d dl2m3Z3 zd dl4m5Z5 d dl6m7Z7m8Z8 W n e9y   d dl:m5Z5 d dl;m7Z7m8Z8 Y nw e3dddd\Z<Z=e3dddd\Z>Z?e@eAZBG dd dee+jCZDdS )    N)contextmanager)Path)AnyDictLiteralOptionalUnion)CheckpointIO)rank_zero_info)
reset_seed)ModelParallelStrategy)	TrainerFn)STEP_OUTPUT)ColwiseParallelRowwiseParallelSequenceParallel)override)io)_destroy_dist_connectionckpt_to_dircreate_checkpoint_iofsdp2_strategy_parallelize)logging)safe_import_from)distribute_tensor)	ReplicateShardztorch.distributed.fsdpMixedPrecisionPolicyz"torch.distributed._composable.fsdp)fallback_moduleCPUOffloadPolicyc                       sh  e Zd ZdZddddddddeddfdeed ef deed ef d	ee d
e	ddde	dee
eeeeef f  f fddZeede	fddZdGddZdGddZedGddZedejddf fddZdd ZedGdd Zd!efd"d#ZdHd$d%Zed&d' ZedHdefd(d)ZedHde fd*d+Z!edHdefd,d-Z"edHdefd.d/Z#e$edHd0ee	 fd1d2Z%eede&fd3d4Z'e'j(d5e&ddfd6d4Z'edefd7d8Z)ed9eee*f ddfd:d;Z+ede
ee f fd<d=Z,e	dHd>e
ee f d9eee*f d?ee  ddfd@dAZ-edBee*B de
ee f fdCdDZ.ee/j0	dIdEdFZ1  Z2S )JFSDP2StrategyziFSDP2Strategy implementing FSDP via FSDP 2.

    Notes:
    - TP + FSDP2 is currently not supported.
    auto   FNTdata_parallel_sizetensor_parallel_sizecontext_parallel_sizesequence_paralleloffload_policyr   use_hf_tp_plancustom_tp_planc                    sd  t  jd||d| || _|| _|| _d| _|| _| jdu r4tdus(J dtt	j
t	j
t	j
dd| _d| _|	| _|| _|| _|
| _d| _|durU|| _tdd dS | jr`tdd	 dS tt d
t t t t t t t tt dd	}tt tddt t ttddt ttddttdt dd}| jr|| || _td dS )ar  Initializes the FSDP2Strategy with specified parallelization settings.

        Args:
            data_parallel_size (Union[Literal["auto"], int]): Size of data parallel. Defaults to "auto".
            tensor_parallel_size (Union[Literal["auto"], int]): Size of tensor parallel. Defaults to "auto".
            context_parallel_size (optional): Number of context-parallel groups. Defaults to 1.
            sequence_parallel (bool): Whether to enable sequence parallelism when use_hf_tp_plan is False and
                custom_tp_plan is not provided. Defaults to False. Only effective when tensor_parallel_size > 1.
            data_sampler (optional): Custom data sampler to process dataloaders.
            mp_policy (optional): Mixed precision policy for parameter and operation casting.
                Defaults to:
                ```python
                MixedPrecisionPolicy(
                    param_dtype=torch.bfloat16,
                    reduce_dtype=torch.float32,
                    output_dtype=None,
                    cast_forward_inputs=True,
                )
                ```
            parallelize_fn (callable, optional): Function for parallelizing the model. Defaults to None.
            use_hf_tp_plan (bool, optional): Whether to use the huggingface TP plan. This will be used if
                custom_tp_plan is not provided. Also, sequence_parallel option will be ignored if use_hf_tp_plan
                is set to True. Defaults to True.
            custom_tp_plan (Optional[Dict[str, Any]], optional): Custom tensor parallel plan for the model.
                tensor_parallel_size need to be > 1 to use this option. If provided, it overrides the
                default tensor parallel plan. sequence_parallel option will be ignored if custom_tp_plan
                is provided.
            **kwargs: Additional arguments for base class initialization.
        )r#   r$   Nz%Expected to have MixedPrecisionPolicyT)param_dtypereduce_dtypeoutput_dtypecast_forward_inputszaYou are using a custom TP plan. Make sure it is compatible with the model. Parallelization would zYnot raise errors if the custom TP plan is not compatible. SP option will also be ignored.zZYou are using a huggingface TP plan. Make sure your model is a huggingface model. Certain zHparallelizations might not be supported. SP option will also be ignored.)input_layouts)output_layouts)	model.embed_tokenszmodel.layers.*.self_attn.q_projzmodel.layers.*.self_attn.k_projzmodel.layers.*.self_attn.v_projmodel.layers.*.self_attn.o_projzmodel.layers.*.mlp.up_projzmodel.layers.*.mlp.gate_projmodel.layers.*.mlp.down_projlm_headr"   )r.   r/   )r0   z
model.normzmodel.layers.*.input_layernormr1   z'model.layers.*.post_attention_layernormr2   r3   zaUsing default TP plan for parallelization. It is compatible with huggingface llama3-style models. )super__init___checkpoint_ior%   data_sampler
checkpoint	mp_policyHAS_MIXED_PRECISION_POLICYr   torchbfloat16storeparallelize_fnr'   r&   r(   tp_shard_planr   infor   r   r   r   r   update)selfr#   r$   r%   r&   r'   r8   checkpoint_ior:   r?   r(   r)   kwargsbase_model_tp_planbase_model_sp_plan	__class__r4   d/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/pytorch/strategies/fsdp2_strategy.pyr6   F   sn   ,




zFSDP2Strategy.__init__returnc                 C   s   dS )z"Optim state restoration is enabledTr4   rC   r4   r4   rJ   lightning_restore_optimizer   s   z)FSDP2Strategy.lightning_restore_optimizerc                 C   
   || _ dS )a  Stores a reference to the optimizer state-dict for later restoration.

        Instead of immediately restoring the optimizer's state, this method saves the checkpoint
        reference and defers the restoration until the first training step. This is necessary
        because, in NeMo 2.0, PeFT adapters are added dynamically just before the first training
        step. Attempting to restore the optimizer state-dict before the adapters are initialized
        would result in an error.

        Args:
            checkpoint (dict): A dictionary containing the trainer's checkpoint,
                            including the optimizer state-dict.
        N)r9   )rC   r9   r4   r4   rJ   load_optimizer_state_dict   s   
z'FSDP2Strategy.load_optimizer_state_dictc                 C   sR   ddl m} | jdu r't| j| jd D ]\}}|| j||i d qd| _dS dS )a  Restores the optimizer state-dict from the stored checkpoint.

        This method applies the optimizer state stored in the checkpoint to the corresponding
        optimizers. It ensures that the optimizer states are correctly restored after the
        PeFT adapters have been added in the first training step.

        If no checkpoint is stored, the method exits without performing any restoration.

        Note: This operation runs only once, as the checkpoint reference is cleared after execution.
        r   )set_optimizer_state_dictNoptimizer_states)optim_state_dictoptions)'torch.distributed.checkpoint.state_dictrP   r9   zip
optimizerslightning_module)rC   rP   	optimizer	opt_stater4   r4   rJ   _load_optimizer_state_dict   s   

z(FSDP2Strategy._load_optimizer_state_dictc                 C   s   ddl m} | j| j |   | jdkr| j| _| jdkr#| j	| _g }g }t
| j| j| jgg dD ]\}}|t| || q4|| jjt||d| _| jd  dkrc| jd jd	d
 | j| j_dS )z-setup distributed environment and device meshr   )init_device_meshr!   )data_parallelcontext_paralleltensor_parallel)device_type
mesh_shapemesh_dim_namesr]   r"   r\   r]   dp_cp)mesh_dim_nameN)torch.distributed.device_meshr[   acceleratorsetup_deviceroot_device_setup_distributed_data_parallel_size	num_nodes_tensor_parallel_sizenum_processesrU   r%   appendinttypetuple_device_meshsize_flattenrW   )rC   r[   r`   ra   dimnamer4   r4   rJ   setup_environment   s.   


zFSDP2Strategy.setup_environmenttrainerc                    sr   || _ | j| t| ddr|   | jdkr | j| j| _t| ddr5|j	j
tjkr7t | dS dS dS )zConfigures the strategy within the PyTorch Lightning trainer.

        Args:
            trainer (pl.Trainer): The PyTorch Lightning trainer instance.
        _init_model_parallelTr"   _setup_optimizersN)rx   rf   setupgetattrparallelizerj   _lightning_moduletorh   statefnr   FITTINGr5   setup_optimizers)rC   rx   rH   r4   rJ   r{   	  s   
zFSDP2Strategy.setupc                 C   sF   | j dur| j | jj| j| j| j| j| jd d| _ dS t	d dS )zApplies fully_shard on modelN)device_meshr:   r(   r@   r'   z"Called parallelize more than once.)
r?   rW   modelrr   r:   r(   r@   r'   r   warningrL   r4   r4   rJ   r}     s   

	zFSDP2Strategy.parallelizec              	   C   s  t   |   |  | _| jdurd| _| jdusJ tj s$t	dtj
 r0td dS | j }| j }| jjtjd< t| jjtjd< td| d|d	  d
|  tjj| j||| jd | jdkrptt td d| j d| dd d dS )z-Initializes process group for communications.Nzcuda:nccl,cpu:gloozOtorch.distributed is not available. Cannot initialize distributed process groupz7torch.distributed is already initialized. Exiting earlyMASTER_ADDRMASTER_PORTz'Initializing distributed: GLOBAL_RANK: z
, MEMBER: r"   /)rank
world_sizer>   ncclzd----------------------------------------------------------------------------------------------------z
distributed_backend=z5
All distributed processes registered. Starting with z processes

)r   set_world_ranks_get_process_group_backend_process_group_backendr'   cluster_environmentr<   distributedis_availableRuntimeErroris_initialized_loggerdebugglobal_rankr   main_addressosenvironstr	main_portrA   init_process_groupr>   atexitregisterr   r
   )rC   r   r   r4   r4   rJ   ri   /  s<   






 

z FSDP2Strategy._setup_distributed	step_typec                 C   s4   | ddfD ]}t | j|rt| j|  S qdS )a  Retrieves the loss reduction method for a given step type.

        Args:
            step_type (str): The type of step (e.g., "training", "validation").

        Returns:
            Callable: The loss reduction function, if defined; otherwise, None.
        _loss_reductionloss_reductionN)hasattrrW   r|   )rC   r   fn_namer4   r4   rJ   _get_loss_reductionV  s
   	z!FSDP2Strategy._get_loss_reductionc                 C   sf   | d}| j | jkr| | j | j|||}n	t| j|||}| |}|r-|||S |d|ifS )a  Executes a training, validation, or test step and applies loss reduction if available.

        Args:
            step_type (str): The step type ("training", "validation", "test", "predict").
            batch: The input batch.
            batch_idx (optional): Index of the batch.

        Returns:
            Tuple: The computed loss and a dictionary with reduced loss metrics.
        _stepavg)r   rW   _forward_redirectionr|   r   forward)rC   r   batch	batch_idxmethod_namelossr   r4   r4   rJ   _step_proxyd  s   

zFSDP2Strategy._step_proxyc                 C   s   |  S )zreturns the sharded optim state)
state_dict)rC   rX   r4   r4   rJ   optimizer_statez  s   zFSDP2Strategy.optimizer_statec                 C   s   | j dur	|   | jdusJ | jdusJ | jdkr&| jj||dd}n| j||}| jjd| jjdddd | jd| jj |S )zDefines the training step, logging relevant metrics.

        Args:
            batch: The input batch.
            batch_idx (optional): The index of the batch.

        Returns:
            STEP_OUTPUT: The loss for backpropagation.
        Nr"   T)r]   global_step)prog_barrank_zero_only
batch_sizestep)	r9   rZ   rW   r   r%   training_steplogrx   r   )rC   r   r   r   r4   r4   rJ   r     s&   

zFSDP2Strategy.training_stepc                 C   s   | j dusJ | jdusJ | j # | d||\}}|d r-| j jd|d ddd |W  d   S 1 s9w   Y  dS )zDefines the validation step, logging validation loss.

        Args:
            batch: The input batch.
            batch_idx (optional): The index of the batch.

        Returns:
            Any: The validation loss.
        N
validationr   val_lossTr"   r   r   )rW   r   precision_pluginval_step_contextr   r   rC   r   r   r   reducedr4   r4   rJ   validation_step  s   $zFSDP2Strategy.validation_stepc                 C   sx   | j dusJ | jdusJ | j  | d||\}}| j jd|d ddd |W  d   S 1 s5w   Y  dS )zDefines the test step, logging test loss.

        Args:
            batch: The input batch.
            batch_idx (optional): The index of the batch.

        Returns:
            Any: The test loss.
        Ntest	test_lossr   Tr"   r   )rW   r   r   test_step_contextr   r   r   r4   r4   rJ   	test_step  s   $zFSDP2Strategy.test_stepc                 C   s`   | j dusJ | jdusJ | j  | d||\}}|W  d   S 1 s)w   Y  dS )zRuns one predict step.

        Args:
            batch (dict): the batch to use for pred.
            batch_idx (int, optional): the batch index. Defaults to None.

        Returns:
            STEP_OUTPUT: the reduced loss.
        Npredict)rW   r   r   predict_step_contextr   r   r4   r4   rJ   predict_step  s   $zFSDP2Strategy.predict_step
empty_initc                 c   s    dV  dS )z'Context manager used for initializationNr4   )rC   r   r4   r4   rJ   tensor_init_context  s   
z!FSDP2Strategy.tensor_init_contextc                 C   s   | j du r	t | _ | j S )zVCheckpointIO getter

        Returns:
            CheckpointIO: _description_
        N)r7   r   rL   r4   r4   rJ   rD     s   
zFSDP2Strategy.checkpoint_ior   c                 C   rN   )zcCheckpointIO setter

        Args:
            io (CheckpointIO): the checkpointio to use.
        N)r7   )rC   r   r4   r4   rJ   rD     s   
c                 C   s*   t | jjjjjjjjj	| jjjj
jjj	S )zsGets the current step within an epoch.

        Returns:
            int: The step index within the epoch.
        )maxrx   fit_loop
epoch_loopautomatic_optimizationoptim_progressrX   r   current	completedmanual_optimizationoptim_step_progressrL   r4   r4   rJ   current_epoch_step  s   z FSDP2Strategy.current_epoch_stepfilepathc                 C   sJ   t |}| jr!tj|rt| dS tj|r#t| dS dS dS )zRemoves a checkpoint from the filesystem.

        Args:
            filepath (Union[str, Path]): Path to the checkpoint to be removed.
        N)	r   is_global_zeror   pathislinkunlinkexistsshutilrmtree)rC   r   ckptr4   r4   rJ   remove_checkpoint	  s   zFSDP2Strategy.remove_checkpointc           	      C   s   ddl m} | jdusJ | j }t| jdtt| jdi dd}|r2dd }tt|| }nt| }i }|D ]}|	|}||||< q<t
  |S )	z%Collects the state dict of the model.r   )to_cpuNadapter_onlyrD   Fc                 S   s   d|   v S )Nlora)lower)xr4   r4   rJ   <lambda>,  s    z;FSDP2Strategy.lightning_module_state_dict.<locals>.<lambda>)'nemo.lightning.pytorch.strategies.utilsr   rW   r   r|   r7   listfilterkeyspopdistbarrier)	rC   r   tmp_sdis_adapter_onlyname_has_loramodule_namesr   rv   paramr4   r4   rJ   lightning_module_state_dict  s$   

z)FSDP2Strategy.lightning_module_state_dictr9   storage_optionsc                 C   s    | j r| jj|||d dS dS )z`
        Unshards FSDP2 checkpoint and passes it to checkpoint_io for saving to a file.
        )r   N)r   rD   save_checkpoint)rC   r9   r   r   r4   r4   rJ   r   9  s   zFSDP2Strategy.save_checkpointcheckpoint_pathc                 C   s   | j |S )z#Loads checkpoint with checkpoint_io)rD   load_checkpoint)rC   r   r4   r4   rJ   r   D  s   zFSDP2Strategy.load_checkpointc                    s  j dkrjd  dkrfddjD }fddjD }fddjD }dd |d	  D }| D ]l\ }t fd
d|D r[t|jtddt	 fd| < q;t fdd|D ryt|jtddtddfd| < q;t fdd|D rt|jtddtddfd| < q;t|jd tddfd| < q;n)j dkrňjd  dkrŇfdd|d	  D }nfdd|d	  D }j
j||d dS )zShards a full state dictr"   r]   c                        g | ]}t  j| tr|qS r4   )
isinstancer@   r   .0krL   r4   rJ   
<listcomp>S       z7FSDP2Strategy.load_model_state_dict.<locals>.<listcomp>c                    r   r4   )r   r@   r   r   rL   r4   rJ   r   T  r   c                    r   r4   )r   r@   r   r   rL   r4   rJ   r   U  r   c                 S   s   i | ]\}}||qS r4   r4   r   r   vr4   r4   rJ   
<dictcomp>W  s    z7FSDP2Strategy.load_model_state_dict.<locals>.<dictcomp>r   c                 3       | ]	}t | V  qd S Nrematchr   r   r   r4   rJ   	<genexpr>[      z6FSDP2Strategy.load_model_state_dict.<locals>.<genexpr>r   ru   
placementsc                 3   r   r   r  r  r  r4   rJ   r  ]  r  c                 3   r   r   r  r  r  r4   rJ   r  _  r  r\   c              
      s2   i | ]\}}|t | jd  t tddfdqS )rb   r   r  r	  )r   rr   r   r   r   rL   r4   rJ   r   j  s    c              	      s.   i | ]\}}|t | jd  tddfdqS )r\   r   r  r	  )r   rr   r   r   rL   r4   rJ   r   r  s    )strictN)rl   rr   rs   r@   itemsanyr   r   r   r   rW   load_state_dict)rC   r   r  colwise_keysrowwise_keysseq_parallel_keyssharded_stater   r4   )r   rC   rJ   load_model_state_dictI  s0   "&&



z#FSDP2Strategy.load_model_state_dict)rK   Nr   )F)3__name__
__module____qualname____doc__r   r   r   ro   r   boolr   r   r   r   r   r6   propertyr   rM   rO   rZ   rw   plTrainerr{   r}   ri   r   r   r   r   r   r   r   r   r   r   r   r	   rD   setterr   r   r   r   r   r   r<   no_gradr  __classcell__r4   r4   rH   rJ   r    ?   s    m

'&

'!


 r    )Er   r   _loggingr   r  r   
contextlibr   pathlibr   typingr   r   r   r   r   lightning.pytorchpytorchr  r<   torch.distributedr   r   lightning.fabric.pluginsr	   $lightning.fabric.utilities.rank_zeror
   lightning.fabric.utilities.seedr   +lightning.pytorch.strategies.model_parallelr   PLModelParallelStrategy lightning.pytorch.trainer.statesr   !lightning.pytorch.utilities.typesr   !torch.distributed.tensor.parallelr   r   r   typing_extensionsr   nemo.lightningr   r   r   r   r   r   
nemo.utilsnemo.utils.import_utilsr   torch.distributed.tensor._apir   (torch.distributed.tensor.placement_typesr   r   ImportErrortorch.distributed._tensor.api)torch.distributed._tensor.placement_typesr   r;   r   HAS_CPU_OFFLOAD_POLICY	getLoggerr  r   IOMixinr    r4   r4   r4   rJ   <module>   sL   


