import os import sys sys.path.append(os.getcwd()) import pandas as pd import numpy as np from tqdm import tqdm import faiss import warnings import pickle import collections from datetime import datetime from config import logger from config import need_metric_recall from config import subject_emb_i2i_sim_data from config import subject_item_embedding_recall_dict from config import subjects_embed_path from config import samples_mode from config import test_user_id from matching.subject.recall_comm import get_all_select_df from matching.subject.recall_comm import get_item_info_df from matching.subject.recall_comm import get_user_item_time_dict from matching.subject.recall_comm import get_hist_and_last_select, get_recall_item_info_dict from matching.subject.recall_comm import get_item_topk_select from matching.subject.recall_comm import metrics_recall from utils import get_file_size tqdm.pandas() warnings.filterwarnings('ignore') def embdding_i2i_sim(item_emb_df, topk): """ 基于物品embedding计算相似性矩阵,返回topk个与其最相似的物品 """ # 加载之前保存的 if os.path.exists(subject_emb_i2i_sim_data) and (get_file_size(subject_emb_i2i_sim_data) > 1): item_sim_dict = pickle.load(open(subject_emb_i2i_sim_data, 'rb')) return item_sim_dict # 物品索引与物品id的字典映射 item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['subject_id'])) item_emb_cols = [x for x in item_emb_df.columns if 'subject_id' not in x] item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32) # 向量进行单位化 item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True) # 建立faiss索引 item_index = faiss.IndexFlatIP(item_emb_np.shape[1]) item_index.add(item_emb_np) # 相似度查询,给每个索引位置上的向量返回topk个item以及相似度 sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表 # 将向量检索的结果保存成原始id的对应关系 item_sim_dict = collections.defaultdict(dict) for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)): target_raw_id = item_idx_2_rawid_dict[target_idx] # 从1开始是为了去掉物品本身, 所以最终获得的相似物品只有topk-1 for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]): rele_raw_id = item_idx_2_rawid_dict[rele_idx] item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value # 保存embedding i2i相似度矩阵 pickle.dump(item_sim_dict, open(subject_emb_i2i_sim_data, 'wb')) return item_sim_dict # 基于物品的召回i2i def item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_select, item_info_dict, emb_i2i_sim): """ 基于物品协同过滤的召回 :param user_id: 用户id :param user_item_time_dict: 字典, 根据选择时间获取用户的选择物品序列 {user1: [(item1, time1), (item2, time2)..]...} :param i2i_sim: 字典,物品相似性矩阵 :param sim_item_topk: 整数,选择与当前物品最相似的k个物品 :param recall_item_num: 整数,最后召回的物品数量 :param item_topk_select: 列表,选择次数最多的物品列表,用于召回补全 :param item_info_dict: 字典,物品信息 :param emb_i2i_sim: 物品embedding相似度矩阵 :return: 召回的物品列表 [(item1, score1), (item2, score2)...] """ # 获取用户历史交互的物品 user_hist_items = user_item_time_dict[user_id] user_hist_items_ = {user_id for user_id, _ in user_hist_items} filtered_item_name_list = [] filtered_item_name_list.clear() item_rank = {} for loc, (hist_item, select_time) in enumerate(user_hist_items): # 过滤物品信息字典中没有的物品 if hist_item not in item_info_dict: continue hist_item_created_time = item_info_dict[hist_item]['created_at_ts'] for sim_item, wij in sorted(i2i_sim[hist_item].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]: # 过滤历史选择的物品 if sim_item in user_hist_items_: continue # 过滤物品信息字典中没有的物品 if sim_item not in item_info_dict: continue sim_item_created_time = item_info_dict[sim_item]['created_at_ts'] sim_item_name = item_info_dict[sim_item]['subject_name'] # 过滤物品名称重复的 if sim_item_name in filtered_item_name_list: continue filtered_item_name_list.append(sim_item_name) # 物品创建时间差权重 created_time_weight = np.exp(0.8 ** np.abs(hist_item_created_time - sim_item_created_time)) # 相似物品和历史选择物品序列中历史物品所在的位置权重 loc_weight = (0.9 ** (len(user_hist_items) - loc)) content_weight = 1.0 if emb_i2i_sim.get(hist_item, {}).get(sim_item, None) is not None: content_weight += emb_i2i_sim[hist_item][sim_item] if emb_i2i_sim.get(sim_item, {}).get(hist_item, None) is not None: content_weight += emb_i2i_sim[sim_item][hist_item] item_rank.setdefault(sim_item, 0) item_rank[sim_item] += created_time_weight * loc_weight * content_weight * wij # 不足的,用热门物品补全 if len(item_rank) < recall_item_num: for index, item in enumerate(item_topk_select): # 填充的item应该不在原来的列表中 if item in item_rank.items(): continue # 随便给个负数就行 item_rank[item] = - index - 100 if len(item_rank) == recall_item_num: break item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num] return item_rank def init_item_embedding_recall(): """ 初始化召回用到的一些数据 """ global train_hist_select_df global user_item_time_dict global i2i_sim, sim_item_topk global recall_item_num global item_topk_select global item_info_dict global emb_i2i_sim global train_last_select_df logger.info("加载物品行为数据") all_select_df = get_all_select_df(offline=False) sim_item_topk = 120 recall_item_num = 100 logger.info('获取物品基本信息') item_info_df = get_item_info_df() logger.info('生成物品信息字典') item_info_dict = get_recall_item_info_dict(item_info_df) logger.info('生成物品embedding相似度矩阵') item_emb_df = pd.read_csv(subjects_embed_path, sep='\t', encoding='utf-8') emb_i2i_sim = embdding_i2i_sim(item_emb_df, topk=recall_item_num) # 为了召回评估,提取最后一次选择作为召回评估 # 如果不需要做召回评估直接使用全量的训练集进行召回 if samples_mode and need_metric_recall: logger.info('获取物品行为数据历史和最后一次选择') train_hist_select_df, train_last_select_df = get_hist_and_last_select(all_select_df) else: train_hist_select_df = all_select_df logger.info('获取用户选择物品列表') user_item_time_dict = get_user_item_time_dict(train_hist_select_df) # 加载物品embedding相似度矩阵 i2i_sim = pickle.load(open(subject_emb_i2i_sim_data, 'rb')) logger.info('获取选择次数最多的物品') item_topk_select = get_item_topk_select(train_hist_select_df, k=recall_item_num) def item_embedding_recall(user_id, topk): """ item embedding召回调用接口 """ start_time = datetime.now() logger.info(f"本次需要item embedding召回的用户ID: {user_id}") recall_results = {} recall_results.clear() if user_id not in user_item_time_dict: return recall_results recall_results = item_based_recommend(user_id, user_item_time_dict, i2i_sim, topk + (topk // 3), topk, item_topk_select, item_info_dict, emb_i2i_sim) # 计算耗时毫秒 end_time = datetime.utcnow() cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3) logger.info(f"本次召回耗时: {cost_time_millisecond} 毫秒") return recall_results def item_embedding_recall_train(): # 调用初始化召回用到的一些数据 init_item_embedding_recall() # 只在采样模式下计算所有用户的召回数据并进行召回效果评估 # 如果用全量数据计算所有用户的召回数据会非常耗时 if samples_mode == True and need_metric_recall: # 定义召回物品的字典 user_recall_items_dict = collections.defaultdict(dict) logger.info('生成item embedding所有用户的召回列表') for user_id in tqdm(train_hist_select_df['user_id'].unique()): user_recall_items_dict[user_id] = item_based_recommend(user_id, user_item_time_dict, i2i_sim, sim_item_topk, recall_item_num, item_topk_select, item_info_dict, emb_i2i_sim) logger.info('保存item embedding召回结果') pickle.dump(user_recall_items_dict, open(subject_item_embedding_recall_dict, 'wb')) logger.info('item embedding召回效果评估') metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num) if __name__ == '__main__': item_embedding_recall_train() recall_results = item_embedding_recall(user_id=test_user_id, topk=100) print(recall_results)