|
|
import os
|
|
|
import sys
|
|
|
sys.path.append(os.getcwd())
|
|
|
import pandas as pd
|
|
|
import numpy as np
|
|
|
from tqdm import tqdm
|
|
|
import faiss
|
|
|
import os
|
|
|
import warnings
|
|
|
import pickle
|
|
|
import collections
|
|
|
import random
|
|
|
from sklearn.preprocessing import LabelEncoder
|
|
|
import tensorflow as tf
|
|
|
from tensorflow.python.keras import backend as K
|
|
|
from tensorflow.python.keras.models import Model
|
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
|
from deepmatch.models import FM
|
|
|
from deepmatch.utils import sampledsoftmaxloss, NegativeSampler
|
|
|
from deepctr.feature_column import SparseFeat, VarLenSparseFeat
|
|
|
from collections import Counter
|
|
|
|
|
|
from config import logger, offline_mode
|
|
|
from config import need_metric_recall
|
|
|
from config import embedding_dim, ef_construction, M
|
|
|
from config import shixun_fm_recall_dict
|
|
|
from config import shixun_fm_user_emb_dict
|
|
|
from config import shixun_fm_item_emb_dict
|
|
|
from config import shixun_fm_item_faiss_model_path
|
|
|
from config import shixun_fm_user_faiss_model_path
|
|
|
from config import shixun_fm_user_embedding_index_dict
|
|
|
from config import shixun_fm_item_embedding_index_dict
|
|
|
from config import shixun_fm_train_input_data
|
|
|
from config import shixun_fm_train_label_data
|
|
|
from config import shixun_fm_test_input_data
|
|
|
from config import shixun_fm_test_label_data
|
|
|
from config import shixun_fm_train_set_data
|
|
|
from config import shixun_fm_test_set_data
|
|
|
from config import shixun_fm_user_embedding_data
|
|
|
from config import shixun_fm_item_embedding_data
|
|
|
from config import samples_mode
|
|
|
from matching.shixun.recall_comm import get_all_select_df
|
|
|
from matching.shixun.recall_comm import get_user_info_df,get_item_info_df
|
|
|
from matching.shixun.recall_comm import metrics_recall,get_hist_and_last_select
|
|
|
|
|
|
|
|
|
K.set_learning_phase(True)
|
|
|
if tf.__version__ >= '2.0.0':
|
|
|
tf.compat.v1.disable_eager_execution()
|
|
|
|
|
|
tqdm.pandas()
|
|
|
warnings.filterwarnings('ignore')
|
|
|
|
|
|
|
|
|
def gen_data_set(data, negsample=0):
|
|
|
"""
|
|
|
获取fm召回时的训练验证数据
|
|
|
negsample指的是通过滑窗构建样本的时候,负采集样样本的数量
|
|
|
"""
|
|
|
data.sort_values("created_timestamp", inplace=True)
|
|
|
item_ids = data['shixun_id'].unique()
|
|
|
|
|
|
train_set = []
|
|
|
test_set = []
|
|
|
|
|
|
if os.path.exists(shixun_fm_train_set_data) and \
|
|
|
os.path.exists(shixun_fm_test_set_data):
|
|
|
|
|
|
train_set = pickle.load(open(shixun_fm_train_set_data, 'rb'))
|
|
|
test_set = pickle.load(open(shixun_fm_test_set_data, 'rb'))
|
|
|
|
|
|
return train_set, test_set
|
|
|
|
|
|
for reviewerID, hist_select in tqdm(data.groupby('user_id')):
|
|
|
pos_list = hist_select['shixun_id'].tolist()
|
|
|
|
|
|
if negsample > 0:
|
|
|
# 用户没选择的物品里面选择负样本
|
|
|
candidate_set = list(set(item_ids) - set(pos_list))
|
|
|
|
|
|
# 对于每个正样本,选择n个负样本
|
|
|
neg_list = np.random.choice(candidate_set,size=len(pos_list)*negsample,replace=True)
|
|
|
|
|
|
# 长度只有一个的时候,需要把这条数据也放到训练集中,不然最终学到的embedding就会有缺失
|
|
|
if len(pos_list) == 1:
|
|
|
train_set.append((reviewerID, [pos_list[0]], pos_list[0], 1, len(pos_list),
|
|
|
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
|
|
|
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
|
|
|
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
|
|
|
hist_select.iloc[0]['visits'],hist_select.iloc[0]['challenges_count'],
|
|
|
hist_select.iloc[0]['averge_star'],hist_select.iloc[0]['task_pass']))
|
|
|
|
|
|
test_set.append((reviewerID, [pos_list[0]], pos_list[0], 1, len(pos_list),
|
|
|
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
|
|
|
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
|
|
|
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
|
|
|
hist_select.iloc[0]['visits'],hist_select.iloc[0]['challenges_count'],
|
|
|
hist_select.iloc[0]['averge_star'],hist_select.iloc[0]['task_pass']))
|
|
|
|
|
|
# 滑窗构造正负样本
|
|
|
for i in range(1, len(pos_list)):
|
|
|
hist_sel = pos_list[:i]
|
|
|
if i != len(pos_list) - 1:
|
|
|
# 正样本 [user_id, pos_item, label, len(his_item)...]
|
|
|
train_set.append((reviewerID, hist_sel[::-1], pos_list[i], 1, len(hist_sel[::-1]),
|
|
|
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
|
|
|
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
|
|
|
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
|
|
|
hist_select.iloc[0]['visits'],hist_select.iloc[0]['challenges_count'],
|
|
|
hist_select.iloc[0]['averge_star'],hist_select.iloc[0]['task_pass']))
|
|
|
# logger.info("滑窗构造负样本")
|
|
|
for negi in range(negsample):
|
|
|
# 负样本 [user_id, his_item, neg_item, label, len(his_item)...]
|
|
|
train_set.append((reviewerID, hist_sel[::-1], neg_list[i*negsample+negi], 0, len(hist_sel[::-1]),
|
|
|
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
|
|
|
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
|
|
|
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
|
|
|
hist_select.iloc[0]['visits'],hist_select.iloc[0]['challenges_count'],
|
|
|
hist_select.iloc[0]['averge_star'],hist_select.iloc[0]['task_pass']))
|
|
|
else:
|
|
|
# 将最长的那一个序列长度作为测试数据
|
|
|
test_set.append((reviewerID, hist_sel[::-1], pos_list[i], 1, len(hist_sel[::-1]),
|
|
|
hist_select.iloc[0]['gender'], hist_select.iloc[0]['identity'],
|
|
|
hist_select.iloc[0]['edu_background'],hist_select.iloc[0]['logins'],
|
|
|
hist_select.iloc[0]['grade'],hist_select.iloc[0]['experience'],
|
|
|
hist_select.iloc[0]['visits'],hist_select.iloc[0]['challenges_count'],
|
|
|
hist_select.iloc[0]['averge_star'],hist_select.iloc[0]['task_pass']))
|
|
|
|
|
|
random.shuffle(train_set)
|
|
|
random.shuffle(test_set)
|
|
|
|
|
|
pickle.dump(train_set, open(shixun_fm_train_set_data, 'wb'))
|
|
|
pickle.dump(test_set, open(shixun_fm_test_set_data, 'wb'))
|
|
|
|
|
|
return train_set, test_set
|
|
|
|
|
|
|
|
|
def gen_model_input(train_set, user_profile, seq_max_len):
|
|
|
"""
|
|
|
将输入的数据进行padding,使得序列特征的长度都一致
|
|
|
"""
|
|
|
train_uid = np.array([line[0] for line in train_set])
|
|
|
train_seq = [line[1] for line in train_set]
|
|
|
train_iid = np.array([line[2] for line in train_set])
|
|
|
train_label = np.array([line[3] for line in train_set])
|
|
|
train_hist_len = np.array([line[4] for line in train_set])
|
|
|
train_gender = np.array([line[5] for line in train_set])
|
|
|
train_identity = np.array([line[6] for line in train_set])
|
|
|
train_edu_background = np.array([line[7] for line in train_set])
|
|
|
train_logins = np.array([line[8] for line in train_set])
|
|
|
train_grade = np.array([line[9] for line in train_set])
|
|
|
train_experience = np.array([line[10] for line in train_set])
|
|
|
train_visits = np.array([line[11] for line in train_set])
|
|
|
train_challenges_count = np.array([line[12] for line in train_set])
|
|
|
train_averge_star = np.array([line[13] for line in train_set])
|
|
|
train_task_pass = np.array([line[14] for line in train_set])
|
|
|
|
|
|
train_seq_pad = pad_sequences(train_seq, maxlen=seq_max_len, padding='post',
|
|
|
truncating='post', value=0)
|
|
|
|
|
|
train_model_input = {"user_id": train_uid,
|
|
|
"shixun_id": train_iid,
|
|
|
"hist_shixun_id": train_seq_pad,
|
|
|
"hist_len": train_hist_len,
|
|
|
"gender": train_gender,
|
|
|
"identity": train_identity,
|
|
|
"edu_background": train_edu_background,
|
|
|
"logins":train_logins,
|
|
|
"grade": train_grade,
|
|
|
"experience": train_experience,
|
|
|
"visits": train_visits,
|
|
|
"challenges_count":train_challenges_count,
|
|
|
"averge_star":train_averge_star,
|
|
|
"task_pass":train_task_pass
|
|
|
}
|
|
|
return train_model_input, train_label
|
|
|
|
|
|
def fm_recall(data, topk=100):
|
|
|
|
|
|
# 定义特征
|
|
|
user_sparse_features = ['user_id','gender', 'identity', 'edu_background','logins','grade','experience']
|
|
|
item_sparse_features = ['shixun_id','visits','challenges_count','averge_star','task_pass']
|
|
|
|
|
|
# 用户选择序列的长度,短的填充,长的截断
|
|
|
if samples_mode == True:
|
|
|
SEQ_LEN = 100
|
|
|
else:
|
|
|
SEQ_LEN = 200
|
|
|
|
|
|
user_profile_dup = data[["user_id"]].drop_duplicates('user_id')
|
|
|
item_profile_dup = data[["shixun_id"]].drop_duplicates('shixun_id')
|
|
|
|
|
|
# 定义特征
|
|
|
sparse_features = user_sparse_features + item_sparse_features
|
|
|
|
|
|
feature_max_idx = {}
|
|
|
|
|
|
for feature in sparse_features:
|
|
|
lbe = LabelEncoder()
|
|
|
data[feature] = lbe.fit_transform(data[feature])
|
|
|
feature_max_idx[feature] = data[feature].max() + 1
|
|
|
|
|
|
# 提取user和item的特征
|
|
|
user_profile = data[user_sparse_features].drop_duplicates('user_id')
|
|
|
item_profile = data[item_sparse_features].drop_duplicates('shixun_id')
|
|
|
|
|
|
user_index_2_rawid = dict(zip(user_profile['user_id'], user_profile_dup['user_id']))
|
|
|
item_index_2_rawid = dict(zip(item_profile['shixun_id'], item_profile_dup['shixun_id']))
|
|
|
|
|
|
# 生成训练集和测试集
|
|
|
# 由于深度学习需要的数据量非常大
|
|
|
# 为了保证召回的效果,通过滑窗的形式扩充训练样本
|
|
|
train_set, test_set = gen_data_set(data, 1)
|
|
|
|
|
|
if os.path.exists(shixun_fm_train_input_data) and \
|
|
|
os.path.exists(shixun_fm_train_label_data):
|
|
|
|
|
|
# 加载fm模型的训练集数据
|
|
|
train_model_input = pickle.load(open(shixun_fm_train_input_data, 'rb'))
|
|
|
train_label = pickle.load(open(shixun_fm_train_label_data, 'rb'))
|
|
|
else:
|
|
|
# 生成并保存fm模型的训练集数据
|
|
|
train_model_input, train_label = gen_model_input(train_set, user_profile, SEQ_LEN)
|
|
|
|
|
|
pickle.dump(train_model_input, open(shixun_fm_train_input_data, 'wb'))
|
|
|
pickle.dump(train_label, open(shixun_fm_train_label_data, 'wb'))
|
|
|
|
|
|
if os.path.exists(shixun_fm_test_input_data) and \
|
|
|
os.path.exists(shixun_fm_test_label_data):
|
|
|
|
|
|
# 加载fm模型的测试集数据
|
|
|
test_model_input = pickle.load(open(shixun_fm_test_input_data, 'rb'))
|
|
|
test_label = pickle.load(open(shixun_fm_test_label_data, 'rb'))
|
|
|
else:
|
|
|
# 生成并保存fm模型的测试集数据
|
|
|
test_model_input, test_label = gen_model_input(test_set, user_profile, SEQ_LEN)
|
|
|
|
|
|
pickle.dump(test_model_input, open(shixun_fm_test_input_data, 'wb'))
|
|
|
pickle.dump(test_label, open(shixun_fm_test_label_data, 'wb'))
|
|
|
embedding_dim = 32
|
|
|
# 将数据整理成模型可以直接输入的形式
|
|
|
# 用户特征由稀疏特征user_id和可变长度特征hist_shixun_id组成
|
|
|
user_feature_columns = [SparseFeat('user_id', feature_max_idx['user_id'], embedding_dim),
|
|
|
SparseFeat('gender', feature_max_idx['gender'], embedding_dim),
|
|
|
SparseFeat('identity', feature_max_idx['identity'], embedding_dim),
|
|
|
SparseFeat('edu_background', feature_max_idx['edu_background'], embedding_dim),
|
|
|
SparseFeat('logins', feature_max_idx['logins'], embedding_dim),
|
|
|
SparseFeat('grade', feature_max_idx['grade'], embedding_dim),
|
|
|
SparseFeat('experience', feature_max_idx['experience'], embedding_dim),
|
|
|
VarLenSparseFeat(SparseFeat('hist_shixun_id', feature_max_idx['shixun_id'],
|
|
|
embedding_dim, embedding_name="shixun_id"), SEQ_LEN, 'mean', 'hist_len'),]
|
|
|
|
|
|
# 物品循环由稀疏特征shixun_id组成
|
|
|
item_feature_columns = [SparseFeat('shixun_id', feature_max_idx['shixun_id'], embedding_dim),
|
|
|
SparseFeat('visits', feature_max_idx['visits'], embedding_dim),
|
|
|
SparseFeat('challenges_count',feature_max_idx['challenges_count'], embedding_dim),
|
|
|
SparseFeat('averge_star', feature_max_idx['averge_star'], embedding_dim),
|
|
|
SparseFeat('task_pass', feature_max_idx['task_pass'], embedding_dim)]
|
|
|
|
|
|
# 进行负采样
|
|
|
train_counter = Counter(train_model_input['shixun_id'])
|
|
|
item_count = [train_counter.get(i, 0) for i in range(item_feature_columns[0].vocabulary_size)]
|
|
|
sampler_config = NegativeSampler('frequency', num_sampled=5, item_name='shixun_id', item_count=item_count)
|
|
|
|
|
|
# 模型的定义
|
|
|
# num_sampled: 负采样时的样本数量
|
|
|
model = FM(user_feature_columns, item_feature_columns, loss_type="softmax", sampler_config=sampler_config)
|
|
|
|
|
|
# 模型编译
|
|
|
model.compile(optimizer="adam", loss=sampledsoftmaxloss)
|
|
|
|
|
|
# 模型训练,这里可以定义验证集的比例,如果设置为0的话就是全量数据直接进行训练
|
|
|
history = model.fit(train_model_input,
|
|
|
train_label,
|
|
|
batch_size=256,
|
|
|
epochs=3,
|
|
|
verbose=1,
|
|
|
validation_split=0.0)
|
|
|
|
|
|
# 训练完模型之后,提取训练的Embedding,包括user端和item端
|
|
|
logger.info('训练完模型之后,提取训练的Embedding,包括user端和item端')
|
|
|
test_user_model_input = test_model_input
|
|
|
all_item_model_input = {"shixun_id": item_profile['shixun_id'].values,
|
|
|
"visits":item_profile['visits'].values,
|
|
|
"challenges_count":item_profile['challenges_count'].values,
|
|
|
"averge_star":item_profile['challenges_count'].values,
|
|
|
"task_pass":item_profile['task_pass'].values
|
|
|
}
|
|
|
|
|
|
user_embedding_model = Model(inputs=model.user_input, outputs=model.user_embedding)
|
|
|
item_embedding_model = Model(inputs=model.item_input, outputs=model.item_embedding)
|
|
|
|
|
|
# 保存输出的item_embedding和user_embedding
|
|
|
# 排序的时候用到,保存的时候需要和原始的id对应
|
|
|
|
|
|
logger.info('保存输出的item_embedding和user_embedding')
|
|
|
user_embs = user_embedding_model.predict(test_user_model_input, batch_size=2 ** 12)
|
|
|
item_embs = item_embedding_model.predict(all_item_model_input, batch_size=2 ** 12)
|
|
|
|
|
|
# embedding保存之前归一化一下
|
|
|
user_embs = user_embs / np.linalg.norm(user_embs, axis=1, keepdims=True)
|
|
|
item_embs = item_embs / np.linalg.norm(item_embs, axis=1, keepdims=True)
|
|
|
|
|
|
pickle.dump(user_embs, open(shixun_fm_user_embedding_data, 'wb'))
|
|
|
pickle.dump(item_embs, open(shixun_fm_item_embedding_data, 'wb'))
|
|
|
|
|
|
logger.info('构建fm user embedding HNSW索引')
|
|
|
build_hnsw(user_embs, shixun_fm_user_faiss_model_path, ef_construction, M, embedding_dim)
|
|
|
|
|
|
logger.info('构建fm item embedding HNSW索引')
|
|
|
build_hnsw(item_embs, shixun_fm_item_faiss_model_path, ef_construction, M, embedding_dim)
|
|
|
|
|
|
# 将embedding转换成字典的形式方便查询
|
|
|
raw_user_id_emb_dict = {user_index_2_rawid[k]: v
|
|
|
for k, v in zip(user_profile['user_id'], user_embs)}
|
|
|
|
|
|
raw_item_id_emb_dict = {item_index_2_rawid[k]: v
|
|
|
for k, v in zip(item_profile['shixun_id'], item_embs)}
|
|
|
|
|
|
pickle.dump(raw_user_id_emb_dict, open(shixun_fm_user_emb_dict, 'wb'))
|
|
|
pickle.dump(raw_item_id_emb_dict, open(shixun_fm_item_emb_dict, 'wb'))
|
|
|
|
|
|
# 生成user embedding和item embedding索引字典
|
|
|
fm_user_embedding_index_dict = {k: user_index_2_rawid[v] for k, v in enumerate(user_profile['user_id'])}
|
|
|
fm_item_embedding_index_dict = {k: item_index_2_rawid[v] for k, v in enumerate(item_profile['shixun_id'])}
|
|
|
|
|
|
pickle.dump(fm_user_embedding_index_dict, open(shixun_fm_user_embedding_index_dict, 'wb'))
|
|
|
pickle.dump(fm_item_embedding_index_dict, open(shixun_fm_item_embedding_index_dict, 'wb'))
|
|
|
|
|
|
# 定义用户召回字典
|
|
|
user_recall_items_dict = collections.defaultdict(dict)
|
|
|
|
|
|
if not os.path.exists(shixun_fm_recall_dict):
|
|
|
|
|
|
# 加载保存的faiss索引
|
|
|
index = faiss.read_index(shixun_fm_item_faiss_model_path)
|
|
|
|
|
|
# 通过user_embedding去查询最相似的topk+1个item_embedding
|
|
|
sim, idx = index.search(np.ascontiguousarray(user_embs), topk+1)
|
|
|
|
|
|
logger.info('生成fm所有用户的召回列表')
|
|
|
for target_idx, sim_value_list, rele_idx_list in tqdm(zip(test_user_model_input['user_id'], sim, idx)):
|
|
|
target_raw_id = user_index_2_rawid[target_idx]
|
|
|
|
|
|
# 从1开始是为了去掉物品本身
|
|
|
for rele_idx, sim_value in zip(rele_idx_list[1:], sim_value_list[1:]):
|
|
|
rele_raw_id = item_index_2_rawid[rele_idx]
|
|
|
user_recall_items_dict[target_raw_id][rele_raw_id] = \
|
|
|
user_recall_items_dict.get(target_raw_id, {}).get(rele_raw_id, 0) + sim_value
|
|
|
|
|
|
# 将召回的结果进行排序,直接得到召回的结果
|
|
|
user_recall_items_dict = {k: sorted(v.items(), key=lambda x: x[1], reverse=True) \
|
|
|
for k, v in user_recall_items_dict.items()}
|
|
|
|
|
|
pickle.dump(user_recall_items_dict, open(shixun_fm_recall_dict, 'wb'))
|
|
|
|
|
|
return user_recall_items_dict
|
|
|
|
|
|
|
|
|
def build_hnsw(vecs, to_file, ef=2000, m=64, dim=100):
|
|
|
"""
|
|
|
训练hnsw模型
|
|
|
"""
|
|
|
vecs = vecs.astype('float32')
|
|
|
|
|
|
# 构建索引
|
|
|
index = faiss.IndexFlatIP(dim)
|
|
|
index.add(vecs)
|
|
|
|
|
|
# 保存hnsw模型
|
|
|
faiss.write_index(index, to_file)
|
|
|
|
|
|
return index
|
|
|
|
|
|
def fm_recall_train():
|
|
|
"""
|
|
|
fm召回训练和评估
|
|
|
"""
|
|
|
|
|
|
# 需要召回的数量
|
|
|
recall_item_num = 100
|
|
|
|
|
|
logger.info("加载物品行为数据")
|
|
|
all_select_df = get_all_select_df(offline=offline_mode)
|
|
|
|
|
|
logger.info("获取物品信息数据")
|
|
|
item_info = get_item_info_df()
|
|
|
|
|
|
logger.info("获取用户信息数据")
|
|
|
users_info = get_user_info_df()
|
|
|
|
|
|
all_select_df = all_select_df.merge(users_info, on='user_id')
|
|
|
all_select_df = all_select_df.merge(item_info,on='shixun_id')
|
|
|
|
|
|
# 为了召回评估,提取最后一次选择作为召回评估
|
|
|
# 如果不需要做召回评估直接使用全量的训练集进行召回
|
|
|
if need_metric_recall:
|
|
|
logger.info('获取物品行为数据历史和最后一次选择')
|
|
|
train_hist_select_df, train_last_select_df = get_hist_and_last_select(all_select_df)
|
|
|
else:
|
|
|
train_hist_select_df = all_select_df
|
|
|
#调试程序简单采样,注意删除临时文件
|
|
|
# train_hist_select_df = train_hist_select_df.sample(frac = 0.001)
|
|
|
|
|
|
# fm模型训练
|
|
|
user_recall_items_dict = fm_recall(train_hist_select_df, topk=recall_item_num)
|
|
|
|
|
|
# 召回效果评估
|
|
|
if samples_mode == True and need_metric_recall:
|
|
|
logger.info('fm召回效果评估')
|
|
|
metrics_recall(user_recall_items_dict, train_last_select_df, topk=recall_item_num)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
fm_recall_train() |