You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

320 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
sys.path.append(os.getcwd())
import pandas as pd
import numpy as np
from collections import defaultdict
from tqdm import tqdm
import math
import warnings
import pickle
import collections
from datetime import datetime
from utils import get_file_size
from config import logger
from config import subject_itemcf_i2i_sim_data, need_metric_recall
from config import subject_itemcf_recall_dict
from config import subjects_embed_path, offline_mode
from config import samples_mode
from config import test_user_id
from matching.subject.recall_comm import get_all_select_df
from matching.subject.recall_comm import get_user_info_df, get_item_info_df
from matching.subject.recall_comm import get_user_item_time_dict
from matching.subject.recall_comm import get_hist_and_last_select, get_recall_item_info_dict
from matching.subject.recall_comm import get_item_topk_select, metrics_recall
from matching.subject.item_embedding_recall import embdding_i2i_sim
tqdm.pandas()
warnings.filterwarnings('ignore')
def itemcf_sim(df, item_info_dict):
"""
物品相似性矩阵计算,基于物品的协同过滤+关联规则
:param df: 物品行为数据
:param item_created_time_dict: 物品创建时间的字典
:return: 物品与物品的相似性矩阵
"""
if os.path.exists(subject_itemcf_i2i_sim_data) and (get_file_size(subject_itemcf_i2i_sim_data) > 1):
i2i_sim_ = pickle.load(open(subject_itemcf_i2i_sim_data, 'rb'))
return i2i_sim_
# 获取用户选择的item字典
user_item_time_dict = get_user_item_time_dict(df)
# 计算物品相似度
i2i_sim = {}
item_cnt = defaultdict(int)
for user_id, item_time_list in tqdm(user_item_time_dict.items()):
# 在基于物品的协同过滤优化的时候考虑时间因素
for loc1, (item_i, i_select_time) in enumerate(item_time_list):
item_cnt[item_i] += 1
i2i_sim.setdefault(item_i, {})
for loc2, (item_j, j_select_time) in enumerate(item_time_list):
if (item_i == item_j) or \
(item_i not in item_info_dict) or \
(item_j not in item_info_dict):
continue
# 考虑物品的正向顺序选择和反向顺序选择
loc_alpha = 1.0 if loc2 > loc1 else 0.7
# 位置信息权重,其中的参数可以调节
loc_weight = loc_alpha * (0.9 ** (np.abs(loc2 - loc1) - 1))
# 选择时间权重,其中的参数可以调节
select_time_weight = np.exp(0.7 ** np.abs(i_select_time - j_select_time))
# 两个物品创建时间差的权重,其中的参数可以调节
item_i_created_time = item_info_dict[item_i]['created_at_ts']
item_j_created_time = item_info_dict[item_j]['created_at_ts']
created_time_weight = np.exp(0.8 ** np.abs(item_i_created_time - item_j_created_time))
i2i_sim[item_i].setdefault(item_j, 0)
# 考虑多种因素的权重计算最终的物品之间的相似度
i2i_sim[item_i][item_j] += loc_weight * select_time_weight * created_time_weight / math.log(len(item_time_list) + 1)
i2i_sim_ = i2i_sim.copy()
# 余弦相似度分母部分计算
for item_i, related_items in i2i_sim.items():
for item_j, wij in related_items.items():
i2i_sim_[item_i][item_j] = wij / math.sqrt(item_cnt[item_i] * item_cnt[item_j])
# 将得到的相似性矩阵保存到本地
pickle.dump(i2i_sim_, open(subject_itemcf_i2i_sim_data, 'wb'))
return i2i_sim_
# 基于物品协同过滤进行召回
def item_based_recommend(user_id,
user_item_time_dict,
i2i_sim,
sim_item_topk,
recall_item_num,
item_topk_select,
item_info_dict,
emb_i2i_sim):
"""
基于物品协同过滤+关联规则的召回
:param user_id: 用户id
:param user_item_time_dict: 字典, 根据选择时间获取用户的选择物品序列 {user1: [(item1, time1), (item2, time2)..]...}
:param i2i_sim: 字典,物品相似度矩阵
:param sim_item_topk: 整数选择与当前物品最相似的k个物品
:param recall_item_num: 整数,需要召回的物品数量
:param item_topk_select: 列表,选择次数最多的物品列表,用于召回补全
:param item_info_dict: 字典,物品信息字典
:param emb_i2i_sim: 字典物品embedding相似度矩阵
:return: 召回的物品列表 [(item1, score1), (item2, score2)...]
"""
# 获取用户历史选择的物品, 包括物品ID, 选择时间
user_hist_items = user_item_time_dict[user_id]
# 获取用户历史选择的物品, 只包括物品ID
user_hist_items_ = {item_id for item_id, _ in user_hist_items}
filtered_item_name_list = []
filtered_item_name_list.clear()
items_rank = {}
for loc, (hist_item, select_time) in enumerate(user_hist_items):
# 过滤物品信息字典中没有的物品
if hist_item not in item_info_dict:
continue
hist_item_created_time = item_info_dict[hist_item]['created_at_ts']
for sim_item, wij in sorted(i2i_sim[hist_item].items(), key=lambda x: x[1], reverse=True)[:sim_item_topk]:
# 过滤历史选择的物品
if sim_item in user_hist_items_:
continue
# 过滤物品信息字典中没有的物品
if sim_item not in item_info_dict:
continue
sim_item_created_time = item_info_dict[sim_item]['created_at_ts']
sim_item_name = item_info_dict[sim_item]['subject_name']
# 过滤物品名称重复的
if sim_item_name in filtered_item_name_list:
continue
filtered_item_name_list.append(sim_item_name)
# 物品创建时间差权重
created_time_weight = np.exp(0.8 ** np.abs(hist_item_created_time - sim_item_created_time))
# 相似物品和历史选择物品序列中历史物品所在的位置权重
loc_weight = (0.9 ** (len(user_hist_items) - loc))
content_weight = 1.0
if emb_i2i_sim.get(hist_item, {}).get(sim_item, None) is not None:
content_weight += emb_i2i_sim[hist_item][sim_item]
if emb_i2i_sim.get(sim_item, {}).get(hist_item, None) is not None:
content_weight += emb_i2i_sim[sim_item][hist_item]
items_rank.setdefault(sim_item, 0)
items_rank[sim_item] += created_time_weight * loc_weight * content_weight * wij
# 提前结束计算,没必要计算全部,否则在线召回将耗时很长
# 只计算最近选择的物品相似的物品。等于是捕捉用户最近的兴趣
if len(items_rank) >= (recall_item_num * 3):
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return items_rank
# 不足的用热门物品补全
if len(items_rank) < recall_item_num:
for index, item in enumerate(item_topk_select):
# 填充的item应该不在原来的列表中
if item in items_rank.items():
continue
# 随便给个负数就行
items_rank[item] = - index - 100
# 达到召回的数量
if len(items_rank) == recall_item_num:
break
items_rank = sorted(items_rank.items(), key=lambda x: x[1], reverse=True)[:recall_item_num]
return items_rank
def init_itemcf_recall():
"""
初始化召回用到的一些数据
"""
global train_hist_select_df
global user_item_time_dict
global i2i_sim, sim_item_topk
global recall_item_num
global item_topk_select
global item_info_dict
global emb_i2i_sim
global train_last_select_df
logger.info("加载物品行为数据")
all_select_df = get_all_select_df(offline=False)
logger.info("加载物品信息数据")
item_info = get_item_info_df()
logger.info("获取用户信息数据")
users_info = get_user_info_df()
all_select_df = all_select_df.merge(users_info, how='left', on='user_id')
all_select_df = all_select_df.merge(item_info, on='subject_id')
sim_item_topk = 120
recall_item_num = 100
logger.info('获取物品信息')
item_info_df = get_item_info_df()
logger.info('生成物品信息字典')
item_info_dict = get_recall_item_info_dict(item_info_df)
logger.info('生成itemcf相似度矩阵')
i2i_sim = itemcf_sim(all_select_df, item_info_dict)
logger.info('生成物品embedding相似度矩阵')
item_emb_df = pd.read_csv(subjects_embed_path, sep='\t', encoding='utf-8')
emb_i2i_sim = embdding_i2i_sim(item_emb_df, topk=recall_item_num)
# 为了召回评估,提取最后一次选择作为召回评估
# 如果不需要做召回评估直接使用全量的训练集进行召
if need_metric_recall:
logger.info('获取物品行为数据历史和最后一次选择')
train_hist_select_df, train_last_select_df = get_hist_and_last_select(all_select_df)
else:
train_hist_select_df = all_select_df
logger.info('获取用户选择的物品列表')
user_item_time_dict = get_user_item_time_dict(train_hist_select_df)
logger.info('获取选择次数最多的物品')
item_topk_select = get_item_topk_select(train_hist_select_df, k=recall_item_num)
def itemcf_recall(user_id, topk):
"""
itemcf召回调用接口
"""
start_time = datetime.now()
logger.info(f"本次需要进行itemcf召回的用户ID: {user_id}")
recall_results = {}
recall_results.clear()
if user_id not in user_item_time_dict:
return recall_results
recall_results = item_based_recommend(user_id,
user_item_time_dict,
i2i_sim,
topk + (topk // 3),
topk,
item_topk_select,
item_info_dict,
emb_i2i_sim)
# 计算耗时毫秒
end_time = datetime.utcnow()
cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3)
logger.info(f"本次召回耗时: {cost_time_millisecond} 毫秒")
return recall_results
def itemcf_recall_train():
"""
itemcf召回训练和评估
"""
# 调用初始化召回用到的一些数据
init_itemcf_recall()
# 只在采样模式下计算所有用户的召回数据并进行召回效果评估
# 如果用全量数据计算所有用户的召回数据会非常耗时
if samples_mode == True and need_metric_recall:
# 定义召回物品的字典
user_recall_items_dict = collections.defaultdict(dict)
logger.info('生成itemcf所有用户的召回列表')
for user_id in tqdm(train_hist_select_df['user_id'].unique()):
user_recall_items_dict[user_id] = item_based_recommend(user_id,
user_item_time_dict,
i2i_sim,
sim_item_topk,
recall_item_num,
item_topk_select,
item_info_dict,
emb_i2i_sim)
logger.info('保存itemcf召回结果')
pickle.dump(user_recall_items_dict, open(subject_itemcf_recall_dict, 'wb'))
logger.info('itemcf召回效果评估')
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
if __name__ == '__main__':
itemcf_recall_train()
recall_results = itemcf_recall(user_id=test_user_id, topk=20)
print(recall_results)