-
Notifications
You must be signed in to change notification settings - Fork 5
/
dataset.py
136 lines (108 loc) · 5.17 KB
/
dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import os
import numpy as np
import pandas as pd
import random
from glob import glob
from scipy.io import wavfile
from scipy.signal import stft
from sklearn.model_selection import train_test_split
from keras.utils import to_categorical
class DatasetGenerator():
def __init__(self, label_set,
sample_rate=16000):
self.label_set = label_set
self.sample_rate = sample_rate
# Covert string to numerical classes
def text_to_labels(self, text):
return self.label_set.index(text)
# Reverse translation of numerical classes back to characters
def labels_to_text(self, labels):
return self.label_set[labels]
def load_data(self, DIR):
# Get all paths inside DIR that ends with wav
wav_files = glob(os.path.join(DIR, '*/*wav'))
wav_files = [x.split(sep='\\')[1] + '/' + x.split(sep='\\')[2] for x in wav_files]
# Loop over files to get samples
data = []
for e in wav_files:
label, name = e.split('/')
if label in self.label_set:
label_id = self.text_to_labels(label)
fle = os.path.join(DIR, e)
sample = (label, label_id, name, fle)
data.append(sample)
# Data Frames with samples' labels and paths
df = pd.DataFrame(data, columns = ['label', 'label_id', 'user_id', 'wav_file'])
self.df = df
return self.df
def apply_train_test_split(self, test_size, random_state):
self.df_train, self.df_test = train_test_split(self.df,
test_size=test_size,
random_state=random_state)
def apply_train_val_split(self, val_size, random_state):
self.df_train, self.df_val = train_test_split(self.df_train,
test_size=val_size,
random_state=random_state)
def read_wav_file(self, x):
# Read wavfile using scipy wavfile.read
_, wav = wavfile.read(x)
# Normalize
wav = wav.astype(np.float32) / np.iinfo(np.int16).max
return wav
def process_wav_file(self, x, threshold_freq=5500, eps=1e-10):
# Read wav file to array
wav = self.read_wav_file(x)
# Sample rate
L = self.sample_rate
# If longer then randomly truncate
if len(wav) > L:
i = np.random.randint(0, len(wav) - L)
wav = wav[i:(i+L)]
# If shorter then randomly add silence
elif len(wav) < L:
rem_len = L - len(wav)
silence_part = np.random.randint(-100,100,16000).astype(np.float32) / np.iinfo(np.int16).max
j = np.random.randint(0, rem_len)
silence_part_left = silence_part[0:j]
silence_part_right = silence_part[j:rem_len]
wav = np.concatenate([silence_part_left, wav, silence_part_right])
# Create spectrogram using discrete FFT (change basis to frequencies)
freqs, times, spec = stft(wav, L, nperseg = 400, noverlap = 240, nfft = 512, padded = False, boundary = None)
# Cut high frequencies
if threshold_freq is not None:
spec = spec[freqs <= threshold_freq,:]
freqs = freqs[freqs <= threshold_freq]
# Log spectrogram
amp = np.log(np.abs(spec)+eps)
return np.expand_dims(amp, axis=2)
def generator(self, batch_size, mode):
while True:
# Depending on mode select DataFrame with paths
if mode == 'train':
df = self.df_train
ids = random.sample(range(df.shape[0]), df.shape[0])
elif mode == 'val':
df = self.df_val
ids = list(range(df.shape[0]))
elif mode == 'test':
df = self.df_test
ids = list(range(df.shape[0]))
else:
raise ValueError('The mode should be either train, val or test.')
# Create batches (for training data the batches are randomly permuted)
for start in range(0, len(ids), batch_size):
X_batch = []
if mode != 'test':
y_batch = []
end = min(start + batch_size, len(ids))
i_batch = ids[start:end]
for i in i_batch:
X_batch.append(self.process_wav_file(df.wav_file.values[i]))
if mode != 'test':
y_batch.append(df.label_id.values[i])
X_batch = np.array(X_batch)
if mode != 'test':
y_batch = to_categorical(y_batch, num_classes = len(self.label_set))
yield (X_batch, y_batch)
else:
yield X_batch