You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

555 lines
22 KiB

5 months ago
import pandas as pd
import numpy as np
from tqdm import tqdm
import warnings
import pickle
from sklearn.preprocessing import MinMaxScaler
import os
import sys
sys.path.append(os.getcwd())
from config import myshixuns_train_data, myshixuns_test_data
from config import shixuns_data_path, shixuns_embed_path, shixun_save_path
from config import shixun_user_item_time_dict_data,shixun_user_item_dict_data
from config import shixun_item_user_time_dict, data_parent_path
from config import shixuns_emb_dict,shixuns_bert_em_path
from config import users_data_path,shixun_merge_emb_path
from utils import reduce_mem
from config import logger
from joblib import Parallel, delayed
tqdm.pandas()
warnings.filterwarnings('ignore')
def get_all_select_sample(sample_nums=10000):
"""
从训练集中划出一部分数据来调试代码
"""
if os.path.exists(data_parent_path + 'myshixuns_train_sample.csv'):
all_select = pd.read_csv(data_parent_path + 'myshixuns_train_sample.csv', sep='\t', encoding='utf-8')
reduce_mem(all_select)
return all_select
all_select = pd.read_csv(myshixuns_train_data, sep='\t', encoding='utf-8')
# user_id过滤重复
all_user_ids = all_select.user_id.unique()
# 只采集指定数量的user_id
sample_user_ids = np.random.choice(all_user_ids, size=sample_nums, replace=False)
# 取出这些user_id选择的物品
all_select = all_select[all_select['user_id'].isin(sample_user_ids)]
# 根据user_id, shixun_id去重
all_select = all_select.drop_duplicates((['user_id', 'shixun_id']))
all_select.to_csv(data_parent_path + 'myshixuns_train_sample.csv', sep='\t', index=False, header=True)
reduce_mem(all_select)
return all_select
def get_all_select_df(offline=True):
"""
读取物品数据时分成线上和线下
线上预测时将测试集中的物品数据合并到总的数据中
线下验证模型的有效性或者特征的有效性时只使用训练集
"""
if offline:
all_select = pd.read_csv(myshixuns_train_data, sep='\t', encoding='utf-8')
else:
train_myshixuns = pd.read_csv(myshixuns_train_data, sep='\t', encoding='utf-8')
test_myshixuns = pd.read_csv(myshixuns_test_data, sep='\t', encoding='utf-8')
all_select = train_myshixuns.append(test_myshixuns)
all_select = all_select.drop_duplicates((['user_id', 'shixun_id']))
reduce_mem(all_select)
return all_select
def get_user_info_df():
"""
读取用户的基本属性
"""
user_info_df = pd.read_csv(users_data_path, sep='\t', encoding='utf-8', quoting=3)
reduce_mem(user_info_df)
return user_info_df
def get_item_eminfo_df():
"""
读取实训的属性和em
"""
item_eminfo_df = pd.read_csv(shixun_merge_emb_path, sep='\t', encoding='utf-8', quoting=3)
reduce_mem(item_eminfo_df)
return item_eminfo_df
def get_item_info_df():
"""
读取物品的基本属性
"""
item_info_df = pd.read_csv(shixuns_data_path, sep='\t', encoding='utf-8')
reduce_mem(item_info_df)
return item_info_df
def get_item_bert_info_df():
"""
读取物品的基本属性和bert编码
"""
item_info_df = pd.read_csv(shixuns_bert_em_path, sep='\t', encoding='utf-8')
reduce_mem(item_info_df)
return item_info_df
def get_recall_item_info_df(item_info_df, shixun_id_list):
"""
读取物品的基本属性
"""
recall_item_info_df = item_info_df[item_info_df['shixun_id'].isin(shixun_id_list)]
return recall_item_info_df
def get_select_item_info(all_select_df, user_id):
"""
获取某个用户选择的物品行为数据
"""
select_item_df = all_select_df[all_select_df['user_id'] == user_id]
return select_item_df
def get_item_emb_dict():
"""
生成和读取物品的Embedding数据
"""
# 加载已经保存的Embedding数据
if os.path.exists(shixuns_emb_dict):
item_emb_dict = pickle.load(open(shixuns_emb_dict, 'rb'))
return item_emb_dict
# 生成物品的Embedding数据
item_emb_df = pd.read_csv(shixuns_embed_path, sep='\t', encoding='utf-8')
item_emb_cols = [x for x in item_emb_df.columns if 'emb' in x]
item_emb_np = np.ascontiguousarray(item_emb_df[item_emb_cols])
# 进行归一化
item_emb_np = item_emb_np / np.linalg.norm(item_emb_np, axis=1, keepdims=True)
item_emb_dict = dict(zip(item_emb_df['shixun_id'], item_emb_np))
pickle.dump(item_emb_dict, open(shixuns_emb_dict, 'wb'))
return item_emb_dict
def get_user_item_time_dict(select_df):
"""
根据时间获取用户选择的物品序列
{user1: [(item1, time1), (item2, time2)..]...}
"""
def make_item_time_pair(df):
"""
构造选物品选择时间列表
"""
return list(zip(df['shixun_id'], df['created_timestamp']))
# 加载之前pickel保存的
if os.path.exists(shixun_user_item_time_dict_data):
user_item_time_dict = pickle.load(open(shixun_user_item_time_dict_data, 'rb'))
return user_item_time_dict
# 按选择时间排序
select_df = select_df.sort_values('created_timestamp')
# 按用户分组生成用户选择的物品
user_item_time_df = select_df.groupby('user_id')['shixun_id', 'created_timestamp']. \
progress_apply(lambda x: make_item_time_pair(x)).reset_index(). \
rename(columns = {0: 'item_time_list'})
# 生成用户选择的物品字典
user_item_time_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
pickle.dump(user_item_time_dict, open(shixun_user_item_time_dict_data, 'wb'))
return user_item_time_dict
def get_user_item_dict(select_df):
"""
根据时间获取用户选择的物品序列
{user1: [(item1, visits,challenges_count,averge_star,task_pass,time1), (item2, time2)..]...}
"""
def make_item_time_pair(df):
"""
构造选物品选择时间列表
"""
return list(zip(df['shixun_id'], df['visits'],df['challenges_count'],
df['averge_star'],df['task_pass'],df['created_timestamp']))
# 加载之前pickel保存的
if os.path.exists(shixun_user_item_dict_data):
user_item_dict = pickle.load(open(shixun_user_item_dict_data, 'rb'))
return user_item_dict
# 按选择时间排序
select_df = select_df.sort_values('created_timestamp')
# 按用户分组生成用户选择的物品
user_item_time_df = select_df.groupby('user_id')['shixun_id','visits' ,'challenges_count',
'averge_star','task_pass','created_timestamp']. \
progress_apply(lambda x: make_item_time_pair(x)).reset_index().rename(columns={0: 'item_time_list'})
# 生成用户选择的物品字典
user_item_dict = dict(zip(user_item_time_df['user_id'], user_item_time_df['item_time_list']))
pickle.dump(user_item_dict, open(shixun_user_item_dict_data, 'wb'))
return user_item_dict
def get_item_user_time_dict(select_df):
"""
根据选择时间获取物品被选择的用户序列
{item1: [(user1, time1), (user2, time2)...]...}
"""
def make_user_time_pair(df):
return list(zip(df['user_id'], df['created_timestamp']))
# 加载之前pickel保存的
if os.path.exists(shixun_item_user_time_dict):
item_user_time_dict = pickle.load(open(shixun_item_user_time_dict, 'rb'))
return item_user_time_dict
select_df = select_df.sort_values('created_timestamp')
item_user_time_df = select_df.groupby('shixun_id')['user_id', 'created_timestamp']. \
progress_apply(lambda x: make_user_time_pair(x)).reset_index(). \
rename(columns = {0: 'user_time_list'})
item_user_time_dict = dict(zip(item_user_time_df['shixun_id'], item_user_time_df['user_time_list']))
pickle.dump(item_user_time_dict, open(shixun_item_user_time_dict, 'wb'))
return item_user_time_dict
def get_hist_and_last_em_select(all_select):
"""
获取物品行为数据的历史选择和最后一次选择
"""
# 如果用户只有一个选择hist为空会导致训练的时候这个用户不可见此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
def apply_parallel(df_grouped, func):
results = Parallel(n_jobs=20)(delayed(func)(group) for name, group in df_grouped)
return pd.concat(results)
if os.path.exists(shixun_save_path + 'select_last_df.csv') and \
os.path.exists(shixun_save_path + 'select_hist_df.csv'):
select_last_df = pd.read_csv(shixun_save_path + 'select_last_df.csv', sep='\t', encoding='utf-8')
select_hist_df = pd.read_csv(shixun_save_path + 'select_hist_df.csv', sep='\t', encoding='utf-8')
else:
all_select = all_select.sort_values(by=['user_id', 'created_timestamp'])
select_last_df = all_select.groupby('user_id').tail(1)
# 多进程并行提升处理速度
df_grouped = all_select.groupby('user_id')
select_hist_df = apply_parallel(df_grouped, hist_func)
select_hist_df = select_hist_df.reset_index(drop=True)
#select_hist_df = all_select.groupby('user_id').progress_apply(hist_func).reset_index(drop = True)
select_last_df.to_csv(shixun_save_path + 'select_last_em_df.csv', sep='\t', index=False, header=True)
select_hist_df.to_csv(shixun_save_path + 'select_hist_em_df.csv', sep='\t', index=False, header=True)
reduce_mem(select_last_df)
reduce_mem(select_hist_df)
return select_hist_df, select_last_df
def get_hist_and_last_select(all_select):
"""
获取物品行为数据的历史选择和最后一次选择
"""
# 如果用户只有一个选择hist为空会导致训练的时候这个用户不可见此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
def apply_parallel(df_grouped, func):
results = Parallel(n_jobs=20)(delayed(func)(group) for name, group in df_grouped)
return pd.concat(results)
if os.path.exists(shixun_save_path + 'select_last_df.csv') and \
os.path.exists(shixun_save_path + 'select_hist_df.csv'):
select_last_df = pd.read_csv(shixun_save_path + 'select_last_df.csv', sep='\t', encoding='utf-8')
select_hist_df = pd.read_csv(shixun_save_path + 'select_hist_df.csv', sep='\t', encoding='utf-8')
else:
all_select = all_select.sort_values(by=['user_id', 'created_timestamp'])
select_last_df = all_select.groupby('user_id').tail(1)
# 多进程并行提升处理速度
df_grouped = all_select.groupby('user_id')
select_hist_df = apply_parallel(df_grouped, hist_func)
select_hist_df = select_hist_df.reset_index(drop=True)
#select_hist_df = all_select.groupby('user_id').progress_apply(hist_func).reset_index(drop = True)
select_last_df.to_csv(shixun_save_path + 'select_last_df.csv', sep='\t', index=False, header=True)
select_hist_df.to_csv(shixun_save_path + 'select_hist_df.csv', sep='\t', index=False, header=True)
reduce_mem(select_last_df)
reduce_mem(select_hist_df)
return select_hist_df, select_last_df
def get_cf_hist_and_last_select(all_select):
"""
获取cf算法所需物品行为数据的历史选择和最后一次选择
"""
# 如果用户只有一个选择hist为空会导致训练的时候这个用户不可见此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
def apply_parallel(df_grouped, func):
results = Parallel(n_jobs=20)(delayed(func)(group) for name, group in df_grouped)
return pd.concat(results)
if os.path.exists(shixun_save_path + 'cf_select_last_df.csv') and \
os.path.exists(shixun_save_path + 'cf_select_hist_df.csv'):
select_last_df = pd.read_csv(shixun_save_path + 'cf_select_last_df.csv', sep='\t', encoding='utf-8')
select_hist_df = pd.read_csv(shixun_save_path + 'cf_select_hist_df.csv', sep='\t', encoding='utf-8')
else:
all_select = all_select.sort_values(by=['user_id', 'created_timestamp'])
select_last_df = all_select.groupby('user_id').tail(1)
# 多进程并行提升处理速度
df_grouped = all_select.groupby('user_id')
select_hist_df = apply_parallel(df_grouped, hist_func)
select_hist_df = select_hist_df.reset_index(drop=True)
#select_hist_df = all_select.groupby('user_id').progress_apply(hist_func).reset_index(drop = True)
select_last_df.to_csv(shixun_save_path + 'cf_select_last_df.csv', sep='\t', index=False, header=True)
select_hist_df.to_csv(shixun_save_path + 'cf_select_hist_df.csv', sep='\t', index=False, header=True)
reduce_mem(select_last_df)
reduce_mem(select_hist_df)
return select_hist_df, select_last_df
def get_hist_and_last_em_select(all_select):
"""
获取物品行为数据的历史选择和最后一次选择
"""
# 如果用户只有一个选择hist为空会导致训练的时候这个用户不可见此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
def apply_parallel(df_grouped, func):
results = Parallel(n_jobs=20)(delayed(func)(group) for name, group in df_grouped)
return pd.concat(results)
if os.path.exists(shixun_save_path + 'select_last_em_df.csv') and \
os.path.exists(shixun_save_path + 'select_hist_em_df.csv'):
select_last_df = pd.read_csv(shixun_save_path + 'select_last_em_df.csv', sep='\t', encoding='utf-8')
select_hist_df = pd.read_csv(shixun_save_path + 'select_hist_em_df.csv', sep='\t', encoding='utf-8')
else:
all_select = all_select.sort_values(by=['user_id', 'created_timestamp'])
select_last_df = all_select.groupby('user_id').tail(1)
# 多进程并行提升处理速度
df_grouped = all_select.groupby('user_id')
select_hist_df = apply_parallel(df_grouped, hist_func)
select_hist_df = select_hist_df.reset_index(drop=True)
#select_hist_df = all_select.groupby('user_id').progress_apply(hist_func).reset_index(drop=True)
select_last_df.to_csv(shixun_save_path + 'select_last_em_df.csv', sep='\t', index=False, header=True)
select_hist_df.to_csv(shixun_save_path + 'select_hist_em_df.csv', sep='\t', index=False, header=True)
reduce_mem(select_last_df)
reduce_mem(select_hist_df)
return select_hist_df, select_last_df
def get_rank_hist_and_last_select(all_select):
"""
获取排序物品行为数据的历史选择和最后一次选择
"""
all_select = all_select.sort_values(by=['user_id', 'created_timestamp'])
select_last_df = all_select.groupby('user_id').tail(1)
# 如果用户只有一个选择hist为空会导致训练的时候这个用户不可见此时默认泄露一下
def hist_func(user_df):
if len(user_df) == 1:
return user_df
else:
return user_df[:-1]
select_hist_df = all_select.groupby('user_id').progress_apply(hist_func).reset_index(drop=True)
return select_hist_df, select_last_df
def get_item_info_dict(item_info_df):
"""
获取物品id对应的基本属性保存成字典的形式
方便后面召回阶段冷启动阶段直接使用
"""
# 创建时间进行归一化
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
item_info_df['created_at_ts'] = item_info_df[['created_at_ts']].progress_apply(max_min_scaler)
# 实训访问次数
item_visists_dict = dict(zip(item_info_df['shixun_id'], item_info_df['visits']))
# 实训难易程度
item_trainee_dict = dict(zip(item_info_df['shixun_id'], item_info_df['trainee']))
# 实训创建时间
item_created_time_dict = dict(zip(item_info_df['shixun_id'], item_info_df['created_at_ts']))
# 实训评价星数
item_averge_star_dict = dict(zip(item_info_df['shixun_id'], item_info_df['averge_star']))
# 实训选择人数
item_myshixuns_count_dict = dict(zip(item_info_df['shixun_id'], item_info_df['myshixuns_count']))
# 实训关卡数量
item_challenges_count_dict = dict(zip(item_info_df['shixun_id'], item_info_df['challenges_count']))
return item_visists_dict, item_trainee_dict, item_created_time_dict, \
item_averge_star_dict, item_myshixuns_count_dict, item_challenges_count_dict
def get_user_hist_item_info_dict(all_select):
"""
获取用户历史选择的物品信息
"""
logger.info("获取用户历史选择的物品信息")
# 获取user_id对应的用户历史选择物品平均访问数量的集合字典
user_hist_item_visits = all_select.groupby('user_id')['visits'].agg('mean').reset_index()
user_hist_item_visits_dict = dict(zip(user_hist_item_visits['user_id'], user_hist_item_visits['visits']))
# 获取user_id对应的用户选择物品的集合
user_hist_item_ids_dict = all_select.groupby('user_id')['shixun_id'].agg(set).reset_index()
user_hist_item_ids_dict = dict(zip(user_hist_item_ids_dict['user_id'], user_hist_item_ids_dict['shixun_id']))
# 获取user_id对应的用户历史选择的物品的难易字典
user_hist_item_trainee = all_select.groupby('user_id')['trainee'].agg(set).reset_index()
user_hist_item_trainee_dict = dict(zip(user_hist_item_trainee['user_id'], user_hist_item_trainee['trainee']))
# 获取user_id对应的用户历史选择的物品的评价的星数
user_averge_star = all_select.groupby('user_id')['averge_star'].agg(set).reset_index()
user_averge_star_dict = dict(zip(user_averge_star['user_id'], user_averge_star['averge_star']))
# 获取user_id对应的用户历史选择物品平均选择人数的集合字典
user_hist_item_myshixuns_count = all_select.groupby('user_id')['myshixuns_count'].agg('mean').reset_index()
user_hist_item_myshixuns_count_dict = dict(zip(user_hist_item_myshixuns_count['user_id'], user_hist_item_myshixuns_count['myshixuns_count']))
# 获取user_id对应的用户历史选择物品平均关卡数量的集合字典
user_hist_item_challenges_count = all_select.groupby('user_id')['challenges_count'].agg('mean').reset_index()
user_hist_item_challenges_count_dict = dict(zip(user_hist_item_challenges_count['user_id'], user_hist_item_challenges_count['challenges_count']))
# 获取user_id对应的用户最后一次选择的物品的创建时间
all_select_ = all_select.sort_values('created_at_ts')
user_last_item_created_time = all_select_.groupby('user_id')['created_at_ts']. \
progress_apply(lambda x: x.iloc[-1]).reset_index()
max_min_scaler = lambda x : (x-np.min(x))/(np.max(x)-np.min(x))
user_last_item_created_time['created_at_ts'] = user_last_item_created_time[['created_at_ts']]. \
progress_apply(max_min_scaler)
user_last_item_created_time_dict = dict(zip(user_last_item_created_time['user_id'], \
user_last_item_created_time['created_at_ts']))
return user_hist_item_visits_dict, user_hist_item_ids_dict, \
user_hist_item_trainee_dict, user_last_item_created_time_dict, \
user_averge_star_dict, user_hist_item_myshixuns_count_dict, \
user_hist_item_challenges_count_dict
def get_item_topk_select(select_df, k):
"""
获取被选择次数最多的物品用来做召回补全
"""
topk_select = select_df['shixun_id'].value_counts().index[:k]
return topk_select
def get_user_activate_degree_dict(all_select_df):
"""
将用户的选择次数作为获取用户活跃度的指标
"""
all_select_df_ = all_select_df.groupby('user_id')['shixun_id'].count().reset_index()
# 用户活跃度归一化
mm = MinMaxScaler()
all_select_df_['shixun_id'] = mm.fit_transform(all_select_df_[['shixun_id']])
user_activate_degree_dict = dict(zip(all_select_df_['user_id'], all_select_df_['shixun_id']))
return user_activate_degree_dict
def metrics_recall(user_recall_items_dict, train_last_select_df, topk=10):
"""
召回评估依次评估召回的前 10, 20, 30...topk/10 个物品的击中率
"""
# 生成用户最后选择物品的字典
last_select_item_dict = dict(zip(train_last_select_df['user_id'], train_last_select_df['shixun_id']))
# 用户数量
user_num = len(user_recall_items_dict)
print(user_num)
for k in range(10, topk + 1, 10):
hit_num = 0
for user_id, item_list in user_recall_items_dict.items():
# 获取前k个召回的结果
tmp_recall_items = [x[0] for x in user_recall_items_dict[user_id][:k]]
if last_select_item_dict[user_id] in set(tmp_recall_items):
hit_num += 1
hit_rate = round(hit_num * 1.0 / user_num, 5)
print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)
def metrics_pinsage_recall(user_recall_items_dict, train_last_select_df, topk=10):
"""
召回评估依次评估召回的前 10, 20, 30...topk/10 个物品的击中率
"""
# 生成用户最后选择物品的字典
last_select_item_dict = dict(zip(train_last_select_df['user'], train_last_select_df['item']))
# 用户数量
user_num = len(user_recall_items_dict)
print(user_num)
for k in range(10, topk + 1, 10):
hit_num = 0
for user_id, item_list in user_recall_items_dict.items():
# 获取前k个召回的结果
tmp_recall_items = [x[0] for x in user_recall_items_dict[user_id][:k]]
if last_select_item_dict[user_id] in set(tmp_recall_items):
hit_num += 1
hit_rate = round(hit_num * 1.0 / user_num, 5)
print(' topk: ', k, ' : ', 'hit_num: ', hit_num, 'hit_rate: ', hit_rate, 'user_num : ', user_num)