import os import sys sys.path.append(os.getcwd()) import pickle import numpy as np import pandas as pd from tqdm import tqdm from sklearn.preprocessing import LabelEncoder from sklearn.preprocessing import MinMaxScaler import warnings from ranking.subject.rank_comm import save_rank_results from deepctr.models import BST from deepctr.feature_column import SparseFeat, VarLenSparseFeat, DenseFeat, get_feature_names from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.preprocessing import text from tensorflow.keras import backend as K from tensorflow.keras.layers import * from tensorflow.keras.models import * from tensorflow.keras.callbacks import * from tensorflow.keras.models import save_model import tensorflow as tf from config import offline_mode, subject_features_save_path from config import subject_train_user_item_feats from config import subject_val_user_item_feats from config import subject_test_user_item_feats from config import subject_model_save_path from config import subject_all_user_item_feats from config import logger from config import subject_rank_dense_fea from config import subject_rank_sparse_fea from config import subject_max_seq_len from config import subject_rank_feats_columns from matching.subject.recall_comm import get_all_select_df from matching.subject.recall_comm import get_item_info_df os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID" os.environ["CUDA_VISIBLE_DEVICES"] = "1" K.set_learning_phase(True) if tf.__version__ >= '2.0.0': tf.compat.v1.disable_eager_execution() tqdm.pandas() warnings.filterwarnings('ignore') # 数据准备函数 def get_bst_feats_columns(df, dense_fea, sparse_fea, behavior_fea, his_behavior_fea, emb_dim=32, max_len=100): """ 数据准备函数: df: 数据集 dense_fea: 数值型特征列 sparse_fea: 离散型特征列 behavior_fea: 用户的候选行为特征列 his_behavior_fea: 用户的历史行为特征列 embedding_dim: embedding的维度,这里为了简单,统一把离散型特征列采用一样的隐向量维度 max_len: 用户序列的最大长度 """ sparse_feature_columns = [SparseFeat(feat, vocabulary_size=df[feat].nunique() + 1, embedding_dim = emb_dim) for feat in sparse_fea] dense_feature_columns = [DenseFeat(feat, 1, ) for feat in dense_fea] var_feature_columns = [VarLenSparseFeat(SparseFeat(feat, vocabulary_size=df['subject_id'].nunique() + 1, embedding_dim=emb_dim, embedding_name='subject_id'), maxlen=max_len) for feat in his_behavior_fea] dnn_feature_columns = sparse_feature_columns + dense_feature_columns + var_feature_columns # 建立x, x是一个字典的形式 x = {} for name in get_feature_names(dnn_feature_columns): if name in his_behavior_fea: # 这是历史行为序列 his_list = [l for l in df[name]] # 二维数组 x[name] = pad_sequences(his_list, maxlen=max_len, padding='post') else: x[name] = df[name].values return x, dnn_feature_columns if __name__ == '__main__': logger.info('加载物品行为数据') all_data = get_all_select_df() logger.info('生成用户历史选择物品数据') hist_select = all_data[['user_id', 'subject_id']].groupby('user_id').agg({list}).reset_index() his_behavior_df = pd.DataFrame() his_behavior_df['user_id'] = hist_select['user_id'] his_behavior_df['hist_subject_id'] = hist_select['subject_id'] logger.info('获取物品信息') subject_info_df = get_item_info_df() # 物品总数量 subjects_count = len(subject_info_df['subject_id'].unique().tolist()) logger.info('加载用户行为特征') # 所有用户物品特征 all_user_item_feats_df = pd.read_csv(subject_all_user_item_feats, sep='\t', encoding='utf-8') # subject_id转换成int train_user_item_feats_df = pd.read_csv(subject_train_user_item_feats, sep='\t', encoding='utf-8') train_user_item_feats_df['subject_id'] = train_user_item_feats_df['subject_id'].astype(int) if offline_mode: val_user_item_feats_df = pd.read_csv(subject_val_user_item_feats, sep='\t', encoding='utf-8') val_user_item_feats_df['subject_id'] = val_user_item_feats_df['subject_id'].astype(int) else: val_user_item_feats_df = None test_user_item_feats_df = pd.read_csv(subject_test_user_item_feats, sep='\t', encoding='utf-8') test_user_item_feats_df['subject_id'] = test_user_item_feats_df['subject_id'].astype(int) # 做特征的时候为了方便,给测试集也打上了一个无效的标签,这里直接删掉 del test_user_item_feats_df['label'] train_user_item_feats_df = train_user_item_feats_df.merge(his_behavior_df, on='user_id') if offline_mode: val_user_item_feats_df = val_user_item_feats_df.merge(his_behavior_df, on='user_id') else: val_user_item_feats_df = None test_user_item_feats_df = test_user_item_feats_df.merge(his_behavior_df, on='user_id') # 把特征分开 sparse_fea = subject_rank_sparse_fea behavior_fea = ['subject_id'] hist_behavior_fea = ['hist_subject_id'] dense_fea = subject_rank_dense_fea train_user_item_feats_df[dense_fea] = train_user_item_feats_df[dense_fea].fillna(0, ) if val_user_item_feats_df is not None: val_user_item_feats_df[dense_fea] = val_user_item_feats_df[dense_fea].fillna(0, ) test_user_item_feats_df[dense_fea] = test_user_item_feats_df[dense_fea].fillna(0, ) # 处理inf的值 train_user_item_feats_df.replace([np.inf, -np.inf], 0, inplace=True) test_user_item_feats_df.replace([np.inf, -np.inf], 0, inplace=True) # dense特征进行归一化 for feat in dense_fea: min_max_scaler = MinMaxScaler() min_max_scaler.fit((all_user_item_feats_df[[feat]])) pickle.dump(min_max_scaler, open(subject_model_save_path + 'min_max_scaler_' + feat + '.model', 'wb')) train_user_item_feats_df[feat] = min_max_scaler.transform(train_user_item_feats_df[[feat]]) if val_user_item_feats_df is not None: val_user_item_feats_df[feat] = min_max_scaler.transform(val_user_item_feats_df[[feat]]) test_user_item_feats_df[feat] = min_max_scaler.transform(test_user_item_feats_df[[feat]]) # sparse特征进行LabelEncoder for feat in sparse_fea: label_encoder = LabelEncoder() if feat == 'subject_id': label_encoder.fit(subject_info_df[[feat]]) subject_id_lable_encoder = label_encoder else: label_encoder.fit(all_user_item_feats_df[[feat]]) if feat == 'user_id': user_id_label_encoder = label_encoder pickle.dump(label_encoder, open(subject_model_save_path + feat + '_label_encoder.model', 'wb')) train_user_item_feats_df[feat] = label_encoder.transform(train_user_item_feats_df[[feat]]) if val_user_item_feats_df is not None: val_user_item_feats_df[feat] = label_encoder.transform(val_user_item_feats_df[[feat]]) test_user_item_feats_df[feat] = label_encoder.transform(test_user_item_feats_df[[feat]]) # 准备训练数据 x_train, dnn_feature_columns = get_bst_feats_columns(train_user_item_feats_df, dense_fea, sparse_fea, behavior_fea, hist_behavior_fea, max_len=subject_max_seq_len) y_train = train_user_item_feats_df['label'].values if val_user_item_feats_df is not None: # 准备验证数据 x_val, dnn_feature_columns = get_bst_feats_columns(val_user_item_feats_df, dense_fea, sparse_fea, behavior_fea, hist_behavior_fea, max_len=subject_max_seq_len) y_val = val_user_item_feats_df['label'].values dense_fea = [x for x in dense_fea if x != 'label'] x_test, dnn_feature_columns = get_bst_feats_columns(test_user_item_feats_df, dense_fea, sparse_fea, behavior_fea, hist_behavior_fea, max_len=subject_max_seq_len) # 建立模型 model = BST(dnn_feature_columns, behavior_fea) # 查看模型结构 model.summary() # 模型编译 model.compile(optimizer="Adam", loss=tf.keras.losses.BinaryCrossentropy(from_logits=True), metrics=['AUC']#评价指标:AUC,精确度,均方误差 ) # 模型训练 if val_user_item_feats_df is not None: history = model.fit(x_train, y_train, verbose=1, epochs=5, validation_data=(x_val, y_val), batch_size=256) else: # 也可以使用下面的语句用自己采样出来的验证集 history = model.fit(x_train, y_train, verbose=1, epochs=5, validation_split=0.3, batch_size=256) # 保存训练好的BST模型 # save_model(model, subject_model_save_path + 'bst_model.h5') model.save(subject_model_save_path + 'bst_model.h5') # 模型预测 test_user_item_feats_df['pred_score'] = model.predict(x_test, verbose=1, batch_size=256) # 还原user_id和subject_id test_user_item_feats_df['user_id'] = user_id_label_encoder.inverse_transform(test_user_item_feats_df[['user_id']]) test_user_item_feats_df['subject_id'] = subject_id_lable_encoder.inverse_transform(test_user_item_feats_df[['subject_id']]) item_info_df = get_item_info_df() item_info_df = item_info_df[['subject_id', 'subject_name']] test_user_item_feats_df = test_user_item_feats_df.merge(item_info_df, how='left', on='subject_id') test_user_item_feats_df.sort_values(by=['user_id', 'pred_score'], ascending=[True, False], inplace=True) test_user_item_feats_df['pred_rank'] = test_user_item_feats_df.groupby(['user_id'])['pred_score'].rank(ascending=False, method='first').astype(int) for feat in subject_rank_feats_columns: min_max_scaler = pickle.load(open(subject_model_save_path + 'min_max_scaler_' + feat + '.model', 'rb')) train_user_item_feats_df[feat] = min_max_scaler.inverse_transform(train_user_item_feats_df[[feat]]) if val_user_item_feats_df is not None: val_user_item_feats_df[feat] = min_max_scaler.inverse_transform(val_user_item_feats_df[[feat]]) test_user_item_feats_df[feat] = min_max_scaler.inverse_transform(test_user_item_feats_df[[feat]]) test_user_item_feats_df[['user_id', 'subject_id', 'subject_name'] + subject_rank_feats_columns + ['pred_score', 'pred_rank']].to_csv( subject_features_save_path + 'bst_rank_score.csv', sep='\t', index=False, header=True)