o
    }oi#%                     @   s  d dl Z d dlZd dlmZ d dlZd dlZd dlZ	d dl
Z
d dlmZ d dlZd dlmZ d dlmZmZ d dlmZ d dlmZmZ dd Ze	d	Zed
ZdZee\Z Z!G dd dZ"e
# 	d'de$e% de&fddZ'd(ddZ(dd Z)dd Z*e
# ej+j,d)dede%fddZ-ej+j.e-dd ej/fd!d"Z0e1d#kre2d$ e3e j4d% Z5e
j67e5 ej+j8e-e0d& dS dS )*    N)Callable)	rearrange)T5EncoderModelT5TokenizerFast)CausalVideoTokenizer)
read_imageresize_videoc                 C   s6   t jd| d}tjd| d}|d |  ||fS )a.  
    Initializes the T5 tokenizer and encoder model, loading them from a specified cache directory.

    Args:
        t5_cache_dir (str): Path to the cache directory for storing the pretrained model files.

    Returns:
        tuple: A tuple containing the tokenizer and encoder model instances.
    zgoogle-t5/t5-11b)	cache_dircuda)r   from_pretrainedr   toeval)t5_cache_dir	tokenizertext_encoder r   k/home/ubuntu/.local/lib/python3.10/site-packages/nemo/collections/diffusion/data/prepare_energon_dataset.pyinitialize_text_encoder    s
   
r   zUhf://datasets/huggan/smithsonian_butterflies_subset/data/train-00000-of-00001.parquetCosmosCausalCV_f4x8x8 c                   @   s:   e Zd ZdZdejdedejdejfddZdd
dZd	S )EncodedSamplea|  
    A class representing an encoded sample, containing the text encoding, length,
    attention mask, and offset mappings.

    Attributes:
        encoded_text (np.ndarray): Encoded text array.
        length (int): Length of the encoding.
        attn_mask (np.ndarray): Attention mask for the encoding.
        offset_mappings (np.ndarray): Mappings for offset positions.
    encoded_textlength	attn_maskoffset_mappingsc                 C   s   || _ || _|| _|| _d S )N)r   r   r   r   )selfr   r   r   r   r   r   r   __init__I   s   
