You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

298 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
sys.path.append(os.getcwd())
import pandas as pd
import numpy as np
from tqdm import tqdm
import faiss
import warnings
import pickle
import collections
from datetime import datetime
from utils import get_file_size
from config import logger
from config import subjects_embed_path, subject_save_path
from config import need_metric_recall
from config import subject_dssm_usercf_recall_dict
from config import subject_dssm_user_faiss_model_path
from config import subject_dssm_user_embedding_data
from config import subject_dssm_user_embedding_index_dict
from config import samples_mode
from config import test_user_id
from matching.subject.recall_comm import get_all_select_df
from matching.subject.recall_comm import get_user_info_df, get_item_info_df
from matching.subject.recall_comm import get_user_item_time_dict
from matching.subject.recall_comm import get_all_hist_and_last_select
from matching.subject.recall_comm import get_recall_item_info_dict
from matching.subject.recall_comm import get_item_topk_select
from matching.subject.recall_comm import metrics_recall
from matching.subject.item_embedding_recall import embdding_i2i_sim
tqdm.pandas()
warnings.filterwarnings('ignore')
def dssm_u2u_embedding_sim(user_embedding_index_dict,
user_emb,
user_index,
topk):
"""
获取dssm user embedding用户相似性矩阵
topk指的是每个user, faiss返回最相似的topk个user
"""
# 加载之前保存的dssm user embedding相似性矩阵
if os.path.exists(subject_save_path + 'dssm_u2u_sim.pkl') and \
(get_file_size(subject_save_path + 'dssm_u2u_sim.pkl') > 1):
user_sim_dict = pickle.load(open(subject_save_path + 'dssm_u2u_sim.pkl', 'rb'))
return user_sim_dict
# 相似度查询给每个索引位置上的向量返回topk个item以及相似度
sim, idx = user_index.search(user_emb, topk)
# 将向量检索的结果保存成原始id的对应关系
user_sim_dict = collections.defaultdict(dict)
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(range(len(user_emb)), sim, idx)):
target_raw_id = user_embedding_index_dict[target_idx]
# 从1开始是为了去掉物品本身, 所以最终获得的相似物品只有topk-1
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = user_embedding_index_dict[rele_idx]
user_sim_dict[target_raw_id][rele_raw_id] = user_sim_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
# 保存dssm_u2u_sim相似度矩阵
pickle.dump(user_sim_dict, open(subject_save_path + 'dssm_u2u_sim.pkl', 'wb'))
return user_sim_dict
def init_dssm_usercf_recall():
"""
初始化召回用到的一些数据
"""
global train_hist_select_df
global user_item_time_dict
global u2u_sim, sim_user_topk
global recall_item_num
global item_topk_select
global item_info_dict
global emb_i2i_sim
global train_last_select_df
global dssm_user_embedding_index_dict
global dssm_user_emb
global dssm_user_index
logger.info("加载物品行为数据")
all_select_df = get_all_select_df(offline=False)
logger.info("获取用户信息数据")
users_info = get_user_info_df()
all_select_df = all_select_df.merge(users_info, how='left', on='user_id')
sim_user_topk = 120
recall_item_num = 100
logger.info('获取物品基本信息')
item_info_df = get_item_info_df()
logger.info('获取物品信息字典')
item_info_dict = get_recall_item_info_dict(item_info_df)
logger.info('获取物品embedding相似度矩阵')
item_emb_df = pd.read_csv(subjects_embed_path, sep='\t', encoding='utf-8')
emb_i2i_sim = embdding_i2i_sim(item_emb_df, topk=recall_item_num)
# 为了召回评估,提取最后一次选择作为召回评估
# 如果不需要做召回评估直接使用全量的训练集进行召
if need_metric_recall:
logger.info('获取物品行为数据历史和最后一次选择')
train_hist_select_df, train_last_select_df = get_all_hist_and_last_select(all_select_df)
else:
train_hist_select_df = all_select_df
train_hist_select_df['user_id'].dropna(inplace=True)
# 使用dssm user embedding, 使用faiss计算用户相似度
# dssm中使用的是用户行为序列训练的user embedding
# 如果用户行为序列普遍比较短的话user embedding的效果可能不是很好
logger.info('获取dssm user embedding相似度矩阵')
dssm_user_embedding_index_dict = pickle.load(open(subject_dssm_user_embedding_index_dict, 'rb'))
dssm_user_emb = pickle.load(open(subject_dssm_user_embedding_data, 'rb'))
dssm_user_index = faiss.read_index(subject_dssm_user_faiss_model_path)
u2u_sim = dssm_u2u_embedding_sim(dssm_user_embedding_index_dict,
dssm_user_emb,
dssm_user_index,
topk=recall_item_num)
logger.info('获取用户选择物品列表')
user_item_time_dict = get_user_item_time_dict(train_hist_select_df)
logger.info('获取选择次数最多的物品')
item_topk_select = get_item_topk_select(train_hist_select_df, k=recall_item_num)
def dssm_usercf_recall(user_id, topk):
"""
dssm usercf召回调用接口
"""
start_time = datetime.now()
logger.info(f"本次需要进行dssm usercf召回的用户ID: {user_id}")
recall_results = {}
recall_results.clear()
if user_id not in user_item_time_dict:
return recall_results
recall_results = user_based_recommend(user_id,
user_item_time_dict,
u2u_sim,
topk,
topk,
item_topk_select,
item_info_dict,
emb_i2i_sim)
# 计算耗时毫秒
end_time = datetime.utcnow()
cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3)
logger.info(f"本次召回耗时: {cost_time_millisecond} 毫秒")
return recall_results
def user_based_recommend(user_id,
user_item_time_dict,
u2u_sim,
sim_user_topk,
recall_item_num,
item_topk_select,
item_info_dict,
emb_i2i_sim):
"""
基于用户协同过滤+关联规则的召回
:param user_id: 用户id
:param user_item_time_dict: 字典, 根据选择时间获取用户的选择物品序列 {user1: [(item1, time1), (item2, time2)..]...}
:param u2u_sim: 字典,用户相似性矩阵
:param sim_user_topk: 整数选择与当前用户最相似的k个用户
:param recall_item_num: 整数,需要召回的物品数量
:param item_topk_select: 列表,选择次数最多的物品列表,用于召回补全
:param item_info_dict: 字典,物品信息字典
:param emb_i2i_sim: 字典物品embedding相似度矩阵
:return: 召回的物品列表 [(item1, score1), (item2, score2)...]
"""
# 获取用户选择的物品
user_item_time_list = user_item_time_dict[user_id]
# 存在一个用户多次选择某个物品,去重
user_hist_items = set([i for i, t in user_item_time_list])
filtered_item_name_list = []
filtered_item_name_list.clear()
items_rank = {}
# 根据用户相似度矩阵取sim_user_topk个用户选择的物品
for sim_u, wuv in sorted(u2u_sim[user_id].items(), key=lambda x: x[1], reverse=True)[:sim_user_topk]:
for cur_item, select_time in user_item_time_dict[sim_u]:
# 过滤历史选择和不在物品信息字典中的物品
if (cur_item in user_hist_items) or (cur_item not in item_info_dict):
continue
cur_item_name = item_info_dict[cur_item]['subject_name']
# 过滤物品名称重复的
if cur_item_name in filtered_item_name_list:
continue
filtered_item_name_list.append(cur_item_name)
items_rank.setdefault(cur_item, 0)
content_weight = 1.0
loc_weight = 0.9
created_time_weight = 0.8
# 当前物品与该用户选择的历史物品进行一个权重选择
for loc, (hist_item, select_time) in enumerate(user_item_time_list):
if hist_item not in item_info_dict:
continue
# 选择时的相对位置权重
loc_weight += 0.9 ** (len(user_item_time_list) - loc)
# 物品embedding相似性权重
if emb_i2i_sim.get(cur_item, {}).get(hist_item, None) is not None:
content_weight += emb_i2i_sim[cur_item][hist_item]
if emb_i2i_sim.get(hist_item, {}).get(cur_item, None) is not None:
content_weight += emb_i2i_sim[hist_item][cur_item]
# 创建时间差权重
cur_item_created_time = item_info_dict[cur_item]['created_at_ts']
hist_item_created_time = item_info_dict[hist_item]['created_at_ts']
created_time_weight += np.exp(0.8 * np.abs(cur_item_created_time - hist_item_created_time))
items_rank[cur_item] += loc_weight * content_weight * created_time_weight * wuv
# 提前结束计算,没必要计算所有用户,否则在线召回将耗时很长
# 只计算最相似用户选择的物品。等于是捕捉最相似用户最近的兴趣
if len(items_rank) >= (recall_item_num * 2):
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return items_rank
# 热度补全
if len(items_rank) < recall_item_num:
for index, item in enumerate(item_topk_select):
# 填充的item应该不在原来的列表中
if item in items_rank.items():
continue
items_rank[item] = - index - 100 # 随便给个复数就行
# 达到召回的数量
if len(items_rank) == recall_item_num:
break
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return items_rank
def dssm_usercf_recall_train():
"""
dssm usercf召回训练和评估
"""
# 调用初始化召回用到的一些数据
init_dssm_usercf_recall()
# 只在采样模式下计算所有用户的召回数据并进行召回效果评估
# 如果用全量数据计算所有用户的召回数据会非常耗时
if samples_mode == True and need_metric_recall:
logger.info('生成dssm usercf所有用户的召回列表')
user_recall_items_dict = collections.defaultdict(dict)
for user_id in tqdm(train_hist_select_df['user_id'].unique()):
user_recall_items_dict[user_id] = user_based_recommend(user_id,
user_item_time_dict,
u2u_sim,
sim_user_topk,
recall_item_num,
item_topk_select,
item_info_dict,
emb_i2i_sim)
logger.info('保存dssm usercf召回结果')
pickle.dump(user_recall_items_dict, open(subject_dssm_usercf_recall_dict, 'wb'))
logger.info('dssm usercf召回效果评估')
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
if __name__ == '__main__':
dssm_usercf_recall_train()
recall_results = dssm_usercf_recall(user_id=test_user_id, topk=20)
print(recall_results)