# coding=utf-8 # Copyright 2025 AIDAS Lab # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import sys os.environ["TOKENIZERS_PARALLELISM"] = "true" from PIL import Image from tqdm import tqdm import numpy as np import torch import wandb from models import MMadaModelLM from models import MAGVITv2, get_mask_schedule, MMadaModelLM, MMadaConfig from models.modeling_emova_speech_tokenizer import EMOVASpeechTokenizer from training.prompting_utils import UniversalPrompting from training.utils import get_config, flatten_omega_conf from transformers import AutoTokenizer import argparse def resize_vocab(model, config): print(f"Resizing token embeddings to {config.model.mmada.new_vocab_size}") model.resize_token_embeddings(config.model.mmada.new_vocab_size) def get_vq_model_class(model_type): if model_type == "magvitv2": return MAGVITv2 elif model_type == "emova": return EMOVASpeechTokenizer.from_pretrained( "Emova-ollm/emova_speech_tokenizer_hf" ) else: raise ValueError(f"model_type {model_type} not supported.") if __name__ == '__main__': config = get_config() resume_wandb_run = config.wandb.resume run_id = config.wandb.get("run_id", None) if run_id is None: resume_wandb_run = False run_id = wandb.util.generate_id() config.wandb.run_id = run_id wandb_config = {k: v for k, v in flatten_omega_conf(config, resolve=True)} wandb.init( project="demo", name=config.experiment.name + '_t2s', config=wandb_config, ) device = torch.device("cuda" if torch.cuda.is_available() else "cpu") text_tokenizer = AutoTokenizer.from_pretrained(config.model.mmada.pretrained_model_path, padding_side="left") uni_prompting = UniversalPrompting(text_tokenizer, max_text_len=config.dataset.preprocessing.max_seq_length, special_tokens=("<|s2t|>", "<|soa|>", "<|eoa|>", "<|soi|>", "<|eoi|>", "<|sov|>", "<|eov|>", "<|t2i|>", "<|mmu|>", "<|t2v|>", "<|v2v|>", "<|lvg|>", "<|t2s|>"), ignore_id=-100, cond_dropout_prob=config.training.cond_dropout_prob, use_reserved_token=True) # b) Load speech tokenizer/detokenizer vq_model = get_vq_model_class(config.model.speech_model.type) vq_model = vq_model.from_pretrained(config.model.speech_model.speech_model_name).to(device) vq_model.requires_grad_(False) vq_model.eval() # c) Load main MMaDA model train_step = config.model.mmada.train_step trained_checkpoint_path = f"/home/work/AIDAS/ckpts/omada/omada-training-stage1/checkpoint-{train_step}/unwrapped_model" # trained_checkpoint_path = "/home/work/AIDAS/omada-training-stage1/checkpoint-10000/unwrapped_model" print(f"Loading trained model from: {trained_checkpoint_path}") model = MMadaModelLM.from_pretrained( trained_checkpoint_path, trust_remote_code=True, torch_dtype=torch.bfloat16, config='/home/work/AIDAS/ommda-training-s2t-mmada/config.json' # Should be changed to t2s after the train ends ) print("✅ Trained model loaded successfully!") # model = MMadaModelLM.from_pretrained(config.model.mmada.pretrained_model_path, trust_remote_code=True, torch_dtype=torch.bfloat16) # # d) Extend vocabulary for speech tokens num_speech_tokens = 4096 image_vocab_size = config.model.mmada.codebook_size # 8192 # text_vocab_size = len(uni_prompting.text_tokenizer) # resize_vocab(model, config) model.to(device).eval() mask_token_id = model.config.mask_token_id if config.get("validation_prompts_file", None) is not None: config.dataset.params.validation_prompts_file = config.validation_prompts_file config.training.batch_size = config.batch_size config.training.guidance_scale = config.guidance_scale config.training.generation_timesteps = config.generation_timesteps with open(config.dataset.params.validation_prompts_file, "r") as f: validation_prompts = f.read().splitlines() for step in tqdm(range(0, len(validation_prompts), config.training.batch_size)): prompts = validation_prompts[step:step + config.training.batch_size] audio_tokens = torch.ones((len(prompts), config.model.mmada.num_speech_vq_tokens), dtype=torch.long, device=device) * mask_token_id input_ids, attention_mask = uni_prompting((prompts, audio_tokens), 't2s_gen') if config.training.guidance_scale > 0: uncond_input_ids, uncond_attention_mask = uni_prompting(([''] * len(prompts), audio_tokens), 't2s_gen') else: uncond_input_ids = None uncond_attention_mask = None if config.get("mask_schedule", None) is not None: schedule = config.mask_schedule.schedule args = config.mask_schedule.get("params", {}) mask_schedule = get_mask_schedule(schedule, **args) else: mask_schedule = get_mask_schedule(config.training.get("mask_schedule", "cosine")) with torch.no_grad(): # TODO: Implement t2s_generate gen_token_ids = model.t2s_generate( input_ids=input_ids, uncond_input_ids=uncond_input_ids, attention_mask=attention_mask, uncond_attention_mask=uncond_attention_mask, guidance_scale=config.training.guidance_scale, temperature=config.training.get("generation_temperature", 1.0), timesteps=config.training.generation_timesteps, noise_schedule=mask_schedule, noise_type=config.training.get("noise_type", "mask"), seq_len=config.model.mmada.num_speech_vq_tokens, uni_prompting=uni_prompting, config=config, ) gen_token_ids = torch.clamp(gen_token_ids, max=config.model.mmada.speech_codebook_size - 1, min=0) id_list = gen_token_ids[0].cpu().tolist() print(len(id_list)) speech_unit_str = " ".join(map(str, id_list)) speech_unit_for_decode = "".join([f"<|speech_{unit}|>" for unit in speech_unit_str.split(" ")]) output_wav_path = f"/home/work/AIDAS/output/omada_tmp/generated_audio_step_{train_step}_{step}_item.wav" # Using a default condition, this can be made more dynamic if needed condition = 'gender-female_emotion-neutral_speed-normal_pitch-normal' vq_model.decode( speech_unit_for_decode, condition=condition, output_wav_file=output_wav_path ) wandb.log({ f"Generated Audio/{step*config.training.batch_size}": wandb.Audio(output_wav_path, caption=prompts) }, step=step)