o
    }oiz]                  	   @   s  d dl Z d dlZd dlZd dlmZmZ z
d dlmZ dZ	W n e
efy+   dZ	Y nw z d dlmZ d dlmZmZmZmZmZmZmZmZmZ dZW n e
efyY   dZY nw zd dlmZmZmZmZmZ dZW n( e
efy   e d	 d d
l!m"Z d dl#mZmZmZ d dl#m$Z dZY nw 									 	 													dde%fddZ&dd Z'dd Z(						 	 	dddZ)dS )    N)AppStatelogging)set_logging_levelTF)tensor_parallel)	RankGenerator get_pipeline_model_parallel_rankset_expert_model_parallel_rank$set_expert_model_parallel_world_size set_pipeline_model_parallel_rank&set_pipeline_model_parallel_split_rank&set_pipeline_model_parallel_world_sizeset_tensor_model_parallel_rank$set_tensor_model_parallel_world_size)!ConstantNumMicroBatchesCalculatorget_current_global_batch_sizeget_micro_batch_sizeget_num_microbatches init_num_microbatches_calculatorzCMegatron num_microbatches_calculator not found, using Apex version.)ConstantNumMicroBatches)r   r   r   )setup_microbatch_calculator        use_gloo_process_groupsc                 C   s>  t  }||_| |_||_||_||_||_||_||_|
|_	||_
||_|	|_||_||_||_||_||_||_||_t| ||||||
|||||d\|_|_|_|_|_|_|_|_t|j t|j t|j t |j t!|j|j  t"|j t#|j t$j%j&|d |durt'| |r|durt(rddl)m*} |du rt+||||j|dd n`t,|t-rt. |ksJ t/ |ksJ t0 |||j  ksJ n@t1dddl2m*} |du rt+||||j|dd n&t,|t-rt. |ksJ t/ |ksJ t0 |||j  ksJ nt1dd	|_3t4rt5| dS dS )
z)Initialize model parallel groups in NeMo.)
world_sizeranktensor_model_parallel_size_pipeline_model_parallel_size_%virtual_pipeline_model_parallel_size_#pipeline_model_parallel_split_rank_context_parallel_size_expert_model_parallel_size_expert_tensor_parallel_size_#encoder_tensor_model_parallel_size_%encoder_pipeline_model_parallel_size_use_tp_pp_dp_mapping)use_te_rng_trackerNr   )#_GLOBAL_NUM_MICROBATCHES_CALCULATORF)r   global_batch_sizemicro_batch_sizedata_parallel_sizerampup_batch_sizedecrease_batch_size_if_neededz*Microbatch calculator already initialized.T)6r   global_rankr   
local_rankr%   expert_model_parallel_sizetensor_model_parallel_sizepipeline_model_parallel_size$virtual_pipeline_model_parallel_sizecontext_parallel_size"encoder_tensor_model_parallel_size$encoder_pipeline_model_parallel_size$pipeline_model_parallel_comm_backenduse_fp8	use_sharpinit_mpi_proc_groupexpert_tensor_parallel_size#num_distributed_optimizer_instancesnccl_communicator_config_pathr   fake_initialize_model_paralleltensor_model_parallel_rankpipeline_model_parallel_rankexpert_model_parallel_rankexpert_tensor_parallel_rankmodel_parallel_sizer*   "pipeline_model_parallel_split_rank$virtual_pipeline_model_parallel_rankr   r   r	   r   r   r   r
   r   randominitialize_rng_tracker_set_random_seedMCORE_MB_CALCULATOR)megatron.core.num_microbatches_calculatorr'   r   
