o
    }oiKl                     @   s  d dl mZ d dlZd dlmZ d dlmZ d dlmZ d dlm	Z	 d dl
mZ d dlmZmZ d d	lmZmZmZmZmZmZ d d
lmZ d dlmZ d dlmZ d dlmZ d dlmZ d dl m!Z! d dl"m#Z# d dl$m%Z% d dl&m'Z'm(Z( d dl)m*Z* d dl+m,Z,m-Z-m.Z. d dl/m0Z0m1Z1m2Z2m3Z3 d dl4m5Z5 G dd deeZ6dej	dej	dej	fddZ7dej	dej	d ed!e8de9e: f
d"d#Z;dS )$    )AnyN)LightningModule)
DictConfig)	PeftModel)Tensor)fully_shard)	ReplicateShard)ColwiseParallelPrepareModuleInputRowwiseParallelSequenceParallelloss_parallelparallelize_module)DynamicCache)resample)AutoTokenizer)
get_pad_id)
HFHubMixin)maybe_install_lora)ASRBLEU)BLEU)configure_optimizers	is_frozen)fp32_precision)load_pretrained_hfsetup_audio_codecsetup_speech_encoder)AudioSignal
LabelsTypeLengthsType
NeuralType)loggingc                       s  e Zd Zdeddf fddZedd Zedefdd	Zedefd
dZ	edefddZ
edd ZedefddZedefddZedefddZ	d@dedeeef fddZdefddZdedefddZdAdd ZdAd!d"ZdBdAd$d%Zdedefd&d'ZdAd(d)ZdAd*d+Zd,ed-efd.d/Zdejfd0d1Ze 	2dCd3ejd4ejd5edeeejf fd6d7Z  fd8d9Z!d:d; Z"edefd<d=Z#dAd>d?Z$  Z%S )DDuplexS2SModelcfgreturnNc                    s(  t |tsJ dt|dt      t| _t   j	j
j _ j	j
j _t jjdd _t jj jjd }|j _|j _ jj _ j`t  t  tj fddt jD  _tj  jj!j" j# j  _$ %dtj& j' j( j)g j*d	 d
 _+d
 _,d S )NzYou must pass the config to DuplexS2SModel as a Python dict to support hyperparameter serialization in PTL checkpoints (we got: 'type(cfg)=z').T)use_fast)pretrained_weightsc                    s    g | ]}t j j jjqS  )torchnn	Embeddingspeech_vocab_sizeembed_tokensembedding_dim).0_selfr(   f/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/speechlm2/models/duplex_s2s_model.py
<listcomp>O   s    z+DuplexS2SModel.__init__.<locals>.<listcomp>_control_codesdeviceF)-
isinstancedicttypesuper__init__save_hyperparametersr   r$   r   audio_codecvector_quantizercodebook_size_per_group_codebook_size
num_groups_num_codebooksr   pretrained_llm	tokenizerr   r'   trainmodelllmlm_headr-   r   r   r)   r*   
ModuleListrangeembed_audio_tokensLinearconfighidden_sizer,   
audio_headregister_buffertensorspeech_bos_idspeech_eos_idspeech_delay_idr7   	_use_fsdp_use_tp)r2   r$   rH   	__class__r1   r3   r<   0   s>   




zDuplexS2SModel.__init__c                 C   
   | j d S )zVReturn the size of the audio codec codebook including extra speech BOS and EOS tokens.   rA   r1   r(   r(   r3   r,   _      
z DuplexS2SModel.speech_vocab_sizec                 C   s   | j S )zBIndicates start of utterance generation (not start of inference!).r\   r1   r(   r(   r3   rS   d   s   zDuplexS2SModel.speech_bos_idc                 C   rZ   )z&Indicates end of utterance generation.   r\   r1   r(   r(   r3   rT   i   r]   zDuplexS2SModel.speech_eos_idc                 C   rZ   )z4Indicates start of inference (the very first frame).   r\   r1   r(   r(   r3   rU   n   r]   zDuplexS2SModel.speech_delay_idc                 C      | j jS )z&Return the size of the text tokenizer.)rE   
vocab_sizer1   r(   r(   r3   text_vocab_sizes   s   zDuplexS2SModel.text_vocab_sizec                 C   r`   N)rE   bos_idr1   r(   r(   r3   text_bos_idx      zDuplexS2SModel.text_bos_idc                 C   r`   rc   )rE   eos_idr1   r(   r(   r3   text_eos_id|   rf   zDuplexS2SModel.text_eos_idc                 C   s
   t | jS )a   
        Text pad ID is used as a 'blank' for frames when the model is not speaking
        and for frames where the model is speaking but has already predicted the
        entire text channel's content.

        Example:

            flow:         |---user---||-------assistant--------||-user-|
            text channel:  0000000000  1xxxxxxx0000000000000002  000000

        Where 0 indicates PAD ID, 1 indicates BOS ID, 2 indacates EOS ID,
        and x indicates tokens corresponding to actual text

        )r   rE   r1   r(   r(   r3   text_pad_id   s   
zDuplexS2SModel.text_pad_idinput_embedsc           	      C   sv   | j |||dudd}|jdd \}}| |d }| |d ||| j| j}||d}|dur9|d |d< |S )	a\  
        Implements a fully offline forward pass through the entire model.
        The flow is the following:

                                                     |-> |audio_head| -> |audio codes|
        |source speech + prev target text| -> |llm| -|
                                                     |-> |lm_head|    -> |token ids  |
        NT)inputs_embedspast_key_values	use_cachereturn_dictr_   last_hidden_state)text_logitsaudio_logitsrl   cache)rH   shaperI   rP   viewrC   r,   )	r2   rj   rr   outBTrp   rq   ansr(   r(   r3   forward   s   zDuplexS2SModel.forwardbatchc              	   C   s  | j |d |d d\}}|d }|jd |jd   }dk r<tj|tj|jd t||jd| j tj	gdd	}n|dkrM|d
d
d
|jd f }t
 * t  | jj|d |d d\}}W d
   n1 snw   Y  W d
   n1 s}w   Y  |dd}|jd  }|jd  }	kr||	k r|	| }|d
d
d
|f }|d
d
d
|f }tj||d n||	 }|d
d
d
|	f }tj||	d |dkrtd|	 d| d |d }
t|
| jk| j|}t|
| jk| j|}tjtj|jd d|jd g| j| jtj	d|d
d
d
df gdd	}tj||d gdd	}| jrW| jd  }|jd d |  }dkrW|d
d
d
| f }|d
d
d
| f }|d
d
d
ddf }|d
d
dd
df }|d
d
d
dd
df }|d
d
dd
d
df }| |}t| jD ]}|| j | |d|f  q||d
d
d
df | j!"dd  ||d |d ||dS )a  
        Performs additional processing on the mini-batch collected from dataloader.
        Notably:
        * Convert source audio to speech representations.
        * Convert target audio to target audio tokens.
        * Convert target text to embeddings.
        * Combine the input audio and target text embeddings.
        * Take care of any necessary slicing to align the shapes of source audio,
            target audio, and target token ids.
        source_audiosource_audio_lensinput_signalinput_signal_lengthtarget_tokensr^   r   r6   dimNtarget_audiotarget_audio_lens)audio	audio_lenr_   )maxzA mismatch between source (z) and target (zn) sequence length greater than 2 detected. This may indicate significant desynchronization in longer sessions.).N)
fill_valuer7   dtypetensor_parallel.duplex_user_channel_weight      ?)rj   
input_lensoutput_lenstext_labelsaudio_labels)#
perceptionrs   r)   catonesabsr7   ri   tolongr   no_gradr>   encode	transposeclamp_r"   warningwherere   rS   rh   rT   fullrU   rW   device_meshsizer-   rK   rC   add_rL   r$   get)r2   rz   source_encodedsource_encoded_lensr   difftarget_codestarget_codes_lenstlslbtt	input_idstp_world_size	remaindertext_inputsr   audio_inputsr   rj   cbidxr(   r(   r3   prepare_inputs   s   
 	 	
 (zDuplexS2SModel.prepare_inputs	batch_idxc              
   C   sd  | j j| j j| jfD ]
}t|r|  q
| |}| |d }|d  }t ; t	j
jj|d dd|d dddd| }t	j
jj|d	 dd
|d dd
dd|| j  }W d    n1 sgw   Y  | jj| | jj|  }	|d jd d
 \}
}|	t	| jd ur| jjd jd d nd|||
||t	j||
|  d}| j|dd |S )Nrj   r   rp   r   r^   r   sum)	reductionrq   r_   r   lr)losslearning_rate	text_loss
audio_loss
batch_sizesequence_length
num_framespadding_ratioT)on_step)r   preprocessorencoderrH   r   evalr   r   r   r)   r*   
functionalcross_entropyflattenrC   r$   text_loss_weightaudio_loss_weightrs   	as_tensor_trainertrainer
optimizersparam_groupsr   float32log_dict)r2   rz   r   minputsforward_outputsr   r   r   r   rv   rw   rx   r(   r(   r3   training_step#  sJ   
(

zDuplexS2SModel.training_stepc                 C   s   t |  d S rc   )r   r1   r(   r(   r3   on_train_epoch_startJ     z#DuplexS2SModel.on_train_epoch_startc                 C   s*   |    t| jj | _t  | _d S rc   )r   r   r$   scoring_asrresetasr_bleur   bleur1   r(   r(   r3   on_validation_epoch_startM  s   z(DuplexS2SModel.on_validation_epoch_startvalc                 C   s   | j  }| D ]\}}| j| d| || jddd q	| j }| D ]\}}| j| d| || jddd q)d S )Nr0   T)on_epoch	sync_dist)r   computeitemslogr   r7   r   )r2   prefixr   kr   r   r(   r(   r3   on_validation_epoch_endR  s   
&
&z&DuplexS2SModel.on_validation_epoch_endc              	   C   s   |  D ]O\}}|d u rq| |d |d }t $ | jj||d t|d dd|d d d tjd W d    n1 sAw   Y  | j	j||d |d	 d
 qd S )Nr{   r|   target_textsr   i"V  i>  r   )namerefs
pred_audiopred_audio_lenstext)r   r   hyps)
r   offline_inferencer   r   updater   r   r)   r   r   )r2   rz   r   r   dataset_batchresultsr(   r(   r3   validation_stepZ  s"   zDuplexS2SModel.validation_stepc                 C   s   |   S rc   )r   r1   r(   r(   r3   on_test_epoch_startn     z"DuplexS2SModel.on_test_epoch_startc                 C   s   | j ddS )Ntest)r   )r   r1   r(   r(   r3   on_test_epoch_endq  r   z DuplexS2SModel.on_test_epoch_endargskwargsc                 O   s   | j |i |S rc   )r   r2   r   r   r(   r(   r3   	test_stept  s   zDuplexS2SModel.test_stepc                 C   sh   t jd| j| jd}t jd| jf| j| jd}| |}t| jD ]}|	| j
| |d|f  q!|S )aK  
        Return the partial embedding corresponding to the start frame of the model.
        It consists of the sum of text embedding of pad ID, and sum of audio token embeddings
        corresponding to an all-zero frame. This is consistent with how the model is trained.
        The returned shape is (1, embedding_dim).
        )r^   )r   r7   r^   .)r)   r   ri   r7   rA   rU   r-   rK   rC   r   rL   )r2   text_bos	audio_bosrj   r   r(   r(   r3   _get_bos_embeddingw  s   
z!DuplexS2SModel._get_bos_embeddingTr~   input_signal_lensdecode_audioc              
   C   s`  | j ||d\}}|j\}}}| jrRtj|g|jd}	tjj|	tjjj	d t
