|
|
import os
|
|
|
import sys
|
|
|
sys.path.append(os.getcwd())
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from tqdm import tqdm
|
|
|
import faiss
|
|
|
import warnings
|
|
|
import pickle
|
|
|
import collections
|
|
|
from datetime import datetime
|
|
|
from utils import get_file_size
|
|
|
from config import logger
|
|
|
from config import shixuns_embed_path, shixun_save_path
|
|
|
from config import need_metric_recall
|
|
|
from config import shixun_dssm_usercf_recall_dict
|
|
|
from config import shixun_dssm_user_faiss_model_path
|
|
|
from config import shixun_dssm_user_embedding_data
|
|
|
from config import shixun_dssm_user_embedding_index_dict
|
|
|
from config import samples_mode
|
|
|
from config import test_user_id
|
|
|
from matching.shixun.recall_comm import get_all_select_df
|
|
|
from matching.shixun.recall_comm import get_user_info_df, get_item_info_df
|
|
|
from matching.shixun.recall_comm import get_user_item_time_dict
|
|
|
from matching.shixun.recall_comm import get_hist_and_last_select
|
|
|
from matching.shixun.recall_comm import get_item_info_dict
|
|
|
from matching.shixun.recall_comm import get_item_info_dict
|
|
|
from matching.shixun.recall_comm import get_item_topk_select
|
|
|
from matching.shixun.recall_comm import metrics_recall
|
|
|
from matching.shixun.item_embedding_recall import embdding_i2i_sim
|
|
|
|
|
|
|
|
|
tqdm.pandas()
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
def dssm_u2u_embedding_sim(user_embedding_index_dict,
|
|
|
user_emb,
|
|
|
user_index,
|
|
|
topk):
|
|
|
"""
|
|
|
获取dssm user embedding用户相似性矩阵
|
|
|
topk指的是每个user, faiss返回最相似的topk个user
|
|
|
"""
|
|
|
# 加载之前保存的dssm user embedding相似性矩阵
|
|
|
if os.path.exists(shixun_save_path + 'dssm_u2u_sim.pkl') and \
|
|
|
(get_file_size(shixun_save_path + 'dssm_u2u_sim.pkl') > 1):
|
|
|
user_sim_dict = pickle.load(open(shixun_save_path + 'dssm_u2u_sim.pkl', 'rb'))
|
|
|
return user_sim_dict
|
|
|
|
|
|
# 相似度查询,给每个索引位置上的向量返回topk个item以及相似度
|
|
|
sim, idx = user_index.search(user_emb, topk)
|
|
|
|
|
|
# 将向量检索的结果保存成原始id的对应关系
|
|
|
user_sim_dict = collections.defaultdict(dict)
|
|
|
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb)), sim, idx)):
|
|
|
target_raw_id = user_embedding_index_dict[target_idx]
|
|
|
|
|
|
# 从1开始是为了去掉物品本身, 所以最终获得的相似物品只有topk-1
|
|
|
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
|
|
|
rele_raw_id = user_embedding_index_dict[rele_idx]
|
|
|
user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
|
|
|
|
|
|
# 保存dssm_u2u_sim相似度矩阵
|
|
|
pickle.dump(user_sim_dict, open(shixun_save_path + 'dssm_u2u_sim.pkl', 'wb'))
|
|
|
|
|
|
return user_sim_dict
|
|
|
|
|
|
|
|
|
def init_dssm_usercf_recall():
|
|
|
"""
|
|
|
初始化召回用到的一些数据
|
|
|
"""
|
|
|
global train_hist_select_df
|
|
|
global user_item_time_dict
|
|
|
global u2u_sim, sim_user_topk
|
|
|
global recall_item_num
|
|
|
global item_topk_select
|
|
|
global item_created_time_dict
|
|
|
global emb_i2i_sim
|
|
|
global train_last_select_df
|
|
|
global dssm_user_embedding_index_dict
|
|
|
global dssm_user_emb
|
|
|
global dssm_user_index
|
|
|
|
|
|
logger.info("加载物品行为数据")
|
|
|
all_select_df = get_all_select_df(offline=False)
|
|
|
|
|
|
logger.info("获取用户信息数据")
|
|
|
users_info = get_user_info_df()
|
|
|
|
|
|
all_select_df = all_select_df.merge(users_info, on='user_id')
|
|
|
|
|
|
sim_user_topk = 120
|
|
|
recall_item_num = 100
|
|
|
|
|
|
logger.info('获取物品基本信息')
|
|
|
item_info_df = get_item_info_df()
|
|
|
|
|
|
logger.info('获取物品信息字典')
|
|
|
item_visists_dict, item_trainee_dict, item_created_time_dict, \
|
|
|
item_averge_star_dict, item_myshixuns_count_dict, item_challenges_count_dict = \
|
|
|
get_item_info_dict(item_info_df)
|
|
|
|
|
|
logger.info('获取物品embedding相似度矩阵')
|
|
|
item_emb_df = pd.read_csv(shixuns_embed_path, sep='\t', encoding='utf-8')
|
|
|
emb_i2i_sim = embdding_i2i_sim(item_emb_df, topk=recall_item_num)
|
|
|
|
|
|
# 为了召回评估,提取最后一次选择作为召回评估
|
|
|
# 如果不需要做召回评估直接使用全量的训练集进行召
|
|
|
if need_metric_recall:
|
|
|
logger.info('获取物品行为数据历史和最后一次选择')
|
|
|
train_hist_select_df, train_last_select_df = get_hist_and_last_select(all_select_df)
|
|
|
else:
|
|
|
train_hist_select_df = all_select_df
|
|
|
|
|
|
train_hist_select_df['user_id'].dropna(inplace=True)
|
|
|
|
|
|
# 使用dssm user embedding, 使用faiss计算用户相似度
|
|
|
# dssm中使用的是用户行为序列训练的user embedding
|
|
|
# 如果用户行为序列普遍比较短的话,user embedding的效果可能不是很好
|
|
|
|
|
|
logger.info('获取dssm user embedding相似度矩阵')
|
|
|
dssm_user_embedding_index_dict = pickle.load(open(shixun_dssm_user_embedding_index_dict, 'rb'))
|
|
|
dssm_user_emb = pickle.load(open(shixun_dssm_user_embedding_data, 'rb'))
|
|
|
dssm_user_index = faiss.read_index(shixun_dssm_user_faiss_model_path)
|
|
|
|
|
|
u2u_sim = dssm_u2u_embedding_sim(dssm_user_embedding_index_dict,
|
|
|
dssm_user_emb,
|
|
|
dssm_user_index,
|
|
|
topk=recall_item_num)
|
|
|
|
|
|
logger.info('获取用户选择物品列表')
|
|
|
user_item_time_dict = get_user_item_time_dict(train_hist_select_df)
|
|
|
|
|
|
logger.info('获取选择次数最多的物品')
|
|
|
item_topk_select = get_item_topk_select(train_hist_select_df, k=recall_item_num)
|
|
|
|
|
|
def user_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
u2u_sim,
|
|
|
sim_user_topk,
|
|
|
recall_item_num,
|
|
|
item_topk_select,
|
|
|
item_created_time_dict,
|
|
|
emb_i2i_sim):
|
|
|
"""
|
|
|
基于用户协同过滤+关联规则的召回
|
|
|
:param user_id: 用户id
|
|
|
:param user_item_time_dict: 字典, 根据选择时间获取用户的选择物品序列 {user1: [(item1, time1), (item2, time2)..]...}
|
|
|
:param u2u_sim: 字典,用户相似性矩阵
|
|
|
:param sim_user_topk: 整数,选择与当前用户最相似的k个用户
|
|
|
:param recall_item_num: 整数,需要召回的物品数量
|
|
|
:param item_topk_select: 列表,选择次数最多的物品列表,用于召回补全
|
|
|
:param item_created_time_dict: 字典,物品创建时间列表
|
|
|
:param emb_i2i_sim: 字典,物品embedding相似度矩阵
|
|
|
|
|
|
:return: 召回的物品列表 [(item1, score1), (item2, score2)...]
|
|
|
"""
|
|
|
# 获取用户选择的物品
|
|
|
user_item_time_list = user_item_time_dict[user_id]
|
|
|
|
|
|
# 存在一个用户多次选择某个物品,去重
|
|
|
user_hist_items = set([i for i, t in user_item_time_list])
|
|
|
|
|
|
items_rank = {}
|
|
|
# 根据用户相似度矩阵取sim_user_topk个用户选择的物品
|
|
|
for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]:
|
|
|
for i, select_time in user_item_time_dict[sim_u]:
|
|
|
if (i in user_hist_items) or (i not in item_created_time_dict):
|
|
|
continue
|
|
|
items_rank.setdefault(i, 0)
|
|
|
|
|
|
content_weight = 1.0
|
|
|
loc_weight = 0.9
|
|
|
created_time_weight = 0.8
|
|
|
|
|
|
# 当前物品与该用户选择的历史物品进行一个权重选择
|
|
|
for loc, (j, select_time) in enumerate(user_item_time_list):
|
|
|
if j not in item_created_time_dict:
|
|
|
continue
|
|
|
|
|
|
# 选择时的相对位置权重
|
|
|
loc_weight += 0.9 ** (len(user_item_time_list) - loc)
|
|
|
|
|
|
# 物品embedding相似性权重
|
|
|
if emb_i2i_sim.get(i, {}).get(j, None) is not None:
|
|
|
content_weight += emb_i2i_sim[i][j]
|
|
|
|
|
|
if emb_i2i_sim.get(j, {}).get(i, None) is not None:
|
|
|
content_weight += emb_i2i_sim[j][i]
|
|
|
|
|
|
# 创建时间差权重
|
|
|
created_time_weight += np.exp(0.8 * np.abs(item_created_time_dict[i] - item_created_time_dict[j]))
|
|
|
|
|
|
items_rank[i] += loc_weight * content_weight * created_time_weight * wuv
|
|
|
|
|
|
# 热度补全
|
|
|
if len(items_rank) < recall_item_num:
|
|
|
for i, item in enumerate(item_topk_select):
|
|
|
|
|
|
# 填充的item应该不在原来的列表中
|
|
|
if item in items_rank.items():
|
|
|
continue
|
|
|
items_rank[item] = - i - 100 # 随便给个复数就行
|
|
|
|
|
|
# 达到召回的数量
|
|
|
if len(items_rank) == recall_item_num:
|
|
|
break
|
|
|
|
|
|
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
|
|
|
|
|
|
return items_rank
|
|
|
|
|
|
|
|
|
def dssm_usercf_recall(user_id, topk):
|
|
|
"""
|
|
|
dssm usercf召回调用接口
|
|
|
"""
|
|
|
start_time = datetime.now()
|
|
|
|
|
|
logger.info(f"本次需要进行dssm usercf召回的用户ID: {user_id}")
|
|
|
|
|
|
recall_results = {}
|
|
|
recall_results.clear()
|
|
|
|
|
|
if user_id not in user_item_time_dict:
|
|
|
return recall_results
|
|
|
|
|
|
recall_results = user_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
u2u_sim,
|
|
|
topk + (topk // 2),
|
|
|
topk,
|
|
|
item_topk_select,
|
|
|
item_created_time_dict,
|
|
|
emb_i2i_sim)
|
|
|
|
|
|
# 计算耗时毫秒
|
|
|
end_time = datetime.utcnow()
|
|
|
cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3)
|
|
|
|
|
|
logger.info(f"本次召回耗时: {cost_time_millisecond} 毫秒")
|
|
|
|
|
|
return recall_results
|
|
|
|
|
|
|
|
|
def dssm_usercf_recall_train():
|
|
|
"""
|
|
|
dssm usercf召回训练和评估
|
|
|
"""
|
|
|
|
|
|
# 调用初始化召回用到的一些数据
|
|
|
init_dssm_usercf_recall()
|
|
|
|
|
|
# 只在采样模式下计算所有用户的召回数据并进行召回效果评估
|
|
|
# 如果用全量数据计算所有用户的召回数据会非常耗时
|
|
|
|
|
|
if samples_mode == True and need_metric_recall:
|
|
|
|
|
|
logger.info('生成dssm usercf所有用户的召回列表')
|
|
|
user_recall_items_dict = collections.defaultdict(dict)
|
|
|
|
|
|
for user_id in tqdm(train_hist_select_df['user_id'].unique()):
|
|
|
user_recall_items_dict[user_id] = user_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
u2u_sim,
|
|
|
sim_user_topk,
|
|
|
recall_item_num,
|
|
|
item_topk_select,
|
|
|
item_created_time_dict,
|
|
|
emb_i2i_sim)
|
|
|
|
|
|
logger.info('保存dssm usercf召回结果')
|
|
|
pickle.dump(user_recall_items_dict, open(shixun_dssm_usercf_recall_dict, 'wb'))
|
|
|
|
|
|
logger.info('dssm usercf召回效果评估')
|
|
|
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
dssm_usercf_recall_train()
|
|
|
|
|
|
recall_results = dssm_usercf_recall(user_id=test_user_id, topk=20)
|
|
|
# print(recall_results) |