o
    }oih                     @   sX  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZ d dl	m
Z
mZmZmZmZmZmZmZmZmZ d dlmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZm Z m!Z! d d
l"m#Z# d dl$m%Z% d dlm&Z&m'Z' d dl(m!Z) d dl*m+Z+m,Z,m-Z- d dl.m/Z/m0Z0 d dl1m2Z2m3Z3m4Z4m5Z5 d dl6m7Z7 d dl8m9Z9m:Z: d dl;m<Z< d dl=m>Z> e>dddd\Z?Z@e>dddd\ZAZBe>dddd\ZCZDeddG dd dZEdejFjGfd d!ZHd"ejIfd#d$ZJd%ejKfd&d'ZLdYd%ejKd)eMd*eNd+dfd,d-ZOd.eePef d+efd/d0ZQdZd1d2ZR	3	d[d4eePeej& f d5eePeee! e f f d6eMd7e/d+eePee)ejSf f f
d8d9ZT	d\d;eePe
f d<ePd7e/d+eePee f fd=d>ZU			3		d]d7e/d?e?d@eMdAeeePee3e2e4f f  dBdf
dCdDZVdEdF ZWedGePfdHdIZXdJdK ZYd^dLdMZZdNe/dOeej& dPeeN dQeej& dRePf
dSdTZ[dUeMdVeMfdWdXZ\dS )_    N)	dataclass)	lru_cache)Path)
AnyDict	GeneratorIterableListOptionalSetTupleUnioncast)ClusterEnvironment)TQDMProgressBarparallel_state)ShardedBaseShardedObjectShardedTensor)&sharded_tensor_to_torch_sharded_tensor)_get_extra_state_offsets)Tensornn)r   )DTensor	ReplicateShard)
DeviceMesh_mesh_resources)ColwiseParallelRowwiseParallelSequenceParallelparallelize_module)_strategy_lib)MegatronProgressBarProgressPrinter)AsyncFinalizableCheckpointIO)safe_import_fromztorch.distributed.fsdpMixedPrecisionPolicyz"torch.distributed._composable.fsdp)fallback_modulefully_shardCPUOffloadPolicyT)kw_onlyc                   @   s>   e Zd ZU dZeed< dZeed< dZeed< dZ	eed< dS )	RestoreConfigad  
    Configuration for restoring model state from a checkpoint.

    Attributes:
        path (str): Path to the checkpoint directory.
        load_model_state (bool): Whether to load model weights.
        load_optim_state (bool): Whether to load optimizer state.
        load_artifacts (bool): Whether to load additional artifacts (e.g., tokenizer).
    pathTload_model_stateFload_optim_stateload_artifactsN)
__name__
__module____qualname____doc__str__annotations__r/   boolr0   r1    r9   r9   [/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/pytorch/strategies/utils.pyr-   5   s   
 
r-   strategyc                 C   sF   ddl m} tt| j}t| d| }t| |	 |
 | dS )z
    Sets up parallel ranks for distributed training.

    Args:
        strategy (pl.strategies.Strategy): The Lightning strategy being used for training.
    r   )ModelParallelConfigparallelismN)#megatron.core.model_parallel_configr<   r   r   cluster_environmentgetattrr#   init_parallel_ranks
world_sizeglobal_rank
local_rank)r;   r<   envr=   r9   r9   r:   setup_parallel_ranksH   s    rF   	pl_modulec                 C   sF   ddl m} ddlm} | s| }|jdur!t|  dS dS dS )z
    Initializes model parallelism for distributed training.

    Args:
        pl_module (pl.LightningModule): The PyTorch Lightning module.
    r   r   )AppStateN)megatron.corer   
nemo.utilsrH   model_parallel_is_initializedmodel_parallel_sizer#   init_model_parallel)rG   r   rH   	app_stater9   r9   r:   rM   V   s   
rM   trainerc                 C   s   t | dd}|dur&t| jdr| jjdur| jj|_n
t|dr&|j| j_| jjdur>| jj| jj  | jj|  t|drI|  dS dS )z
    Configures the data sampler for distributed training.

    Args:
        trainer (pl.Trainer): The PyTorch Lightning trainer instance.
    
datamoduleNdata_samplerreconfigure_limit_batches)	r@   hasattrr;   rQ   setupr?   rC   connectrR   )rO   rP   r9   r9   r:   setup_data_samplerh   s   