|	 }
|
|krQ|dd|d |ddf }|d|
| d}tj||gdd}n|}
|| jdd9 }t }tj||
| j| jtjd	}tj||
| jtjd	}|ddd
f  |  7  < | |ddddf |d}|d jdddddf |ddd
f< |d jdddddf |ddd
f< td|jd D ]u}|dd|f  | |dd|d f 7  < t| jD ]}|dd|f  | j| |dd|d |f 7  < q| |dd||d f |d d}|d jdddddf |dd|f< |d jdddddf |dd|f< q| jrW|
|krW|ddd|f }|ddd|f }t||| j| jd|||d}|rt|| j}t + t  | j j!|"dd|d\}}W d   n	1 sw   Y  W d   n	1 sw   Y  ||d< ||d< |S )a  
        Autoregressive prediction.

        Args:
            input_signal: a batch of waveforms with shape (B, T) with source sampling rate.
            input_signal_lens: example lengths as number of samples of shape (B,).
            decode_audio: bool, whether to decode audio codes to waveform.

        Returns:
            A dict with keys:
                * "text": generated text, de-tokenized to strings, properly skipping text_pad_id; list of length B.
                * "audio": generated waveform of shape (B, T3) (`decode_audio=True`).
                * "audio_len" output lengths as number of waveform samples of shape (B,) (when `decode_audio=True`).
                * "tokens_text": generated text tokens of shape (B, T2).
                * "tokens_len" output lengths as number of tokens of shape (B,).
                * "tokens_audio": generated audio codes of shape (B, T2, K) where `K=num_codebooks`.
        r}   r6   )opNr^   r   r   r   )r7   r   r   )rr   rp   r   rq   rr   )rE   pad_id)r   tokens_texttokens_audio
