diff --git a/src/voiceprint/create_data.py b/src/voiceprint/create_data.py new file mode 100644 index 0000000..26c2a53 --- /dev/null +++ b/src/voiceprint/create_data.py @@ -0,0 +1,108 @@ +import os +import random +import sys + +import librosa +import numpy as np +import tensorflow as tf +from tqdm import tqdm + + +# 获取浮点数组 +def _float_feature(value): + if not isinstance(value, list): + value = [value] + return tf.train.Feature(float_list=tf.train.FloatList(value=value)) + + +# 获取整型数据 +def _int64_feature(value): + if not isinstance(value, list): + value = [value] + return tf.train.Feature(int64_list=tf.train.Int64List(value=value)) + + +# 把数据添加到TFRecord中 +def data_example(data, label): + feature = { + 'data': _float_feature(data), + 'label': _int64_feature(label), + } + return tf.train.Example(features=tf.train.Features(feature=feature)) + + +# 开始创建tfrecord数据 +def create_data_tfrecord(data_list_path, save_path): + with open(data_list_path, 'r') as f: + data = f.readlines() + with tf.io.TFRecordWriter(save_path) as writer: + for d in data: + try: + path, label = d.replace('\n', '').split('\t') + wav, sr = librosa.load(path, sr=16000) + intervals = librosa.effects.split(wav, top_db=20) + wav_output = [] + # [可能需要修改参数] 音频长度 16000 * 秒数 + wav_len = int(16000 * 2.04) + for sliced in intervals: + wav_output.extend(wav[sliced[0]:sliced[1]]) + for i in range(20): + # 裁剪过长的音频,过短的补0 + if len(wav_output) > wav_len: + l = len(wav_output) - wav_len + r = random.randint(0, l) + wav_output = wav_output[r:wav_len + r] + else: + wav_output.extend(np.zeros(shape=[wav_len - len(wav_output)], dtype=np.float32)) + wav_output = np.array(wav_output) + # 转成梅尔频谱 + ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).reshape(-1).tolist() + # [可能需要修改参数] 梅尔频谱shape ,librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).shape + if len(ps) != 128 * 128: continue + tf_example = data_example(ps, int(label)) + writer.write(tf_example.SerializeToString()) + if len(wav_output) <= wav_len: + break + except Exception as e: + print(e) + + +# 生成数据列表 +def get_data_list(audio_path, list_path): + files = os.listdir(audio_path) + + f_train = open(os.path.join(list_path, 'train_list.txt'), 'w') + f_test = open(os.path.join(list_path, 'test_list.txt'), 'w') + + sound_sum = 0 + s = set() + label = {} + for file in files: + if '.wav' not in file: + continue + index = file.find('-') + name = file[0:index] # 取出每个wav文件的名字 + print(name) + if name not in s: + label[name] = len(s) + s.add(name) + sound_path = os.path.join(audio_path, file) + if sound_sum % 10 == 0: # 从第0个文件开始放入test,顺序为10的倍数的文件放入test + f_test.write('%s\t%d\n' % (sound_path.replace('\\', '/'), label[name])) + else: + f_train.write('%s\t%d\n' % (sound_path.replace('\\', '/'), label[name])) + sound_sum += 1 + + f_test.close() + f_train.close() + + +if __name__ == '__main__': + trainername = sys.argv[1] + print('trainername = ' + trainername) + print(sys.path[0]) + get_data_list(sys.path[0] + '/' + 'dataset/audio' + '/' + trainername, sys.path[0] + '/' + 'dataset') + create_data_tfrecord(sys.path[0] + '/' + 'dataset/train_list.txt', sys.path[0] + '/' + 'dataset/train.tfrecord') + create_data_tfrecord(sys.path[0] + '/' + 'dataset/test_list.txt', sys.path[0] + '/' + 'dataset/test.tfrecord') + command = "python " + sys.path[0] + "/" + "train.py" + " " + trainername + os.system(command) diff --git a/src/voiceprint/infer_contrast.py b/src/voiceprint/infer_contrast.py new file mode 100644 index 0000000..782400f --- /dev/null +++ b/src/voiceprint/infer_contrast.py @@ -0,0 +1,101 @@ +import os +import random + +import librosa +import numpy as np +import tensorflow as tf +from tensorflow.keras.models import Model + + +layer_name = 'global_max_pooling2d' +#model = tf.keras.models.load_model('models/resnet.h5') +#intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output) + + +# 读取音频数据 +def load_data(data_path): + wav, sr = librosa.load(data_path, sr=16000) + intervals = librosa.effects.split(wav, top_db=20) + wav_output = [] + for sliced in intervals: + wav_output.extend(wav[sliced[0]:sliced[1]]) + assert len(wav_output) >= 8000, "有效音频小于0.5s" + wav_output = np.array(wav_output) + ps = librosa.feature.melspectrogram(y=wav_output, sr=sr, hop_length=256).astype(np.float32) + ps = ps[np.newaxis, ..., np.newaxis] + return ps + + +def infer(audio_path): + data = load_data(audio_path) + feature = intermediate_layer_model.predict(data) + return feature + + +if __name__ == '__main__': + # 要预测的两个人的音频文件 + # person1 = 'dataset/wc-5.wav' + # person2 = 'dataset/wc-1.wav' + # feature1 = infer(person1)[0] + # feature2 = infer(person2)[0] + # # 对角余弦值 + # dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2)) + # if dist > 0.92: + # print("%s 和 %s 为同一个人,相似度为:%f" % (person1, person2, dist)) + # else: + # print("%s 和 %s 不是同一个人,相似度为:%f" % (person1, person2, dist)) + + dirs = os.listdir('dataset/predict') + for dir in dirs: + dist_sum = 0 + print(dir) + model_path = "models" + '/' + dir + '/resnet.h5' + #model_path = os.path.join(model_path, 'resnet.h5') + print(model_path + " is running") + model = tf.keras.models.load_model(model_path) + intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(layer_name).output) + + person2 = 'dataset/wctest.wav' + feature2 = infer(person2)[0] + + dir = os.path.join('dataset/predict', dir) + print(dir) + files = os.listdir(dir) + length = len(files) + predict_num = 20 + for i in range(0, predict_num): + index = random.randint(0, length - 1) + # print(files[index]) + person1 = os.path.join(dir, files[index]) + feature1 = infer(person1)[0] + dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2)) + dist_sum = dist + dist_sum + if dist > 0.92: + print("%s 和 %s 为同一个人,相似度为:%f" % (person1, person2, dist)) + else: + print("%s 和 %s 不是同一个人,相似度为:%f" % (person1, person2, dist)) + + dist_avg = dist_sum / predict_num + if dist_avg > 0.925: + print("为同一个人,相似度为:%f" % dist_avg) + else: + print("不是同一个人,相似度为:%f" % dist_avg) + + # dist_sum = 0 + # for file in files: + # person1 = os.path.join('dataset/audio', file) + # feature1 = infer(person1)[0] + # dist = np.dot(feature1, feature2) / (np.linalg.norm(feature1) * np.linalg.norm(feature2)) + # dist_sum = dist + dist_sum + # if dist > 0.92: + # print("%s 和 %s 为同一个人,相似度为:%f" % (person1, person2, dist)) + # else: + # print("%s 和 %s 不是同一个人,相似度为:%f" % (person1, person2, dist)) + # + # num = len(files) + # dist_avg = dist_sum / num + # print(dist_avg) + # if dist_avg > 0.925: + # print("为同一个人,相似度为:%f" % (dist_avg)) + # else: + # print("不是同一个人,相似度为:%f" % (dist_avg)) diff --git a/src/voiceprint/reader.py b/src/voiceprint/reader.py new file mode 100644 index 0000000..4aada16 --- /dev/null +++ b/src/voiceprint/reader.py @@ -0,0 +1,27 @@ +import tensorflow as tf + + +def _parse_data_function(example): + # [可能需要修改参数】 设置的梅尔频谱的shape相乘的值 + data_feature_description = { + 'data': tf.io.FixedLenFeature([16384], tf.float32), + 'label': tf.io.FixedLenFeature([], tf.int64), + } + return tf.io.parse_single_example(example, data_feature_description) + + +def train_reader_tfrecord(data_path, num_epochs, batch_size): + raw_dataset = tf.data.TFRecordDataset(data_path) + train_dataset = raw_dataset.map(_parse_data_function) + train_dataset = train_dataset.shuffle(buffer_size=1000) \ + .repeat(count=num_epochs) \ + .batch(batch_size=batch_size) \ + .prefetch(buffer_size=tf.data.experimental.AUTOTUNE) + return train_dataset + + +def test_reader_tfrecord(data_path, batch_size): + raw_dataset = tf.data.TFRecordDataset(data_path) + test_dataset = raw_dataset.map(_parse_data_function) + test_dataset = test_dataset.batch(batch_size=batch_size) + return test_dataset diff --git a/src/voiceprint/train.py b/src/voiceprint/train.py new file mode 100644 index 0000000..842f0ff --- /dev/null +++ b/src/voiceprint/train.py @@ -0,0 +1,95 @@ +import os +import sys + +import tensorflow as tf +import reader +import numpy as np + +class_dim = 1 # train_list里面有几个分类,这里的值要等于txt文件中的数字+1 +EPOCHS = 500 +BATCH_SIZE = 32 +init_model = None + +model = tf.keras.models.Sequential([ + tf.keras.applications.ResNet50V2(include_top=False, weights=None, input_shape=(128, None, 1)), + tf.keras.layers.ActivityRegularization(l2=0.5), + tf.keras.layers.Dropout(rate=0.5), + tf.keras.layers.GlobalMaxPooling2D(), + tf.keras.layers.Dense(units=class_dim, activation=tf.nn.softmax) +]) + +model.summary() + +# 定义优化方法 +optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3) + +workdir = sys.path[0] + +train_dataset = reader.train_reader_tfrecord(workdir + '/' + 'dataset/train.tfrecord', EPOCHS, batch_size=BATCH_SIZE) +test_dataset = reader.test_reader_tfrecord(workdir + '/' + 'dataset/test.tfrecord', batch_size=BATCH_SIZE) + +if init_model: + model.load_weights(init_model) + +modelname = sys.argv[1] +modeldir = workdir + '/' + 'models' + '/' + modelname +print(modeldir) +isExists = os.path.exists(modeldir) +if not isExists: + os.makedirs(modeldir) + print(modeldir + ' 创建成功') +else: + print(modeldir + ' 已存在') + + +for batch_id, data in enumerate(train_dataset): + # [可能需要修改参数】 设置的梅尔频谱的shape + sounds = data['data'].numpy().reshape((-1, 128, 128, 1)) + labels = data['label'] + # 执行训练 + with tf.GradientTape() as tape: + predictions = model(sounds) + # 获取损失值 + train_loss = tf.keras.losses.sparse_categorical_crossentropy(labels, predictions) + train_loss = tf.reduce_mean(train_loss) + # 获取准确率 + train_accuracy = tf.keras.metrics.sparse_categorical_accuracy(labels, predictions) + train_accuracy = np.sum(train_accuracy.numpy()) / len(train_accuracy.numpy()) + + # 更新梯度 + gradients = tape.gradient(train_loss, model.trainable_variables) + optimizer.apply_gradients(zip(gradients, model.trainable_variables)) + + print(batch_id) + if batch_id % 20 == 0: + print("Batch %d, Loss %f, Accuracy %f" % (batch_id, train_loss.numpy(), train_accuracy)) + + if batch_id % 50 == 0 and batch_id != 0: + test_losses = list() + test_accuracies = list() + for d in test_dataset: + # [可能需要修改参数】 设置的梅尔频谱的shape + test_sounds = d['data'].numpy().reshape((-1, 128, 128, 1)) + test_labels = d['label'] + + test_result = model(test_sounds) + # 获取损失值 + test_loss = tf.keras.losses.sparse_categorical_crossentropy(test_labels, test_result) + test_loss = tf.reduce_mean(test_loss) + test_losses.append(test_loss) + # 获取准确率 + test_accuracy = tf.keras.metrics.sparse_categorical_accuracy(test_labels, test_result) + test_accuracy = np.sum(test_accuracy.numpy()) / len(test_accuracy.numpy()) + test_accuracies.append(test_accuracy) + + print('=================================================') + print("Test, Loss %f, Accuracy %f" % ( + sum(test_losses) / len(test_losses), sum(test_accuracies) / len(test_accuracies))) + print("models is creating") + print('=================================================') + + # 保存模型 + model.save(filepath=modeldir + '/resnet.h5') + model.save_weights(filepath=modeldir + '/model_weights.h5') + +print("训练完成")