import os import sys sys.path.append(os.getcwd()) import time import numpy as np import pandas as pd from gensim.models import KeyedVectors import faiss import config from tqdm import tqdm import jieba from ltp import LTP from config import data_path, word2vec_dim from config import ltp_model_path, shixuns_keywords_path from config import JIEBA_TOKEN, LTP_TOKEN, logger, user_dict_path from datetime import datetime import torch # 使用Faiss训练hnsw模型 tqdm.pandas() ltp = LTP(ltp_model_path) if torch.cuda.is_available(): ltp.to("cuda") # 加载用户自定义词典 if os.path.exists(shixuns_keywords_path): jieba.load_userdict(shixuns_keywords_path) with open(shixuns_keywords_path, 'r', encoding='utf-8') as f: user_dict_words = f.read().split() ltp.add_words(user_dict_words) if os.path.exists(user_dict_path): with open(user_dict_path, 'r', encoding='utf-8') as f: user_dict_words = f.read().split() ltp.add_words(user_dict_words) for word in user_dict_words: jieba.add_word(word) def tokenizer(sent, token_method=JIEBA_TOKEN, verbose=False): """ 中文分词,支持jieba和ltp两种方式 """ if token_method == JIEBA_TOKEN: seg = jieba.cut(sent) result = ' '.join(seg) elif token_method == LTP_TOKEN: content = [] content.append(sent) seg = ltp.pipeline(content, tasks=['cws'])['cws'] result = '' for word in seg[0]: if result == '': result = word else: result = result + ' ' + word if verbose == True: logger.info(f"分词方式:{token_method}, 分词结果:{result}") return result def sentence_embedding(sentence, w2v_model, fassi_w2v_model, verbose=False): ''' 通过词向量均值的方式生成句向量 sentence: 待生成句向量的句子 w2v_model: word2vec模型 return: 句子中所有词向量的均值 ''' sentence = tokenizer(sentence, JIEBA_TOKEN,verbose) embedding = [] for word in sentence.split(): if (word not in w2v_model.wv.index_to_key) and (word not in fassi_w2v_model.wv.index_to_key): embedding.append(np.random.randn(1, word2vec_dim)) else: if word in fassi_w2v_model.wv.index_to_key: embedding.append(fassi_w2v_model.wv.get_vector(word)) else: embedding.append(w2v_model.wv.get_vector(word)) # 所有词向量的均值为句向量 return np.mean(np.array(embedding), axis=0).reshape(1, -1) class HNSW(object): def __init__(self, w2v_path, # 通用word2vec词向量 faiss_w2v_path, # 自己训练的word2vec词向量 ef=config.ef_construction, # 搜索时保存最近邻的动态列表大小 M=config.M, # 节点的邻结点的数量 model_path=None, # hnsw模型保存路径 data_path=None): # 数据文件路径 # 加载词向量 logger.info("加载Word2Vec词向量") self.w2v_model = KeyedVectors.load(w2v_path) self.fassi_w2v_model = KeyedVectors.load(faiss_w2v_path) # 加载hnsw模型 if model_path and os.path.exists(model_path): logger.info("加载HNSW快速召回模型") self.data = pd.read_csv(data_path, sep='\t', encoding='utf-8') self.index = self.load_hnsw(model_path) # 训练hnsw模型 elif data_path: logger.info("训练HNSW快速召回模型") self.data = self.load_data(data_path) self.index = self.build_hnsw(model_path, ef=ef, m=M) else: logger.error('No existing model and no building data provided.') def load_data(self, data_path): ''' 读取数据,并生成句向量 :param data_path:数据所在路径 :return: 包含句向量的dataframe ''' data = pd.read_csv(data_path, sep='\t', encoding='utf-8') logger.info('生成所有实训向量') data['shixun_name_vec'] = data['shixun_name'].progress_apply( lambda x: sentence_embedding(x, self.w2v_model, self.fassi_w2v_model)) logger.info('检测所有实训向量的维度') data['shixun_name_vec'] = data['shixun_name_vec'].progress_apply( lambda x: x[0][0] if x.shape[1] != word2vec_dim else x) # 保存生成好的句向量 data['shixun_id'] = data['shixun_id'].astype(int) return data def evaluate(self, vecs): ''' 验证模型 ''' logger.info('Evaluating hnsw model') nq, d = vecs.shape t0 = time.time # 找top1个相似的 D, I = self.index.search(vecs, 1) t1 = time.time missing_rate = (I == -1).sum() / float(nq) recall_at_1 = (I == np.arange(nq)).sum() / float(nq) print("\t %7.3f ms per query, R@1 %.4f, missing rate %.4f" % ( (t1 - t0) * 1000.0 / nq, recall_at_1, missing_rate)) def build_hnsw(self, to_file, ef=2000, m=64): """ 训练hnsw模型 """ logger.info('构建 HNSW 索引') # 所有的句向量拼接 vecs = np.stack(self.data['shixun_name_vec'].values).reshape(-1, word2vec_dim) vecs = vecs.astype('float32') dim = self.w2v_model.vector_size # 构建索引 index = faiss.IndexHNSWFlat(dim, m) # 使用单个GPU资源 res = faiss.StandardGpuResources() # res.setTempMemory(20 * 1024 * 1024 * 1024) faiss.index_cpu_to_gpu(res, 0, index) index.hnsw.ef_construction = ef index.verbose = True index.add(vecs) # 保存hnsw模型 faiss.write_index(index, to_file) return index def load_hnsw(self, model_path): hnsw = faiss.read_index(model_path) return hnsw def search(self, text, k=10): """ 通过hnsw检索topk """ logger.info(f"Searching top {k} similarity for {text}.") # 转换句向量 test_vec = sentence_embedding(text, self.w2v_model, self.fassi_w2v_model, verbose=True) test_vec = test_vec.astype('float32') # 搜索相似度最高的k个句向量 # 多召回一些避免通过status过滤后不足k个 D, I = self.index.search(test_vec, k * 2 ) top_k_index = list(I.ravel()) top_k_Item = [] for index in top_k_index: shixun_id = int(self.data.iloc[index]['shixun_id']) shixun_name = self.data.iloc[index]['shixun_name'] shixun_status = self.data.iloc[index]['status'] # 只取已发布的实训 if shixun_status == 2: top_k_Item.append((shixun_id, shixun_name)) # 只返回topk个 return top_k_index[:k], top_k_Item[:k] def hnsw_search_test(hnsw): while True: topk = 20 item_name = input('请输入您的实训名称:').strip() top_k_index, top_k_Item = hnsw.search(item_name, k=topk) df_top_k = pd.DataFrame(columns=['index', 'shixun_recommend']) pd.set_option('colheader_justify', 'center') for i in range(topk): df_top_k.loc[i] = [top_k_index[i], top_k_Item[i]] print(df_top_k) if __name__ == '__main__': hnsw = HNSW(config.word2vec_model_path, config.shixun_faiss_w2v_path, config.ef_construction, config.M, config.shixuns_fassi_model_path, config.shixuns_data_path)