tokens_lenr_   )tokensr   r   r   )#r   rs   rV   r)   rR   r7   distributed
all_reduceReduceOpMAXintitemrepeatr   r$   r   r   emptyrC   r   r   argmaxrK   r-   rL   tokens_to_strrE   ri   replace_control_speech_codesr5   r   r   r>   decoder   )r2   r~   r   r   rj   lengthsrv   T_localHT_tensorrw   
last_framepadrr   	gen_audiogen_textrx   tr   gen_audio_codespredicted_audiopredicted_audio_lensr(   r(   r3   r     sd   
((.6$(* z DuplexS2SModel.offline_inferencec                    s>   t   t j|i | W d    d S 1 sw   Y  d S rc   )r   r;   backwardr   rX   r(   r3   r    s   "zDuplexS2SModel.backwardc                 C   s   t | S rc   )r   r1   r(   r(   r3   r     r   z#DuplexS2SModel.configure_optimizersc              
   C   sj   t dtdt dddtdt dddtdt dddtdt ddd	tdt d
| jjdgdS )z
        Return a typing schema for optimal batch size calibration for various
        sequence lengths using OOMptimizer.
        r{   )rv   rw   input)r   r:   
seq_lengthr|   )rv   r   r   r   output)r   r:   r  ra   )clsr   )r9   r!   r   r    r   rE   ra   r1   r(   r(   r3   oomptimizer_schema  s   
z!DuplexS2SModel.oomptimizer_schemac                 C   sZ  | j }|d u r	d S | j}t|tr|jj}|d  } dkrd| _tt	 ft
dfddt d}t||| |jD ]_}t t t t tt
ddt tt
dft	 fdt t tt
ddd
}|j}d	D ]*}t||}||  d
krtd| d| d| d t||||   qit||| q;| j| jfD ]}	t|	|tt
dt
ddd q|d  }
 dkr+|
