最近在关注Rainforest Connection Species Audio Detection ,然后看到CCF的练习项目 ,借此机会学习一下音频相关的处理方案。
github仓库链接:https://github.com/blueloveTH/speech_commands_recognition
任务简介 训练集有57886条,测试集有6835条,每条数据都是一段1秒左右的语音,其中包含一个单词,一共有30种可能,因此是一个30分类任务。
还有一位同学对赛题进行了更详细的介绍,对任务还不太理解的朋友,我强烈建议你看一下↓
2020CCFBDCI通用音频分类CNN方案(0.90+方案)_wherewegogo的博客-CSDN博客
特征工程 特征部分采用1x32x32的对数梅尔频谱图(Log-Melspectrogram) ,关于它的介绍网上已有很多资料。这是一种很常规的特征,几乎所有音频任务都有用到。
nn类方法通常不用MFCC,因为MFCC是频谱图经有损变换得到的,去除了相关性。而nn具有强大的端到端学习能力,直接使用原始频谱图能提取出更多信息。(事实上,直接喂16000维的原始波形也能达到90+准确率)
librosa包提供了melspec的实现,下面是提取特征的核心代码。先将所有音频截取到定长,即1秒16000次采样,再用librosa提取频谱图,最后转换到对数域,以放大低频部分的信息。
1 2 3 4 5 6 7 8 9 10 11 def crop_or_pad (y, length=16000 ): if len (y) < length: y = np.concatenate([y, np.zeros(length - len (y))]) elif len (y) > length: y = y[: length] return y def extract_features (wav ): wav = crop_or_pad(wav) melspec = librosa.feature.melspectrogram(wav, sr=16000 , n_mels=32 ) return librosa.power_to_db(melspec)
归一化 由于缺乏对频谱图意义的理解,查阅文章 How to normalize spectrograms ,其探讨了频谱图应该如何进行归一化的问题。方法有下列几种:
本文测试了不归一化、逐样本和全局归一化的效果,后二者略好一点。github仓库中用的是全局归一化,即预计算训练集上的均值和方差,如下:
1 2 3 mean, std = -34.256863 , 17.706715 def normalize (x ): return (x - mean) / std
模型架构(resnet)
本文使用了pytorch配合keras4torch 搭建模型和训练管线。
keras4torch是我们正在开发的pypi包,它是keras接口的一个子集,后端基于pytorch,其代码对于torch用户和keras用户都较容易理解。
残差块定义 首先是BN+ReLU+Conv2d的三连
1 2 3 4 5 6 7 8 9 import keras4torch as k4timport torch.nn as nndef bn_relu_conv (channels, stride ): return nn.Sequential( k4t.layers.BatchNorm2d(), nn.ReLU(inplace=True ), k4t.layers.Conv2d(channels, kernel_size=3 , stride=stride, padding=1 , bias=False ) )
这里使用了keras4torch提供的Conv2d,它是torch.nn.Conv2d的封装,实现了自动尺寸推断,无需设定输入通道数。
下面是残差连接。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class _ResidualBlock (nn.Module): def __init__ (self, in_channels, out_channels, stride ): super (_ResidualBlock, self ).__init__() self .sequential = nn.Sequential( bn_relu_conv(out_channels, stride=stride), bn_relu_conv(out_channels, stride=1 ) ) self .equalInOut = (in_channels == out_channels) if not self .equalInOut: self .conv_shortcut = k4t.layers.Conv2d(out_channels, kernel_size=1 , stride=stride, padding=0 , bias=False ) def forward (self, x ): if not self .equalInOut: return self .conv_shortcut(x) + self .sequential(x) else : return x + self .sequential(x)
如果输入和输出通道不一致,我们需要一个额外的线性卷积层使它们的通道数相同。
这一步操作判断了输入通道数,因此继承KerasLayer以获取in_shape,从而使残差块具有自动尺寸推断的特性。
1 2 3 class ResidualBlock (k4t.layers.KerasLayer): def build (self, in_shape: torch.Size ): return _ResidualBlock(in_shape[1 ], *self .args, **self .kwargs)
多个残差块 stack_blocks将n个残差块叠起来。
1 2 3 4 def stack_blocks (n, channels, stride ): return nn.Sequential( *[ResidualBlock(channels, stride if i == 0 else 1 ) for i in range (n)] )
最后用Sequential构建整个模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def wideresnet (depth, num_classes, widen_factor=10 ): nChannels = [16 , 16 *widen_factor, 32 *widen_factor, 64 *widen_factor] n = (depth - 4 ) // 6 model = nn.Sequential( k4t.layers.Conv2d(nChannels[0 ], kernel_size=3 , stride=1 , padding=1 , bias=False ), stack_blocks(n, nChannels[1 ], stride=1 ), stack_blocks(n, nChannels[2 ], stride=2 ), stack_blocks(n, nChannels[3 ], stride=2 ), k4t.layers.BatchNorm2d(), nn.ReLU(inplace=True ), nn.AdaptiveAvgPool2d(1 ), nn.Flatten(), k4t.layers.Linear(num_classes) ) return model
训练设定 优化器使用SGD,动量为0.9,L2正则系数为0.01。
初始学习率为1e-2,然后按下述公式调整。
$$ \begin{equation} learning_rate(epoch) = \begin{cases} 10^{-2} & epoch\le13 \ 3\times10^{-3} & 13 \lt epoch \le 20 \ 9\times10^{-4} & 20 \lt epoch \le 27 \ 2.7\times10^{-4} & 27 \lt epoch \le 34 \ 8.1\times10^{-5} & epoch \gt 34 \ \end{cases} \end{equation} $$
动态学习率非常重要,有利于优化器找到损失更低的点。如果使用固定学习率,效果会比较差。
接着使用keras4torch.Model 封装torch模型以集成keras风格的训练api,包括.compile(), .fit(), .evaluate()和.predict()。
支持DataLoader的相应版本为 .fit_dl(), .evaluate_dl(), 和.predict_dl()。
1 2 3 4 5 6 7 8 9 10 def build_model (): model = wideresnet(depth=28 , widen_factor=10 , num_classes=NUM_CLASSES) model = k4t.Model(model).build([1 , 32 , 32 ]) model.compile (optimizer=torch.optim.SGD(model.parameters(), lr=1e-2 , momentum=0.9 , weight_decay=1e-2 ), loss=k4t.losses.CELoss(label_smoothing=0.1 ), metrics=['acc' ], device='cuda' ) return model
数据增强 使用在线增强,对原始波形应用三种简单变换:
每种变换都以50%概率应用于每个波形,即每个epoch有约12.5%的数据未经变换,其余数据经历不同程度的变换,这样整个训练过程模型仍以接近1的概率看到全部原始数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 class StochasticTransform (object ): def __call__ (self, x ): return self .transform(x) if random.random() < 0.5 else x @abstractclassmethod def transform (self, x ): pass class ChangeAmplitude (StochasticTransform ): def __init__ (self, min =0.7 , max =1.2 ): self .amplitude_range = (min , max ) def transform (self, x ): x = x * random.uniform(*self .amplitude_range) return x class ChangeSpeedAndPitch (StochasticTransform ): def __init__ (self, max_scale=0.2 ): self .max_scale = max_scale def transform (self, x ): scale = random.uniform(-self .max_scale, self .max_scale) speed_fac = 1.0 / (1 + scale) x = np.interp(np.arange(0 , len (x), speed_fac), np.arange(0 ,len (x)), x).astype(np.float32) return x class TimeShift (StochasticTransform ): def __init__ (self, frac_0=8 , frac_1=3 ): self .frac_0 = frac_0 self .frac_1 = frac_1 def transform (self, x ): a = np.arange(len (x)) a = np.roll(a, np.random.randint(len (x)//self .frac_0, len (x)//self .frac_1)) return x[a]
此外,https://github.com/johnmartinsson/bird-species-classification/wiki/Data-Augmentation 也提供了很棒的参考。
在线增强 能使模型看到数十倍于原始数据量的增强实例,从而使学习到的函数比较平滑。另一种策略是离线增强 ,即事先预计算增强实例,混合后与原始数据一块喂给模型,优点是速度比较快。
最后是五折交叉验证,ModelCheckpoint将保存验证集上表现最好的模型用于预测。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from sklearn.model_selection import StratifiedKFoldfrom torch.optim.lr_scheduler import MultiStepLRkfold = StratifiedKFold(n_splits=5 , shuffle=True , random_state=2020 ) y_proba = np.zeros([len (x_test), NUM_CLASSES]).astype(np.float32) model_name = 'wideresnet28' for i, (trn, val) in enumerate (kfold.split(x_train, y_train)): print (f'Processing fold {i} :' ) model = build_model() lr_scheduler = MultiStepLR(model.trainer.optimizer, milestones=[13 , 20 , 27 , 34 ], gamma=0.3 ) lr_scheduler = k4t.callbacks.LRScheduler(lr_scheduler) model_checkpoint = k4t.callbacks.ModelCheckpoint(f'best_{model_name} _{i} .pt' , monitor='val_acc' ) trn_loader, val_loader = make_dataloader(x_train[trn], y_train[trn], x_train[val], y_train[val]) history = model.fit_dl(trn_loader, epochs=40 , val_loader=val_loader, callbacks=[model_checkpoint, lr_scheduler] ) model.load_weights(f'best_{model_name} _{i} .pt' ) y_proba += model.predict(x_test, activation=nn.Softmax(-1 )) y_proba /= kfold.n_splits
实验结果