zEncodedSample.__init__returnNc                 C   s`   | j d| j tj| _ | jd| j tj| _| jdur.| jd| j tj| _dS dS )zj
        Truncates the encoded text, attention mask, and offset mappings to the specified length.
        r   N)r   r   astypenpfloat16r   int32r   )r   r   r   r   truncateO   s
   
zEncodedSample.truncate)r   N)	__name__
__module____qualname____doc__r   ndarrayintr   r"   r   r   r   r   r   =   s     r   T   promptsr"   c              	   C   s@  | j |ddd|d|d}|j }|j }|r#|d }	|	  }	nd}	|||d}
|
j}|jdd	 }t|j	d
 D ]}d
|| || d< q=|  }|  }|ddd|f }|ddd|f }g }t|j	d
 D ]}|r||	| }nd}|
t|| tj|| || | qs|r|D ]}|  q|S )a%  
    Encodes a batch of text prompts into T5 embeddings.

    Args:
        tokenizer: Tokenizer instance for encoding.
        encoder: T5 encoder model instance.
        prompts (list[str]): List of text prompts to encode.
        truncate (bool): If True, truncates the output embeddings.
        max_length (int): Maximum length for each encoded prompt.
        output_mapping (bool): If True, returns offset mappings for each prompt.

    Returns:
        list[EncodedSample]: A list of encoded samples containing text encodings and masks.
    ptT
max_length)return_tensors
truncationpaddingr,   return_lengthreturn_offsets_mappingoffset_mappingN)	input_idsattention_mask   )dimr   )batch_encode_plusr3   r
   r4   cpunumpylast_hidden_statesumrangeshapeappendr   r   r   r    r"   )r   encoderr*   r"   r,   output_mappingbatch_encodingr3   r   offsets_mappingoutputsr   lengthsbatch_idoutidxoffsetsxr   r   r   encode_for_batchY   sD   


*
rJ   c           	      C   sV   t | ||gd }tj|jtjd}|j\}}tjd||tjd}||dd|f< |S )a[  
    Generates a T5 embedding for a single text prompt.

    Args:
        tokenizer: T5 tokenizer instance.
        text_encoder: T5 encoder model instance.
        prompt (str): The text prompt to encode.
        t5_embeding_max_length (int): Maximum length for the embedding.

    Returns:
        torch.Tensor: Padded T5 embedding tensor.
    r   dtyper5   N)rJ   torchtensorr   bfloat16r=   zeros)	r   r   promptt5_embeding_max_lengthrF   r   LCt5_embedr   r   r   generate_t5_embed   s   
rV   c                 C   s0   | | }|| }||d kr|| n| }||fS )a6  
    Calculates the start and end indices for distributed processing based on rank.

    Args:
        dataset_size (int): Total dataset size.
        rank (int): Current process rank.
        world_size (int): Total number of processes.

    Returns:
        tuple: (start index, end index) for the rank.
    r5   r   )dataset_sizerank
world_size
split_size	start_idxend_idxr   r   r   get_start_end_idx_for_this_rank   s   r]   c           
      C   s   t j|  }|d }|d }t|}t|ddd}t|dd}|tjdf }t|dd	\}}tt	t
|}| d
|jtjdt||jd |jd dd}	|	S )z
    Generates a sample dictionary with image latent tensor, caption, and metadata.

    Args:
        index (int): Index of the dataset row.

    Returns:
        dict: Dictionary containing processed image latents, embeddings, and metadata.
    	image_urlnamezh w (t c) -> t h w cr5   )tr)   )
short_size.)temporal_window06rK         )image_heightimage_width)__key__z.pthz.picklez.json)dfilocr   r   r   r   newaxisautoencoderrV   r   r   r   rM   rO   pickledumpsr=   )
indexrowr^   image_captionvideobatch_video_image_latenttext_embeddingsampler   r   r   butterfly_process_func   s"   

rx   outputprocess_func
output_dirc           
      C   s   t  }tj }ttt||\}}tj	|dd tj
|d| d}tj|dd}t||D ]}| |}	||	 q2W d   dS 1 sIw   Y  dS )z
    Prepares a WebDataset using the specified processing function, for distributed settings.

    Args:
        process_func (Callable): Function to process each dataset entry.
        output_dir (str): Output directory to save processed dataset.

    T)exist_okrX   z	-%06d.tari'  )maxcountN)distget_rankrM   distributedget_world_sizer]   lenri   osmakedirspathjoinwdsShardWriterr<   write)
rz   r{   rX   rY   r[   r\   
output_tarsinkirw   r   r   r   prepare   s   
"r   )targetr   c                  C   s   t jttdd} | S )z
    Prepares the butterfly dataset for distributed training.

    Returns:
        run.Partial: Partially configured run for WebDataset preparation.
    butterfly_webdataset)rz   r{   )runPartialr   rx   )reciper   r   r   prepare_butterfly_dataset  s   r   __main__nccl
LOCAL_RANK)default_factory)Tr)   T)r)   )ry   )9r   rm   typingr   nemo_runr   r9   r   pandaspdrM   torch.distributedr   r~   
webdatasetr   einopsr   transformersr   r   9nemo.collections.common.video_tokenizers.cosmos_tokenizerr   .nemo.collections.common.video_tokenizers.utilsr   r   r   read_parquetri   r   rl   r   r   r   r   no_gradliststrboolrJ   rV   r]   rx   cli
entrypointr   factoryr   r   r#   init_process_groupr(   environ
local_rankr
   
set_devicemainr   r   r   r   <module>   sP   


?(
