You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
matching_dependency/hpo/er_model_hpo.py

258 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import numpy as np
import torch
import json
from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer
from ConfigSpace.conditions import InCondition
from ConfigSpace.read_and_write import json as csj
import py_entitymatching as em
import py_entitymatching.catalog.catalog_manager as cm
import pandas as pd
from smac import HyperparameterOptimizationFacade, Scenario
from settings import *
from ml_er.ml_entity_resolver import evaluate_prediction, load_mds, is_explicable, build_col_pairs_sim_tensor_dict
# 数据在外部加载
########################################################################################################################
ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1')
# ltable.fillna("", inplace=True)
rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1')
# rtable.fillna("", inplace=True)
mappings = pd.read_csv(mapping_path)
lid_mapping_list = []
rid_mapping_list = []
# 全部转为字符串
# ltable = ltable.astype(str)
# rtable = rtable.astype(str)
# mappings = mappings.astype(str)
matching_number = len(mappings) # 所有阳性样本数商品数据集应为1300
for index, row in mappings.iterrows():
lid_mapping_list.append(row[mapping_lid])
rid_mapping_list.append(row[mapping_rid])
# 仅保留两表中出现在映射表中的行,增大正样本比例
selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)]
if len(lr_attrs_map) > 0:
selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段
tables_id = rtable_id # 不论左表右表ID字段名是否一致经上一行调整统一以右表为准
selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)]
selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名
########################################################################################################################
class Classifier:
@property
def configspace(self) -> ConfigurationSpace:
# Build Configuration Space which defines all parameters and their ranges
cs = ConfigurationSpace(seed=0)
block_attr_items = selected_attrs[:]
block_attr_items.remove(tables_id)
block_attr = Categorical("block_attr", block_attr_items)
overlap_size = Integer("overlap_size", (1, 3), default=1)
ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf", "lg", "ln", "nb"], default="rf")
ml_blocker = Categorical("ml_blocker", ["over_lap", "attr_equiv"], default="over_lap")
use_overlap_size = InCondition(child=overlap_size, parent=ml_blocker, values=["over_lap"])
cs.add_hyperparameters([block_attr, overlap_size, ml_matcher, ml_blocker])
cs.add_conditions([use_overlap_size])
return cs
# train 就是整个函数 只需将返回结果由预测变成预测结果的评估
def train(self, config: Configuration, seed: int = 0) -> float:
# print(f"BRAINFUCK:{config.values()}")
cm.del_catalog()
attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs] # 字段名加左前缀
attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs] # 字段名加右前缀
cm.set_key(selected_ltable, tables_id)
cm.set_key(selected_rtable, tables_id)
if config["ml_blocker"] == "over_lap":
blocker = em.OverlapBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"],
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
overlap_size=config["overlap_size"], show_progress=False,
allow_missing=True)
elif config["ml_blocker"] == "attr_equiv":
blocker = em.AttrEquivalenceBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"],
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
allow_missing=True)
candidate['gold'] = 0
candidate_match_rows = []
for index_num, line in candidate.iterrows():
l_id = line['ltable_' + tables_id]
map_row = mappings[mappings[mapping_lid] == l_id]
if map_row is not None:
r_id = map_row[mapping_rid]
for value in r_id:
if value == line['rtable_' + tables_id]:
candidate_match_rows.append(line["_id"])
else:
continue
for line in candidate_match_rows:
candidate.loc[line, 'gold'] = 1
# 裁剪负样本,保持正负样本数量一致
candidate_mismatch = candidate[candidate['gold'] == 0]
candidate_match = candidate[candidate['gold'] == 1]
if len(candidate_mismatch) > len(candidate_match):
candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match))
# 拼接正负样本
candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match])
if len(candidate_for_train_test) == 0:
return 1
cm.set_key(candidate_for_train_test, '_id')
cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id)
cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id)
cm.set_ltable(candidate_for_train_test, selected_ltable)
cm.set_rtable(candidate_for_train_test, selected_rtable)
# 分为训练测试集
train_proportion = 0.7
test_proportion = 0.3
sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0)
train_set = sets['train']
test_set = sets['test']
cm.set_key(train_set, '_id')
cm.set_fk_ltable(train_set, 'ltable_' + tables_id)
cm.set_fk_rtable(train_set, 'rtable_' + tables_id)
cm.set_ltable(train_set, selected_ltable)
cm.set_rtable(train_set, selected_rtable)
cm.set_key(test_set, '_id')
cm.set_fk_ltable(test_set, 'ltable_' + tables_id)
cm.set_fk_rtable(test_set, 'rtable_' + tables_id)
cm.set_ltable(test_set, selected_ltable)
cm.set_rtable(test_set, selected_rtable)
if config["ml_matcher"] == "dt":
matcher = em.DTMatcher(name='DecisionTree', random_state=0)
elif config["ml_matcher"] == "svm":
matcher = em.SVMMatcher(name='SVM', random_state=0)
elif config["ml_matcher"] == "rf":
matcher = em.RFMatcher(name='RF', random_state=0)
elif config["ml_matcher"] == "lg":
matcher = em.LogRegMatcher(name='LogReg', random_state=0)
elif config["ml_matcher"] == "ln":
matcher = em.LinRegMatcher(name='LinReg')
elif config["ml_matcher"] == "nb":
matcher = em.NBMatcher(name='NaiveBayes')
feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False)
train_feature_vecs = em.extract_feature_vecs(train_set,
feature_table=feature_table,
attrs_after=['gold'],
show_progress=False)
test_feature_after = attrs_with_l_prefix[:]
test_feature_after.extend(attrs_with_r_prefix)
for _ in test_feature_after:
if _.endswith(tables_id):
test_feature_after.remove(_)
test_feature_after.append('gold')
test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table,
attrs_after=test_feature_after, show_progress=False)
fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold']
train_feature_vecs.fillna(0, inplace=True)
test_feature_vecs.fillna(0, inplace=True)
matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold')
test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id])
predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after,
append=True, target_attr='predicted', inplace=False)
eval_result = em.eval_matches(predictions, 'gold', 'predicted')
em.print_eval_summary(eval_result)
indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, test_proportion)
print(indicators)
# 计算可解释性
predictions_attrs = []
predictions_attrs.extend(attrs_with_l_prefix)
predictions_attrs.extend(attrs_with_r_prefix)
predictions_attrs.extend(['gold', 'predicted'])
predictions = predictions[predictions_attrs]
predictions = predictions.reset_index(drop=True)
# predictions = predictions.astype(str)
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions)
# 默认路径为 "../md_discovery/output/xxx.txt"
# mds/vio 共2个md文件
md_paths = [md_output_dir + 'mds.txt', md_output_dir + 'vio.txt']
md_list = load_mds(md_paths) # 从全局变量中读取所有的md
epl_match = 0 # 可解释预测match
if len(md_list) > 0:
for line in predictions.itertuples():
if is_explicable(line, md_list, sim_tensor_dict) and str(getattr(line, 'predicted')) == str(1):
epl_match += 1
ppre = predictions[predictions['predicted'] == str(1)]
interpretability = epl_match / len(ppre) # 可解释性
if indicators["block_recall"] >= 0.8:
f1 = indicators["F1"]
else:
f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (indicators["precision"] + indicators["block_recall"])
# if indicators["block_recall"] < 0.8:
# return 1
# f1 = indicators["F1"]
performance = interpre_weight * interpretability + (1 - interpre_weight) * f1
return 1 - performance
def ml_er_hpo():
classifier = Classifier()
cs = classifier.configspace
str_configspace = csj.write(cs)
dict_configspace = json.loads(str_configspace)
with open(hpo_output_dir + "configspace.json", "w") as f:
json.dump(dict_configspace, f)
# Next, we create an object, holding general information about the run
scenario = Scenario(
cs,
deterministic=True,
n_trials=10, # We want to run max 50 trials (combination of config and seed)
n_workers=1
)
initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5)
# Now we use SMAC to find the best hyperparameters
smac = HyperparameterOptimizationFacade(
scenario,
classifier.train,
initial_design=initial_design,
overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state
)
incumbent = smac.optimize()
incumbent_cost = smac.validate(incumbent)
default = cs.get_default_configuration()
default_cost = smac.validate(default)
print(f"Default Cost: {default_cost}")
print(f"Incumbent Cost: {incumbent_cost}")
if incumbent_cost > default_cost:
incumbent = default
print(f"Updated Incumbent Cost: {default_cost}")
print(f"Optimized Configuration:{incumbent.values()}")
incumbent_ndarray = incumbent.get_array()
np.save(hpo_output_dir + 'incumbent.npy', incumbent_ndarray)
return incumbent
if __name__ == '__main__':
ml_er_hpo()