Spaces:
Running
on
Zero
Running
on
Zero
| import csv | |
| import os | |
| import pickle | |
| import sys | |
| import numpy as np | |
| import torch | |
| import random | |
| import math | |
| import librosa | |
| class audio_video_spec_fullset_Dataset(torch.utils.data.Dataset): | |
| # Only Load audio dataset: for training Stage1: Audio Npy Dataset | |
| def __init__(self, split, dataset1, feat_type='clip', transforms=None, sr=22050, duration=10, truncate=220000, fps=21.5, drop=0.0, fix_frames=False, hop_len=256): | |
| super().__init__() | |
| if split == "train": | |
| self.split = "Train" | |
| elif split == "valid" or split == 'test': | |
| self.split = "Test" | |
| # Default params: | |
| self.min_duration = 2 | |
| self.sr = sr # 22050 | |
| self.duration = duration # 10 | |
| self.truncate = truncate # 220000 | |
| self.fps = fps | |
| self.fix_frames = fix_frames | |
| self.hop_len = hop_len | |
| self.drop = drop | |
| print("Fix Frames: {}".format(self.fix_frames)) | |
| print("Use Drop: {}".format(self.drop)) | |
| # Dataset1: (VGGSound) | |
| assert dataset1.dataset_name == "VGGSound" | |
| # spec_dir: spectrogram path | |
| # feat_dir: CAVP feature path | |
| # video_dir: video path | |
| dataset1_spec_dir = os.path.join(dataset1.data_dir, "mel_maa2", "npy") | |
| dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp") | |
| dataset1_video_dir = os.path.join(dataset1.video_dir, "tmp_vid") | |
| split_txt_path = dataset1.split_txt_path | |
| with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: | |
| data_list1 = f.readlines() | |
| data_list1 = list(map(lambda x: x.strip(), data_list1)) | |
| spec_list1 = list(map(lambda x: os.path.join(dataset1_spec_dir, x) + "_mel.npy", data_list1)) # spec | |
| feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat | |
| video_list1 = list(map(lambda x: os.path.join(dataset1_video_dir, x) + "_new_fps_21.5_truncate_0_10.0.mp4", data_list1)) # video | |
| # Merge Data: | |
| self.data_list = data_list1 if self.split != "Test" else data_list1[:200] | |
| self.spec_list = spec_list1 if self.split != "Test" else spec_list1[:200] | |
| self.feat_list = feat_list1 if self.split != "Test" else feat_list1[:200] | |
| self.video_list = video_list1 if self.split != "Test" else video_list1[:200] | |
| assert len(self.data_list) == len(self.spec_list) == len(self.feat_list) == len(self.video_list) | |
| shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) | |
| self.data_list = [self.data_list[i] for i in shuffle_idx] | |
| self.spec_list = [self.spec_list[i] for i in shuffle_idx] | |
| self.feat_list = [self.feat_list[i] for i in shuffle_idx] | |
| self.video_list = [self.video_list[i] for i in shuffle_idx] | |
| print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) | |
| def __len__(self): | |
| return len(self.data_list) | |
| def load_spec_and_feat(self, spec_path, video_feat_path): | |
| """Load audio spec and video feat""" | |
| try: | |
| spec_raw = np.load(spec_path).astype(np.float32) # channel: 1 | |
| except: | |
| print(f"corrupted mel: {spec_path}", flush=True) | |
| spec_raw = np.zeros((80, 625), dtype=np.float32) # [C, T] | |
| p = np.random.uniform(0,1) | |
| if p > self.drop: | |
| try: | |
| video_feat = np.load(video_feat_path)['feat'].astype(np.float32) | |
| except: | |
| print(f"corrupted video: {video_feat_path}", flush=True) | |
| video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) | |
| else: | |
| video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) | |
| spec_len = self.sr * self.duration / self.hop_len | |
| if spec_raw.shape[1] < spec_len: | |
| spec_raw = np.tile(spec_raw, math.ceil(spec_len / spec_raw.shape[1])) | |
| spec_raw = spec_raw[:, :int(spec_len)] | |
| feat_len = self.fps * self.duration | |
| if video_feat.shape[0] < feat_len: | |
| video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1)) | |
| video_feat = video_feat[:int(feat_len)] | |
| return spec_raw, video_feat | |
| def mix_audio_and_feat(self, spec1=None, spec2=None, video_feat1=None, video_feat2=None, video_info_dict={}, mode='single'): | |
| """ Return Mix Spec and Mix video feat""" | |
| if mode == "single": | |
| # spec1: | |
| if not self.fix_frames: | |
| start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start | |
| else: | |
| start_idx = 0 | |
| start_frame = int(self.fps * start_idx / self.sr) | |
| truncate_frame = int(self.fps * self.truncate / self.sr) | |
| # Spec Start & Truncate: | |
| spec_start = int(start_idx / self.hop_len) | |
| spec_truncate = int(self.truncate / self.hop_len) | |
| spec1 = spec1[:, spec_start : spec_start + spec_truncate] | |
| video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] | |
| # info_dict: | |
| video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame+truncate_frame) # Start frame, end frame | |
| video_info_dict['video_time2'] = "" | |
| return spec1, video_feat1, video_info_dict | |
| elif mode == "concat": | |
| total_spec_len = int(self.truncate / self.hop_len) | |
| # Random Trucate len: | |
| spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) | |
| spec2_truncate_len = total_spec_len - spec1_truncate_len | |
| # Sample spec clip: | |
| spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) | |
| spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) | |
| spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len | |
| # concat spec: | |
| spec1, spec2 = spec1[:, spec_start1 : spec_end1], spec2[:, spec_start2 : spec_end2] | |
| concat_audio_spec = np.concatenate([spec1, spec2], axis=1) | |
| # Concat Video Feat: | |
| start1_frame, truncate1_frame = int(self.fps * spec_start1 * self.hop_len / self.sr), int(self.fps * spec1_truncate_len * self.hop_len / self.sr) | |
| start2_frame, truncate2_frame = int(self.fps * spec_start2 * self.hop_len / self.sr), int(self.fps * self.truncate / self.sr) - truncate1_frame | |
| video_feat1, video_feat2 = video_feat1[start1_frame : start1_frame + truncate1_frame], video_feat2[start2_frame : start2_frame + truncate2_frame] | |
| concat_video_feat = np.concatenate([video_feat1, video_feat2]) | |
| video_info_dict['video_time1'] = str(start1_frame) + '_' + str(start1_frame+truncate1_frame) # Start frame, end frame | |
| video_info_dict['video_time2'] = str(start2_frame) + '_' + str(start2_frame+truncate2_frame) | |
| return concat_audio_spec, concat_video_feat, video_info_dict | |
| def __getitem__(self, idx): | |
| audio_name1 = self.data_list[idx] | |
| spec_npy_path1 = self.spec_list[idx] | |
| video_feat_path1 = self.feat_list[idx] | |
| video_path1 = self.video_list[idx] | |
| # select other video: | |
| flag = False | |
| if random.uniform(0, 1) < 0.5: | |
| flag = True | |
| random_idx = idx | |
| while random_idx == idx: | |
| random_idx = random.randint(0, len(self.data_list)-1) | |
| audio_name2 = self.data_list[random_idx] | |
| spec_npy_path2 = self.spec_list[random_idx] | |
| video_feat_path2 = self.feat_list[random_idx] | |
| video_path2 = self.video_list[random_idx] | |
| # Load the Spec and Feat: | |
| spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1) | |
| if flag: | |
| spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, video_feat_path2) | |
| video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2, 'video_path1': video_path1, 'video_path2': video_path2} | |
| mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1, spec2, video_feat1, video_feat2, video_info_dict, mode='concat') | |
| else: | |
| video_info_dict = {'audio_name1':audio_name1, 'audio_name2': "", 'video_path1': video_path1, 'video_path2': ""} | |
| mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single') | |
| # print("mix spec shape:", mix_spec.shape) | |
| # print("mix video feat:", mix_video_feat.shape) | |
| data_dict = {} | |
| # data_dict['mix_spec'] = mix_spec[None].repeat(3, axis=0) # TODO:要把这里改掉,否则无法适应maa的autoencoder | |
| data_dict['mix_spec'] = mix_spec # (80, 512) | |
| data_dict['mix_video_feat'] = mix_video_feat # (32, 512) | |
| data_dict['mix_info_dict'] = mix_info | |
| return data_dict | |
| class audio_video_spec_fullset_Dataset_Train(audio_video_spec_fullset_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='train', **dataset_cfg) | |
| class audio_video_spec_fullset_Dataset_Valid(audio_video_spec_fullset_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='valid', **dataset_cfg) | |
| class audio_video_spec_fullset_Dataset_Test(audio_video_spec_fullset_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='test', **dataset_cfg) | |
| class audio_video_spec_fullset_Dataset_inpaint(audio_video_spec_fullset_Dataset): | |
| def __getitem__(self, idx): | |
| audio_name1 = self.data_list[idx] | |
| spec_npy_path1 = self.spec_list[idx] | |
| video_feat_path1 = self.feat_list[idx] | |
| video_path1 = self.video_list[idx] | |
| # Load the Spec and Feat: | |
| spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1) | |
| video_info_dict = {'audio_name1': audio_name1, 'audio_name2': "", 'video_path1': video_path1, 'video_path2': ""} | |
| mix_spec, mix_masked_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict) | |
| # print("mix spec shape:", mix_spec.shape) | |
| # print("mix video feat:", mix_video_feat.shape) | |
| data_dict = {} | |
| # data_dict['mix_spec'] = mix_spec[None].repeat(3, axis=0) # TODO:要把这里改掉,否则无法适应maa的autoencoder | |
| data_dict['mix_spec'] = mix_spec # (80, 512) | |
| data_dict['hybrid_feat'] = {'mix_video_feat': mix_video_feat, 'mix_spec': mix_masked_spec} # (32, 512) | |
| data_dict['mix_info_dict'] = mix_info | |
| return data_dict | |
| def mix_audio_and_feat(self, spec1=None, video_feat1=None, video_info_dict={}): | |
| """ Return Mix Spec and Mix video feat""" | |
| # spec1: | |
| if not self.fix_frames: | |
| start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start | |
| else: | |
| start_idx = 0 | |
| start_frame = int(self.fps * start_idx / self.sr) | |
| truncate_frame = int(self.fps * self.truncate / self.sr) | |
| # Spec Start & Truncate: | |
| spec_start = int(start_idx / self.hop_len) | |
| spec_truncate = int(self.truncate / self.hop_len) | |
| spec1 = spec1[:, spec_start: spec_start + spec_truncate] | |
| video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] | |
| # Start masking frames: | |
| masked_spec = random.randint(1, int(spec_truncate * 0.5 // 16)) * 16 # 16帧的倍数,最多mask 50% | |
| masked_truncate = int(masked_spec * self.hop_len) | |
| masked_frame = int(self.fps * masked_truncate / self.sr) | |
| start_masked_idx = random.randint(0, self.truncate - masked_truncate - 1) | |
| start_masked_frame = int(self.fps * start_masked_idx / self.sr) | |
| start_masked_spec = int(start_masked_idx / self.hop_len) | |
| masked_spec1 = np.zeros((80, spec_truncate)).astype(np.float32) | |
| masked_spec1[:] = spec1[:] | |
| masked_spec1[:, start_masked_spec:start_masked_spec+masked_spec] = np.zeros((80, masked_spec)) | |
| video_feat1[start_masked_frame:start_masked_frame+masked_frame, :] = np.zeros((masked_frame, 512)) | |
| # info_dict: | |
| video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame + truncate_frame) # Start frame, end frame | |
| video_info_dict['video_time2'] = "" | |
| return spec1, masked_spec1, video_feat1, video_info_dict | |
| class audio_video_spec_fullset_Dataset_inpaint_Train(audio_video_spec_fullset_Dataset_inpaint): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='train', **dataset_cfg) | |
| class audio_video_spec_fullset_Dataset_inpaint_Valid(audio_video_spec_fullset_Dataset_inpaint): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='valid', **dataset_cfg) | |
| class audio_video_spec_fullset_Dataset_inpaint_Test(audio_video_spec_fullset_Dataset_inpaint): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='test', **dataset_cfg) | |
| class audio_Dataset(torch.utils.data.Dataset): | |
| # Only Load audio dataset: for training Stage1: Audio Npy Dataset | |
| def __init__(self, split, dataset1, sr=22050, duration=10, truncate=220000, debug_num=False, fix_frames=False, hop_len=256): | |
| super().__init__() | |
| if split == "train": | |
| self.split = "Train" | |
| elif split == "valid" or split == 'test': | |
| self.split = "Test" | |
| # Default params: | |
| self.min_duration = 2 | |
| self.sr = sr # 22050 | |
| self.duration = duration # 10 | |
| self.truncate = truncate # 220000 | |
| self.fix_frames = fix_frames | |
| self.hop_len = hop_len | |
| print("Fix Frames: {}".format(self.fix_frames)) | |
| # Dataset1: (VGGSound) | |
| assert dataset1.dataset_name == "VGGSound" | |
| # spec_dir: spectrogram path | |
| # dataset1_spec_dir = os.path.join(dataset1.data_dir, "codec") | |
| dataset1_wav_dir = os.path.join(dataset1.wav_dir, "wav") | |
| split_txt_path = dataset1.split_txt_path | |
| with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: | |
| data_list1 = f.readlines() | |
| data_list1 = list(map(lambda x: x.strip(), data_list1)) | |
| wav_list1 = list(map(lambda x: os.path.join(dataset1_wav_dir, x) + ".wav", data_list1)) # feat | |
| # Merge Data: | |
| self.data_list = data_list1 | |
| self.wav_list = wav_list1 | |
| assert len(self.data_list) == len(self.wav_list) | |
| shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) | |
| self.data_list = [self.data_list[i] for i in shuffle_idx] | |
| self.wav_list = [self.wav_list[i] for i in shuffle_idx] | |
| if debug_num: | |
| self.data_list = self.data_list[:debug_num] | |
| self.wav_list = self.wav_list[:debug_num] | |
| print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) | |
| def __len__(self): | |
| return len(self.data_list) | |
| def load_spec_and_feat(self, wav_path): | |
| """Load audio spec and video feat""" | |
| try: | |
| wav_raw, sr = librosa.load(wav_path, sr=self.sr) # channel: 1 | |
| except: | |
| print(f"corrupted wav: {wav_path}", flush=True) | |
| wav_raw = np.zeros((160000,), dtype=np.float32) # [T] | |
| wav_len = self.sr * self.duration | |
| if wav_raw.shape[0] < wav_len: | |
| wav_raw = np.tile(wav_raw, math.ceil(wav_len / wav_raw.shape[0])) | |
| wav_raw = wav_raw[:int(wav_len)] | |
| return wav_raw | |
| def mix_audio_and_feat(self, wav_raw1=None, video_info_dict={}, mode='single'): | |
| """ Return Mix Spec and Mix video feat""" | |
| if mode == "single": | |
| # spec1: | |
| if not self.fix_frames: | |
| start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start | |
| else: | |
| start_idx = 0 | |
| wav_start = start_idx | |
| wav_truncate = self.truncate | |
| wav_raw1 = wav_raw1[wav_start: wav_start + wav_truncate] | |
| return wav_raw1, video_info_dict | |
| elif mode == "concat": | |
| total_spec_len = int(self.truncate / self.hop_len) | |
| # Random Trucate len: | |
| spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) | |
| spec2_truncate_len = total_spec_len - spec1_truncate_len | |
| # Sample spec clip: | |
| spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) | |
| spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) | |
| spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len | |
| # concat spec: | |
| return video_info_dict | |
| def __getitem__(self, idx): | |
| audio_name1 = self.data_list[idx] | |
| wav_path1 = self.wav_list[idx] | |
| # select other video: | |
| flag = False | |
| if random.uniform(0, 1) < -1: | |
| flag = True | |
| random_idx = idx | |
| while random_idx == idx: | |
| random_idx = random.randint(0, len(self.data_list)-1) | |
| audio_name2 = self.data_list[random_idx] | |
| spec_npy_path2 = self.spec_list[random_idx] | |
| wav_path2 = self.wav_list[random_idx] | |
| # Load the Spec and Feat: | |
| wav_raw1 = self.load_spec_and_feat(wav_path1) | |
| if flag: | |
| spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, wav_path2) | |
| video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2} | |
| mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(video_info_dict, mode='concat') | |
| else: | |
| video_info_dict = {'audio_name1':audio_name1, 'audio_name2': ""} | |
| mix_wav, mix_info = self.mix_audio_and_feat(wav_raw1=wav_raw1, video_info_dict=video_info_dict, mode='single') | |
| data_dict = {} | |
| data_dict['mix_wav'] = mix_wav # (131072,) | |
| data_dict['mix_info_dict'] = mix_info | |
| return data_dict | |
| class audio_Dataset_Train(audio_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='train', **dataset_cfg) | |
| class audio_Dataset_Test(audio_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='test', **dataset_cfg) | |
| class audio_Dataset_Valid(audio_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='valid', **dataset_cfg) | |
| class video_codec_Dataset(torch.utils.data.Dataset): | |
| # Only Load audio dataset: for training Stage1: Audio Npy Dataset | |
| def __init__(self, split, dataset1, sr=22050, duration=10, truncate=220000, fps=21.5, debug_num=False, fix_frames=False, hop_len=256): | |
| super().__init__() | |
| if split == "train": | |
| self.split = "Train" | |
| elif split == "valid" or split == 'test': | |
| self.split = "Test" | |
| # Default params: | |
| self.min_duration = 2 | |
| self.fps = fps | |
| self.sr = sr # 22050 | |
| self.duration = duration # 10 | |
| self.truncate = truncate # 220000 | |
| self.fix_frames = fix_frames | |
| self.hop_len = hop_len | |
| print("Fix Frames: {}".format(self.fix_frames)) | |
| # Dataset1: (VGGSound) | |
| assert dataset1.dataset_name == "VGGSound" | |
| # spec_dir: spectrogram path | |
| # dataset1_spec_dir = os.path.join(dataset1.data_dir, "codec") | |
| dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp") | |
| dataset1_wav_dir = os.path.join(dataset1.wav_dir, "wav") | |
| split_txt_path = dataset1.split_txt_path | |
| with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: | |
| data_list1 = f.readlines() | |
| data_list1 = list(map(lambda x: x.strip(), data_list1)) | |
| wav_list1 = list(map(lambda x: os.path.join(dataset1_wav_dir, x) + ".wav", data_list1)) # feat | |
| feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat | |
| # Merge Data: | |
| self.data_list = data_list1 | |
| self.wav_list = wav_list1 | |
| self.feat_list = feat_list1 | |
| assert len(self.data_list) == len(self.wav_list) | |
| shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) | |
| self.data_list = [self.data_list[i] for i in shuffle_idx] | |
| self.wav_list = [self.wav_list[i] for i in shuffle_idx] | |
| self.feat_list = [self.feat_list[i] for i in shuffle_idx] | |
| if debug_num: | |
| self.data_list = self.data_list[:debug_num] | |
| self.wav_list = self.wav_list[:debug_num] | |
| self.feat_list = self.feat_list[:debug_num] | |
| print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) | |
| def __len__(self): | |
| return len(self.data_list) | |
| def load_spec_and_feat(self, wav_path, video_feat_path): | |
| """Load audio spec and video feat""" | |
| try: | |
| wav_raw, sr = librosa.load(wav_path, sr=self.sr) # channel: 1 | |
| except: | |
| print(f"corrupted wav: {wav_path}", flush=True) | |
| wav_raw = np.zeros((160000,), dtype=np.float32) # [T] | |
| try: | |
| video_feat = np.load(video_feat_path)['feat'].astype(np.float32) | |
| except: | |
| print(f"corrupted video: {video_feat_path}", flush=True) | |
| video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) | |
| wav_len = self.sr * self.duration | |
| if wav_raw.shape[0] < wav_len: | |
| wav_raw = np.tile(wav_raw, math.ceil(wav_len / wav_raw.shape[0])) | |
| wav_raw = wav_raw[:int(wav_len)] | |
| feat_len = self.fps * self.duration | |
| if video_feat.shape[0] < feat_len: | |
| video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1)) | |
| video_feat = video_feat[:int(feat_len)] | |
| return wav_raw, video_feat | |
| def mix_audio_and_feat(self, wav_raw1=None, video_feat1=None, video_info_dict={}, mode='single'): | |
| """ Return Mix Spec and Mix video feat""" | |
| if mode == "single": | |
| # spec1: | |
| if not self.fix_frames: | |
| start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start | |
| else: | |
| start_idx = 0 | |
| wav_start = start_idx | |
| wav_truncate = self.truncate | |
| wav_raw1 = wav_raw1[wav_start: wav_start + wav_truncate] | |
| start_frame = int(self.fps * start_idx / self.sr) | |
| truncate_frame = int(self.fps * self.truncate / self.sr) | |
| video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] | |
| # info_dict: | |
| video_info_dict['video_time1'] = str(start_frame) + '_' + str(start_frame+truncate_frame) # Start frame, end frame | |
| video_info_dict['video_time2'] = "" | |
| return wav_raw1, video_feat1, video_info_dict | |
| elif mode == "concat": | |
| total_spec_len = int(self.truncate / self.hop_len) | |
| # Random Trucate len: | |
| spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, total_spec_len - self.min_duration * self.sr // self.hop_len - 1) | |
| spec2_truncate_len = total_spec_len - spec1_truncate_len | |
| # Sample spec clip: | |
| spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) | |
| spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) | |
| spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len | |
| # concat spec: | |
| return video_info_dict | |
| def __getitem__(self, idx): | |
| audio_name1 = self.data_list[idx] | |
| wav_path1 = self.wav_list[idx] | |
| video_feat_path1 = self.feat_list[idx] | |
| # select other video: | |
| flag = False | |
| if random.uniform(0, 1) < -1: | |
| flag = True | |
| random_idx = idx | |
| while random_idx == idx: | |
| random_idx = random.randint(0, len(self.data_list)-1) | |
| audio_name2 = self.data_list[random_idx] | |
| wav_path2 = self.wav_list[random_idx] | |
| video_feat_path2 = self.feat_list[random_idx] | |
| # Load the Spec and Feat: | |
| wav_raw1, video_feat1 = self.load_spec_and_feat(wav_path1, video_feat_path1) | |
| if flag: | |
| wav_raw2, video_feat2 = self.load_spec_and_feat(wav_path2, video_feat_path2) | |
| video_info_dict = {'audio_name1':audio_name1, 'audio_name2': audio_name2} | |
| mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(video_info_dict, mode='concat') | |
| else: | |
| video_info_dict = {'audio_name1':audio_name1, 'audio_name2': ""} | |
| mix_wav, mix_video_feat, mix_info = self.mix_audio_and_feat(wav_raw1=wav_raw1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single') | |
| data_dict = {} | |
| data_dict['mix_wav'] = mix_wav # (131072,) | |
| data_dict['mix_video_feat'] = mix_video_feat # (32, 512) | |
| data_dict['mix_info_dict'] = mix_info | |
| return data_dict | |
| class video_codec_Dataset_Train(video_codec_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='train', **dataset_cfg) | |
| class video_codec_Dataset_Test(video_codec_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='test', **dataset_cfg) | |
| class video_codec_Dataset_Valid(video_codec_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='valid', **dataset_cfg) | |
| class audio_video_spec_fullset_Audioset_Dataset(torch.utils.data.Dataset): | |
| # Only Load audio dataset: for training Stage1: Audio Npy Dataset | |
| def __init__(self, split, dataset1, dataset2, sr=22050, duration=10, truncate=220000, | |
| fps=21.5, drop=0.0, fix_frames=False, hop_len=256): | |
| super().__init__() | |
| if split == "train": | |
| self.split = "Train" | |
| elif split == "valid" or split == 'test': | |
| self.split = "Test" | |
| # Default params: | |
| self.min_duration = 2 | |
| self.sr = sr # 22050 | |
| self.duration = duration # 10 | |
| self.truncate = truncate # 220000 | |
| self.fps = fps | |
| self.fix_frames = fix_frames | |
| self.hop_len = hop_len | |
| self.drop = drop | |
| print("Fix Frames: {}".format(self.fix_frames)) | |
| print("Use Drop: {}".format(self.drop)) | |
| # Dataset1: (VGGSound) | |
| assert dataset1.dataset_name == "VGGSound" | |
| assert dataset2.dataset_name == "Audioset" | |
| # spec_dir: spectrogram path | |
| # feat_dir: CAVP feature path | |
| # video_dir: video path | |
| dataset1_spec_dir = os.path.join(dataset1.data_dir, "mel_maa2", "npy") | |
| dataset1_feat_dir = os.path.join(dataset1.data_dir, "cavp") | |
| split_txt_path = dataset1.split_txt_path | |
| with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: | |
| data_list1 = f.readlines() | |
| data_list1 = list(map(lambda x: x.strip(), data_list1)) | |
| spec_list1 = list(map(lambda x: os.path.join(dataset1_spec_dir, x) + "_mel.npy", data_list1)) # spec | |
| feat_list1 = list(map(lambda x: os.path.join(dataset1_feat_dir, x) + ".npz", data_list1)) # feat | |
| if split == "train": | |
| dataset2_spec_dir = os.path.join(dataset2.data_dir, "mel") | |
| dataset2_feat_dir = os.path.join(dataset2.data_dir, "cavp_renamed") | |
| split_txt_path = dataset2.split_txt_path | |
| with open(os.path.join(split_txt_path, '{}.txt'.format(self.split)), "r") as f: | |
| data_list2 = f.readlines() | |
| data_list2 = list(map(lambda x: x.strip(), data_list2)) | |
| spec_list2 = list(map(lambda x: os.path.join(dataset2_spec_dir, f'Y{x}') + "_mel.npy", data_list2)) # spec | |
| feat_list2 = list(map(lambda x: os.path.join(dataset2_feat_dir, x) + ".npz", data_list2)) # feat | |
| data_list1 += data_list2 | |
| spec_list1 += spec_list2 | |
| feat_list1 += feat_list2 | |
| # Merge Data: | |
| self.data_list = data_list1 if self.split != "Test" else data_list1[:200] | |
| self.spec_list = spec_list1 if self.split != "Test" else spec_list1[:200] | |
| self.feat_list = feat_list1 if self.split != "Test" else feat_list1[:200] | |
| assert len(self.data_list) == len(self.spec_list) == len(self.feat_list) | |
| shuffle_idx = np.random.permutation(np.arange(len(self.data_list))) | |
| self.data_list = [self.data_list[i] for i in shuffle_idx] | |
| self.spec_list = [self.spec_list[i] for i in shuffle_idx] | |
| self.feat_list = [self.feat_list[i] for i in shuffle_idx] | |
| print('Split: {} Sample Num: {}'.format(split, len(self.data_list))) | |
| # self.check(self.spec_list) | |
| def __len__(self): | |
| return len(self.data_list) | |
| def check(self, feat_list): | |
| from tqdm import tqdm | |
| for spec_path in tqdm(feat_list): | |
| mel = np.load(spec_path).astype(np.float32) | |
| if mel.shape[0] != 80: | |
| import ipdb | |
| ipdb.set_trace() | |
| def load_spec_and_feat(self, spec_path, video_feat_path): | |
| """Load audio spec and video feat""" | |
| spec_raw = np.load(spec_path).astype(np.float32) # channel: 1 | |
| if spec_raw.shape[0] != 80: | |
| print(f"corrupted mel: {spec_path}", flush=True) | |
| spec_raw = np.zeros((80, 625), dtype=np.float32) # [C, T] | |
| p = np.random.uniform(0, 1) | |
| if p > self.drop: | |
| try: | |
| video_feat = np.load(video_feat_path)['feat'].astype(np.float32) | |
| except: | |
| print(f"corrupted video: {video_feat_path}", flush=True) | |
| video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) | |
| else: | |
| video_feat = np.load(os.path.join(os.path.dirname(video_feat_path), 'empty_vid.npz'))['feat'].astype(np.float32) | |
| spec_len = self.sr * self.duration / self.hop_len | |
| if spec_raw.shape[1] < spec_len: | |
| spec_raw = np.tile(spec_raw, math.ceil(spec_len / spec_raw.shape[1])) | |
| spec_raw = spec_raw[:, :int(spec_len)] | |
| feat_len = self.fps * self.duration | |
| if video_feat.shape[0] < feat_len: | |
| video_feat = np.tile(video_feat, (math.ceil(feat_len / video_feat.shape[0]), 1)) | |
| video_feat = video_feat[:int(feat_len)] | |
| return spec_raw, video_feat | |
| def mix_audio_and_feat(self, spec1=None, spec2=None, video_feat1=None, video_feat2=None, video_info_dict={}, | |
| mode='single'): | |
| """ Return Mix Spec and Mix video feat""" | |
| if mode == "single": | |
| # spec1: | |
| if not self.fix_frames: | |
| start_idx = random.randint(0, self.sr * self.duration - self.truncate - 1) # audio start | |
| else: | |
| start_idx = 0 | |
| start_frame = int(self.fps * start_idx / self.sr) | |
| truncate_frame = int(self.fps * self.truncate / self.sr) | |
| # Spec Start & Truncate: | |
| spec_start = int(start_idx / self.hop_len) | |
| spec_truncate = int(self.truncate / self.hop_len) | |
| spec1 = spec1[:, spec_start: spec_start + spec_truncate] | |
| video_feat1 = video_feat1[start_frame: start_frame + truncate_frame] | |
| # info_dict: | |
| video_info_dict['video_time1'] = str(start_frame) + '_' + str( | |
| start_frame + truncate_frame) # Start frame, end frame | |
| video_info_dict['video_time2'] = "" | |
| return spec1, video_feat1, video_info_dict | |
| elif mode == "concat": | |
| total_spec_len = int(self.truncate / self.hop_len) | |
| # Random Trucate len: | |
| spec1_truncate_len = random.randint(self.min_duration * self.sr // self.hop_len, | |
| total_spec_len - self.min_duration * self.sr // self.hop_len - 1) | |
| spec2_truncate_len = total_spec_len - spec1_truncate_len | |
| # Sample spec clip: | |
| spec_start1 = random.randint(0, total_spec_len - spec1_truncate_len - 1) | |
| spec_start2 = random.randint(0, total_spec_len - spec2_truncate_len - 1) | |
| spec_end1, spec_end2 = spec_start1 + spec1_truncate_len, spec_start2 + spec2_truncate_len | |
| # concat spec: | |
| spec1, spec2 = spec1[:, spec_start1: spec_end1], spec2[:, spec_start2: spec_end2] | |
| concat_audio_spec = np.concatenate([spec1, spec2], axis=1) | |
| # Concat Video Feat: | |
| start1_frame, truncate1_frame = int(self.fps * spec_start1 * self.hop_len / self.sr), int( | |
| self.fps * spec1_truncate_len * self.hop_len / self.sr) | |
| start2_frame, truncate2_frame = int(self.fps * spec_start2 * self.hop_len / self.sr), int( | |
| self.fps * self.truncate / self.sr) - truncate1_frame | |
| video_feat1, video_feat2 = video_feat1[start1_frame: start1_frame + truncate1_frame], video_feat2[ | |
| start2_frame: start2_frame + truncate2_frame] | |
| concat_video_feat = np.concatenate([video_feat1, video_feat2]) | |
| video_info_dict['video_time1'] = str(start1_frame) + '_' + str( | |
| start1_frame + truncate1_frame) # Start frame, end frame | |
| video_info_dict['video_time2'] = str(start2_frame) + '_' + str(start2_frame + truncate2_frame) | |
| return concat_audio_spec, concat_video_feat, video_info_dict | |
| def __getitem__(self, idx): | |
| audio_name1 = self.data_list[idx] | |
| spec_npy_path1 = self.spec_list[idx] | |
| video_feat_path1 = self.feat_list[idx] | |
| # select other video: | |
| flag = False | |
| if random.uniform(0, 1) < -1: | |
| flag = True | |
| random_idx = idx | |
| while random_idx == idx: | |
| random_idx = random.randint(0, len(self.data_list) - 1) | |
| audio_name2 = self.data_list[random_idx] | |
| spec_npy_path2 = self.spec_list[random_idx] | |
| video_feat_path2 = self.feat_list[random_idx] | |
| # Load the Spec and Feat: | |
| spec1, video_feat1 = self.load_spec_and_feat(spec_npy_path1, video_feat_path1) | |
| if flag: | |
| spec2, video_feat2 = self.load_spec_and_feat(spec_npy_path2, video_feat_path2) | |
| video_info_dict = {'audio_name1': audio_name1, 'audio_name2': audio_name2} | |
| mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1, spec2, video_feat1, video_feat2, video_info_dict, mode='concat') | |
| else: | |
| video_info_dict = {'audio_name1': audio_name1, 'audio_name2': ""} | |
| mix_spec, mix_video_feat, mix_info = self.mix_audio_and_feat(spec1=spec1, video_feat1=video_feat1, video_info_dict=video_info_dict, mode='single') | |
| # print("mix spec shape:", mix_spec.shape) | |
| # print("mix video feat:", mix_video_feat.shape) | |
| data_dict = {} | |
| data_dict['mix_spec'] = mix_spec # (80, 512) | |
| data_dict['mix_video_feat'] = mix_video_feat # (32, 512) | |
| data_dict['mix_info_dict'] = mix_info | |
| return data_dict | |
| class audio_video_spec_fullset_Audioset_Train(audio_video_spec_fullset_Audioset_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='train', **dataset_cfg) | |
| class audio_video_spec_fullset_Audioset_Valid(audio_video_spec_fullset_Audioset_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='valid', **dataset_cfg) | |
| class audio_video_spec_fullset_Audioset_Test(audio_video_spec_fullset_Audioset_Dataset): | |
| def __init__(self, dataset_cfg): | |
| super().__init__(split='test', **dataset_cfg) |