import pandas as pd from tqdm import tqdm import warnings import pickle import json import random import time import os import sys sys.path.append(os.getcwd()) from datetime import datetime from config import logger from config import test_user_id from config import subject_cold_start_recall_dict from config import subjects_data_path from config import subject_cold_start_user_subject_dict from config import cold_start_subjects_parent_path from matching.subject.recall_comm import get_all_select_df from matching.subject.recall_comm import get_item_info_df from matching.subject.recall_comm import get_user_hist_item_info_dict from utils import is_number tqdm.pandas() warnings.filterwarnings('ignore') def make_disciplines_list(data): results = [] results.clear() data = str(data).replace('[SEP]', ',') disciplines_list = data.split(',') for discipline_id in disciplines_list: if 'nan' not in discipline_id: discipline_id = discipline_id.replace("'", ''). \ replace('{', ''). replace('}', '').replace(' ', '') if is_number(discipline_id): discipline_id = int(float(discipline_id)) if discipline_id not in results: results.append(discipline_id) return results def build_user_sel_discipline_dict(): """ 构建用户选择课程的所属学科的字典 """ logger.info("加载选课行为数据") all_select_df = get_all_select_df(offline=False) logger.info("获取实践课程信息数据") item_info_df = get_item_info_df() logger.info('生成用户选课记录列表') all_select_df = all_select_df.merge(item_info_df, on='subject_id') all_select_df = all_select_df.groupby('user_id').progress_aggregate(set).reset_index() user_sel_subject_df = pd.DataFrame() user_sel_subject_df['user_id'] = all_select_df['user_id'] user_sel_subject_df['disciplines_list'] = all_select_df['disciplines_id'].progress_apply(make_disciplines_list) user_sel_discipline_dict = dict(zip(user_sel_subject_df['user_id'], user_sel_subject_df['disciplines_list'])) pickle.dump(user_sel_discipline_dict, open(subject_cold_start_user_subject_dict, 'wb')) return user_sel_discipline_dict def build_cold_start_recall_dict(topk=100): """ 按兴趣标签ID获取对应的实践课程,并按业务规则进行排序,方便冷启动召回时直接获取TopK """ # 先按兴趣标签ID进行实践课程分组 df_grouped = df_cold_start_df.groupby('disciplines_id') all_groups = tqdm(df_grouped) # 定义召回物品的字典 cold_start_user_items_dict = {} logger.info(f"开始生成实践课程冷启动召回字典") for group_id, df_group in all_groups: if is_number(group_id): group_id = str(int(group_id)) cold_start_user_items_dict.setdefault(int(group_id), []) id_file = cold_start_subjects_parent_path + str(group_id) + '.csv' # 生成时间戳,排序时使用 df_group['created_at'].fillna('2016-01-01 00:00:00', inplace=True) df_group['created_at_ts'] = df_group['created_at'].progress_apply(lambda x:time.mktime(time.strptime(x,'%Y-%m-%d %H:%M:%S'))) # 最热的课程按学习人数降序,难度降序 df_group_hottest = df_group.sort_values(by=['study_count', 'averge_star'], axis=0, ascending=[False, False], inplace=False) df_group_hottest.drop_duplicates(['subject_id'], inplace=True) # 最新的创建时间降序,难度降序 df_group_newtest = df_group.sort_values(by=['created_at_ts', 'averge_star'], axis=0, ascending=[False, False], inplace=False) df_group_newtest.drop_duplicates(['subject_id'], inplace=True) # 最新和最热各取一半 df_recall = df_group_hottest[: topk // 2] df_recall = df_recall.append(df_group_newtest[: topk // 2]) df_recall.drop_duplicates(['subject_id'], inplace=True) # 随机打乱顺序 df_recall.sample(frac=1).reset_index(drop=True) # 生成每个兴趣标签下的冷启动召回字典 for subject_id, subject_name in zip(df_recall['subject_id'], df_recall['subject_name']): cold_start_user_items_dict[int(group_id)].append((subject_id, subject_name)) # 保存每个兴趣标签对应的实践课程 df_recall.to_csv(id_file, columns=['subject_id', 'subject_name', 'visits', 'study_count', 'course_study_count', 'passed_count', 'course_used_count', 'school_used_count', 'challenge_count', 'evaluate_count', 'video_study_time', 'study_pdf_attachment_count', 'averge_star', 'created_at', 'updated_at'], sep='\t', index=False) # 保存生成的冷启动召回字典 pickle.dump(cold_start_user_items_dict, open(subject_cold_start_recall_dict, 'wb')) return cold_start_user_items_dict logger.info("加载物品行为数据") all_select_df = get_all_select_df(offline=False) logger.info("获取物品信息") item_info_df = get_item_info_df() logger.info("获取用户历史选择物品信息字典") all_select_df = all_select_df.merge(item_info_df, how='left', on='subject_id') user_hist_item_info_dict = get_user_hist_item_info_dict(all_select_df) logger.info('加载冷启动召回数据') df_cold_start_df = pd.read_csv(subjects_data_path, sep='\t', encoding='utf-8') df_cold_start_df.fillna('-1', inplace=True) if os.path.exists(subject_cold_start_user_subject_dict): logger.info('加载用户选择课程所属学科字典') cold_start_user_subject_dict = pickle.load(open(subject_cold_start_user_subject_dict, 'rb')) else: logger.info('生成用户选择课程所属学科字典') cold_start_user_subject_dict = build_user_sel_discipline_dict() if os.path.exists(subject_cold_start_recall_dict): logger.info('加载用户冷启动召回字典') cold_start_items_dict = pickle.load(open(subject_cold_start_recall_dict, 'rb')) else: logger.info('生成用户冷启动召回字典') cold_start_items_dict = build_cold_start_recall_dict(topk=300) def cold_start_user_recall(user_id, disciplines_id_list, topk=100): """ 用户冷启动召回推荐 :param disciplines_id_list: 课程大类ID列表 :param topk: 需要召回的数量 :return 召回的字典 """ start_time = datetime.now() logger.info(f"本次需要进行冷启动召回的用户ID: {user_id}") rank_list = [] user_disciplines_id_list = [] user_disciplines_id_list.clear() # 获取用户选择的课程所属的学科列表 if user_id in cold_start_user_subject_dict: user_disciplines_id_list = list(cold_start_user_subject_dict[user_id]) # 用户没有兴趣标签时随机推荐所有标签下最新和最热的 if (disciplines_id_list is None) or (len(disciplines_id_list) == 0): # 也没有用户的选课行为时,随机推荐所有标签下最新和最热的 if len(user_disciplines_id_list) == 0: disciplines_id_list = df_cold_start_df['disciplines_id'].unique().tolist() # 有用户选课行为时推荐课程所属学科最新和最热的课程 else: disciplines_id_list = user_disciplines_id_list else: # 传递了兴趣学科标签时和用户选过的学科一起随机推荐最新和最热的课程 if len(disciplines_id_list) > 0 and len(user_disciplines_id_list) > 0: disciplines_id_list = disciplines_id_list + user_disciplines_id_list # 获取每个兴趣标签下的召回 for disciplines_id in set(disciplines_id_list): disciplines_id = int(disciplines_id) if disciplines_id in cold_start_items_dict: rank_list.append(cold_start_items_dict[disciplines_id]) # 二维列表展成一维列表 rank_list = [item for row_item in rank_list for item in row_item] # 过滤历史选择的物品 if user_id in user_hist_item_info_dict: # 获取用户历史选择的物品 user_hist_item_list = list(user_hist_item_info_dict[user_id]['hist_item_ids']) # 从召回列表中删除历史选择的物品 for subject_id, subject_name in rank_list: if subject_id in user_hist_item_list: rank_list.remove((subject_id, subject_name)) filtered_subject_name_list = [] filtered_subject_name_list.clear() filtered_rank_list = [] filtered_rank_list.clear() # 过滤课程名称相同的 for subject_id, subject_name in rank_list: if subject_name not in filtered_subject_name_list: filtered_subject_name_list.append(subject_name) filtered_rank_list.append((subject_id, subject_name)) rank_list = filtered_rank_list.copy() if topk > len(rank_list): topk = len(rank_list) # 多个兴趣标签冷启动召回的再随机打乱一次 random.shuffle(rank_list) # 取topk个返回 rank_list = rank_list[:topk] # 以字典的形式返回 recommend_results = {subject_id: subject_name for subject_id, subject_name in rank_list} # 计算耗时毫秒 end_time = datetime.utcnow() cost_time_millisecond = round(float((end_time - start_time).microseconds / 1000.0), 3) logger.info(f"本次冷启动召回耗时: {cost_time_millisecond} 毫秒") return recommend_results if __name__ == '__main__': # 冷启动推荐测试 recommend_results = cold_start_user_recall(user_id=test_user_id, disciplines_id_list=[2], topk=100) print(json.dumps(recommend_results, ensure_ascii=False, indent=4))