rV      replace_progress_barprogress_intervalreturnc           	      C   s   t ttj t| d}d\}}|D ]}t|trd}|jtkr!d}q|sR|rTt	|D ]+\}}t|trQ|rKt
|d}| |_| jsD|  |||<  dS t|_ dS q*dS dS dS )a>  
    Fixes or replaces the progress bar callback in the PyTorch Lightning trainer.

    Args:
        trainer (pl.Trainer): The PyTorch Lightning trainer instance.
        replace_progress_bar (bool): Whether to replace the default progress bar.
        progress_interval (int): Interval at which to log progress.
    	callbacks)FFT)log_intervalN)r   r	   plCallbackr@   
isinstancer$   	__class__r   	enumerater%   _traineris_global_zerodisable)	rO   rX   rY   r[   contains_megatron_progresscontains_progresscallbackiprinterr9   r9   r:   fix_progress_bar~   s.   	



rj   filepathc                 C   s"   t | } | jdkr| | jS | S )a  PTL considers checkpoints as .ckpt files.
    This method removes the extension and returns a path
    to be used as a directory for distributed checkpoints.

    Converts a checkpoint file path to a directory path by removing the `.ckpt` extension.

    Args:
        filepath (Union[str, Path]): The checkpoint file path.

    Returns:
        Path: The directory path where the checkpoint will be stored.

    z.ckpt)r   suffix	with_namestem)rk   r9   r9   r:   ckpt_to_dir   s   
ro   c                 K   sn   | dddkrddlm} || ddd}ndd	lm} |di |}| r+| |}| d
dr5t|}|S )a  
    Creates a checkpoint IO handler for saving/loading checkpoints.

    Args:
        wrapping_ckpt_io: An optional wrapper for checkpoint IO.
        **kwargs: Additional arguments to configure checkpoint IO.

    Returns:
        Checkpoint IO handler instance.
    model_libraryNhuggingfacer   )HFCheckpointIOloraF)adapter_only)MegatronCheckpointIO
async_saver9   )getnemo.lightning.io.hfrr   nemo.lightning.io.plru   r&   )wrapping_ckpt_iokwargsrr   checkpoint_ioru   r9   r9   r:   create_checkpoint_io   s   r}   F
checkpointsharded_state_dictdtensordevice_meshc                    sv   dt tj dt t dtdtfdddt tj dt t dtfddd fd
d	 | D ]
} | |||d q.| S )a  
    Converts a Megatron-Core sharded state dictionary into a PyTorch-compatible format.

    Args:
        checkpoint (Dict[str, List[torch.Tensor]]):
            The Megatron-Core checkpoint containing a list of tensors for each key.
        sharded_state_dict (Dict[str, Union[List[ShardedTensor], ShardedObject]]):
            The corresponding PyTorch sharded state dictionary.
        dtensor (bool, optional):
            Whether to use DTensor for the conversion. Defaults to False.
        device_mesh (DeviceMesh, optional):
            The device mesh configuration for distributed tensors.

    Returns:
        Dict[str, Union[TorchShardedTensor, io.BytesIO]]:
            A PyTorch-compatible state dictionary with properly formatted sharded tensors.
    tenssh_tensr   rZ   c                 S   s>   t | dkrt |dksJ t| d |t tddf}|S )z7Converts a Megatron-Core tensor into a PyTorch DTensor.rW   r   )dim)lenr   
from_localr   r   )r   r   r   dtenr9   r9   r:   _mcore_to_pyt_dtensor   s   z>mcore_to_pyt_sharded_state_dict.<locals>._mcore_to_pyt_dtensorc                 S   sf   t | |D ])\}}|j|jd |_|j|jd |_|j|jd |_d|_||_|  qt|S )z>Converts a Megatron-Core tensor into a PyTorch sharded tensor.Nr   )zipglobal_shapeprepend_axis_numglobal_offsetaxis_fragmentationsdatavalidate_metadata_integrityr   )r   r   tensh_tenr9   r9   r:   _mcore_to_pyt_sharded_tensor   s   
zEmcore_to_pyt_sharded_state_dict.<locals>._mcore_to_pyt_sharded_tensorNc                    s   ||v sJ | dt || tr'|| D ]} | | || ||d qdS t || tr0dS t || trUrH| | || |d| |< dS | | || | |< dS dS )zHRecursively converts checkpoint tensors into PyTorch-compatible formats.z not in sharded_state_dictr   N)r_   r   r   r	   )r~   r   kr   kk_convertr   r   r   r9   r:   r     s   z1mcore_to_pyt_sharded_state_dict.<locals>._convertr   N)r	   torchr   r   r   r   TorchShardedTensor)r~   r   r   r   r   r9   r   r:   mcore_to_pyt_sharded_state_dict   s   
 r    
