import os import sys sys.path.append(os.getcwd()) import numpy as np import pandas as pd import pickle from tqdm import tqdm from sklearn.preprocessing import MinMaxScaler import warnings from utils import reduce_mem from config import logger from config import samples_mode from config import subject_features_save_path from config import mysubjects_train_data, mysubjects_test_data from config import subject_itemcf_recall_dict from config import subject_item_embedding_recall_dict from config import subject_youtubednn_recall_dict from config import subject_youtubednn_usercf_recall_dict from config import subject_dssm_recall_dict from config import subject_pinsage_recall_dict from config import subject_final_recall_items_dict from config import subject_bert_emb_dict from config import subject_item_w2v_emb_dict from config import subject_train_user_item_feats from config import subject_val_user_item_feats from config import subject_test_user_item_feats from config import subject_youtube_user_emb_dict from config import subject_youtube_item_emb_dict from config import subject_all_user_item_feats from config import subject_dssm_item_emb_dict from config import subject_dssm_user_emb_dict from config import offline_mode from matching.subject.recall_comm import get_item_info_df from matching.subject.recall_comm import get_hist_and_last_select from matching.subject.recall_comm import get_user_info_df from ranking.subject.rank_comm import fill_is_disciplines_hab from ranking.subject.rank_comm import get_rank_item_info_dict from ranking.subject.rank_comm import get_item_bert_emb_dict tqdm.pandas() warnings.filterwarnings('ignore') def train_val_split(all_select_df, sample_rate=0.2): """ 划分训练集和验证集 :param all_select_df: 指的是训练集 :param sample_rate: 采样作为验证集的用户比率 """ all_select = all_select_df all_user_ids = all_select.user_id.unique() # replace = True表示可以重复抽样,反之不可以 sample_user_ids = np.random.choice(all_user_ids, size=int(len(all_user_ids) * sample_rate), replace=False) select_val = all_select[all_select['user_id'].isin(sample_user_ids)] select_train = all_select[~all_select['user_id'].isin(sample_user_ids)] # 将验证集中的最后一次选择给抽取出来作为答案 select_val = select_val.sort_values(['user_id', 'created_timestamp']) val_ans = select_val.groupby('user_id').tail(1) select_val = select_val.groupby('user_id').progress_apply(lambda x: x[:-1]).reset_index(drop=True) # 如果该用户只有一个选择数据,又被分到ans中 # 止方法保证答案中出现的用户再验证集中还有 val_ans = val_ans[val_ans.user_id.isin(select_val.user_id.unique())] select_val = select_val[select_val.user_id.isin(val_ans.user_id.unique())] return select_train, select_val, val_ans def get_train_val_test_data(offline=True): """ 读取训练、验证、测试集 """ if offline: select_train_data = pd.read_csv(mysubjects_train_data, sep='\t', encoding='utf-8') select_train, select_val, val_ans = train_val_split(select_train_data, sample_rate=0.3) else: select_train = pd.read_csv(mysubjects_train_data, sep='\t', encoding='utf-8') select_val = None val_ans = None select_test = pd.read_csv(mysubjects_test_data, sep='\t', encoding='utf-8') return select_train, select_val, select_test, val_ans def get_recall_list(single_recall_model=None, multi_recall=False): """ 返回多路召回列表或者部分单路召回列表 """ if multi_recall: return pickle.load(open(subject_final_recall_items_dict, 'rb')) if single_recall_model == 'i2i_itemcf': return pickle.load(open(subject_itemcf_recall_dict, 'rb')) elif single_recall_model == 'i2i_emb_itemcf': return pickle.load(open(subject_item_embedding_recall_dict, 'rb')) elif single_recall_model == 'user_cf': return pickle.load(open(subject_youtubednn_usercf_recall_dict, 'rb')) elif single_recall_model == 'youtubednn': return pickle.load(open(subject_youtubednn_recall_dict, 'rb')) elif single_recall_model == 'dssm': return pickle.load(open(subject_dssm_recall_dict, 'rb')) elif single_recall_model == 'pinsage': return pickle.load(open(subject_pinsage_recall_dict, 'rb')) def get_embedding(): """ 通过字典查询对应的item的Embedding """ #获取subject的bert embedding字典 if os.path.exists(subject_bert_emb_dict): item_bert_emb_dict = pickle.load(open(subject_bert_emb_dict, 'rb')) else: item_bert_emb_dict = get_item_bert_emb_dict() # w2v Embedding是需要提前训练好的 if os.path.exists(subject_item_w2v_emb_dict): item_word2vec_emb_dict = pickle.load(open(subject_item_w2v_emb_dict, 'rb')) else: print(os.path.basename(subject_item_w2v_emb_dict) + ' file not exist.') if os.path.exists(subject_youtube_item_emb_dict): item_youtube_emb_dict = pickle.load(open(subject_youtube_item_emb_dict, 'rb')) else: print(os.path.basename(subject_youtube_item_emb_dict) + 'file not exist.') if os.path.exists(subject_youtube_user_emb_dict): user_youtube_emb_dict = pickle.load(open(subject_youtube_user_emb_dict, 'rb')) else: print(os.path.basename(subject_youtube_user_emb_dict) + 'file not exist.') if os.path.exists(subject_dssm_item_emb_dict): item_dssm_emb_dict = pickle.load(open(subject_dssm_item_emb_dict, 'rb')) else: print(os.path.basename(subject_dssm_item_emb_dict) + 'file not exist.') if os.path.exists(subject_dssm_user_emb_dict): user_dssm_emb_dict = pickle.load(open(subject_dssm_user_emb_dict, 'rb')) else: print(os.path.basename(subject_dssm_user_emb_dict) + 'file not exist.') return item_bert_emb_dict, item_word2vec_emb_dict, item_youtube_emb_dict, user_youtube_emb_dict,item_dssm_emb_dict,user_dssm_emb_dict def recall_dict_2_df(recall_list_dict): """ 召回列表转换成DataFrame形式, [user, item, score] """ df_row_list = [] for user, recall_list in tqdm(recall_list_dict.items()): for item, score in recall_list: df_row_list.append([user, item, score]) col_names = ['user_id', 'sim_item', 'score'] recall_list_df = pd.DataFrame(df_row_list, columns=col_names) return recall_list_df def neg_sample_recall_data(recall_items_df, sample_rate=0.05): """ 负采样函数,可以控制负采样时的比例, 这里给了一个默认的值 """ logger.info('采样之前数据') pos_data = recall_items_df[recall_items_df['label'] == 1] neg_data = recall_items_df[recall_items_df['label'] == 0] print('正样本数量:', len(pos_data), '负样本数量:', len(neg_data), '正样本比率:', round(len(pos_data)/(len(pos_data) + len(neg_data)), 6)) # 分组采样函数 def neg_sample_func(group_df): neg_num = len(group_df) # 保证最少有一个 sample_num = max(int(neg_num * sample_rate), 1) # 保证最多不超过20个,这里可以根据实际情况进行选择 sample_num = min(sample_num, 20) return group_df.sample(n=sample_num, replace=True) # 对用户进行负采样,保证所有用户都在采样后的数据中 neg_data_user_sample = neg_data.groupby('user_id', group_keys=False). \ progress_apply(neg_sample_func) # 对物品进行负采样,保证所有物品都在采样后的数据中 neg_data_item_sample = neg_data.groupby('sim_item', group_keys=False). \ progress_apply(neg_sample_func) # 将上述两种情况下的采样数据合并 neg_data_new = neg_data_user_sample.append(neg_data_item_sample) # 由于上述两个操作是分开的,可能将两个相同的数据给重复选择了,所以需要对合并后的数据进行去重 neg_data_new = neg_data_new.sort_values(['user_id', 'score']).drop_duplicates( ['user_id', 'sim_item'], keep='last') # 将正样本数据合并 data_new = pd.concat([pos_data, neg_data_new], ignore_index=True) logger.info('采样之后数据') pos_data = data_new[data_new['label'] == 1] neg_data = data_new[data_new['label'] == 0] print('正样本数量:', len(pos_data), '负样本数量:', len(neg_data), '正样本比率:', round(len(pos_data)/(len(pos_data) + len(neg_data)), 4)) return data_new def sample_test_recall_data(recall_items_df, sample_rate=0.05): """ 测试样采样函数,可以控制采样的比例, 这里给了一个默认的值 """ logger.info('采样之前样本数量:' + str(len(recall_items_df))) # 分组采样函数 def neg_sample_func(group_df): neg_num = len(group_df) # 保证最少有一个 sample_num = max(int(neg_num * sample_rate), 1) # 保证最多不超过20个,这里可以根据实际情况进行选择 sample_num = min(sample_num, 20) return group_df.sample(n=sample_num, replace=True) # 对用户进行负采样,保证所有用户都在采样后的数据中 data_user_sample = recall_items_df.groupby('user_id', group_keys=False). \ progress_apply(neg_sample_func) # 对物品进行负采样,保证所有物品都在采样后的数据中 data_item_sample = recall_items_df.groupby('sim_item', group_keys=False). \ progress_apply(neg_sample_func) # 将上述两种情况下的采样数据合并 data_new = data_user_sample.append(data_item_sample) # 由于上述两个操作是分开的,可能将两个相同的数据给重复选择了,所以需要对合并后的数据进行去重 data_new = data_new.sort_values(['user_id', 'score']).drop_duplicates( ['user_id', 'sim_item'], keep='last') logger.info('采样之后样本数量:' + str(len(data_new))) return data_new def get_rank_label_df(recall_list_df, label_df, is_test=False): """ 召回数据打标签 """ # 测试集是没有标签了,为了后面代码统一一些,这里直接给一个负数替代 if is_test: recall_list_df['label'] = -1 return recall_list_df label_df = label_df.rename(columns={'subject_id': 'sim_item'}) recall_list_df_ = recall_list_df.merge(label_df[['user_id', 'sim_item', 'created_timestamp']], \ how='left', on=['user_id', 'sim_item']) recall_list_df_['label'] = recall_list_df_['created_timestamp'].progress_apply( lambda x: 0 if np.isnan(x) else 1) del recall_list_df_['created_timestamp'] return recall_list_df_ def get_user_recall_item_label_df(select_train_hist, select_val_hist, select_test_hist, select_train_last, select_val_last, recall_list_df): """ 获取用户召回列表训练,验证,测试集的标签 """ # 获取训练数据的召回列表 train_user_items_df = recall_list_df[recall_list_df['user_id'].isin(select_train_hist['user_id'].unique())] logger.info('训练集数据打标签') train_user_item_label_df = get_rank_label_df(train_user_items_df, select_train_last, is_test=False) logger.info('训练集数据负采样') train_user_item_label_df = neg_sample_recall_data(train_user_item_label_df) if select_val_hist is not None: val_user_items_df = recall_list_df[recall_list_df['user_id'].isin(select_val_hist['user_id'].unique())] logger.info('验证集数据打标签') val_user_item_label_df = get_rank_label_df(val_user_items_df, select_val_last, is_test=False) logger.info('验证集数据负采样') val_user_item_label_df = neg_sample_recall_data(val_user_item_label_df) else: val_user_item_label_df = None # 测试集数据进行随机采样,减少生成特征的时间 test_user_items_df = recall_list_df[recall_list_df['user_id'].isin(select_test_hist['user_id'].unique())] logger.info('测试集数据打标签') test_user_item_label_df = get_rank_label_df(test_user_items_df, None, is_test=True) logger.info('测试集数据随机采样') test_user_item_label_df = sample_test_recall_data(test_user_item_label_df) return train_user_item_label_df, val_user_item_label_df, test_user_item_label_df def make_tuple_func(group_df): """ 将最终的召回的df数据转换成字典的形式做排序特征 """ row_data = [] for name, row_df in group_df.iterrows(): row_data.append((row_df['sim_item'], row_df['score'], row_df['label'])) return row_data def get_cos_similar_matrix(v1, v2): #获取两个向量的余弦相似度 num = np.dot(v1, v2) # 向量点乘 denom = np.linalg.norm(v1).reshape(-1) * np.linalg.norm(v2).reshape(-1) # 求模长的乘积 res = num / denom res[np.isneginf(res)] = 0.0 #负无穷大的赋值0 # return num return float(0.5 + 0.5 * res) def create_behavior_feature(users_id, recall_list, select_hist_df, subjects_info, subject_info_dict, subjects_emb, user_emb=None, N=1): """ 基于用户历史行为生成相关特征 :param users_id: 用户id :param recall_list: 对于每个用户召回的候选物品列表 :param select_hist_df: 用户历史选择的物品 :param subjects_info: 物品信息 :param subjects_emb: 物品的embedding向量,可以用item_content_emb, item_w2v_emb, youtube_item_emb :param user_emb: 用户的embedding向量,可以是youtube_user_emb, 也可以不用, 如果要传的话,subjects_emb就要用youtube_item_emb,保持维度一样 :param N: 最近的N次选择,由于行为日志里面很多用户只存在一次历史选择,为了不产生空值,默认为1 """ subjects_info['subject_id'] = subjects_info['subject_id'].astype(int) select_hist_df['user_id'] = select_hist_df['user_id'].astype(int) # 建立一个二维列表保存结果, 后面要转成DataFrame all_user_feas = [] subject_id_list = subjects_info['subject_id'].values.tolist() for user_id in tqdm(users_id): # 该用户的最后N次选择 hist_user_items = select_hist_df[select_hist_df['user_id']==user_id]['subject_id'][-N:] # 遍历该用户的召回列表 for rank, (subject_id, score, label) in enumerate(recall_list[user_id]): # 不在物品信息中的跳过,以免报错 if subject_id not in subject_id_list: continue subject_id = int(subject_id) cur_subjects_info = subject_info_dict[subject_id] # 课程建立时间, 访问次数,学习人数,课堂学习人数... a_create_time = cur_subjects_info[0][0] a_visits_count = cur_subjects_info[0][1] a_stages_count = cur_subjects_info[0][2] a_stages_shixuns_count = cur_subjects_info[0][3] a_study_count = cur_subjects_info[0][4] a_course_study_count = cur_subjects_info[0][5] a_passed_count = cur_subjects_info[0][6] a_course_used_count = cur_subjects_info[0][7] a_school_used_count = cur_subjects_info[0][8] a_challenge_count = cur_subjects_info[0][9] a_evaluate_count = cur_subjects_info[0][10] a_video_study_time = cur_subjects_info[0][11] a_study_pdf_attachment_count = cur_subjects_info[0][12] a_averge_star = cur_subjects_info[0][14] single_user_fea = [user_id, subject_id] # 计算与最后选择的物品的相似度的和,最大值、最小值、均值 sim_fea = [] time_fea = [] visits_fea = [] stages_count_fea = [] stages_shixuns_count_fea = [] study_count_fea = [] course_study_count_fea = [] passed_count_fea = [] course_used_count_fea = [] school_used_count_fea = [] challenge_count_fea = [] evaluate_count_fea = [] video_study_time_fea = [] study_pdf_attachment_count_fea = [] averge_star_fea = [] # 遍历用户的最后N次选择物品 for hist_item in hist_user_items: if (hist_item not in subject_id_list): continue hist_item = int(hist_item) hist_subjects_info = subject_info_dict[hist_item] b_create_time = hist_subjects_info[0][0] b_visits_count = hist_subjects_info[0][1] b_stages_count = hist_subjects_info[0][2] b_stages_shixuns_count = hist_subjects_info[0][3] b_study_count = hist_subjects_info[0][4] b_course_study_count = hist_subjects_info[0][5] b_passed_count = hist_subjects_info[0][6] b_course_used_count = hist_subjects_info[0][7] b_school_used_count = hist_subjects_info[0][8] b_challenge_count = hist_subjects_info[0][9] b_evaluate_count = hist_subjects_info[0][10] b_video_study_time = hist_subjects_info[0][11] b_study_pdf_attachment_count = hist_subjects_info[0][12] b_averge_star = hist_subjects_info[0][14] if (hist_item not in subjects_emb) or (subject_id not in subjects_emb): sim_fea.append(0.0) else: sim_fea.append(np.dot(subjects_emb[hist_item], subjects_emb[subject_id])) time_fea.append(abs(a_create_time - b_create_time)) visits_fea.append(abs(a_visits_count - b_visits_count)) stages_count_fea.append(abs(a_stages_count - b_stages_count)) stages_shixuns_count_fea.append(abs(a_stages_shixuns_count - b_stages_shixuns_count)) study_count_fea.append(abs(a_study_count - b_study_count)) course_study_count_fea.append(abs(a_course_study_count - b_course_study_count)) passed_count_fea.append(abs(a_passed_count - b_passed_count)) course_used_count_fea.append(abs(a_course_used_count - b_course_used_count)) school_used_count_fea.append(abs(a_school_used_count - b_school_used_count)) challenge_count_fea.append(abs(a_challenge_count - b_challenge_count)) evaluate_count_fea.append(abs(a_evaluate_count - b_evaluate_count)) video_study_time_fea.append(abs(a_video_study_time - b_video_study_time)) study_pdf_attachment_count_fea.append(abs(a_study_pdf_attachment_count - b_study_pdf_attachment_count)) averge_star_fea.append(abs(a_averge_star - b_averge_star)) if (len(sim_fea) != 0) and (len(time_fea) != 0) and (len(visits_fea) != 0) and \ (len(stages_count_fea) != 0) and(len(stages_shixuns_count_fea) != 0) and(len(study_count_fea) != 0) and (len(course_study_count_fea) != 0) and \ (len(passed_count_fea) != 0) and (len(course_used_count_fea) != 0) and \ (len(school_used_count_fea) != 0) and (len(challenge_count_fea) != 0) and \ (len(evaluate_count_fea) != 0) and (len(video_study_time_fea) != 0) and \ (len(study_pdf_attachment_count_fea) != 0) and (len(averge_star_fea) != 0): # 相似性特征 single_user_fea.extend(sim_fea) # 时间差特征 single_user_fea.extend(time_fea) # 访问次数差特征 single_user_fea.extend(visits_fea) # 章节数量差特征 single_user_fea.extend(stages_count_fea) # 章节实训数量差特征 single_user_fea.extend(stages_shixuns_count_fea) # 学生人数差特征 single_user_fea.extend(study_count_fea) # 课堂学习人数差特征 single_user_fea.extend(course_study_count_fea) # 课程通过人数差特征 single_user_fea.extend(passed_count_fea) # 课堂使用次数差特征 single_user_fea.extend(course_used_count_fea) # 学校使用次数差特征 single_user_fea.extend(school_used_count_fea) # 关卡数量差特征 single_user_fea.extend(challenge_count_fea) # 学校使用次数差特征 single_user_fea.extend(evaluate_count_fea) # 视频学习时长差特征 single_user_fea.extend(video_study_time_fea) # PDF附件数量差特征 single_user_fea.extend(study_pdf_attachment_count_fea) # 平均星数差特征 single_user_fea.extend(averge_star_fea) # 相似性的统计特征 single_user_fea.extend([max(sim_fea), min(sim_fea), sum(sim_fea), sum(sim_fea) / len(sim_fea)]) if user_emb: # 如果用户向量有的话,这里计算该召回物品与用户的相似性特征 if (user_id not in user_emb) or (subject_id not in subjects_emb): single_user_fea.append(0.0) else: single_user_fea.append(np.dot(user_emb[user_id], subjects_emb[subject_id])) single_user_fea.extend([score, rank, label]) # 加入到总的表中 all_user_feas.append(single_user_fea) # 定义交叉特征 id_cols = ['user_id', 'subject_id'] sim_cols = ['sim' + str(i) for i in range(N)] time_cols = ['time_diff' + str(i) for i in range(N)] vists_cols = ['visit_diff' + str(i) for i in range(N)] stages_count_cols = ['stages_count_diff' + str(i) for i in range(N)] stages_shixuns_count_cols = ['stages_shixuns_count_diff' + str(i) for i in range(N)] study_count_cols = ['study_count_diff' + str(i) for i in range(N)] course_study_count_cols = ['course_study_count_diff' + str(i) for i in range(N)] passed_count_cols = ['passed_count_diff' + str(i) for i in range(N)] course_used_count_cols = ['course_used_count_diff' + str(i) for i in range(N)] school_used_count_cols = ['school_used_count_diff' + str(i) for i in range(N)] challenge_count_cols = ['challenge_count_diff' + str(i) for i in range(N)] evaluate_count_cols = ['evaluate_count_diff' + str(i) for i in range(N)] video_study_time_cols = ['video_study_time_diff' + str(i) for i in range(N)] study_pdf_attachment_count_cols = ['study_pdf_attachment_count_diff' + str(i) for i in range(N)] averge_star_cols = ['averge_star_diff' + str(i) for i in range(N)] sat_cols = ['sim_max', 'sim_min', 'sim_sum', 'sim_mean'] user_item_sim_cols = ['user_item_sim'] if user_emb else [] user_score_rank_label = ['score', 'rank', 'label'] # 交叉特征列表 cols = id_cols + sim_cols + time_cols + vists_cols + study_count_cols + course_study_count_cols \ +stages_count_cols+stages_shixuns_count_cols+passed_count_cols + course_used_count_cols \ + school_used_count_cols + challenge_count_cols+ evaluate_count_cols + video_study_time_cols \ + study_pdf_attachment_count_cols+ averge_star_cols + sat_cols + user_item_sim_cols + user_score_rank_label # 转成DataFrame features_df = pd.DataFrame(all_user_feas, columns=cols) return features_df def active_level(all_data, cols): """ 生成用户活跃度的特征 根据用户选择物品时间和选择物品的次数生成用户活跃度 如果用户选择物品之间的时间间隔比较小,同时选择的物品次数很多,就认为此用户是活跃用户 1. 首先根据user_id分组, 对于每个用户计算选择物品的次数,两两选择物品时间间隔的均值 2. 把选择次数取倒数和时间间隔的均值统一归一化,然后两者相加合并,该值越小说明用户越活跃 3. 注意:上面两两选择物品的时间间隔均值,会出现如果用户只选择了一次的情况, 这时候时间间隔均值那里会出现空值,对于这种情况最后特征那里给个大数进行区分 """ if os.path.exists(subject_features_save_path + 'user_active_level.csv'): user_act = pd.read_csv(subject_features_save_path + 'user_active_level.csv', sep='\t', encoding='utf-8') return user_act data = all_data[cols] data.sort_values(['user_id', 'created_timestamp'], inplace=True) user_act = pd.DataFrame(data.groupby('user_id', as_index=False)[['subject_id', 'created_timestamp']].\ agg({'subject_id':np.size, 'created_timestamp': {list}}).values, \ columns=['user_id', 'select_size', 'created_timestamp']) # 计算时间间隔的均值 def time_diff_mean(l): if len(l) == 1: return 1 else: return np.mean([j-i for i, j in list(zip(l[:-1], l[1:]))]) user_act['user_time_diff_mean'] = user_act['created_timestamp'].progress_apply(lambda x: time_diff_mean(x)) # 选择次数取倒数 user_act['select_size'] = 1 / user_act['select_size'] # 两者归一化 user_act['select_size'] = (user_act['select_size'] - user_act['select_size'].min()) / \ (user_act['select_size'].max() - user_act['select_size'].min()) user_act['user_time_diff_mean'] = (user_act['user_time_diff_mean'] - user_act['user_time_diff_mean'].min()) / \ (user_act['user_time_diff_mean'].max() - user_act['user_time_diff_mean'].min()) user_act['active_level'] = user_act['select_size'] + user_act['user_time_diff_mean'] user_act['user_id'] = user_act['user_id'].astype('int') del user_act['created_timestamp'] user_act.to_csv(subject_features_save_path + 'user_active_level.csv', index=False, header=True, sep='\t') return user_act def hot_level(all_data, cols): """ 生成物品热度的特征 根据物品选择时间和被选择物品的次数来衡量物品热度特征 如果物品在很短的时间间隔之内被选择了很多次,说明物品比较热门 1. 根据物品进行分组,对于每个物品的用户,计算选择的时间间隔 2. 将用户的数量取倒数,然后用户的数量和时间间隔归一化,相加得到热度特征 该值越小说明被选择的次数越大且时间间隔越短,物品比较热门 """ if os.path.exists(subject_features_save_path + 'subject_hot_level.csv'): subject_hot = pd.read_csv(subject_features_save_path + 'subject_hot_level.csv', sep='\t', encoding='utf-8') return subject_hot data = all_data[cols] data.sort_values(['subject_id', 'created_timestamp'], inplace=True) subject_hot = pd.DataFrame(data.groupby('subject_id', as_index=False) \ [['user_id', 'created_timestamp']]. \ agg({'user_id': np.size, 'created_timestamp': {list}}).values, \ columns=['subject_id', 'user_num', 'created_timestamp']) # 计算被选择时间间隔的均值 def time_diff_mean(l): if len(l) == 1: return 1 else: return np.mean([j-i for i, j in list(zip(l[:-1], l[1:]))]) subject_hot['item_time_diff_mean'] = subject_hot['created_timestamp']. \ progress_apply(lambda x: time_diff_mean(x)) # 选择次数取倒数 subject_hot['user_num'] = 1 / subject_hot['user_num'] # 两者归一化 subject_hot['user_num'] = (subject_hot['user_num'] - subject_hot['user_num'].min()) /\ (subject_hot['user_num'].max() - subject_hot['user_num'].min()) subject_hot['item_time_diff_mean'] = (subject_hot['item_time_diff_mean'] - subject_hot['item_time_diff_mean'].min()) /\ (subject_hot['item_time_diff_mean'].max() - subject_hot['item_time_diff_mean'].min()) subject_hot['hot_level'] = subject_hot['user_num'] + subject_hot['item_time_diff_mean'] subject_hot['subject_id'] = subject_hot['subject_id'].astype('int') del subject_hot['created_timestamp'] subject_hot.to_csv(subject_features_save_path + 'subject_hot_level.csv', index=False, header=True, sep='\t') return subject_hot def user_time_hob_fea(all_data, cols): """ 生成用户的时间习惯特征 根据用户选择的历史物品的时间做统计求均值 可以看出用户习惯一天什么时候选择物品 """ user_time_hob_info = all_data[cols] # 先把时间戳进行归一化 mm = MinMaxScaler() user_time_hob_info['created_timestamp'] = mm.fit_transform(user_time_hob_info[['created_timestamp']]) user_time_hob_info['created_at_ts'] = mm.fit_transform(user_time_hob_info[['created_at_ts']]) user_time_hob_info = user_time_hob_info.groupby('user_id').agg('mean').reset_index() user_time_hob_info.rename(columns={'created_timestamp': 'user_time_hob1', 'created_at_ts': 'user_time_hob2'}, inplace=True) return user_time_hob_info def user_disciplines_id_hob_fea(all_data, cols): """ 用户的选择的课程难易爱好 根据用户选择的课程难易度转成一个列表 后面汇总的时候再单独制作一个特征,如果难度在这里面为1否则为0 """ user_category_hob_info = all_data[cols] user_category_hob_info['disciplines_id'] = user_category_hob_info['disciplines_id'].astype(str) user_category_hob_info = user_category_hob_info.groupby('user_id').agg({set}).reset_index() user_disciplines_id_hob_info = pd.DataFrame() user_disciplines_id_hob_info['user_id'] = user_category_hob_info['user_id'] user_disciplines_id_hob_info['disciplines_list'] = user_category_hob_info['disciplines_id'] return user_disciplines_id_hob_info def build_rank_features_engineering(): """ 排序模型特征工程 """ logger.info('获取训练验证测试数据集') # offline和online的区别就是验证集是否为空 # online时select_val, val_ans为空 select_train, select_val, select_test, val_ans = get_train_val_test_data(offline=offline_mode) logger.info('获取用户历史选择和最后一次选择') select_train_hist, select_train_last = get_hist_and_last_select(select_train) if select_val is not None: select_val_hist, select_val_last = select_val, val_ans else: select_val_hist, select_val_last = None, None select_test_hist = select_test # 读取离线召回数据 # 全量数据时只选择pinsage召回的结果 # 增量数据时选择多路召回合并的结果 logger.info('获取召回列表数据') recall_list_dict = get_recall_list(single_recall_model='pinsage', multi_recall=samples_mode) logger.info('召回数据转换成DataFrame...') recall_list_df = recall_dict_2_df(recall_list_dict) logger.info('给训练验证测试数据集打标签,负采样...') train_user_item_label_df, val_user_item_label_df, test_user_item_label_df = \ get_user_recall_item_label_df(select_train_hist, select_val_hist, select_test_hist, select_train_last, select_val_last, recall_list_df) logger.info('召回数据转换成字典') train_user_item_label_tuples = train_user_item_label_df.groupby('user_id'). \ progress_apply(make_tuple_func).reset_index() train_user_item_label_tuples_dict = dict(zip(train_user_item_label_tuples['user_id'], train_user_item_label_tuples[0])) if val_user_item_label_df is not None: val_user_item_label_tuples = val_user_item_label_df.groupby('user_id'). \ progress_apply(make_tuple_func).reset_index() val_user_item_label_tuples_dict = dict(zip(val_user_item_label_tuples['user_id'], val_user_item_label_tuples[0])) else: val_user_item_label_tuples_dict = None test_user_item_label_tuples = test_user_item_label_df.groupby('user_id'). \ progress_apply(make_tuple_func).reset_index() test_user_item_label_tuples_dict = dict(zip(test_user_item_label_tuples['user_id'], test_user_item_label_tuples[0])) logger.info("获取用户信息") users_info = get_user_info_df() # 用到的用户信息特征 users_info = users_info[['user_id', 'gender', 'school_id', 'identity','edu_background', 'logins', 'grade', 'experience']] logger.info('获取物品信息') subject_info_df = get_item_info_df() # 用到的物品信息特征 subject_info_df = subject_info_df[['subject_id', 'visits', 'disciplines_id', 'stages_count','stage_shixuns_count', 'study_count', 'course_study_count', 'passed_count', 'course_used_count', 'school_used_count', 'challenge_count', 'evaluate_count', 'video_study_time', 'study_pdf_attachment_count', 'averge_star', 'created_at_ts']] logger.info('生成物品信息字典') subject_info_dict = get_rank_item_info_dict(subject_info_df) logger.info('获取物品向量化特征') item_bert_emb_dict, item_word2vec_emb_dict, item_youtube_emb_dict, user_youtube_emb_dict,item_dssm_emb_dict,user_dssm_emb_dict = get_embedding() if os.path.exists(subject_features_save_path + 'train_user_item_behavior_feats_df.csv'): train_user_item_feats_df = pd.read_csv(subject_features_save_path + 'train_user_item_behavior_feats_df.csv', sep='\t', encoding='utf-8') reduce_mem(train_user_item_feats_df) else: logger.info('生成训练数据集中物品交叉特征') train_user_item_feats_df = create_behavior_feature(train_user_item_label_tuples_dict.keys(), train_user_item_label_tuples_dict, select_train_hist, subject_info_df, subject_info_dict, item_bert_emb_dict) train_user_item_feats_df.to_csv(subject_features_save_path + 'train_user_item_behavior_feats_df.csv', sep='\t', index=False, header=True) reduce_mem(train_user_item_feats_df) if os.path.exists(subject_features_save_path + 'val_user_item_behavior_feats_df.csv'): val_user_item_feats_df = pd.read_csv(subject_features_save_path + 'val_user_item_behavior_feats_df.csv', sep='\t', encoding='utf-8') reduce_mem(val_user_item_feats_df) else: if val_user_item_label_tuples_dict is not None: logger.info('生成验证数据集中物品交叉特征') val_user_item_feats_df = create_behavior_feature(val_user_item_label_tuples_dict.keys(), val_user_item_label_tuples_dict, select_val_hist, subject_info_df, subject_info_dict, item_bert_emb_dict) else: val_user_item_feats_df = None if val_user_item_feats_df is not None: val_user_item_feats_df.to_csv(subject_features_save_path + 'val_user_item_behavior_feats_df.csv', sep='\t', index=False, header=True) reduce_mem(val_user_item_feats_df) if os.path.exists(subject_features_save_path + 'test_user_item_behavior_feats_df.csv'): test_user_item_feats_df = pd.read_csv(subject_features_save_path + 'test_user_item_behavior_feats_df.csv', sep='\t', encoding='utf-8') reduce_mem(test_user_item_feats_df) else: logger.info('生成测试数据集中物品交叉特征') test_user_item_feats_df = create_behavior_feature(test_user_item_label_tuples_dict.keys(), test_user_item_label_tuples_dict, select_test_hist, subject_info_df, subject_info_dict, item_bert_emb_dict) test_user_item_feats_df.to_csv(subject_features_save_path + 'test_user_item_behavior_feats_df.csv', sep='\t', index=False, header=True) reduce_mem(test_user_item_feats_df) # 物品行为数据,就是前面的所有数据 if select_val is not None: all_data = select_train.append(select_val) all_data = select_train.append(select_test) # 拼上物品信息 all_data = all_data.merge(subject_info_df, on='subject_id', how='left') logger.info('生成用户活跃度特征') user_act_fea = active_level(all_data, ['user_id', 'subject_id', 'created_timestamp']) logger.info('生成物品热度特征') subject_hot_fea = hot_level(all_data, ['user_id', 'subject_id', 'created_timestamp']) # 用户时间特征 user_time_hob_cols = ['user_id', 'created_timestamp', 'created_at_ts'] user_time_hob_info = user_time_hob_fea(all_data, user_time_hob_cols) # 用户选择的课程难度特征 user_category_hob_cols = ['user_id', 'disciplines_id'] user_disciplines_id_hob_info = user_disciplines_id_hob_fea(all_data, user_category_hob_cols) # 用户选择的课程访问次数特征 user_visits_count_info = all_data.groupby('user_id')['visits'].agg('mean').reset_index() user_visits_count_info.rename(columns = {'visits': 'visits_hbo'}, inplace=True) # 用户选择的课程学生人数特征 user_study_count_info = all_data.groupby('user_id')['study_count'].agg('mean').reset_index() user_study_count_info.rename(columns = {'study_count': 'study_count_hbo'}, inplace=True) # 用户选择的课程课堂学习人数特征 user_course_study_count_info = all_data.groupby('user_id')['course_study_count'].agg('mean').reset_index() user_course_study_count_info.rename(columns = {'course_study_count': 'course_study_count_hbo'}, inplace=True) # 用户选择的课程通过人数特征 user_passed_count_info = all_data.groupby('user_id')['passed_count'].agg('mean').reset_index() user_passed_count_info.rename(columns={'passed_count': 'passed_count_hbo'}, inplace=True) # 用户选择的课程课堂使用数量特征 user_course_used_count_info = all_data.groupby('user_id')['course_used_count'].agg('mean').reset_index() user_course_used_count_info.rename(columns={'course_used_count': 'course_used_count_hbo'}, inplace=True) # 用户选择的课程学校使用次数特征 user_school_used_count_info = all_data.groupby('user_id')['school_used_count'].agg('mean').reset_index() user_school_used_count_info.rename(columns={'school_used_count': 'school_used_count_hbo'}, inplace=True) # 用户选择的课程关卡数量特征 user_challenge_count_info = all_data.groupby('user_id')['challenge_count'].agg('mean').reset_index() user_challenge_count_info.rename(columns={'challenge_count': 'challenge_count_hbo'}, inplace=True) # 用户选择的课程评测次数特征 user_evaluate_count_info = all_data.groupby('user_id')['evaluate_count'].agg('mean').reset_index() user_evaluate_count_info.rename(columns={'evaluate_count': 'evaluate_count_hbo'}, inplace=True) # 用户选择的课程视频学习时长特征 user_video_study_time_info = all_data.groupby('user_id')['video_study_time'].agg('mean').reset_index() user_video_study_time_info.rename(columns={'video_study_time': 'video_study_time_hbo'}, inplace=True) # 用户选择的课程PDF附件数量特征 user_study_pdf_attachment_count_info = all_data.groupby('user_id')['study_pdf_attachment_count'].agg('mean').reset_index() user_study_pdf_attachment_count_info.rename(columns={'study_pdf_attachment_count': 'study_pdf_attachment_count_hbo'}, inplace=True) # 用户选择的课程评价星数特征 user_averge_star_info = all_data.groupby('user_id')['averge_star'].agg('mean').reset_index() user_averge_star_info.rename(columns={'averge_star': 'averge_star_hbo'}, inplace=True) # 用户选择的实训数量特征 user_subject_num_info = all_data.groupby('user_id')['subject_id'].agg([('subject_num', 'count')]).reset_index() user_subject_num_info.rename(columns={'subject_num': 'seq_length'}, inplace=True) logger.info('合并用户特征') user_features = pd.merge(user_act_fea, user_time_hob_info, on='user_id') user_features = user_features.merge(user_disciplines_id_hob_info, on='user_id') user_features = user_features.merge(user_visits_count_info, on='user_id') user_features = user_features.merge(user_study_count_info, on='user_id') user_features = user_features.merge(user_course_study_count_info, on='user_id') user_features = user_features.merge(user_passed_count_info, on='user_id') user_features = user_features.merge(user_course_used_count_info, on='user_id') user_features = user_features.merge(user_school_used_count_info, on='user_id') user_features = user_features.merge(user_challenge_count_info, on='user_id') user_features = user_features.merge(user_evaluate_count_info, on='user_id') user_features = user_features.merge(user_video_study_time_info, on='user_id') user_features = user_features.merge(user_study_pdf_attachment_count_info, on='user_id') user_features = user_features.merge(user_averge_star_info, on='user_id') user_features = user_features.merge(user_subject_num_info, on='user_id') # 合并用户人口学统计特征 user_features = user_features.merge(users_info, on='user_id', how='left') logger.info('保存用户特征') user_features.to_csv(subject_features_save_path + 'user_features_df.csv', sep='\t', header=True, index=False) logger.info('拼接用户特征') train_user_item_feats_df = train_user_item_feats_df.merge(user_features, on='user_id', how='left') if val_user_item_feats_df is not None: val_user_item_feats_df = val_user_item_feats_df.merge(user_features, on='user_id', how='left') else: val_user_item_feats_df = None test_user_item_feats_df = test_user_item_feats_df.merge(user_features, on='user_id',how='left') logger.info('拼接物品特征') train_user_item_feats_df = train_user_item_feats_df.merge(subject_info_df, on='subject_id', how='left') train_user_item_feats_df = train_user_item_feats_df.merge(subject_hot_fea, on='subject_id', how='left') if val_user_item_feats_df is not None: val_user_item_feats_df = val_user_item_feats_df.merge(subject_info_df, on='subject_id', how='left') val_user_item_feats_df = val_user_item_feats_df.merge(subject_hot_fea, on='subject_id', how='left') else: val_user_item_feats_df = None test_user_item_feats_df = test_user_item_feats_df.merge(subject_info_df, on='subject_id', how='left') test_user_item_feats_df = test_user_item_feats_df.merge(subject_hot_fea, on='subject_id', how='left') # 是否在用户选择的课程难度中 train_user_item_feats_df['is_disciplines_hab'] = train_user_item_feats_df.progress_apply( lambda x: fill_is_disciplines_hab(x), axis=1) if val_user_item_feats_df is not None: val_user_item_feats_df['is_disciplines_hab'] = val_user_item_feats_df.progress_apply( lambda x: fill_is_disciplines_hab(x), axis=1) else: val_user_item_feats_df = None test_user_item_feats_df['is_disciplines_hab'] = test_user_item_feats_df.progress_apply( lambda x: fill_is_disciplines_hab(x), axis=1) # 删除排序模型用不到的特征 del train_user_item_feats_df['disciplines_list'] if val_user_item_feats_df is not None: del val_user_item_feats_df['disciplines_list'] else: val_user_item_feats_df = None del test_user_item_feats_df['disciplines_list'] logger.info('保存所有特征') train_user_item_feats_df.to_csv(subject_train_user_item_feats, sep='\t', index=False, header=True) if val_user_item_feats_df is not None: val_user_item_feats_df.to_csv(subject_val_user_item_feats, sep='\t', index=False, header=True) test_user_item_feats_df.to_csv(subject_test_user_item_feats, sep='\t', index=False, header=True) all_user_item_feats_df = train_user_item_feats_df.append(test_user_item_feats_df) if val_user_item_feats_df is not None: all_user_item_feats_df = all_user_item_feats_df.append(val_user_item_feats_df) all_user_item_feats_df.to_csv(subject_all_user_item_feats, sep='\t', index=False, header=True) logger.info('保存成功') if __name__ == '__main__': build_rank_features_engineering()