最近在关注Rainforest Connection Species Audio Detection,然后看到CCF的练习项目,借此机会学习一下音频相关的处理方案。

github仓库链接:https://github.com/blueloveTH/speech_commands_recognition

任务简介

训练集有57886条,测试集有6835条,每条数据都是一段1秒左右的语音,其中包含一个单词,一共有30种可能,因此是一个30分类任务。

还有一位同学对赛题进行了更详细的介绍,对任务还不太理解的朋友,我强烈建议你看一下↓

2020CCFBDCI通用音频分类CNN方案(0.90+方案)_wherewegogo的博客-CSDN博客

特征工程

特征部分采用1x32x32的对数梅尔频谱图(Log-Melspectrogram),关于它的介绍网上已有很多资料。这是一种很常规的特征,几乎所有音频任务都有用到。

nn类方法通常不用MFCC,因为MFCC是频谱图经有损变换得到的,去除了相关性。而nn具有强大的端到端学习能力,直接使用原始频谱图能提取出更多信息。(事实上,直接喂16000维的原始波形也能达到90+准确率)

librosa包提供了melspec的实现,下面是提取特征的核心代码。先将所有音频截取到定长,即1秒16000次采样,再用librosa提取频谱图,最后转换到对数域,以放大低频部分的信息。

1
2
3
4
5
6
7
8
9
10
11
def crop_or_pad(y, length=16000):
if len(y) < length:
y = np.concatenate([y, np.zeros(length - len(y))])
elif len(y) > length:
y = y[: length]
return y

def extract_features(wav):
wav = crop_or_pad(wav)
melspec = librosa.feature.melspectrogram(wav, sr=16000, n_mels=32)
return librosa.power_to_db(melspec)

归一化

由于缺乏对频谱图意义的理解,查阅文章 How to normalize spectrograms ,其探讨了频谱图应该如何进行归一化的问题。方法有下列几种:

  • 不归一化
  • 逐样本归一化
  • 逐通道归一化
  • 全局归一化

本文测试了不归一化、逐样本和全局归一化的效果,后二者略好一点。github仓库中用的是全局归一化,即预计算训练集上的均值和方差,如下:

1
2
3
mean, std = -34.256863, 17.706715
def normalize(x):
return (x - mean) / std

模型架构(resnet)

本文使用了pytorch配合keras4torch搭建模型和训练管线。

keras4torch是我们正在开发的pypi包,它是keras接口的一个子集,后端基于pytorch,其代码对于torch用户和keras用户都较容易理解。

残差块定义

首先是BN+ReLU+Conv2d的三连

1
2
3
4
5
6
7
8
9
import keras4torch as k4t
import torch.nn as nn

def bn_relu_conv(channels, stride):
return nn.Sequential(
k4t.layers.BatchNorm2d(),
nn.ReLU(inplace=True),
k4t.layers.Conv2d(channels, kernel_size=3, stride=stride, padding=1, bias=False)
)

这里使用了keras4torch提供的Conv2d,它是torch.nn.Conv2d的封装,实现了自动尺寸推断,无需设定输入通道数。

下面是残差连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class _ResidualBlock(nn.Module):
def __init__(self, in_channels, out_channels, stride):
super(_ResidualBlock, self).__init__()
self.sequential = nn.Sequential(
bn_relu_conv(out_channels, stride=stride),
bn_relu_conv(out_channels, stride=1)
)

self.equalInOut = (in_channels == out_channels)

if not self.equalInOut:
self.conv_shortcut = k4t.layers.Conv2d(out_channels, kernel_size=1, stride=stride, padding=0, bias=False)

def forward(self, x):
if not self.equalInOut:
return self.conv_shortcut(x) + self.sequential(x)
else:
return x + self.sequential(x)

如果输入和输出通道不一致,我们需要一个额外的线性卷积层使它们的通道数相同。

这一步操作判断了输入通道数,因此继承KerasLayer以获取in_shape,从而使残差块具有自动尺寸推断的特性。

1
2
3
class ResidualBlock(k4t.layers.KerasLayer):
def build(self, in_shape: torch.Size):
return _ResidualBlock(in_shape[1], *self.args, **self.kwargs)

多个残差块

stack_blocks将n个残差块叠起来。

1
2
3
4
def stack_blocks(n, channels, stride):
return nn.Sequential(
*[ResidualBlock(channels, stride if i == 0 else 1) for i in range(n)]
)

最后用Sequential构建整个模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def wideresnet(depth, num_classes, widen_factor=10):
nChannels = [16, 16*widen_factor, 32*widen_factor, 64*widen_factor]
n = (depth - 4) // 6

