You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

474 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
sys.path.append(os.getcwd())
import pandas as pd
import numpy as np
from tqdm import tqdm
import faiss
import warnings
import pickle
import collections
import random
from sklearn.preprocessing import MinMaxScaler
from sklearn.preprocessing import LabelEncoder
import tensorflow as tf
from tensorflow.python.keras import backend as K
from tensorflow.python.keras.models import Model
from tensorflow.keras.preprocessing.sequence import pad_sequences
from deepmatch.models import DSSM
from deepmatch.utils import sampledsoftmaxloss, NegativeSampler
from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat
from collections import Counter
from config import logger, offline_mode
from config import need_metric_recall
from config import embedding_dim, ef_construction, M
from config import subject_dssm_recall_dict
from config import subject_dssm_user_emb_dict
from config import subject_dssm_item_emb_dict
from config import subject_dssm_item_faiss_model_path
from config import subject_dssm_user_faiss_model_path
from config import subject_dssm_user_embedding_index_dict
from config import subject_dssm_item_embedding_index_dict
from config import subject_dssm_train_input_data
from config import subject_dssm_train_label_data
from config import subject_dssm_test_input_data
from config import subject_dssm_test_label_data
from config import subject_dssm_train_set_data
from config import subject_dssm_test_set_data
from config import subject_dssm_user_embedding_data
from config import subject_dssm_item_embedding_data
from config import samples_mode
from matching.subject.recall_comm import get_all_select_df
from matching.subject.recall_comm import get_user_info_df,get_item_info_df
from matching.subject.recall_comm import metrics_recall,get_all_hist_and_last_select
K.set_learning_phase(True)
if tf.__version__ >= '2.0.0':
tf.compat.v1.disable_eager_execution()
tqdm.pandas()
warnings.filterwarnings('ignore')
def gen_data_set(data, negsample=0):
"""
获取DSSM召回时的训练验证数据
negsample指的是通过滑窗构建样本的时候,负采集样样本的数量
"""
data.sort_values("created_timestamp", inplace=True)
item_ids = data['subject_id'].unique()
train_set = []
test_set = []
if os.path.exists(subject_dssm_train_set_data) and \
os.path.exists(subject_dssm_test_set_data):
train_set = pickle.load(open(subject_dssm_train_set_data, 'rb'))
test_set = pickle.load(open(subject_dssm_test_set_data, 'rb'))
return train_set, test_set
for reviewerID, hist_select in tqdm(data.groupby('user_id')):
pos_list = hist_select['subject_id'].tolist()
if negsample > 0:
# 用户没选择的物品里面选择负样本
candidate_set = list(set(item_ids) - set(pos_list))
# 对于每个正样本选择n个负样本
neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True)
# 长度只有一个的时候需要把这条数据也放到训练集中不然最终学到的embedding就会有缺失
if len(pos_list) == 1:
train_set.append((reviewerID, [pos_list[0]], pos_list[0], 1, len(pos_list),
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
hist_select.iloc[0]['visits'],hist_select.iloc[0]['stages_count'],
hist_select.iloc[0]['stage_shixuns_count'],hist_select.iloc[0]['shixuns_count'],
hist_select.iloc[0]['study_count'],hist_select.iloc[0]['course_study_count'],
hist_select.iloc[0]['passed_count'],hist_select.iloc[0]['challenge_count'],
hist_select.iloc[0]['evaluate_count'],hist_select.iloc[0]['study_pdf_attachment_count'],
hist_select.iloc[0]['averge_star']))
test_set.append((reviewerID, [pos_list[0]], pos_list[0], 1, len(pos_list),
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
hist_select.iloc[0]['visits'],hist_select.iloc[0]['stages_count'],
hist_select.iloc[0]['stage_shixuns_count'],hist_select.iloc[0]['shixuns_count'],
hist_select.iloc[0]['study_count'],hist_select.iloc[0]['course_study_count'],
hist_select.iloc[0]['passed_count'],hist_select.iloc[0]['challenge_count'],
hist_select.iloc[0]['evaluate_count'],hist_select.iloc[0]['study_pdf_attachment_count'],
hist_select.iloc[0]['averge_star']))
# 滑窗构造正负样本
for i in range(1, len(pos_list)):
hist_sel = pos_list[:i]
if i != len(pos_list) - 1:
# 正样本 [user_id, pos_item, label, len(his_item)...]
train_set.append((reviewerID, hist_sel[::-1], pos_list[i], 1, len(hist_sel[::-1]),
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
hist_select.iloc[0]['visits'],hist_select.iloc[0]['stages_count'],
hist_select.iloc[0]['stage_shixuns_count'],hist_select.iloc[0]['shixuns_count'],
hist_select.iloc[0]['study_count'],hist_select.iloc[0]['course_study_count'],
hist_select.iloc[0]['passed_count'],hist_select.iloc[0]['challenge_count'],
hist_select.iloc[0]['evaluate_count'],hist_select.iloc[0]['study_pdf_attachment_count'],
hist_select.iloc[0]['averge_star']))
# logger.info("滑窗构造负样本")
for negi in range(negsample):
# 负样本 [user_id, his_item, neg_item, label, len(his_item)...]
train_set.append((reviewerID, hist_sel[::-1], neg_list[i*negsample+negi], 0, len(hist_sel[::-1]),
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
hist_select.iloc[0]['visits'],hist_select.iloc[0]['stages_count'],
hist_select.iloc[0]['stage_shixuns_count'],hist_select.iloc[0]['shixuns_count'],
hist_select.iloc[0]['study_count'],hist_select.iloc[0]['course_study_count'],
hist_select.iloc[0]['passed_count'],hist_select.iloc[0]['challenge_count'],
hist_select.iloc[0]['evaluate_count'],hist_select.iloc[0]['study_pdf_attachment_count'],
hist_select.iloc[0]['averge_star']))
else:
# 将最长的那一个序列长度作为测试数据
test_set.append((reviewerID, hist_sel[::-1], pos_list[i], 1, len(hist_sel[::-1]),
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
hist_select.iloc[0]['visits'],hist_select.iloc[0]['stages_count'],
hist_select.iloc[0]['stage_shixuns_count'],hist_select.iloc[0]['shixuns_count'],
hist_select.iloc[0]['study_count'],hist_select.iloc[0]['course_study_count'],
hist_select.iloc[0]['passed_count'],hist_select.iloc[0]['challenge_count'],
hist_select.iloc[0]['evaluate_count'],hist_select.iloc[0]['study_pdf_attachment_count'],
hist_select.iloc[0]['averge_star']))
random.shuffle(train_set)
random.shuffle(test_set)
pickle.dump(train_set, open(subject_dssm_train_set_data, 'wb'))
pickle.dump(test_set, open(subject_dssm_test_set_data, 'wb'))
return train_set, test_set
def gen_model_input(train_set, user_profile, seq_max_len):
"""
将输入的数据进行padding,使得序列特征的长度都一致
"""
train_uid = np.array([line[0] for line in train_set])
train_seq = [line[1] for line in train_set]
train_iid = np.array([line[2] for line in train_set])
train_label = np.array([line[3] for line in train_set])
train_hist_len = np.array([line[4] for line in train_set])
train_gender = np.array([line[5] for line in train_set])
train_identity = np.array([line[6] for line in train_set])
train_edu_background = np.array([line[7] for line in train_set])
train_logins = np.array([line[8] for line in train_set])
train_grade = np.array([line[9] for line in train_set])
train_experience = np.array([line[10] for line in train_set])
train_visits = np.array([line[11] for line in train_set])
train_stages_count = np.array([line[12] for line in train_set])
train_stage_shixuns_count = np.array([line[13] for line in train_set])
train_shixuns_count = np.array([line[14] for line in train_set])
train_study_count = np.array([line[15] for line in train_set])
train_course_study_count = np.array([line[16] for line in train_set])
train_passed_count = np.array([line[17] for line in train_set])
train_challenge_count = np.array([line[18] for line in train_set])
train_evaluate_count = np.array([line[19] for line in train_set])
train_study_pdf_attachment_count = np.array([line[20] for line in train_set])
train_averge_star = np.array([line[21] for line in train_set])
train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post',
truncating='post', value=0)
train_model_input = {"user_id": train_uid,
"subject_id": train_iid,
"hist_subject_id": train_seq_pad,
"hist_len": train_hist_len,
"gender": train_gender,
"identity": train_identity,
"edu_background": train_edu_background,
"logins":train_logins,
"grade": train_grade,
"experience": train_experience,
"visits": train_visits,
"stages_count":train_stages_count,
"stage_shixuns_count":train_stage_shixuns_count,
"shixuns_count":train_shixuns_count,
"study_count":train_study_count,
"course_study_count":train_course_study_count,
"passed_count":train_passed_count,
"challenge_count":train_challenge_count,
"evaluate_count":train_evaluate_count,
"study_pdf_attachment_count":train_study_pdf_attachment_count,
"averge_star":train_averge_star
}
return train_model_input, train_label
def dssm_recall(data, topk=100):
# 定义特征
user_sparse_features = ['user_id']
user_dense_features = ['gender', 'identity', 'edu_background','logins','grade','experience']
item_sparse_features = ['subject_id']
item_dense_features = ['visits','stages_count','stage_shixuns_count','shixuns_count','study_count',
'course_study_count','passed_count','challenge_count','evaluate_count',
'study_pdf_attachment_count','averge_star']
# 用户选择序列的长度,短的填充,长的截断
if samples_mode == True:
SEQ_LEN = 100
else:
SEQ_LEN = 200
user_profile_dup = data[["user_id"]].drop_duplicates('user_id')
item_profile_dup = data[["subject_id"]].drop_duplicates('subject_id')
# 定义特征
sparse_features = user_sparse_features + item_sparse_features
dence_features = user_dense_features+item_dense_features
feature_max_idx = {}
for feature in sparse_features:
lbe = LabelEncoder()
data[feature] = lbe.fit_transform(data[feature])
feature_max_idx[feature] = data[feature].max() + 1
for feature in dence_features:
min_max_scaler = MinMaxScaler()
data[feature] = min_max_scaler.fit_transform(data[[feature]])
# 提取user和item的特征
user_profile = data[user_sparse_features + user_dense_features].drop_duplicates('user_id')
item_profile = data[item_sparse_features + item_dense_features].drop_duplicates('subject_id')
user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_dup['user_id']))
item_index_2_rawid = dict(zip(item_profile['subject_id'], item_profile_dup['subject_id']))
# 生成训练集和测试集
# 由于深度学习需要的数据量非常大
# 为了保证召回的效果,通过滑窗的形式扩充训练样本
train_set, test_set = gen_data_set(data, 1)
if os.path.exists(subject_dssm_train_input_data) and \
os.path.exists(subject_dssm_train_label_data):
# 加载DSSM模型的训练集数据
train_model_input = pickle.load(open(subject_dssm_train_input_data, 'rb'))
train_label = pickle.load(open(subject_dssm_train_label_data, 'rb'))
else:
# 生成并保存DSSM模型的训练集数据
train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN)
pickle.dump(train_model_input, open(subject_dssm_train_input_data, 'wb'))
pickle.dump(train_label, open(subject_dssm_train_label_data, 'wb'))
if os.path.exists(subject_dssm_test_input_data) and \
os.path.exists(subject_dssm_test_label_data):
# 加载DSSM模型的测试集数据
test_model_input = pickle.load(open(subject_dssm_test_input_data, 'rb'))
test_label = pickle.load(open(subject_dssm_test_label_data, 'rb'))
else:
# 生成并保存DSSM模型的测试集数据
test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN)
pickle.dump(test_model_input, open(subject_dssm_test_input_data, 'wb'))
pickle.dump(test_label, open(subject_dssm_test_label_data, 'wb'))
# 将数据整理成模型可以直接输入的形式
# 用户特征由稀疏特征user_id和可变长度特征hist_subject_id组成
user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
DenseFeat('gender', dimension=1),
DenseFeat('identity', dimension=1),
DenseFeat('edu_background', dimension=1),
DenseFeat('logins', dimension=1),
DenseFeat('grade', dimension=1),
DenseFeat('experience', dimension=1),
VarLenSparseFeat(SparseFeat('hist_subject_id', feature_max_idx['subject_id'],
embedding_dim, embedding_name="subject_id"), SEQ_LEN, 'mean', 'hist_len'),]
# 物品循环由稀疏特征sunject_id组成
item_feature_columns = [SparseFeat('subject_id', feature_max_idx['subject_id'], embedding_dim),
DenseFeat('visits', dimension=1),
DenseFeat('stages_count', dimension=1),
DenseFeat('stage_shixuns_count', dimension=1),
DenseFeat('shixuns_count', dimension=1),
DenseFeat('study_count', dimension=1),
DenseFeat('course_study_count', dimension=1),
DenseFeat('passed_count', dimension=1),
DenseFeat('challenge_count', dimension=1),
DenseFeat('evaluate_count', dimension=1),
DenseFeat('study_pdf_attachment_count', dimension=1),
DenseFeat('averge_star', dimension=1)]
# 进行负采样
train_counter = Counter(train_model_input['subject_id'])
item_count = [train_counter.get(i, 0) for i in range(item_feature_columns[0].vocabulary_size)]
sampler_config = NegativeSampler('frequency', num_sampled=5, item_name='subject_id', item_count=item_count)
# 模型的定义
# num_sampled: 负采样时的样本数量
model = DSSM(user_feature_columns,item_feature_columns,
user_dnn_hidden_units=(128,embedding_dim),
item_dnn_hidden_units=(128, embedding_dim),
sampler_config=sampler_config)
# 模型编译
model.compile(optimizer="adam", loss=sampledsoftmaxloss)
# 模型训练这里可以定义验证集的比例如果设置为0的话就是全量数据直接进行训练
history = model.fit(train_model_input,
train_label,
batch_size=256,
epochs=20,
verbose=1,
validation_split=0.0)
# 训练完模型之后,提取训练的Embedding包括user端和item端
logger.info('训练完模型之后,提取训练的Embedding包括user端和item端')
test_user_model_input = test_model_input
all_item_model_input = {"subject_id": item_profile['subject_id'].values,
"visits":item_profile['visits'].values,
"stages_count":item_profile['stages_count'].values,
"stage_shixuns_count":item_profile['stage_shixuns_count'].values,
"shixuns_count":item_profile['shixuns_count'].values,
"study_count":item_profile['study_count'].values,
"course_study_count":item_profile['course_study_count'].values,
"passed_count":item_profile['passed_count'].values,
"challenge_count":item_profile['challenge_count'].values,
"evaluate_count":item_profile['evaluate_count'].values,
"study_pdf_attachment_count":item_profile['study_pdf_attachment_count'].values,
"averge_star":item_profile['averge_star'].values }
user_embedding_model=Model(inputs=model.user_input, outputs=model.user_embedding)
item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
# 保存输出的item_embedding和user_embedding
# 排序的时候用到保存的时候需要和原始的id对应
logger.info('保存输出的item_embedding和user_embedding')
user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
# embedding保存之前归一化一下
user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)
pickle.dump(user_embs, open(subject_dssm_user_embedding_data, 'wb'))
pickle.dump(item_embs, open(subject_dssm_item_embedding_data, 'wb'))
logger.info('构建DSSM user embedding HNSW索引')
build_hnsw(user_embs, subject_dssm_user_faiss_model_path, ef_construction, M, embedding_dim)
logger.info('构建DSSM item embedding HNSW索引')
build_hnsw(item_embs, subject_dssm_item_faiss_model_path, ef_construction, M, embedding_dim)
# 将embedding转换成字典的形式方便查询
raw_user_id_emb_dict = {user_index_2_rawid[k]: v
for k, v in zip(user_profile['user_id'], user_embs)}
raw_item_id_emb_dict = {item_index_2_rawid[k]: v
for k, v in zip(item_profile['subject_id'], item_embs)}
pickle.dump(raw_user_id_emb_dict, open(subject_dssm_user_emb_dict, 'wb'))
pickle.dump(raw_item_id_emb_dict, open(subject_dssm_item_emb_dict, 'wb'))
# 生成user embedding和item embedding索引字典
dssm_user_embedding_index_dict = {k: user_index_2_rawid[v] for k, v in enumerate(user_profile['user_id'])}
dssm_item_embedding_index_dict = {k: item_index_2_rawid[v] for k, v in enumerate(item_profile['subject_id'])}
pickle.dump(dssm_user_embedding_index_dict, open(subject_dssm_user_embedding_index_dict, 'wb'))
pickle.dump(dssm_item_embedding_index_dict, open(subject_dssm_item_embedding_index_dict, 'wb'))
# 定义用户召回字典
user_recall_items_dict = collections.defaultdict(dict)
if not os.path.exists(subject_dssm_recall_dict):
# 加载保存的faiss索引
index = faiss.read_index(subject_dssm_item_faiss_model_path)
# 通过user_embedding去查询最相似的topk+1个item_embedding
sim, idx = index.search(np.ascontiguousarray(user_embs), topk+1)
logger.info('生成DSSM所有用户的召回列表')
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)):
target_raw_id = user_index_2_rawid[target_idx]
# 从1开始是为了去掉物品本身
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
rele_raw_id = item_index_2_rawid[rele_idx]
user_recall_items_dict[target_raw_id][rele_raw_id] = \
user_recall_items_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
# 将召回的结果进行排序,直接得到召回的结果
user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) \
for k, v in user_recall_items_dict.items()}
pickle.dump(user_recall_items_dict, open(subject_dssm_recall_dict, 'wb'))
return user_recall_items_dict
def build_hnsw(vecs, to_file, ef=2000, m=64, dim=100):
"""
训练hnsw模型
"""
vecs = vecs.astype('float32')
# 构建索引
index = faiss.IndexFlatIP(dim)
index.add(vecs)
# 保存hnsw模型
faiss.write_index(index, to_file)
return index
def dssm_recall_train():
"""
DSSM召回训练和评估
"""
# 需要召回的数量
recall_item_num = 100
logger.info("加载物品行为数据")
all_select_df = get_all_select_df(offline=offline_mode)
logger.info("获取物品信息数据")
item_info = get_item_info_df()
logger.info("获取用户信息数据")
users_info = get_user_info_df()
all_select_df = all_select_df.merge(users_info, on='user_id')
all_select_df = all_select_df.merge(item_info,on='subject_id')
# 为了召回评估,提取最后一次选择作为召回评估
# 如果不需要做召回评估直接使用全量的训练集进行召回
if need_metric_recall:
logger.info('获取物品行为数据历史和最后一次选择')
train_hist_select_df, train_last_select_df = get_all_hist_and_last_select(all_select_df)
else:
train_hist_select_df = all_select_df
#调试程序简单采样,注意删除临时文件
# train_hist_select_df = train_hist_select_df.sample(frac = 0.001)
# DSSM模型训练
user_recall_items_dict = dssm_recall(train_hist_select_df, topk=recall_item_num)
# 召回效果评估
if samples_mode == True and need_metric_recall:
logger.info('DSSM召回效果评估')
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
if __name__ == '__main__':
dssm_recall_train()