state_dictprefixc           
         st  				ddt dtdtttttf  dt d	td
tdtt fdd			ddt dt	dtttttf  dt d	tdtt fdd		ddt dt
jdtttttf  dt dtf
ddd  fdd	 d}| D ]}|dr|t|t|dd d }qg|  D ]6\}}g }|}|d}|dr|d}t|d}	d|}|d|	|f  | ||||||| q| S )!a  
    Converts a PyTorch state dictionary into a Megatron-Core compatible format.

    Args:
        state_dict (Dict[str, Any]):
            The PyTorch state dictionary.
        prefix (str, optional):
            A prefix to prepend to all keys. Defaults to "".
        device_mesh (DeviceMesh, optional):
            The device mesh configuration for distributed tensors.

    Returns:
        Dict[str, List[ShardedBase]]:
            A Megatron-Core formatted state dictionary with properly sharded tensors.
    r9   r   FNkeyr   prepend_offsetsr   allow_shape_mismatchr   rZ   c                    s   t |}|d us
J t|tsJ || |j g }d}ttt  }	 fdd|	D }
t|jD ]&\}}t|t	rR|j
}||| |j||
| f q5| r[||}q5tj| |  g||R |||d}|gS )Nr   c                    s   g | ]} | j |  qS r9   )shape.0rh   r   r   r9   r:   
<listcomp>@  s    zUpyt_to_mcore_state_dict.<locals>._dtensor_to_mcore_sharded_tensor.<locals>.<listcomp>
replica_idr   r   )r   r_   r   to_localr   listrangera   
placementsr   r   appendr   get_local_rankis_replicater   from_rank_offsets)r   r   r   r   r   r   r   rank_offsetsr   axis
axis_fragmrh   	placementaxlocal_shardr9   r   r:    _dtensor_to_mcore_sharded_tensor-  s:   
"

	zApyt_to_mcore_state_dict.<locals>._dtensor_to_mcore_sharded_tensorr   c                    s  t |}t|tsJ || }| tfddD s)J dd D |j ttt  } fdd|D }g }	tt D ]E}
|
 }|j	|j}}tt |D ]}|j
| |j|  }|	|| | ||| f qYtj| |  |g||	R d||d|
< qFS )Nc                    s    g | ]}|j j d  j jkqS r   )metadatar   r   ls)local_shardsr9   r:   r   b  s     zSpyt_to_mcore_state_dict.<locals>._torch_to_mcore_sharded_tensor.<locals>.<listcomp>c                 S   s   g | ]}|j jqS r9   )metar   r   r9   r9   r:   r   b  s    c                    s$   g | ]} | d  j j|  qS r   )r   shard_sizesr   r   r   r9   r:   r   i  s   $ r   r   )r   r_   r   r   r   allsizer   r   tensorshard_offsetsr   r   r   r   )r   r   r   r   r   r   sharded_metar   r   r   rh   r   r   r   jaxis_rank_offsetr9   r   r:   _torch_to_mcore_sharded_tensorT  s<   

z?pyt_to_mcore_state_dict.<locals>._torch_to_mcore_sharded_tensorobjsharded_offsetsc                 S   s4   ddt jddf}t| |  |gt||R  S )mcore helperr   T)with_context_parallel)r   get_data_parallel_rankr   r   )r   r   r   r   r   r9   r9   r:   _torch_to_mcore_sharded_object  s
   
"z?pyt_to_mcore_state_dict.<locals>._torch_to_mcore_sharded_objectc           
         s   t |tr!| D ]\}}	 ||||	|| | d||d q	dS t |tr4||||||d| |< dS t |trF|||||d| |< dS t |tjrW||||| |< dS dS )r   .)r   r   r   )r   r   N)r_   r   itemsr   r   ioBytesIO)
r   r   sh_keyvr   r   r   r   r   vvr   r   r   r   r9   r:   r     s:   