isinstancer   r   r   r   	Exception(apex.transformer.pipeline_parallel.utils_is_megatron_initialized	HAVE_APEXr   )r   r-   r.   r0   r/   r:   r1   r2   rC   r6   r3   r4   r5   r)   r(   r+   r7   r9   seedapex_transformer_log_levelr%   r&   r;   r<   r8   r   	app_stater'    rR   P/home/ubuntu/.local/lib/python3.10/site-packages/nemo/lightning/megatron_init.py"initialize_model_parallel_for_nemoM   s   








		rT   c                 C   sl   | dur/| dkr/| dt    }t| tj| t| tj dkr-t	| dS dS t
d| )z$Set random seed for reproducability.Nr   d   z'Seed ({}) should be a positive integer.)r   rE   rO   nptorchmanual_seedcudadevice_countr   model_parallel_cuda_manual_seed
ValueErrorformat)seed_rO   rR   rR   rS   rG      s   

rG   c                   C   sf   t jdkr1t jd t jd t jd t jd t jd t jd t j	d dS dS )z%Set PyTorch JIT layer fusion options.z1.10.0a0+0aef44cTFN)
rW   __version___C_jit_set_profiling_executor_jit_set_profiling_mode_jit_override_can_fuse_on_cpu_jit_override_can_fuse_on_gpu_jit_set_texpr_fuser_enabled_jit_set_nvfuser_enabled%_debug_set_autodiff_subgraph_inliningrR   rR   rR   rS   set_jit_fusion_options   s   
rh   c           2         s  t || }t || }|| }t || }|
du rd}n|
}|	dkr'|
dkr'|}n|	}|dkr;|dks3J ||ks;J d|| | }|| | }|| }| | dksjJ d|  d| d| d| d| d	| d
| | | }|| }|| }|| | ks~J d}|durd}|dkrt|d||||rdndddndt|d||||rdnd|d |du r|}|| | }|| }|| dkrtd| d| dt||||d|rdnd|d|r|dks||ksJ d ddksJ d d dd d6 fdd	}g }|dD ]}||v r)t|}td| d|  q|dD ]}|| ||v rH|} td| d|   q/||}!td|  td| d |!  g }"|d!D ]}|"| ||v r|}#td| d"|#  qh|#|}$td#|"  td| d$|$  g }%|d%D ]}|%| ||v rtd| d&t|  qtd'|%  g }&d}'|d(D ]}|&| ||v r|}'td| d)|'  q|'|}(td*|&  td| d+|(  d})|dur!|dkr!|d,d-d.D ]}||v rt||})qd}*|durC|dkrC|d(d-d.D ]}||v rAt||}*q3g }+g },d}-d}.d}/|dD ]L}|+| ||v rj|}-td| d/|-  t	|dkr|d |d0 g}0|,|0 n	|}0|,t|0 ||0v rt|0}.td| d1|.  qQ|-|}1|.dur|.|}/td2|+  td| d3|1  td4|+  td| d5|/  |(|1|)|*||||fS )7a  
    Fake initialize model data parallel groups so that we can instantiate model parallel
    models before DDP is initialized. This is needed because PTL execution flow is init
    model, init trainer -> call trainer.fit(model). DDP is initialized during .fit.
    This function is taken from megatron.core.parallel_state and modified so that the
    distributed groups are not created.
    We only need the tensor parallel and pipeline parallel ranks to instantiate the model.

    Arguments:
        tensor_model_parallel_size: number of GPUs used to parallelize model tensor.
        pipeline_model_parallel_size: number of GPUs used to parallelize model pipeline.
        context_parallel_size: number of GPUs used to parallelize tokens of each input.

    Let's say we have a total of 16 GPUs denoted by g0 ... g15 and we
    use 2 GPUs to parallelize the model tensor, and 4 GPUs to parallelize
    the model pipeline. The present function will
    create 8 tensor model-parallel groups, 4 pipeline model-parallel groups
    and 8 data-parallel groups as:
        8 data_parallel groups:
            [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]
        8 tensor model-parallel groups:
            [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]
        4 pipeline model-parallel groups:
            [g0, g4, g8, g12], [g1, g5, g9, g13], [g2, g6, g10, g14], [g3, g7, g11, g15]
    Note that for efficiency, the caller should make sure adjacent ranks
    are on the same DGX box. For example if we are using 2 DGX-1 boxes
    with a total of 16 GPUs, rank 0 to 7 belong to the first box and
    ranks 8 to 15 belong to the second box.
    Nr   z9We do not support encoders with more TP than the decoder.zworld_size: zM must be divisible by total world_size: (decoder_)tensor_model_parallel_size z* * (decoder_)pipeline_model_parallel_size z# * (decoder_)context_parallel_size z& + encoder_tensor_model_parallel_size z( * encoder_pipeline_model_parallel_size z * context_parallel_size r   ztp-cp-ep-pp-dpztp-cp-ep-dp-pp)tpepdpppcporderrank_offsetzdecoder world_size (zB) is not divisible by expert_tensor_model_pipeline_parallel size ()zmWhen not using pp-last rank ordering, the data parallel size of the attention and moe layers must be the samerl   zaPipeline parallel groups are expected to be the same for Non-Expert and Expert part,     but got z and Fc                 ;   s    ddl m} 	 |rj| fi |}n	 j| fi |}d u r+|D ]}|V  q#d S j| fi |}| dkrKt|||D ]	\}}|| V  q?d S | dkrjt|t|ksYJ t||D ]	\}}|| V  q^d S |D ]}|V  ql|D ]}|V  qtd S )Nr   )cyclerl   tp-pp)	itertoolsrq   	get_ranksziplen)
