提交voiceprint代码

wc
翁程 4 years ago
parent 6910979d90
commit 8a873cc430

@ -0,0 +1,108 @@
import os
import random
import sys
import librosa
import numpy as np
import tensorflow as tf
from tqdm import tqdm
# 获取浮点数组
def _float_feature(value):
if not isinstance(value, list):
value = [value]
return tf.train.Feature(float_list=tf.train.FloatList(value=value))
# 获取整型数据
def _int64_feature(value):
if not isinstance(value, list):
value = [value]
return tf.train.Feature(int64_list=tf.train.Int64List(value=value))
# 把数据添加到TFRecord中
def data_example(data, label):
feature = {
'data': _float_feature(data),
'label': _int64_feature(label),
}
return tf.train.Example(features=tf.train.Features(feature=feature))
# 开始创建tfrecord数据
def create_data_tfrecord(data_list_path, save_path):
with open(data_list_path, 'r') as f:
data = f.readlines()
with tf.io.TFRecordWriter(save_path) as writer:
for d in data:
try:
path, label = d.replace('\n', '').split('\t')
wav, sr = librosa.load(path, sr=16000)
intervals = librosa.effects.split(wav, top_db=20)
wav_output = []
# [可能需要修改参数] 音频长度 16000 * 秒数
wav_len = int(16000 * 2.04)
for sliced in intervals:
wav_output.extend(wav[sliced[0]:sliced[1]])
for i in range(20):
# 裁剪过长的音频过短的补0
if len(wav_output) > wav_len:
l = len(wav_output) - wav_len
r = random.randint(0, l)
wav_output = wav_output[r:wav_len + r]
else:
wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32))
wav_output = np.array(wav_output)
# 转成梅尔频谱
ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).reshape(-1).tolist()
# [可能需要修改参数] 梅尔频谱shape librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).shape
if len(ps) != 128 * 128: continue
tf_example = data_example(ps, int(label))
writer.write(tf_example.SerializeToString())
if len(wav_output) <= wav_len:
break
except Exception as e:
print(e)
# 生成数据列表
def get_data_list(audio_path, list_path):
files = os.listdir(audio_path)
f_train = open(os.path.join(list_path, 'train_list.txt'), 'w')
f_test = open(os.path.join(list_path, 'test_list.txt'), 'w')
sound_sum = 0
s = set()
label = {}
for file in files:
if '.wav' not in file:
continue
index = file.find('-')
name = file[0:index] # 取出每个wav文件的名字
print(name)
if name not in s:
label[name] = len(s)
s.add(name)
sound_path = os.path.join(audio_path, file)
if sound_sum % 10 == 0: # 从第0个文件开始放入test顺序为10的倍数的文件放入test
f_test.write('%s\t%d\n' % (sound_path.replace('\\', '/'), label[name]))
else:
f_train.write('%s\t%d\n' % (sound_path.replace('\\', '/'), label[name]))
sound_sum += 1
f_test.close()
f_train.close()
if __name__ == '__main__':
trainername = sys.argv[1]
print('trainername = ' + trainername)
print(sys.path[0])
get_data_list(sys.path[0] + '/' + 'dataset/audio' + '/' + trainername, sys.path[0] + '/' + 'dataset')
create_data_tfrecord(sys.path[0] + '/' + 'dataset/train_list.txt', sys.path[0] + '/' + 'dataset/train.tfrecord')
create_data_tfrecord(sys.path[0] + '/' + 'dataset/test_list.txt', sys.path[0] + '/' + 'dataset/test.tfrecord')
command = "python " + sys.path[0] + "/" + "train.py" + " " + trainername
os.system(command)

@ -0,0 +1,101 @@
import os
import random
import librosa
import numpy as np
import tensorflow as tf
from tensorflow.keras.models import Model
layer_name = 'global_max_pooling2d'
#model = tf.keras.models.load_model('models/resnet.h5')
#intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)
# 读取音频数据
def load_data(data_path):
wav, sr = librosa.load(data_path, sr=16000)
intervals = librosa.effects.split(wav, top_db=20)
wav_output = []
for sliced in intervals:
wav_output.extend(wav[sliced[0]:sliced[1]])
assert len(wav_output) >= 8000, "有效音频小于0.5s"
wav_output = np.array(wav_output)
ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32)
ps = ps[np.newaxis, ..., np.newaxis]
return ps
def infer(audio_path):
data = load_data(audio_path)
feature = intermediate_layer_model.predict(data)
return feature
if __name__ == '__main__':
# 要预测的两个人的音频文件
# person1 = 'dataset/wc-5.wav'
# person2 = 'dataset/wc-1.wav'
# feature1 = infer(person1)[0]
# feature2 = infer(person2)[0]
# # 对角余弦值
# dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2))
# if dist > 0.92:
# print("%s 和 %s 为同一个人,相似度为:%f" % (person1, person2, dist))
# else:
# print("%s 和 %s 不是同一个人,相似度为:%f" % (person1, person2, dist))
dirs = os.listdir('dataset/predict')
for dir in dirs:
dist_sum = 0
print(dir)
model_path = "models" + '/' + dir + '/resnet.h5'
#model_path = os.path.join(model_path, 'resnet.h5')
print(model_path + " is running")
model = tf.keras.models.load_model(model_path)
intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output)
person2 = 'dataset/wctest.wav'
feature2 = infer(person2)[0]
dir = os.path.join('dataset/predict', dir)
print(dir)
files = os.listdir(dir)
length = len(files)
predict_num = 20
for i in range(0, predict_num):
index = random.randint(0, length - 1)
# print(files[index])
person1 = os.path.join(dir, files[index])
feature1 = infer(person1)[0]
dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2))
dist_sum = dist + dist_sum
if dist > 0.92:
print("%s%s 为同一个人,相似度为:%f" % (person1, person2, dist))
else:
print("%s%s 不是同一个人,相似度为:%f" % (person1, person2, dist))
dist_avg = dist_sum / predict_num
if dist_avg > 0.925:
print("为同一个人,相似度为:%f" % dist_avg)
else:
print("不是同一个人,相似度为:%f" % dist_avg)
# dist_sum = 0
# for file in files:
# person1 = os.path.join('dataset/audio', file)
# feature1 = infer(person1)[0]
# dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2))
# dist_sum = dist + dist_sum
# if dist > 0.92:
# print("%s 和 %s 为同一个人,相似度为:%f" % (person1, person2, dist))
# else:
# print("%s 和 %s 不是同一个人,相似度为:%f" % (person1, person2, dist))
#
# num = len(files)
# dist_avg = dist_sum / num
# print(dist_avg)
# if dist_avg > 0.925:
# print("为同一个人,相似度为:%f" % (dist_avg))
# else:
# print("不是同一个人,相似度为:%f" % (dist_avg))

