import os import sys sys.path.append(os.getcwd()) import numpy as np import pandas as pd from tqdm import tqdm import warnings from datetime import datetime from config import logger from config import test_user_id from config import subject_features_save_path from config import subject_all_user_item_feats from matching.subject.recall_comm import get_item_info_df from matching.subject.recall_comm import get_all_select_df from matching.subject.recall_comm import get_select_item_info from matching.subject.recall_comm import get_rank_hist_and_last_select from matching.subject.multi_recall_predict import multi_recall_predict from ranking.subject.rank_features_engineering import get_embedding, get_recall_list from ranking.subject.rank_comm import fill_is_disciplines_hab from ranking.subject.rank_comm import get_rank_item_info_dict os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' tqdm.pandas() warnings.filterwarnings('ignore') all_select_df, recall_list_dict = None, None item_content_emb_dict, item_w2v_emb_dict = None, None item_youtube_emb_dict, user_youtube_emb_dict = None, None subject_info_df, user_features = None, None all_user_item_feats_df = None def create_rank_behavior_feature(user_id, recall_list, select_hist_df, subjects_info, subject_info_dict, subjects_emb, user_emb=None, N=1): """ 基于用户历史行为生成排序模型特征 :param users_id: 用户id :param recall_list: 用户召回的候选物品列表 :param select_hist_df: 用户历史选择的物品 :param subjects_info: 物品信息 :param subjects_emb: 物品的embedding向量,可以用item_content_emb, item_w2v_emb, youtube_item_emb :param user_emb: 用户的embedding向量,可以是youtube_user_emb, 也可以不用, 如果要传的话,subjects_emb就要用youtube_item_emb,保持维度一样 :param N: 最近的N次选择,由于行为日志里面很多用户只存在一次历史选择,为了不产生空值,默认为1 """ subjects_info['subject_id'] = subjects_info['subject_id'].astype(int) select_hist_df['user_id'] = select_hist_df['user_id'].astype(int) # 建立一个二维列表保存结果, 后面要转成DataFrame all_user_feas = [] subject_id_list = subjects_info['subject_id'].values.tolist() # 该用户的最后N次选择 hist_user_items = select_hist_df[select_hist_df['user_id']==user_id]['subject_id'][-N:] # 遍历该用户的召回列表 for rank, (subject_id, score) in enumerate(recall_list): # 不在物品信息中的跳过,以免报错 if subject_id not in subject_id_list: continue subject_id = int(subject_id) cur_subjects_info = subject_info_dict[subject_id] # 课程建立时间, 访问次数,学习人数,课堂学习人数... a_create_time = cur_subjects_info[0][0] a_visits_count = cur_subjects_info[0][1] a_stages_count = cur_subjects_info[0][2] a_stages_shixuns_count = cur_subjects_info[0][3] a_study_count = cur_subjects_info[0][4] a_course_study_count = cur_subjects_info[0][5] a_passed_count = cur_subjects_info[0][6] a_course_used_count = cur_subjects_info[0][7] a_school_used_count = cur_subjects_info[0][8] a_challenge_count = cur_subjects_info[0][9] a_evaluate_count = cur_subjects_info[0][10] a_video_study_time = cur_subjects_info[0][11] a_study_pdf_attachment_count = cur_subjects_info[0][12] a_averge_star = cur_subjects_info[0][14] single_user_fea = [user_id, subject_id] # 计算与最后选择的物品的相似度的和,最大值、最小值、均值 sim_fea = [] time_fea = [] visits_fea = [] stages_count_fea = [] stages_shixuns_count_fea = [] study_count_fea = [] course_study_count_fea = [] passed_count_fea = [] course_used_count_fea = [] school_used_count_fea = [] challenge_count_fea = [] evaluate_count_fea = [] video_study_time_fea = [] study_pdf_attachment_count_fea = [] averge_star_fea = [] # 遍历用户的最后N次选择物品 for hist_item in hist_user_items: if (hist_item not in subject_id_list): continue hist_item = int(hist_item) hist_subjects_info = subject_info_dict[hist_item] b_create_time = hist_subjects_info[0][0] b_visits_count = hist_subjects_info[0][1] b_stages_count = hist_subjects_info[0][2] b_stages_shixuns_count = hist_subjects_info[0][3] b_study_count = hist_subjects_info[0][4] b_course_study_count = hist_subjects_info[0][5] b_passed_count = hist_subjects_info[0][6] b_course_used_count = hist_subjects_info[0][7] b_school_used_count = hist_subjects_info[0][8] b_challenge_count = hist_subjects_info[0][9] b_evaluate_count = hist_subjects_info[0][10] b_video_study_time = hist_subjects_info[0][11] b_study_pdf_attachment_count = hist_subjects_info[0][12] b_averge_star = hist_subjects_info[0][14] if (hist_item not in subjects_emb) or (subject_id not in subjects_emb): sim_fea.append(0.0) else: sim_fea.append(np.dot(subjects_emb[hist_item], subjects_emb[subject_id])) time_fea.append(abs(a_create_time - b_create_time)) visits_fea.append(abs(a_visits_count - b_visits_count)) stages_count_fea.append(abs(a_stages_count - b_stages_count)) stages_shixuns_count_fea.append(abs(a_stages_shixuns_count - b_stages_shixuns_count)) study_count_fea.append(abs(a_study_count - b_study_count)) course_study_count_fea.append(abs(a_course_study_count - b_course_study_count)) passed_count_fea.append(abs(a_passed_count - b_passed_count)) course_used_count_fea.append(abs(a_course_used_count - b_course_used_count)) school_used_count_fea.append(abs(a_school_used_count - b_school_used_count)) challenge_count_fea.append(abs(a_challenge_count - b_challenge_count)) evaluate_count_fea.append(abs(a_evaluate_count - b_evaluate_count)) video_study_time_fea.append(abs(a_video_study_time - b_video_study_time)) study_pdf_attachment_count_fea.append(abs(a_study_pdf_attachment_count - b_study_pdf_attachment_count)) averge_star_fea.append(abs(a_averge_star - b_averge_star)) if (len(sim_fea) != 0) and (len(time_fea) != 0) and (len(visits_fea) != 0) and \ (len(study_count_fea) != 0) and (len(course_study_count_fea) != 0) and \ (len(passed_count_fea) != 0) and (len(course_used_count_fea) != 0) and \ (len(school_used_count_fea) != 0) and (len(challenge_count_fea) != 0) and \ (len(evaluate_count_fea) != 0) and (len(video_study_time_fea) != 0) and \ (len(study_pdf_attachment_count_fea) != 0) and (len(averge_star_fea) != 0): # 相似性特征 single_user_fea.extend(sim_fea) # 时间差特征 single_user_fea.extend(time_fea) # 访问次数差特征 single_user_fea.extend(visits_fea) # 章节数量差特征 single_user_fea.extend(stages_count_fea) # 章节实训数量差特征 single_user_fea.extend(stages_shixuns_count_fea) # 学生人数差特征 single_user_fea.extend(study_count_fea) # 课堂学习人数差特征 single_user_fea.extend(course_study_count_fea) # 课程通过人数差特征 single_user_fea.extend(passed_count_fea) # 课堂使用次数差特征 single_user_fea.extend(course_used_count_fea) # 学校使用次数差特征 single_user_fea.extend(school_used_count_fea) # 关卡数量差特征 single_user_fea.extend(challenge_count_fea) # 学校使用次数差特征 single_user_fea.extend(evaluate_count_fea) # 视频学习时长差特征 single_user_fea.extend(video_study_time_fea) # PDF附件数量差特征 single_user_fea.extend(study_pdf_attachment_count_fea) # 平均星数差特征 single_user_fea.extend(averge_star_fea) # 相似性的统计特征 single_user_fea.extend([max(sim_fea), min(sim_fea), sum(sim_fea), sum(sim_fea) / len(sim_fea)]) if user_emb: # 如果用户向量有的话,这里计算该召回物品与用户的相似性特征 if (user_id not in user_emb) or (subject_id not in subjects_emb): single_user_fea.append(0.0) else: single_user_fea.append(np.dot(user_emb[user_id], subjects_emb[subject_id])) single_user_fea.extend([score, rank]) # 加入到总的表中 all_user_feas.append(single_user_fea) # 定义交叉特征 id_cols = ['user_id', 'subject_id'] sim_cols = ['sim' + str(i) for i in range(N)] time_cols = ['time_diff' + str(i) for i in range(N)] vists_cols = ['visit_diff' + str(i) for i in range(N)] stages_count_cols = ['stages_count_diff' + str(i) for i in range(N)] stages_shixuns_count_cols = ['stages_shixuns_count_diff' + str(i) for i in range(N)] study_count_cols = ['study_count_diff' + str(i) for i in range(N)] course_study_count_cols = ['course_study_count_diff' + str(i) for i in range(N)] passed_count_cols = ['passed_count_diff' + str(i) for i in range(N)] course_used_count_cols = ['course_used_count_diff' + str(i) for i in range(N)] school_used_count_cols = ['school_used_count_diff' + str(i) for i in range(N)] challenge_count_cols = ['challenge_count_diff' + str(i) for i in range(N)] evaluate_count_cols = ['evaluate_count_diff' + str(i) for i in range(N)] video_study_time_cols = ['video_study_time_diff' + str(i) for i in range(N)] study_pdf_attachment_count_cols = ['study_pdf_attachment_count_diff' + str(i) for i in range(N)] averge_star_cols = ['averge_star_diff' + str(i) for i in range(N)] sat_cols = ['sim_max', 'sim_min', 'sim_sum', 'sim_mean'] user_item_sim_cols = ['user_item_sim'] if user_emb else [] user_score_rank_label = ['score', 'rank'] # 交叉特征列表 cols = id_cols + sim_cols + time_cols + vists_cols + study_count_cols + course_study_count_cols \ +stages_count_cols+stages_shixuns_count_cols+passed_count_cols + course_used_count_cols \ + school_used_count_cols + challenge_count_cols+ evaluate_count_cols + video_study_time_cols \ + study_pdf_attachment_count_cols+ averge_star_cols + sat_cols + user_item_sim_cols + user_score_rank_label # 转成DataFrame features_df = pd.DataFrame(all_user_feas, columns=cols) return features_df def init_rank_features(): global all_select_df global recall_list_dict global item_bert_emb_dict global item_word2vec_emb_dict global item_youtube_emb_dict global user_youtube_emb_dict global item_dssm_emb_dict global user_dssm_emb_dict global subject_info_df global user_features global all_user_item_feats_df global subject_info_dict global subject_hot logger.info("加载物品行为数据") all_select_df = get_all_select_df(offline=False) logger.info('获取物品召回数据') recall_list_dict = get_recall_list(single_recall_model='youtubednn', multi_recall=True) logger.info('获取物品向量化特征') item_bert_emb_dict, item_word2vec_emb_dict, item_youtube_emb_dict, user_youtube_emb_dict,item_dssm_emb_dict,user_dssm_emb_dict = get_embedding() logger.info('获取物品信息') subject_info_df = get_item_info_df() # 用到的物品信息特征 subject_info_df = subject_info_df[['subject_id', 'visits', 'disciplines_id', 'stages_count','stage_shixuns_count', 'study_count', 'course_study_count', 'passed_count', 'course_used_count', 'school_used_count', 'challenge_count', 'evaluate_count', 'video_study_time', 'study_pdf_attachment_count', 'averge_star', 'created_at_ts']] logger.info('生成物品信息字典') subject_info_dict = get_rank_item_info_dict(subject_info_df) logger.info('获取用户特征') user_features = pd.read_csv(subject_features_save_path + 'user_features_df.csv', sep='\t', encoding='utf-8') logger.info('获取物品热度特征') subject_hot = pd.read_csv(subject_features_save_path + 'subject_hot_level.csv', sep='\t', encoding='utf-8') logger.info('获取排序模型特征') all_user_item_feats_df = pd.read_csv(subject_all_user_item_feats, sep='\t', encoding='utf-8') def build_rank_features_online(user_id, user_recall_item_dict): """ 根据用户召回列表构建排序模型特征 """ # 没有召回数据返回空DataFrame if user_id not in user_recall_item_dict: return pd.DataFrame() start_time = datetime.now() # 获取用户的召回列表 recall_list = user_recall_item_dict[user_id] select_info = get_select_item_info(all_select_df, user_id) select_hist, select_last = get_rank_hist_and_last_select(select_info) user_item_feats_df = create_rank_behavior_feature(user_id, recall_list, select_hist, subject_info_df, subject_info_dict, item_youtube_emb_dict, user_youtube_emb_dict) if not user_item_feats_df.empty: # 拼接用户特征 user_item_feats_df = user_item_feats_df.merge(user_features, on='user_id', how='left') # 接接物品特征 user_item_feats_df = user_item_feats_df.merge(subject_info_df, on='subject_id', how='left') # 接接物品热度特征 user_item_feats_df = user_item_feats_df.merge(subject_hot, on='subject_id', how='left') # 是否在用户选择的实训难度中 user_item_feats_df['is_disciplines_hab'] = user_item_feats_df.progress_apply( lambda x: fill_is_disciplines_hab(x), axis=1) del user_item_feats_df['disciplines_list'] user_item_feats_df = user_item_feats_df.reset_index() # 计算耗时毫秒 end_time = datetime.utcnow() cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3) logger.info(f"在线生成排序模型特征耗时: {cost_time_millisecond} 毫秒") return user_item_feats_df def build_rank_features_offline(user_id): """ 直接读取离线的排序模型特征 """ user_item_feats_df = all_user_item_feats_df[all_user_item_feats_df['user_id'] == user_id] return user_item_feats_df if __name__ == '__main__': init_rank_features() user_recall_item_dict, only_cold_start_recall = multi_recall_predict(test_user_id, topk=100) user_item_feats_df = build_rank_features_online(test_user_id, user_recall_item_dict) user_item_feats_df.to_csv(subject_features_save_path + 'user_item_feats_df.csv', sep='\t', index=False)