model = nn.Sequential(
k4t.layers.Conv2d(nChannels[0], kernel_size=3, stride=1, padding=1, bias=False),
stack_blocks(n, nChannels[1], stride=1),
stack_blocks(n, nChannels[2], stride=2),
stack_blocks(n, nChannels[3], stride=2),

k4t.layers.BatchNorm2d(), nn.ReLU(inplace=True),
nn.AdaptiveAvgPool2d(1), nn.Flatten(),
k4t.layers.Linear(num_classes)
)

return model

训练设定

优化器使用SGD,动量为0.9,L2正则系数为0.01。

初始学习率为1e-2,然后按下述公式调整。

$$
\begin{equation} learning_rate(epoch) = \begin{cases} 10^{-2} & epoch\le13 \ 3\times10^{-3} & 13 \lt epoch \le 20 \ 9\times10^{-4} & 20 \lt epoch \le 27 \ 2.7\times10^{-4} & 27 \lt epoch \le 34 \ 8.1\times10^{-5} & epoch \gt 34 \ \end{cases} \end{equation}
$$

动态学习率非常重要,有利于优化器找到损失更低的点。如果使用固定学习率,效果会比较差。

接着使用keras4torch.Model 封装torch模型以集成keras风格的训练api,包括.compile(), .fit(), .evaluate()和.predict()。

支持DataLoader的相应版本为 .fit_dl(), .evaluate_dl(), 和.predict_dl()。

1
2
3
4
5
6
7
8
9
10
def build_model():
model = wideresnet(depth=28, widen_factor=10, num_classes=NUM_CLASSES)

model = k4t.Model(model).build([1, 32, 32])

model.compile(optimizer=torch.optim.SGD(model.parameters(), lr=1e-2, momentum=0.9, weight_decay=1e-2),
loss=k4t.losses.CELoss(label_smoothing=0.1),
metrics=['acc'], device='cuda')

return model

数据增强

使用在线增强,对原始波形应用三种简单变换:

  • 改变振幅
  • 平移波形
  • 改变速度和音调

每种变换都以50%概率应用于每个波形,即每个epoch有约12.5%的数据未经变换,其余数据经历不同程度的变换,这样整个训练过程模型仍以接近1的概率看到全部原始数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class StochasticTransform(object):
def __call__(self, x):
return self.transform(x) if random.random() < 0.5 else x

@abstractclassmethod
def transform(self, x):
pass

class ChangeAmplitude(StochasticTransform):
def __init__(self, min=0.7, max=1.2):
self.amplitude_range = (min, max)

def transform(self, x):
x = x * random.uniform(*self.amplitude_range)
return x

class ChangeSpeedAndPitch(StochasticTransform):
def __init__(self, max_scale=0.2):
self.max_scale = max_scale

def transform(self, x):
scale = random.uniform(-self.max_scale, self.max_scale)
speed_fac = 1.0 / (1 + scale)
x = np.interp(np.arange(0, len(x), speed_fac), np.arange(0,len(x)), x).astype(np.float32)
return x

class TimeShift(StochasticTransform):
def __init__(self, frac_0=8, frac_1=3):
self.frac_0 = frac_0
self.frac_1 = frac_1

def transform(self, x):
a = np.arange(len(x))
a = np.roll(a, np.random.randint(len(x)//self.frac_0, len(x)//self.frac_1))
return x[a]

此外,https://github.com/johnmartinsson/bird-species-classification/wiki/Data-Augmentation 也提供了很棒的参考。

在线增强能使模型看到数十倍于原始数据量的增强实例,从而使学习到的函数比较平滑。另一种策略是离线增强,即事先预计算增强实例,混合后与原始数据一块喂给模型,优点是速度比较快。

最后是五折交叉验证,ModelCheckpoint将保存验证集上表现最好的模型用于预测。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from sklearn.model_selection import StratifiedKFold
from torch.optim.lr_scheduler import MultiStepLR

kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=2020)

y_proba = np.zeros([len(x_test), NUM_CLASSES]).astype(np.float32)
model_name = 'wideresnet28'

for i, (trn, val) in enumerate(kfold.split(x_train, y_train)):
print(f'Processing fold {i}:')

model = build_model()

lr_scheduler = MultiStepLR(model.trainer.optimizer, milestones=[13, 20, 27, 34], gamma=0.3)
lr_scheduler = k4t.callbacks.LRScheduler(lr_scheduler)

model_checkpoint = k4t.callbacks.ModelCheckpoint(f'best_{model_name}_{i}.pt', monitor='val_acc')

trn_loader, val_loader = make_dataloader(x_train[trn], y_train[trn], x_train[val], y_train[val])

history = model.fit_dl(trn_loader,
epochs=40,
val_loader=val_loader,
callbacks=[model_checkpoint, lr_scheduler]
)

model.load_weights(f'best_{model_name}_{i}.pt')
y_proba += model.predict(x_test, activation=nn.Softmax(-1))

y_proba /= kfold.n_splits

实验结果