@ -0,0 +1,27 @@
import tensorflow as tf
def _parse_data_function(example):
# [可能需要修改参数】 设置的梅尔频谱的shape相乘的值
data_feature_description = {
'data': tf.io.FixedLenFeature([16384], tf.float32),
'label': tf.io.FixedLenFeature([], tf.int64),
}
return tf.io.parse_single_example(example, data_feature_description)
def train_reader_tfrecord(data_path, num_epochs, batch_size):
raw_dataset = tf.data.TFRecordDataset(data_path)
train_dataset = raw_dataset.map(_parse_data_function)
train_dataset = train_dataset.shuffle(buffer_size=1000) \
.repeat(count=num_epochs) \
.batch(batch_size=batch_size) \
.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)
return train_dataset
def test_reader_tfrecord(data_path, batch_size):
raw_dataset = tf.data.TFRecordDataset(data_path)
test_dataset = raw_dataset.map(_parse_data_function)
test_dataset = test_dataset.batch(batch_size=batch_size)
return test_dataset

@ -0,0 +1,95 @@
import os
import sys
import tensorflow as tf
import reader
import numpy as np
class_dim = 1 # train_list里面有几个分类这里的值要等于txt文件中的数字+1
EPOCHS = 500
BATCH_SIZE = 32
init_model = None
model = tf.keras.models.Sequential([
tf.keras.applications.ResNet50V2(include_top=False, weights=None, input_shape=(128, None, 1)),
tf.keras.layers.ActivityRegularization(l2=0.5),
tf.keras.layers.Dropout(rate=0.5),
tf.keras.layers.GlobalMaxPooling2D(),
tf.keras.layers.Dense(units=class_dim, activation=tf.nn.softmax)
])
model.summary()
# 定义优化方法
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
workdir = sys.path[0]
train_dataset = reader.train_reader_tfrecord(workdir + '/' + 'dataset/train.tfrecord', EPOCHS, batch_size=BATCH_SIZE)
test_dataset = reader.test_reader_tfrecord(workdir + '/' + 'dataset/test.tfrecord', batch_size=BATCH_SIZE)
if init_model:
model.load_weights(init_model)
modelname = sys.argv[1]
modeldir = workdir + '/' + 'models' + '/' + modelname
print(modeldir)
isExists = os.path.exists(modeldir)
if not isExists:
os.makedirs(modeldir)
print(modeldir + ' 创建成功')
else:
print(modeldir + ' 已存在')
for batch_id, data in enumerate(train_dataset):
# [可能需要修改参数】 设置的梅尔频谱的shape
sounds = data['data'].numpy().reshape((-1, 128, 128, 1))
labels = data['label']
# 执行训练
with tf.GradientTape() as tape:
predictions = model(sounds)
# 获取损失值
train_loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions)
train_loss = tf.reduce_mean(train_loss)
# 获取准确率
train_accuracy = tf.keras.metrics.sparse_categorical_accuracy(labels, predictions)
train_accuracy = np.sum(train_accuracy.numpy()) / len(train_accuracy.numpy())
# 更新梯度
gradients = tape.gradient(train_loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
print(batch_id)
if batch_id % 20 == 0:
print("Batch %d, Loss %f, Accuracy %f" % (batch_id, train_loss.numpy(), train_accuracy))
if batch_id % 50 == 0 and batch_id != 0:
test_losses = list()
test_accuracies = list()
for d in test_dataset:
# [可能需要修改参数】 设置的梅尔频谱的shape
test_sounds = d['data'].numpy().reshape((-1, 128, 128, 1))
test_labels = d['label']
test_result = model(test_sounds)
# 获取损失值
test_loss = tf.keras.losses.sparse_categorical_crossentropy(test_labels, test_result)
test_loss = tf.reduce_mean(test_loss)
test_losses.append(test_loss)
# 获取准确率
test_accuracy = tf.keras.metrics.sparse_categorical_accuracy(test_labels, test_result)
test_accuracy = np.sum(test_accuracy.numpy()) / len(test_accuracy.numpy())
test_accuracies.append(test_accuracy)
print('=================================================')
print("Test, Loss %f, Accuracy %f" % (
sum(test_losses) / len(test_losses), sum(test_accuracies) / len(test_accuracies)))
print("models is creating")
print('=================================================')
# 保存模型
model.save(filepath=modeldir + '/resnet.h5')
model.save_weights(filepath=modeldir + '/model_weights.h5')
print("训练完成")
Loading…
Cancel
Save