|
|
import os
|
|
|
import sys
|
|
|
sys.path.append(os.getcwd())
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from collections import defaultdict
|
|
|
from tqdm import tqdm
|
|
|
import math
|
|
|
import warnings
|
|
|
import pickle
|
|
|
import collections
|
|
|
from datetime import datetime
|
|
|
from utils import get_file_size
|
|
|
from config import logger
|
|
|
from config import subject_itemcf_i2i_sim_data, need_metric_recall
|
|
|
from config import subject_itemcf_recall_dict
|
|
|
from config import subjects_embed_path, offline_mode
|
|
|
from config import samples_mode
|
|
|
from config import test_user_id
|
|
|
from matching.subject.recall_comm import get_all_select_df
|
|
|
from matching.subject.recall_comm import get_user_info_df, get_item_info_df
|
|
|
from matching.subject.recall_comm import get_user_item_time_dict
|
|
|
from matching.subject.recall_comm import get_hist_and_last_select, get_recall_item_info_dict
|
|
|
from matching.subject.recall_comm import get_item_topk_select, metrics_recall
|
|
|
from matching.subject.item_embedding_recall import embdding_i2i_sim
|
|
|
|
|
|
tqdm.pandas()
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
def itemcf_sim(df, item_info_dict):
|
|
|
"""
|
|
|
物品相似性矩阵计算,基于物品的协同过滤+关联规则
|
|
|
:param df: 物品行为数据
|
|
|
:param item_created_time_dict: 物品创建时间的字典
|
|
|
|
|
|
:return: 物品与物品的相似性矩阵
|
|
|
"""
|
|
|
if os.path.exists(subject_itemcf_i2i_sim_data) and (get_file_size(subject_itemcf_i2i_sim_data) > 1):
|
|
|
i2i_sim_ = pickle.load(open(subject_itemcf_i2i_sim_data, 'rb'))
|
|
|
return i2i_sim_
|
|
|
|
|
|
# 获取用户选择的item字典
|
|
|
user_item_time_dict = get_user_item_time_dict(df)
|
|
|
|
|
|
# 计算物品相似度
|
|
|
i2i_sim = {}
|
|
|
item_cnt = defaultdict(int)
|
|
|
|
|
|
for user_id, item_time_list in tqdm(user_item_time_dict.items()):
|
|
|
|
|
|
# 在基于物品的协同过滤优化的时候考虑时间因素
|
|
|
for loc1, (item_i, i_select_time) in enumerate(item_time_list):
|
|
|
item_cnt[item_i] += 1
|
|
|
i2i_sim.setdefault(item_i, {})
|
|
|
|
|
|
for loc2, (item_j, j_select_time) in enumerate(item_time_list):
|
|
|
if (item_i == item_j) or \
|
|
|
(item_i not in item_info_dict) or \
|
|
|
(item_j not in item_info_dict):
|
|
|
continue
|
|
|
|
|
|
# 考虑物品的正向顺序选择和反向顺序选择
|
|
|
loc_alpha = 1.0 if loc2 > loc1 else 0.7
|
|
|
|
|
|
# 位置信息权重,其中的参数可以调节
|
|
|
loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
|
|
|
|
|
|
# 选择时间权重,其中的参数可以调节
|
|
|
select_time_weight = np.exp(0.7 ** np.abs(i_select_time - j_select_time))
|
|
|
|
|
|
# 两个物品创建时间差的权重,其中的参数可以调节
|
|
|
item_i_created_time = item_info_dict[item_i]['created_at_ts']
|
|
|
item_j_created_time = item_info_dict[item_j]['created_at_ts']
|
|
|
created_time_weight = np.exp(0.8 ** np.abs(item_i_created_time - item_j_created_time))
|
|
|
|
|
|
i2i_sim[item_i].setdefault(item_j, 0)
|
|
|
|
|
|
# 考虑多种因素的权重计算最终的物品之间的相似度
|
|
|
i2i_sim[item_i][item_j] += loc_weight * select_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
|
|
|
|
|
|
i2i_sim_ = i2i_sim.copy()
|
|
|
|
|
|
# 余弦相似度分母部分计算
|
|
|
for item_i, related_items in i2i_sim.items():
|
|
|
for item_j, wij in related_items.items():
|
|
|
i2i_sim_[item_i][item_j] = wij / math.sqrt(item_cnt[item_i] * item_cnt[item_j])
|
|
|
|
|
|
# 将得到的相似性矩阵保存到本地
|
|
|
pickle.dump(i2i_sim_, open(subject_itemcf_i2i_sim_data, 'wb'))
|
|
|
|
|
|
return i2i_sim_
|
|
|
|
|
|
|
|
|
# 基于物品协同过滤进行召回
|
|
|
def item_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
i2i_sim,
|
|
|
sim_item_topk,
|
|
|
recall_item_num,
|
|
|
item_topk_select,
|
|
|
item_info_dict,
|
|
|
emb_i2i_sim):
|
|
|
"""
|
|
|
基于物品协同过滤+关联规则的召回
|
|
|
:param user_id: 用户id
|
|
|
:param user_item_time_dict: 字典, 根据选择时间获取用户的选择物品序列 {user1: [(item1, time1), (item2, time2)..]...}
|
|
|
:param i2i_sim: 字典,物品相似度矩阵
|
|
|
:param sim_item_topk: 整数,选择与当前物品最相似的k个物品
|
|
|
:param recall_item_num: 整数,需要召回的物品数量
|
|
|
:param item_topk_select: 列表,选择次数最多的物品列表,用于召回补全
|
|
|
:param item_info_dict: 字典,物品信息字典
|
|
|
:param emb_i2i_sim: 字典,物品embedding相似度矩阵
|
|
|
|
|
|
:return: 召回的物品列表 [(item1, score1), (item2, score2)...]
|
|
|
"""
|
|
|
# 获取用户历史选择的物品, 包括物品ID, 选择时间
|
|
|
user_hist_items = user_item_time_dict[user_id]
|
|
|
|
|
|
# 获取用户历史选择的物品, 只包括物品ID
|
|
|
user_hist_items_ = {item_id for item_id, _ in user_hist_items}
|
|
|
|
|
|
filtered_item_name_list = []
|
|
|
filtered_item_name_list.clear()
|
|
|
|
|
|
items_rank = {}
|
|
|
for loc, (hist_item, select_time) in enumerate(user_hist_items):
|
|
|
|
|
|
# 过滤物品信息字典中没有的物品
|
|
|
if hist_item not in item_info_dict:
|
|
|
continue
|
|
|
|
|
|
hist_item_created_time = item_info_dict[hist_item]['created_at_ts']
|
|
|
|
|
|
for sim_item, wij in sorted(i2i_sim[hist_item].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
|
|
|
|
|
|
# 过滤历史选择的物品
|
|
|
if sim_item in user_hist_items_:
|
|
|
continue
|
|
|
|
|
|
# 过滤物品信息字典中没有的物品
|
|
|
if sim_item not in item_info_dict:
|
|
|
continue
|
|
|
|
|
|
sim_item_created_time = item_info_dict[sim_item]['created_at_ts']
|
|
|
sim_item_name = item_info_dict[sim_item]['subject_name']
|
|
|
|
|
|
# 过滤物品名称重复的
|
|
|
if sim_item_name in filtered_item_name_list:
|
|
|
continue
|
|
|
|
|
|
filtered_item_name_list.append(sim_item_name)
|
|
|
|
|
|
# 物品创建时间差权重
|
|
|
created_time_weight = np.exp(0.8 ** np.abs(hist_item_created_time - sim_item_created_time))
|
|
|
|
|
|
# 相似物品和历史选择物品序列中历史物品所在的位置权重
|
|
|
loc_weight = (0.9 ** (len(user_hist_items) - loc))
|
|
|
|
|
|
content_weight = 1.0
|
|
|
if emb_i2i_sim.get(hist_item, {}).get(sim_item, None) is not None:
|
|
|
content_weight += emb_i2i_sim[hist_item][sim_item]
|
|
|
|
|
|
if emb_i2i_sim.get(sim_item, {}).get(hist_item, None) is not None:
|
|
|
content_weight += emb_i2i_sim[sim_item][hist_item]
|
|
|
|
|
|
items_rank.setdefault(sim_item, 0)
|
|
|
items_rank[sim_item] += created_time_weight * loc_weight * content_weight * wij
|
|
|
|
|
|
# 提前结束计算,没必要计算全部,否则在线召回将耗时很长
|
|
|
# 只计算最近选择的物品相似的物品。等于是捕捉用户最近的兴趣
|
|
|
if len(items_rank) >= (recall_item_num * 3):
|
|
|
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
|
|
|
return items_rank
|
|
|
|
|
|
# 不足的用热门物品补全
|
|
|
if len(items_rank) < recall_item_num:
|
|
|
for index, item in enumerate(item_topk_select):
|
|
|
|
|
|
# 填充的item应该不在原来的列表中
|
|
|
if item in items_rank.items():
|
|
|
continue
|
|
|
|
|
|
# 随便给个负数就行
|
|
|
items_rank[item] = - index - 100
|
|
|
|
|
|
# 达到召回的数量
|
|
|
if len(items_rank) == recall_item_num:
|
|
|
break
|
|
|
|
|
|
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
|
|
|
return items_rank
|
|
|
|
|
|
|
|
|
def init_itemcf_recall():
|
|
|
"""
|
|
|
初始化召回用到的一些数据
|
|
|
"""
|
|
|
global train_hist_select_df
|
|
|
global user_item_time_dict
|
|
|
global i2i_sim, sim_item_topk
|
|
|
global recall_item_num
|
|
|
global item_topk_select
|
|
|
global item_info_dict
|
|
|
global emb_i2i_sim
|
|
|
global train_last_select_df
|
|
|
|
|
|
logger.info("加载物品行为数据")
|
|
|
all_select_df = get_all_select_df(offline=False)
|
|
|
|
|
|
logger.info("加载物品信息数据")
|
|
|
item_info = get_item_info_df()
|
|
|
logger.info("获取用户信息数据")
|
|
|
users_info = get_user_info_df()
|
|
|
|
|
|
all_select_df = all_select_df.merge(users_info, how='left', on='user_id')
|
|
|
all_select_df = all_select_df.merge(item_info, on='subject_id')
|
|
|
|
|
|
sim_item_topk = 120
|
|
|
recall_item_num = 100
|
|
|
|
|
|
logger.info('获取物品信息')
|
|
|
item_info_df = get_item_info_df()
|
|
|
|
|
|
logger.info('生成物品信息字典')
|
|
|
item_info_dict = get_recall_item_info_dict(item_info_df)
|
|
|
|
|
|
logger.info('生成itemcf相似度矩阵')
|
|
|
i2i_sim = itemcf_sim(all_select_df, item_info_dict)
|
|
|
|
|
|
logger.info('生成物品embedding相似度矩阵')
|
|
|
item_emb_df = pd.read_csv(subjects_embed_path, sep='\t', encoding='utf-8')
|
|
|
emb_i2i_sim = embdding_i2i_sim(item_emb_df, topk=recall_item_num)
|
|
|
|
|
|
# 为了召回评估,提取最后一次选择作为召回评估
|
|
|
# 如果不需要做召回评估直接使用全量的训练集进行召
|
|
|
|
|
|
if need_metric_recall:
|
|
|
logger.info('获取物品行为数据历史和最后一次选择')
|
|
|
train_hist_select_df, train_last_select_df = get_hist_and_last_select(all_select_df)
|
|
|
else:
|
|
|
train_hist_select_df = all_select_df
|
|
|
|
|
|
logger.info('获取用户选择的物品列表')
|
|
|
user_item_time_dict = get_user_item_time_dict(train_hist_select_df)
|
|
|
|
|
|
logger.info('获取选择次数最多的物品')
|
|
|
item_topk_select = get_item_topk_select(train_hist_select_df, k=recall_item_num)
|
|
|
|
|
|
|
|
|
def itemcf_recall(user_id, topk):
|
|
|
"""
|
|
|
itemcf召回调用接口
|
|
|
"""
|
|
|
start_time = datetime.now()
|
|
|
|
|
|
logger.info(f"本次需要进行itemcf召回的用户ID: {user_id}")
|
|
|
|
|
|
recall_results = {}
|
|
|
recall_results.clear()
|
|
|
|
|
|
if user_id not in user_item_time_dict:
|
|
|
return recall_results
|
|
|
|
|
|
recall_results = item_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
i2i_sim,
|
|
|
topk + (topk // 3),
|
|
|
topk,
|
|
|
item_topk_select,
|
|
|
item_info_dict,
|
|
|
emb_i2i_sim)
|
|
|
|
|
|
# 计算耗时毫秒
|
|
|
end_time = datetime.utcnow()
|
|
|
cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3)
|
|
|
|
|
|
logger.info(f"本次召回耗时: {cost_time_millisecond} 毫秒")
|
|
|
|
|
|
return recall_results
|
|
|
|
|
|
|
|
|
def itemcf_recall_train():
|
|
|
"""
|
|
|
itemcf召回训练和评估
|
|
|
"""
|
|
|
|
|
|
# 调用初始化召回用到的一些数据
|
|
|
init_itemcf_recall()
|
|
|
|
|
|
# 只在采样模式下计算所有用户的召回数据并进行召回效果评估
|
|
|
# 如果用全量数据计算所有用户的召回数据会非常耗时
|
|
|
|
|
|
if samples_mode == True and need_metric_recall:
|
|
|
|
|
|
# 定义召回物品的字典
|
|
|
user_recall_items_dict = collections.defaultdict(dict)
|
|
|
|
|
|
logger.info('生成itemcf所有用户的召回列表')
|
|
|
for user_id in tqdm(train_hist_select_df['user_id'].unique()):
|
|
|
user_recall_items_dict[user_id] = item_based_recommend(user_id,
|
|
|
user_item_time_dict,
|
|
|
i2i_sim,
|
|
|
sim_item_topk,
|
|
|
recall_item_num,
|
|
|
item_topk_select,
|
|
|
item_info_dict,
|
|
|
emb_i2i_sim)
|
|
|
|
|
|
logger.info('保存itemcf召回结果')
|
|
|
pickle.dump(user_recall_items_dict, open(subject_itemcf_recall_dict, 'wb'))
|
|
|
|
|
|
logger.info('itemcf召回效果评估')
|
|
|
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
itemcf_recall_train()
|
|
|
|
|
|
recall_results = itemcf_recall(user_id=test_user_id, topk=20)
|
|
|
print(recall_results) |