z)pyt_to_mcore_state_dict.<locals>._convertr   zmodule.decoder.layers.r      rW   z.word_embeddings.weight)r9   r   FN)r9   r   F)r9   r   )r   FN)r6   r   r   r   intr8   r   r	   r   r   r   r   r   
startswithmaxsplitr   endswithpopjoinr   )
r   r   r   
num_layersr   r   r   r   r   global_layer_offsetr9   r   r:   pyt_to_mcore_state_dict  s   
*
/





r   	mp_policyuse_hf_tp_plantp_shard_planoffload_policyc                    s   |st dus
J dttjtjd} fdd|dtj|i v r%dnd }|d }| d	kr;|j	d	ks;J d
| d	krQ|du rK|rKt
| }t| || |j	d	ksZJ d
tdusbJ d| || t| ||d d} | S )a  Apply parallelisms and activation checkpointing to the model.

    Args:
        model: The model to be parallelized.
        device_mesh (DeviceMesh): The device mesh for distributed training.
        mp_policy (MixedPrecisionPolicy): Mixed precision policy for model parallelism.
        tp_shard_plan (Optional[Dict[str, Union[RowwiseParallel, ColwiseParallel, SequenceParallel]]]):
            A tensor parallel sharding plan. The keys should be the module names and the values should be the
            corresponding parallel styles (e.g., RowwiseParallel, ColwiseParallel, SequenceParallel).
        offload_policy (CPUOffloadPolicy): The offload policy for FSDP. If None, it will use the default policy.

    NOTE: The passed-in model preferably should be on meta device. Otherwise,
    the model must fit on GPU or CPU memory.
    NOTE: Currently, the user is required to manually handle precision settings such as the `mp_policy` here
    because the model parallel strategy does not respect all settings of `Fabric(precision=...)` at the moment.
    NOTE: Currently, the user should make sure that custom_tp_plan is compatible with the model architecture.
    Nz%Expected to have MixedPrecisionPolicy)param_dtypereduce_dtypec                    sr   t | tjr(t| D ]\}}t|t| d k }t|||| d || |< q
d S |  D ]
\}}||| q,d S )NrW   meshr   reshard_after_forwardr   )r_   r   
ModuleListra   r   r   r*   named_children)moduler   r   layer_idtransformer_blockr   name
sub_moduler   parallelize_helperr9   r:   r     s   
z6fsdp2_strategy_parallelize.<locals>.parallelize_helperdp_cpdata_paralleltensor_parallelrW   zHybrid-sharding not supportedzExpected to have fully_shardTr   )HAS_MIXED_PRECISION_POLICYr(   r   bfloat16float32r   root_to_flatten_mappingrw   r   ndimget_hf_tp_shard_planr"   HAS_FULLY_SHARDr*   )modelr   r   r   r   r   dp_meshtp_meshr9   r   r:   fsdp2_strategy_parallelize  s*   
r   c                 C   sn   i }t | dr| jdur|| j t | jdr,| jjdur,|dd | jj D  dd | D }|S )z?
    Get the tensor parallel sharding plan from the model.
    _tp_planNc                 S   s   i | ]
\}}d | |qS )zmodel.r9   r   r   r   r9   r9   r:   
<dictcomp>  s    z(get_hf_tp_shard_plan.<locals>.<dictcomp>c                 S   s   i | ]	\}}|t |qS r9   )!translate_to_torch_parallel_styler  r9   r9   r:   r    s    )rS   r  updater   r   )r   hf_tp_shard_planr9   r9   r:   r     s   r   stylec                 C   s~   t | tstdt|  d| dkrt S | dkrt S | dkr'tt dS | dkr1tt dS | d	kr8t S td
|  )z
    In model configurations, we use a neutral type (string) to specify parallel
    styles, here we translate them into torch.distributed tensor-parallel
    types.
    z Unsupported parallel style type z, expected strcolwiserowwisecolwise_rep)output_layoutsrowwise_rep)input_layoutssequence_parallelz"Unsupported parallel style value: )r_   r6   
