|
|
import os
|
|
|
import sys
|
|
|
sys.path.append(os.getcwd())
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from tqdm import tqdm
|
|
|
import faiss
|
|
|
import warnings
|
|
|
import pickle
|
|
|
import collections
|
|
|
from datetime import datetime
|
|
|
from config import logger
|
|
|
from config import need_metric_recall
|
|
|
from config import subject_emb_i2i_sim_data
|
|
|
from config import subject_item_embedding_recall_dict
|
|
|
from config import subjects_embed_path
|
|
|
from config import samples_mode
|
|
|
from config import test_user_id
|
|
|
from matching.subject.recall_comm import get_all_select_df
|
|
|
from matching.subject.recall_comm import get_item_info_df
|
|
|
from matching.subject.recall_comm import get_user_item_time_dict
|
|
|
from matching.subject.recall_comm import get_hist_and_last_select, get_recall_item_info_dict
|
|
|
from matching.subject.recall_comm import get_item_topk_select
|
|
|
from matching.subject.recall_comm import metrics_recall
|
|
|
from utils import get_file_size
|
|
|
|
|
|
tqdm.pandas()
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
def embdding_i2i_sim(item_emb_df, topk):
|
|
|
"""
|
|
|
基于物品embedding计算相似性矩阵,返回topk个与其最相似的物品
|
|
|
"""
|
|
|
# 加载之前保存的
|
|
|
if os.path.exists(subject_emb_i2i_sim_data) and (get_file_size(subject_emb_i2i_sim_data) > 1):
|
|
|
item_sim_dict = pickle.load(open(subject_emb_i2i_sim_data, 'rb'))
|
|
|
return item_sim_dict
|
|
|
|
|
|
# 物品索引与物品id的字典映射
|
|
|
item_idx_2_rawid_dict = dict(zip(item_emb_df.index, item_emb_df['subject_id']))
|
|
|
|
|
|
item_emb_cols = [x for x in item_emb_df.columns if 'subject_id' not in x]
|
|
|
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols].values, dtype=np.float32)
|
|
|
|
|
|
# 向量进行单位化
|
|
|
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
|
|
|
|
|
|
# 建立faiss索引
|
|
|
item_index = faiss.IndexFlatIP(item_emb_np.shape[1])
|
|
|
item_index.add(item_emb_np)
|
|
|
|
|
|
# 相似度查询,给每个索引位置上的向量返回topk个item以及相似度
|
|
|
sim, idx = item_index.search(item_emb_np, topk) # 返回的是列表
|
|
|
|
|
|
# 将向量检索的结果保存成原始id的对应关系
|
|
|
item_sim_dict = collections.defaultdict(dict)
|
|
|
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(item_emb_np)), sim, idx)):
|
|
|
target_raw_id = item_idx_2_rawid_dict[target_idx]
|
|
|
|
|
|
# 从1开始是为了去掉物品本身, 所以最终获得的相似物品只有topk-1
|
|
|
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
|
|
|
rele_raw_id = item_idx_2_rawid_dict[rele_idx]
|
|
|
item_sim_dict[target_raw_id][rele_raw_id] = item_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
|
|
|
|
|
|
# 保存embedding i2i相似度矩阵
|
|
|
pickle.dump(item_sim_dict, open(subject_emb_i2i_sim_data, 'wb'))
|
|
|
|
|
|
return item_sim_dict
|
|
|
|
|
|
# 基于物品的召回i2i
|
|
|
def item_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
i2i_sim,
|
|
|
sim_item_topk,
|
|
|
recall_item_num,
|
|
|
item_topk_select,
|
|
|
item_info_dict,
|
|
|
emb_i2i_sim):
|
|
|
"""
|
|
|
基于物品协同过滤的召回
|
|
|
:param user_id: 用户id
|
|
|
:param user_item_time_dict: 字典, 根据选择时间获取用户的选择物品序列 {user1: [(item1, time1), (item2, time2)..]...}
|
|
|
:param i2i_sim: 字典,物品相似性矩阵
|
|
|
:param sim_item_topk: 整数,选择与当前物品最相似的k个物品
|
|
|
:param recall_item_num: 整数,最后召回的物品数量
|
|
|
:param item_topk_select: 列表,选择次数最多的物品列表,用于召回补全
|
|
|
:param item_info_dict: 字典,物品信息
|
|
|
:param emb_i2i_sim: 物品embedding相似度矩阵
|
|
|
|
|
|
:return: 召回的物品列表 [(item1, score1), (item2, score2)...]
|
|
|
"""
|
|
|
# 获取用户历史交互的物品
|
|
|
user_hist_items = user_item_time_dict[user_id]
|
|
|
user_hist_items_ = {user_id for user_id, _ in user_hist_items}
|
|
|
|
|
|
filtered_item_name_list = []
|
|
|
filtered_item_name_list.clear()
|
|
|
|
|
|
item_rank = {}
|
|
|
for loc, (hist_item, select_time) in enumerate(user_hist_items):
|
|
|
# 过滤物品信息字典中没有的物品
|
|
|
if hist_item not in item_info_dict:
|
|
|
continue
|
|
|
|
|
|
hist_item_created_time = item_info_dict[hist_item]['created_at_ts']
|
|
|
|
|
|
for sim_item, wij in sorted(i2i_sim[hist_item].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
|
|
|
# 过滤历史选择的物品
|
|
|
if sim_item in user_hist_items_:
|
|
|
continue
|
|
|
|
|
|
# 过滤物品信息字典中没有的物品
|
|
|
if sim_item not in item_info_dict:
|
|
|
continue
|
|
|
|
|
|
sim_item_created_time = item_info_dict[sim_item]['created_at_ts']
|
|
|
sim_item_name = item_info_dict[sim_item]['subject_name']
|
|
|
|
|
|
# 过滤物品名称重复的
|
|
|
if sim_item_name in filtered_item_name_list:
|
|
|
continue
|
|
|
|
|
|
filtered_item_name_list.append(sim_item_name)
|
|
|
|
|
|
# 物品创建时间差权重
|
|
|
created_time_weight = np.exp(0.8 ** np.abs(hist_item_created_time - sim_item_created_time))
|
|
|
|
|
|
# 相似物品和历史选择物品序列中历史物品所在的位置权重
|
|
|
loc_weight = (0.9 ** (len(user_hist_items) - loc))
|
|
|
|
|
|
content_weight = 1.0
|
|
|
|
|
|
if emb_i2i_sim.get(hist_item, {}).get(sim_item, None) is not None:
|
|
|
content_weight += emb_i2i_sim[hist_item][sim_item]
|
|
|
|
|
|
if emb_i2i_sim.get(sim_item, {}).get(hist_item, None) is not None:
|
|
|
content_weight += emb_i2i_sim[sim_item][hist_item]
|
|
|
|
|
|
item_rank.setdefault(sim_item, 0)
|
|
|
item_rank[sim_item] += created_time_weight * loc_weight * content_weight * wij
|
|
|
|
|
|
# 不足的,用热门物品补全
|
|
|
if len(item_rank) < recall_item_num:
|
|
|
for index, item in enumerate(item_topk_select):
|
|
|
|
|
|
# 填充的item应该不在原来的列表中
|
|
|
if item in item_rank.items():
|
|
|
continue
|
|
|
|
|
|
# 随便给个负数就行
|
|
|
item_rank[item] = - index - 100
|
|
|
if len(item_rank) == recall_item_num:
|
|
|
break
|
|
|
|
|
|
item_rank = sorted(item_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
|
|
|
return item_rank
|
|
|
|
|
|
|
|
|
def init_item_embedding_recall():
|
|
|
"""
|
|
|
初始化召回用到的一些数据
|
|
|
"""
|
|
|
global train_hist_select_df
|
|
|
global user_item_time_dict
|
|
|
global i2i_sim, sim_item_topk
|
|
|
global recall_item_num
|
|
|
global item_topk_select
|
|
|
global item_info_dict
|
|
|
global emb_i2i_sim
|
|
|
global train_last_select_df
|
|
|
|
|
|
logger.info("加载物品行为数据")
|
|
|
all_select_df = get_all_select_df(offline=False)
|
|
|
|
|
|
sim_item_topk = 120
|
|
|
recall_item_num = 100
|
|
|
|
|
|
logger.info('获取物品基本信息')
|
|
|
item_info_df = get_item_info_df()
|
|
|
|
|
|
logger.info('生成物品信息字典')
|
|
|
item_info_dict = get_recall_item_info_dict(item_info_df)
|
|
|
|
|
|
logger.info('生成物品embedding相似度矩阵')
|
|
|
item_emb_df = pd.read_csv(subjects_embed_path, sep='\t', encoding='utf-8')
|
|
|
emb_i2i_sim = embdding_i2i_sim(item_emb_df, topk=recall_item_num)
|
|
|
|
|
|
# 为了召回评估,提取最后一次选择作为召回评估
|
|
|
# 如果不需要做召回评估直接使用全量的训练集进行召回
|
|
|
if samples_mode and need_metric_recall:
|
|
|
logger.info('获取物品行为数据历史和最后一次选择')
|
|
|
train_hist_select_df, train_last_select_df = get_hist_and_last_select(all_select_df)
|
|
|
else:
|
|
|
train_hist_select_df = all_select_df
|
|
|
|
|
|
logger.info('获取用户选择物品列表')
|
|
|
user_item_time_dict = get_user_item_time_dict(train_hist_select_df)
|
|
|
|
|
|
# 加载物品embedding相似度矩阵
|
|
|
i2i_sim = pickle.load(open(subject_emb_i2i_sim_data, 'rb'))
|
|
|
|
|
|
logger.info('获取选择次数最多的物品')
|
|
|
item_topk_select = get_item_topk_select(train_hist_select_df, k=recall_item_num)
|
|
|
|
|
|
|
|
|
def item_embedding_recall(user_id, topk):
|
|
|
"""
|
|
|
item embedding召回调用接口
|
|
|
"""
|
|
|
start_time = datetime.now()
|
|
|
|
|
|
logger.info(f"本次需要item embedding召回的用户ID: {user_id}")
|
|
|
|
|
|
recall_results = {}
|
|
|
recall_results.clear()
|
|
|
|
|
|
if user_id not in user_item_time_dict:
|
|
|
return recall_results
|
|
|
|
|
|
recall_results = item_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
i2i_sim,
|
|
|
topk + (topk // 3),
|
|
|
topk,
|
|
|
item_topk_select,
|
|
|
item_info_dict,
|
|
|
emb_i2i_sim)
|
|
|
|
|
|
# 计算耗时毫秒
|
|
|
end_time = datetime.utcnow()
|
|
|
cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3)
|
|
|
|
|
|
logger.info(f"本次召回耗时: {cost_time_millisecond} 毫秒")
|
|
|
|
|
|
return recall_results
|
|
|
|
|
|
|
|
|
def item_embedding_recall_train():
|
|
|
# 调用初始化召回用到的一些数据
|
|
|
init_item_embedding_recall()
|
|
|
|
|
|
# 只在采样模式下计算所有用户的召回数据并进行召回效果评估
|
|
|
# 如果用全量数据计算所有用户的召回数据会非常耗时
|
|
|
|
|
|
if samples_mode == True and need_metric_recall:
|
|
|
|
|
|
# 定义召回物品的字典
|
|
|
user_recall_items_dict = collections.defaultdict(dict)
|
|
|
|
|
|
logger.info('生成item embedding所有用户的召回列表')
|
|
|
for user_id in tqdm(train_hist_select_df['user_id'].unique()):
|
|
|
user_recall_items_dict[user_id] = item_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
i2i_sim,
|
|
|
sim_item_topk,
|
|
|
recall_item_num,
|
|
|
item_topk_select,
|
|
|
item_info_dict,
|
|
|
emb_i2i_sim)
|
|
|
|
|
|
logger.info('保存item embedding召回结果')
|
|
|
pickle.dump(user_recall_items_dict, open(subject_item_embedding_recall_dict, 'wb'))
|
|
|
|
|
|
logger.info('item embedding召回效果评估')
|
|
|
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
item_embedding_recall_train()
|
|
|
recall_results = item_embedding_recall(user_id=test_user_id, topk=100)
|
|
|
print(recall_results) |