group_type	is_expertkwargsrq   d_ranksxe_ranksydecoder_rank_generatorencoder_rank_generatorexpert_decoder_rank_generatorrR   rS   generator_wrapper  s2   z9fake_initialize_model_parallel.<locals>.generator_wrapperrk   zRank z has data parallel group : zdp-cpz< has combined group of data parallel and context parallel : z>All data parallel group ranks with context parallel combined: zRanks z has data parallel rank: rm   z has context parallel group: z"All context parallel group ranks: z has context parallel rank: rr   z has model parallel group: z All model parallel group ranks: ri   z" has tensor model parallel group: z'All tensor model parallel group ranks: z! has tensor model parallel rank: rj   T)rx   z$ has pipeline model parallel group: z has embedding group: z)All pipeline model parallel group ranks: z" has pipeline model parallel rank zAll embedding group ranks: z has embedding rank: )F)
minr   RuntimeErrorrt   listr   infoappendindexrv   )2r   r   r   r   r   r   r!   r"   r    r#   r$   r%   r0   r1   rB   r3   r5   r4   encoder_model_sizedecoder_model_sizetotal_model_sizer*   encoder_world_sizedecoder_world_sizerD   *expert_tensor_model_pipeline_parallel_sizeexpert_data_parallel_sizer   %all_data_parallel_group_ranks_with_cpranksdata_parallel_groupranks_with_cpdata_parallel_group_with_cpdata_parallel_rank all_context_parallel_group_rankscontext_parallel_groupcontext_parallel_rankall_model_parallel_group_ranks%all_tensor_model_parallel_group_rankstensor_model_parallel_groupr>   r@   rA   'all_pipeline_model_parallel_group_ranksall_embedding_group_rankspipeline_model_parallel_groupembedding_groupembedding_rankembedding_ranksr?   rR   r~   rS   r=      s~  
-



	







#



















r=   )r   r   Nr   NNNr   r   r   NNNFFr   r   FFr   NFT)NNr   Nr   r   r   F)*rE   numpyrV   rW   
nemo.utilsr   r   apex.transformer.log_utilr   rN   ImportErrorModuleNotFoundErrormegatron.corer   megatron.core.parallel_stater   r   r   r	   r
   r   r   r   r   HAVE_MEGATRON_CORErI   r   r   r   r   r   rH   warningapex.transformer.microbatchesr   rL   r   boolrT   rG   rh   r=   rR   rR   rR   rS   <module>   s   ,

 