jdksJ d| _d|
i}t|jD ]\}}t|fi ||j|< qt| jfi || _t| jD ]}t| j| fi || j|< qt| jfi || _t| jfi || _t| jfi || _t| jfi || _d S d S )Nr   r^   T)input_layoutsdesired_input_layoutsuse_local_output)zlayers.0norm)output_layouts)r  r   )
input_layernormzself_attn.q_projzself_attn.k_projzself_attn.v_projzself_attn.o_projpost_attention_layernormmlpzmlp.gate_projzmlp.up_projzmlp.down_proj)	num_headsnum_key_value_headsrO   r   zattn_layer.=z$ is not divisible by tp_mesh.size()=z:: set a different tensor parallelism size to avoid errors.r   F)r  r#  r!  data_parallelmesh) r   rH   r8   r   
base_modelrG   r   rW   r   r   r	   r   r   layersr
   r   	self_attngetattrr"   r   setattrrI   rP   ndimrV   	enumerater   r-   rK   rC   rL   r   )r2   r   rH   tp_meshplantransformer_block
attn_layerattrr   r   dp_meshfsdp_configidxlayerr(   r(   r3   configure_model  s   


zDuplexS2SModel.configure_modelrc   )r%   N)r   )T)&__name__
__module____qualname__r9   r<   propertyr,   r  rS   rT   rU   rb   re   rh   ri   r   strry   r   r   r   r   r   r   r   r   r   r   r)   r   r   boolr   r  r   r  r<  __classcell__r(   r(   rX   r3   r#   /   sd    /



s
'


Xr#   speech_codescontrol_codesr%   c                 C   s&   t t | || ddddf | S )z
    Replaces control codes (speech BOS, EOS, etc) in `speech_codes` with the first frame which is
    assumed to consist of 'valid' codes representing silence.
    Nr^   )r)   r   isin)rD  rE  r(   r(   r3   r  c  s   &r  r   r  rE   r   c                 C   sL   g }t |  | D ]\}}|d | }|||k }||| q|S rc   )zipcpuappendids_to_text)r   r  rE   r   rx   hyp_idshyp_lenr(   r(   r3   r
  k  s   r
  )<typingr   r)   	lightningr   	omegaconfr   peftr   r   torch.distributed.fsdpr   torch.distributed.tensorr   r	   !torch.distributed.tensor.parallelr
   r   r   r   r   r   transformersr   -nemo.collections.audio.parts.utils.resamplingr   "nemo.collections.common.tokenizersr   %nemo.collections.speechlm2.data.utilsr   'nemo.collections.speechlm2.parts.hf_hubr   %nemo.collections.speechlm2.parts.lorar   1nemo.collections.speechlm2.parts.metrics.asr_bleur   -nemo.collections.speechlm2.parts.metrics.bleur   ,nemo.collections.speechlm2.parts.optim_setupr   r   *nemo.collections.speechlm2.parts.precisionr   +nemo.collections.speechlm2.parts.pretrainedr   r   r   nemo.core.neural_typesr   r   r    r!   
nemo.utilsr"   r#   r  r  listrA  r
  r(   r(   r(   r3   <module>   s:        8*