ValueErrortyper   r    r   r!   )r  r9   r9   r:   r  "  s   
r  c                 C   s\   t | tr#| jjdkr|   S | jjdkr| jS tdt| j t | t	r,|  S | S )a  
    Move a tensor or distributed tensor to the CPU.

    This function takes an input tensor, which can be either a `DTensor` (distributed tensor)
    or a standard `Tensor`, and ensures that it is moved to the CPU.

    Args:
        v (DTensor | Tensor | any): The input value, which can be a `DTensor`, `Tensor`, or
                                    any other object. If `DTensor`, it checks the device and
                                    moves the tensor accordingly.

    Returns:
        Tensor | any: The corresponding CPU tensor if `v` is a `DTensor` or `Tensor`,
                    otherwise returns `v` unchanged.

    Raises:
        ValueError: If `v` is a `DTensor` but its device is neither 'cuda' nor 'cpu'.

    Example:
        >>> t = torch.tensor([1, 2, 3], device='cuda')
        >>> to_cpu(t)  # Moves tensor to CPU
        tensor([1, 2, 3])

        >>> dt = DTensor(torch.tensor([4, 5, 6], device='cuda'))
        >>> to_cpu(dt)  # Moves DTensor to CPU
        tensor([4, 5, 6])
    cudacpuzUnknown device )
r_   r   devicer  full_tensorr  _local_tensorr  r6   r   )r   r9   r9   r:   to_cpu:  s   

r  c                   C   sB   t  t jt j tj rtj rtj  t  t jt j dS )zDestroy process group.N)	signalSIGINTSIG_IGNr   distributedis_availableis_initializeddestroy_process_groupSIG_DFLr9   r9   r9   r:   _destroy_dist_connectionc  s   
r  cp_mesh
cp_bufferscp_seq_dimscp_no_restore_bufferscp_rotate_methodc                 C   s   ddl m} || |||dS )a  
    Create a context parallel context.

    Args:
        cp_mesh (DeviceMesh): The device mesh for context parallel.
        cp_buffers (List[torch.Tensor]): The buffers for context parallel.
        cp_seq_dims (List[int]): The sequence dimensions for context parallel.
        cp_no_restore_buffers (Set[torch.Tensor]): The no restore buffers for context parallel.
        cp_rotate_method (str): The rotation method for context parallel, such as "allgather" or "addtoall".
    r   )context_parallel)buffersbuffer_seq_dimsno_restore_buffers)%torch.distributed.tensor.experimentalr%  )r   r!  r"  r#  r$  r%  r9   r9   r:   create_context_parallel_ctxm  s   r*  enable_loss_parallelenable_compiled_autogradc                    s(   t jddttd  f fdd}|S )z
    Create a train context.

    Args:
        enable_loss_parallel (bool): Whether to enable loss parallelism.
        enable_compiled_autograd (bool): Whether to enable compiled autograd.
    N
cp_context)NNNc                 3   s    t  @}r|tjjj   r|tjj	
d | d ur:ddlm}m} |||j|jg ||  d V  W d    d S 1 sHw   Y  d S )NTr   )
SDPBackendsdpa_kernel)
contextlib	ExitStackenter_contextr   r  r   parallelloss_parallel_dynamoutilsmaybe_enable_compiled_autogradtorch.nn.attentionr.  r/  FLASH_ATTENTIONEFFICIENT_ATTENTION)r-  stackr.  r/  r,  r+  r9   r:   context  s   

"z"get_train_context.<locals>.contextr   )r0  contextmanagerr
   r   )r+  r,  r=  r9   r<  r:   get_train_context  s   	 r?  )TrW   r   )FN)r   N)NNFNN)rZ   N)]r0  r   r  dataclassesr   	functoolsr   pathlibr   typingr   r   r   r   r	   r
   r   r   r   r   lightning.pytorchpytorchr]   r   lightning.fabric.pluginsr   lightning.pytorch.callbacksr   rI   r   (megatron.core.dist_checkpointing.mappingr   r   r   1megatron.core.dist_checkpointing.strategies.torchr   megatron.core.transformer.utilsr   r   r   !torch.distributed._sharded_tensorr   torch.distributed._tensorr   r   r   torch.distributed.device_meshr   r   !torch.distributed.tensor.parallelr   r    r!   r"   nemo.lightningr#    nemo.lightning.pytorch.callbacksr$   r%   !nemo.utils.callbacks.dist_ckpt_ior&   nemo.utils.import_utilsr'   r(   r   r*   r   r+   HAS_CPU_OFFLOAD_POLICYr-   
strategiesStrategyrF   LightningModulerM   TrainerrV   r8   r   rj   r6   ro   r}   r   r   r   r   r   r  r  r  r*  r?  r9   r9   r9   r:   <module>   s   0




M

 ,
Q
)

