forked from hanqingguo/infocom2021
-
Notifications
You must be signed in to change notification settings - Fork 0
/
generator.py
167 lines (133 loc) · 6.68 KB
/
generator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
########################################################
# Perfrom STFT for each files before training
########################################################
import os
import glob
import tqdm
import torch
import random
import librosa
import argparse
import numpy as np
from multiprocessing import Pool, cpu_count
from utils.audio import Audio
from utils.hparams import HParam
def formatter(dir_, form, num):
return os.path.join(dir_, form.replace('*', '%06d' % num))
def vad_merge(w):
intervals = librosa.effects.split(w, top_db=20)
temp = list()
for s, e in intervals:
temp.append(w[s:e])
return np.concatenate(temp, axis=None)
def mix(hp, args, audio, num, s1_dvec, s1_target, s2, train):
srate = hp.audio.sample_rate
dir_ = os.path.join(args.out_dir, 'train' if train else 'test')
d, _ = librosa.load(s1_dvec, sr=srate)
w1, _ = librosa.load(s1_target, sr=srate)
w2, _ = librosa.load(s2, sr=srate)
assert len(d.shape) == len(w1.shape) == len(w2.shape) == 1, \
'wav files must be mono, not stereo'
d, _ = librosa.effects.trim(d, top_db=20)
w1, _ = librosa.effects.trim(w1, top_db=20)
w2, _ = librosa.effects.trim(w2, top_db=20)
# if reference for d-vector is too short, discard it
if d.shape[0] < 1.1 * hp.embedder.window * hp.audio.hop_length:
return
# LibriSpeech dataset have many silent interval, so let's vad-merge them
# VoiceFilter paper didn't do that. To test SDR in same way, don't vad-merge.
if args.vad == 1:
w1, w2 = vad_merge(w1), vad_merge(w2)
# I think random segment length will be better, but let's follow the paper first
# fit audio to `hp.data.audio_len` seconds.
# if merged audio is shorter than `L`, discard it
L = int(srate * hp.data.audio_len)
if w1.shape[0] < L or w2.shape[0] < L:
return
w1, w2 = w1[:L], w2[:L]
mixed = w1 + w2
norm = np.max(np.abs(mixed)) * 1.1
w1, w2, mixed = w1/norm, w2/norm, mixed/norm
# save vad & normalized wav files
target_wav_path = formatter(dir_, hp.form.target.wav, num)
mixed_wav_path = formatter(dir_, hp.form.mixed.wav, num)
librosa.output.write_wav(target_wav_path, w1, srate)
librosa.output.write_wav(mixed_wav_path, mixed, srate)
# save magnitude & phase spectrograms
target_mag, target_phase = audio.wav2spec(w1)
mixed_mag, mixed_phase = audio.wav2spec(mixed)
target_mag_path = formatter(dir_, hp.form.target.mag, num)
target_phase_path = formatter(dir_, hp.form.target.phase, num)
mixed_mag_path = formatter(dir_, hp.form.mixed.mag, num)
mixed_phase_path = formatter(dir_, hp.form.mixed.phase, num)
torch.save(torch.from_numpy(target_mag), target_mag_path)
torch.save(torch.from_numpy(target_phase), target_phase_path)
torch.save(torch.from_numpy(mixed_mag), mixed_mag_path)
torch.save(torch.from_numpy(mixed_phase), mixed_phase_path)
# save selected sample as text file. d-vec will be calculated soon
dvec_text_path = formatter(dir_, hp.form.dvec, num)
with open(dvec_text_path, 'w') as f:
f.write(s1_dvec)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--config', type=str, required=True,
help="yaml file for configuration")
parser.add_argument('-d', '--libri_dir', type=str, default=None,
help="Directory of LibriSpeech dataset, containing folders of train-clean-100, train-clean-360, dev-clean.")
parser.add_argument('-v', '--voxceleb_dir', type=str, default=None,
help="Directory of VoxCeleb2 dataset, ends with 'aac'")
parser.add_argument('-o', '--out_dir', type=str, required=True,
help="Directory of output training triplet")
parser.add_argument('-p', '--process_num', type=int, default=None,
help='number of processes to run. default: cpu_count')
parser.add_argument('--vad', type=int, default=0,
help='apply vad to wav file. yes(1) or no(0, default)')
args = parser.parse_args()
os.makedirs(args.out_dir, exist_ok=True)
os.makedirs(os.path.join(args.out_dir, 'train'), exist_ok=True)
os.makedirs(os.path.join(args.out_dir, 'test'), exist_ok=True)
hp = HParam(args.config)
cpu_num = cpu_count() if args.process_num is None else args.process_num
if args.libri_dir is None and args.voxceleb_dir is None:
raise Exception("Please provide directory of data")
if args.libri_dir is not None:
train_folders = [x for x in glob.glob(os.path.join(args.libri_dir, 'train-clean-360', '*'))
if os.path.isdir(x)]
# [x for x in glob.glob(os.path.join(args.libri_dir, 'train-clean-100', '*'))
# if os.path.isdir(x)] + \
# [x for x in glob.glob(os.path.join(args.libri_dir, 'train-clean-360', '*'))
# if os.path.isdir(x)]
# we recommned to exclude train-other-500
# See https://github.com/mindslab-ai/voicefilter/issues/5#issuecomment-497746793
# + \
#[x for x in glob.glob(os.path.join(args.libri_dir, 'train-other-500', '*'))
# if os.path.isdir(x)]
test_folders = [x for x in glob.glob(os.path.join(args.libri_dir, 'train-clean-100', '*'))]
elif args.voxceleb_dir is not None:
all_folders = [x for x in glob.glob(os.path.join(args.voxceleb_dir, '*'))
if os.path.isdir(x)]
train_folders = all_folders[:-20]
test_folders = all_folders[-20:]
train_spk = [glob.glob(os.path.join(spk, '**', hp.form.input), recursive=True)
for spk in train_folders]
train_spk = [x for x in train_spk if len(x) >= 2]
test_spk = [glob.glob(os.path.join(spk, '**', hp.form.input), recursive=True)
for spk in test_folders]
test_spk = [x for x in test_spk if len(x) >= 2]
audio = Audio(hp)
def train_wrapper(num):
spk1, spk2 = random.sample(train_spk, 2)
s1_dvec, s1_target = random.sample(spk1, 2)
s2 = random.choice(spk2)
mix(hp, args, audio, num, s1_dvec, s1_target, s2, train=True)
def test_wrapper(num):
spk1, spk2 = random.sample(test_spk, 2)
s1_dvec, s1_target = random.sample(spk1, 2)
s2 = random.choice(spk2)
mix(hp, args, audio, num, s1_dvec, s1_target, s2, train=False)
arr = list(range(10**4))
with Pool(cpu_num) as p:
r = list(tqdm.tqdm(p.imap(train_wrapper, arr), total=len(arr)))
arr = list(range(10**2))
with Pool(cpu_num) as p:
r = list(tqdm.tqdm(p.imap(test_wrapper, arr), total=len(arr)))