-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathdata_loader.py
273 lines (232 loc) · 10.5 KB
/
data_loader.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
import os
import subprocess
import codecs
from tempfile import NamedTemporaryFile
from torch.utils.data.sampler import Sampler
import librosa
import numpy as np
import scipy.signal
import torch
import torchaudio
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
windows = {'hamming': scipy.signal.hamming, 'hann': scipy.signal.hann, 'blackman': scipy.signal.blackman,
'bartlett': scipy.signal.bartlett}
def load_audio(path):
sound, _ = torchaudio.load(path)
sound = sound.numpy()
if len(sound.shape) > 1:
if sound.shape[1] == 1:
sound = sound.squeeze()
else:
sound = sound.mean(axis=1) # multiple channels, average
return sound
class AudioParser(object):
def parse_transcript(self, transcript_path):
"""
:param transcript_path: Path where transcript is stored from the manifest file
:return: Transcript in training/testing format
"""
raise NotImplementedError
def parse_audio(self, audio_path):
"""
:param audio_path: Path where audio is stored from the manifest file
:return: Audio in training/testing format
"""
raise NotImplementedError
class NoiseInjection(object):
def __init__(self,
path=None,
sample_rate=16000,
noise_levels=(0, 0.5)):
"""
Adds noise to an input signal with specific SNR. Higher the noise level, the more noise added.
Modified code from https://github.com/willfrey/audio/blob/master/torchaudio/transforms.py
"""
if not os.path.exists(path):
print("Directory doesn't exist: {}".format(path))
raise IOError
self.paths = path is not None and librosa.util.find_files(path)
self.sample_rate = sample_rate
self.noise_levels = noise_levels
def inject_noise(self, data):
noise_path = np.random.choice(self.paths)
noise_level = np.random.uniform(*self.noise_levels)
return self.inject_noise_sample(data, noise_path, noise_level)
def inject_noise_sample(self, data, noise_path, noise_level):
noise_len = get_audio_length(noise_path)
data_len = len(data) / self.sample_rate
noise_start = np.random.rand() * (noise_len - data_len)
noise_end = noise_start + data_len
noise_dst = audio_with_sox(noise_path, self.sample_rate, noise_start, noise_end)
assert len(data) == len(noise_dst)
noise_energy = np.sqrt(noise_dst.dot(noise_dst) / noise_dst.size)
data_energy = np.sqrt(data.dot(data) / data.size)
data += noise_level * noise_dst * data_energy / noise_energy
return data
class SpectrogramParser(AudioParser):
def __init__(self, audio_conf, normalize=False, augment=False):
"""
Parses audio file into spectrogram with optional normalization and various augmentations
:param audio_conf: Dictionary containing the sample rate, window and the window length/stride in seconds
:param normalize(default False): Apply standard mean and deviation normalization to audio tensor
:param augment(default False): Apply random tempo and gain perturbations
"""
super(SpectrogramParser, self).__init__()
self.window_stride = audio_conf['window_stride']
self.window_size = audio_conf['window_size']
self.sample_rate = audio_conf['sample_rate']
self.window = windows.get(audio_conf['window'], windows['hamming'])
self.normalize = normalize
self.augment = augment
self.noiseInjector = NoiseInjection(audio_conf['noise_dir'], self.sample_rate,
audio_conf['noise_levels']) if audio_conf.get(
'noise_dir') is not None else None
self.noise_prob = audio_conf.get('noise_prob')
def parse_audio(self, audio_path):
if self.augment:
y = load_randomly_augmented_audio(audio_path, self.sample_rate)
else:
y = load_audio(audio_path)
if self.noiseInjector:
add_noise = np.random.binomial(1, self.noise_prob)
if add_noise:
y = self.noiseInjector.inject_noise(y)
n_fft = int(self.sample_rate * self.window_size)
win_length = n_fft
hop_length = int(self.sample_rate * self.window_stride)
# STFT
D = librosa.stft(y, n_fft=n_fft, hop_length=hop_length,
win_length=win_length, window=self.window)
spect, phase = librosa.magphase(D)
# S = log(S+1)
spect = np.log1p(spect)
spect = torch.FloatTensor(spect)
if self.normalize:
mean = spect.mean()
std = spect.std()
spect.add_(-mean)
spect.div_(std)
return spect
def parse_transcript(self, transcript_path):
raise NotImplementedError
class SpectrogramDataset(Dataset, SpectrogramParser):
def __init__(self, audio_conf, manifest_filepath, labels, normalize=False, augment=False):
"""
Dataset that loads tensors via a csv containing file paths to audio files and transcripts separated by
a comma. Each new line is a different sample. Example below:
/path/to/audio.wav,/path/to/audio.txt
...
:param audio_conf: Dictionary containing the sample rate, window and the window length/stride in seconds
:param manifest_filepath: Path to manifest csv as describe above
:param labels: String containing all the possible characters to map to
:param normalize: Apply standard mean and deviation normalization to audio tensor
:param augment(default False): Apply random tempo and gain perturbations
"""
with open(manifest_filepath) as f:
ids = f.readlines()
ids = [x.strip().split(',') for x in ids]
self.ids = ids
self.size = len(ids)
self.labels_map = dict([(labels[i], i) for i in range(len(labels))])
super(SpectrogramDataset, self).__init__(audio_conf, normalize, augment)
def __getitem__(self, index):
sample = self.ids[index]
audio_path, transcript_path = sample[0], sample[1]
spect = self.parse_audio(audio_path)
transcript = self.parse_transcript(transcript_path)
return spect, transcript
def parse_transcript(self, transcript_path):
with codecs.open(transcript_path, 'r',encoding='utf-8') as transcript_file:
transcript = transcript_file.read().replace('\n', '')
transcript = list(filter(None, [self.labels_map.get(x) for x in list(transcript)]))
return transcript
def __len__(self):
return self.size
def _collate_fn(batch):
def func(p):
return p[0].size(1)
longest_sample = max(batch, key=func)[0]
freq_size = longest_sample.size(0)
minibatch_size = len(batch)
max_seqlength = longest_sample.size(1)
inputs = torch.zeros(minibatch_size, 1, freq_size, max_seqlength)
input_percentages = torch.FloatTensor(minibatch_size)
target_sizes = torch.IntTensor(minibatch_size)
targets = []
for x in range(minibatch_size):
sample = batch[x]
tensor = sample[0]
target = sample[1]
seq_length = tensor.size(1)
inputs[x][0].narrow(1, 0, seq_length).copy_(tensor)
input_percentages[x] = seq_length / float(max_seqlength)
target_sizes[x] = len(target)
targets.extend(target)
targets = torch.IntTensor(targets)
return inputs, targets, input_percentages, target_sizes
class AudioDataLoader(DataLoader):
def __init__(self, *args, **kwargs):
"""
Creates a data loader for AudioDatasets.
"""
super(AudioDataLoader, self).__init__(*args, **kwargs)
self.collate_fn = _collate_fn
class BucketingSampler(Sampler):
def __init__(self, data_source, batch_size=1):
"""
Samples batches assuming they are in order of size to batch similarly sized samples together.
"""
super(BucketingSampler, self).__init__(data_source)
self.data_source = data_source
ids = list(range(0, len(data_source)))
self.bins = [ids[i:i + batch_size] for i in range(0, len(ids), batch_size)]
def __iter__(self):
for ids in self.bins:
np.random.shuffle(ids)
yield ids
def __len__(self):
return len(self.bins)
def shuffle(self):
np.random.shuffle(self.bins)
def get_audio_length(path):
output = subprocess.check_output(['soxi -D \"%s\"' % path.strip()], shell=True)
return float(output)
def audio_with_sox(path, sample_rate, start_time, end_time):
"""
crop and resample the recording with sox and loads it.
"""
with NamedTemporaryFile(suffix=".wav") as tar_file:
tar_filename = tar_file.name
sox_params = "sox \"{}\" -r {} -c 1 -b 16 -e si {} trim {} ={} >/dev/null 2>&1".format(path, sample_rate,
tar_filename, start_time,
end_time)
os.system(sox_params)
y = load_audio(tar_filename)
return y
def augment_audio_with_sox(path, sample_rate, tempo, gain):
"""
Changes tempo and gain of the recording with sox and loads it.
"""
with NamedTemporaryFile(suffix=".wav") as augmented_file:
augmented_filename = augmented_file.name
sox_augment_params = ["tempo", "{:.3f}".format(tempo), "gain", "{:.3f}".format(gain)]
sox_params = "sox \"{}\" -r {} -c 1 -b 16 -e si {} {} >/dev/null 2>&1".format(path, sample_rate,
augmented_filename,
" ".join(sox_augment_params))
os.system(sox_params)
y = load_audio(augmented_filename)
return y
def load_randomly_augmented_audio(path, sample_rate=16000, tempo_range=(0.85, 1.15),
gain_range=(-6, 8)):
"""
Picks tempo and gain uniformly, applies it to the utterance by using sox utility.
Returns the augmented utterance.
"""
low_tempo, high_tempo = tempo_range
tempo_value = np.random.uniform(low=low_tempo, high=high_tempo)
low_gain, high_gain = gain_range
gain_value = np.random.uniform(low=low_gain, high=high_gain)
audio = augment_audio_with_sox(path=path, sample_rate=sample_rate,
tempo=tempo_value, gain=gain_value)
return audio