MD-metrics-HPO
HuangJintao 7 months ago
parent b21b0aa496
commit 9b06ce3840

6
.gitignore vendored

@ -2,3 +2,9 @@
/ml_er/output/* /ml_er/output/*
/md_discovery/output/* /md_discovery/output/*
/hpo/output/* /hpo/output/*
tfile.py
table_embedding.py
set_none.py
generate_matches.py
ml_er/fuck.py

@ -1,73 +0,0 @@
import json
from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float
from ConfigSpace.conditions import InCondition
from ConfigSpace.read_and_write import json as csj
import py_entitymatching.catalog.catalog_manager as cm
import pandas as pd
from smac import HyperparameterOptimizationFacade, Scenario
from settings import *
from ml_er.ml_entity_resolver import er_process
class Classifier:
@property
def configspace(self) -> ConfigurationSpace:
cs = ConfigurationSpace(seed=0)
ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf", "lg", "ln", "nb"], default="rf")
# todo 每个分类器的超参数
tree_criterion = Categorical("dt_criterion", ["gini", "entropy", "log_loss"], default="gini")
cs.add_hyperparameters([ml_matcher])
return cs
def train(self, config: Configuration, seed: int = 0) -> float:
cm.del_catalog()
indicators = er_process(config)
return 1-indicators['performance']
def ml_er_hpo():
classifier = Classifier()
cs = classifier.configspace
str_configspace = csj.write(cs)
dict_configspace = json.loads(str_configspace)
with open(hpo_output_dir + "configspace.json", "w") as f:
json.dump(dict_configspace, f, indent=4)
scenario = Scenario(
cs,
deterministic=True,
n_trials=12, # We want to run max 50 trials (combination of config and seed)
n_workers=1
)
initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5)
smac = HyperparameterOptimizationFacade(
scenario,
classifier.train,
initial_design=initial_design,
overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state
)
incumbent = smac.optimize()
incumbent_cost = smac.validate(incumbent)
default = cs.get_default_configuration()
default_cost = smac.validate(default)
print(f"Default Cost: {default_cost}")
print(f"Incumbent Cost: {incumbent_cost}")
if incumbent_cost > default_cost:
incumbent = default
print(f"Updated Incumbent Cost: {default_cost}")
print(f"Optimized Configuration:{incumbent.values()}")
with open(hpo_output_dir + "incumbent.json", "w") as f:
json.dump(dict(incumbent), f, indent=4)
return incumbent
if __name__ == '__main__':
ml_er_hpo()

@ -0,0 +1,110 @@
import json
import pickle
from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float
from ConfigSpace.conditions import InCondition, EqualsCondition
from ConfigSpace.read_and_write import json as csj
import py_entitymatching.catalog.catalog_manager as cm
import pandas as pd
from smac import HyperparameterOptimizationFacade, Scenario
from ml_er.magellan_new import matching
from settings import *
class Classifier:
@property
def configspace(self) -> ConfigurationSpace:
cs = ConfigurationSpace(seed=0)
ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf"])
# note 以tree开头的超参数是DT和RF共用的
tree_criterion = Categorical("tree_criterion", ["gini", "entropy", "log_loss"], default="gini")
rf_n_estimators = Integer('number_of_tree', (10, 150))
tree_max_depth = Integer('tree_max_depth', (15, 30), default=None)
rf_max_features = Categorical('rf_max_features', ["sqrt", "log2", "auto"], default='sqrt')
svm_kernel = Categorical('svm_kernel', ['linear', 'poly', 'rbf', 'sigmoid', 'precomputed'], default='rbf')
svm_C = Integer('svm_C', (1, 100), default=1)
svm_gamma = Categorical('svm_gamma', ['scale', 'auto'], default='scale')
svm_degree = Integer('svm_degree', (1, 5), default=3)
svm_constant = Float('svm_constant', (0.0, 5.0), default=0.0)
dt_splitter = Categorical('dt_splitter', ["best", "random"], default='best')
dt_max_features = Categorical('dt_max_features', ["auto", "sqrt", "log2"], default=None)
cs.add_hyperparameters([ml_matcher, tree_criterion, rf_n_estimators, tree_max_depth, rf_max_features,
svm_kernel, svm_C, svm_gamma, svm_degree, svm_constant, dt_splitter, dt_max_features])
active_tree_criterion = InCondition(child=tree_criterion, parent=ml_matcher, values=['dt', 'rf'])
active_tree_max_depth = InCondition(child=tree_max_depth, parent=ml_matcher, values=['dt', 'rf'])
active_rf_n_estimators = EqualsCondition(child=rf_n_estimators, parent=ml_matcher, value="rf")
active_rf_max_features = EqualsCondition(child=rf_max_features, parent=ml_matcher, value="rf")
active_dt_splitter = EqualsCondition(child=dt_splitter, parent=ml_matcher, value="dt")
active_dt_max_features = EqualsCondition(child=dt_max_features, parent=ml_matcher, value="dt")
active_svm_kernel = EqualsCondition(child=svm_kernel, parent=ml_matcher, value="svm")
active_svm_gamma = EqualsCondition(child=svm_gamma, parent=ml_matcher, value="svm")
active_svm_degree = EqualsCondition(child=svm_degree, parent=ml_matcher, value="svm")
active_svm_constant = EqualsCondition(child=svm_constant, parent=ml_matcher, value="svm")
active_svm_C = EqualsCondition(child=svm_C, parent=ml_matcher, value="svm")
cs.add_conditions([active_svm_C, active_svm_constant, active_svm_degree, active_svm_gamma, active_svm_kernel,
active_dt_splitter, active_rf_n_estimators, active_dt_max_features, active_rf_max_features,
active_tree_max_depth, active_tree_criterion])
return cs
def train(self, config: Configuration, seed: int = 0) -> float:
cm.del_catalog()
with open(er_output_dir + "blocking_result.pickle", "rb") as file:
blocking_result = pickle.load(file)
indicators = matching(config, blocking_result)
return 1 - indicators['performance']
def ml_er_hpo():
classifier = Classifier()
cs = classifier.configspace
str_configspace = csj.write(cs)
dict_configspace = json.loads(str_configspace)
# 将超参数空间保存本地
with open(hpo_output_dir + "configspace.json", "w") as f:
json.dump(dict_configspace, f, indent=4)
scenario = Scenario(
cs,
crash_cost=1.0,
deterministic=True,
n_trials=50,
n_workers=1
)
initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5)
smac = HyperparameterOptimizationFacade(
scenario,
classifier.train,
initial_design=initial_design,
overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state
)
incumbent = smac.optimize()
incumbent_cost = smac.validate(incumbent)
default = cs.get_default_configuration()
default_cost = smac.validate(default)
print(f"Default Cost: {default_cost}")
print(f"Incumbent Cost: {incumbent_cost}")
if incumbent_cost > default_cost:
incumbent = default
print(f"Updated Incumbent Cost: {default_cost}")
print(f"Optimized Configuration:{incumbent.values()}")
with open(hpo_output_dir + "incumbent.json", "w") as f:
json.dump(dict(incumbent), f, indent=4)
return incumbent
if __name__ == '__main__':
ml_er_hpo()

@ -1,5 +1,7 @@
import random import random
import operator import operator
from operator import itemgetter
import pandas as pd import pandas as pd
import torch import torch
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@ -41,12 +43,12 @@ def mining(train: pd.DataFrame):
sim_tensor_list = [] sim_tensor_list = []
for col_tuple in col_tuple_list: for col_tuple in col_tuple_list:
mask = ((data[columns[col_tuple[0]]].isin([''])) | (data[columns[col_tuple[1]]].isin(['']))) mask = ((data[columns[col_tuple[0]]].isin([''])) | (data[columns[col_tuple[1]]].isin([''])))
empty_string_indices = data[mask].index.tolist() empty_string_indices = data[mask].index.tolist() # 空字符串索引
lattr_tensor = norm_table_tensor[col_tuple[0]] lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]] rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1) sim_tensor = torch.sum(mul_tensor, 1) # 求和得到对应属性2列张量相似度, 2列变1列
# 将有空字符串的位置强制置为-1.0000 # 将有空字符串的位置强制置为-1.0000
sim_tensor = sim_tensor.scatter(0, torch.tensor(empty_string_indices, device='cuda').long(), -1.0000) sim_tensor = sim_tensor.scatter(0, torch.tensor(empty_string_indices, device='cuda').long(), -1.0000)
sim_tensor = torch.round(sim_tensor, decimals=2) sim_tensor = torch.round(sim_tensor, decimals=2)
@ -86,7 +88,9 @@ def mining(train: pd.DataFrame):
for k in range(0, len(columns_without_prefix)): for k in range(0, len(columns_without_prefix)):
md_dict_format[columns_without_prefix[k]] = md_list_format[k] md_dict_format[columns_without_prefix[k]] = md_list_format[k]
result_list.append((md_dict_format, abs_support, confidence)) result_list.append((md_dict_format, abs_support, confidence))
result_list.sort(key=operator.itemgetter(2), reverse=True) # result_list.sort(key=itemgetter(2), reverse=True)
# 按confidence->support的优先级排序
result_list.sort(key=itemgetter(2, 1), reverse=True)
mds_to_txt(result_list) mds_to_txt(result_list)
return result_list return result_list

@ -1,6 +1,14 @@
import json
import os
import pickle
import time import time
import ConfigSpace
import pandas as pd import pandas as pd
import py_entitymatching as em import py_entitymatching as em
import torch
from ConfigSpace import Configuration
from ConfigSpace.read_and_write import json as csj
import py_entitymatching.catalog.catalog_manager as cm import py_entitymatching.catalog.catalog_manager as cm
from tqdm import tqdm from tqdm import tqdm
@ -16,11 +24,11 @@ def blocking_mining():
cm.set_key(rtable, rtable_id) cm.set_key(rtable, rtable_id)
mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1') mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1')
matching_number = len(mappings) matching_number = len(mappings)
if ltable_id == rtable_id: # if ltable_id == rtable_id:
tables_id = rtable_id # tables_id = rtable_id
attributes = ltable.columns.values.tolist() attributes = ltable.columns.values.tolist()
lattributes = ['ltable_' + i for i in attributes] # lattributes = ['ltable_' + i for i in attributes]
rattributes = ['rtable_' + i for i in attributes] # rattributes = ['rtable_' + i for i in attributes]
cm.set_key(ltable, ltable_id) cm.set_key(ltable, ltable_id)
cm.set_key(rtable, rtable_id) cm.set_key(rtable, rtable_id)
@ -65,15 +73,208 @@ def blocking_mining():
label_and_split_time = time.time() label_and_split_time = time.time()
print(f'Label and Split Time: {label_and_split_time - block_time}') print(f'Label and Split Time: {label_and_split_time - block_time}')
mining(train_set) # 挖掘MD并保存本地
md_list = mining(train_set)
mining_time = time.time() mining_time = time.time()
print(f'Mining Time: {mining_time - label_and_split_time}') print(f'Mining Time: {mining_time - label_and_split_time}')
return 1 blocking_results = (ltable, rtable, train_set, test_set, md_list, block_recall)
# 将blocking结果保存到本地
with open(er_output_dir + "blocking_result.pickle", "wb") as file_:
pickle.dump(blocking_results, file_)
return blocking_results
def matching(config: Configuration, blocking_result_):
print(f'\033[33mConfig: {config}\033[0m')
start = time.time()
ltable = blocking_result_[0]
rtable = blocking_result_[1]
train_set = blocking_result_[2]
test_set = blocking_result_[3]
md_list = blocking_result_[4]
block_recall = blocking_result_[5]
ml_matcher = config["ml_matcher"]
match ml_matcher:
case "dt":
matcher = em.DTMatcher(name='DecisionTree', random_state=0, criterion=config['tree_criterion'],
max_depth=config['tree_max_depth'], splitter=config['dt_splitter'],
max_features=config['dt_max_features'])
case "svm":
matcher = em.SVMMatcher(name='SVM', random_state=0, kernel=config['svm_kernel'], degree=config['svm_degree'],
gamma=config['svm_gamma'], C=config['svm_C'], coef0=config['svm_constant'])
case "rf":
matcher = em.RFMatcher(name='RandomForest', random_state=0, criterion=config['tree_criterion'],
max_depth=config['tree_max_depth'], n_estimators=config['number_of_tree'],
max_features=config['rf_max_features'])
cm.set_key(train_set, '_id')
cm.set_fk_ltable(train_set, 'ltable_' + ltable_id)
cm.set_fk_rtable(train_set, 'rtable_' + rtable_id)
cm.set_ltable(train_set, ltable)
cm.set_rtable(train_set, rtable)
cm.set_key(ltable, ltable_id)
cm.set_key(rtable, rtable_id)
cm.set_key(test_set, '_id')
cm.set_fk_ltable(test_set, 'ltable_' + ltable_id)
cm.set_fk_rtable(test_set, 'rtable_' + rtable_id)
cm.set_ltable(test_set, ltable)
cm.set_rtable(test_set, rtable)
feature_table = em.get_features_for_matching(ltable, rtable, validate_inferred_attr_types=False)
train_feature_vecs = em.extract_feature_vecs(train_set,
feature_table=feature_table,
attrs_after=['gold'],
show_progress=False)
train_feature_vecs.fillna(value=0, inplace=True)
test_feature_after = ['ltable_' + i for i in ltable.columns.values.tolist()]
for _ in test_feature_after[:]:
test_feature_after.append(_.replace('ltable_', 'rtable_'))
for _ in test_feature_after:
if _.endswith(ltable_id) or _.endswith(rtable_id):
test_feature_after.remove(_)
test_feature_after.append('gold')
test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table,
attrs_after=test_feature_after, show_progress=False)
test_feature_vecs.fillna(value=0, inplace=True)
fit_exclude = ['_id', 'ltable_' + ltable_id, 'rtable_' + rtable_id, 'gold']
matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold')
test_feature_after.extend(['_id', 'ltable_' + ltable_id, 'rtable_' + rtable_id])
predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after,
append=True, target_attr='predicted', inplace=False)
eval_result = em.eval_matches(predictions, 'gold', 'predicted')
em.print_eval_summary(eval_result)
indicators = evaluate_prediction(predictions, 'gold', 'predicted')
indicators['block_recall'] = block_recall
test_feature_after.remove('_id')
test_feature_after.append('predicted')
predictions = predictions[test_feature_after]
predictions = predictions.reset_index(drop=True)
predictions = predictions.astype(str)
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions)
predictions['confidence'] = 0
epl_match = 0 # 可解释预测match
if len(md_list) > 0:
for row in tqdm(predictions.itertuples()):
x = is_explicable(row, md_list, sim_tensor_dict)
if x > 0 and str(getattr(row, 'predicted')) == str(1):
predictions.loc[row[0], 'confidence'] = x
epl_match += 1
df = predictions[predictions['predicted'] == str(1)]
interpretability = epl_match / len(df) # 可解释性
indicators['interpretability'] = interpretability
# note 既然不调block参数, 不妨假设block_recall很高, 不必考虑
# if indicators["block_recall"] < indicators["recall"]:
# f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (
# indicators["precision"] + indicators["block_recall"])
# else:
# f1 = indicators["F1"]
performance = interpre_weight * interpretability + (1 - interpre_weight) * indicators["F1"]
indicators['performance'] = performance
print(f'ER Indicators: {indicators}')
predictions.to_csv(er_output_dir + 'predictions.csv', sep=',', index=False, header=True)
print(f'\033[33mTime consumed by matching in seconds: {time.time() - start}\033[0m')
return indicators
def evaluate_prediction(prediction_: pd.DataFrame, labeled_attr: str, predicted_attr: str) -> dict:
new_df = prediction_.reset_index(drop=False, inplace=False)
gold = new_df[labeled_attr]
predicted = new_df[predicted_attr]
gold_negative = gold[gold == 0].index.values
gold_positive = gold[gold == 1].index.values
predicted_negative = predicted[predicted == 0].index.values
predicted_positive = predicted[predicted == 1].index.values
false_positive_indices = list(set(gold_negative).intersection(predicted_positive))
true_positive_indices = list(set(gold_positive).intersection(predicted_positive))
false_negative_indices = list(set(gold_positive).intersection(predicted_negative))
num_true_positives = float(len(true_positive_indices))
num_false_positives = float(len(false_positive_indices))
num_false_negatives = float(len(false_negative_indices))
precision_denominator = num_true_positives + num_false_positives
recall_denominator = num_true_positives + num_false_negatives
precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator
recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator
F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall)
return {"precision": precision, "recall": recall, "F1": F1}
def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame):
predictions_attrs = predictions.columns.values.tolist()
col_tuple_list = []
for _ in predictions_attrs:
if _.startswith('ltable'):
left_index = predictions_attrs.index(_)
right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_'))
col_tuple_list.append((left_index, right_index))
length = predictions.shape[0]
width = predictions.shape[1]
predictions = predictions.reset_index(drop=True)
sentences = predictions.values.flatten(order='F').tolist()
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True)
split_embedding = torch.split(embedding, length, dim=0)
table_tensor = torch.stack(split_embedding, dim=0, out=None)
# prediction的归一化嵌入张量
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
sim_tensor_dict = {}
for col_tuple in col_tuple_list:
lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1)
sim_tensor = torch.round(sim_tensor, decimals=4)
sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor
return sim_tensor_dict
def is_explicable(row, all_mds: list, st_dict):
attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段
for md_tuple in all_mds:
explicable = True # 假设这条md能解释当前元组
for a in attrs:
if st_dict[a][row[0]].item() < md_tuple[0][a]:
explicable = False # 任意一个字段的相似度达不到阈值这条md就不能解释当前元组
break # 不再与当前md的其他相似度阈值比较跳转到下一条md
if explicable:
return md_tuple[2] # 任意一条md能解释直接返回
return -1.0 # 遍历结束,不能解释
def matching(): def ml_er(config: Configuration, blocking_result_):
return 1 indicators = matching(config, blocking_result_)
output_path = er_output_dir + "eval_result.txt"
with open(output_path, 'w') as _f:
_f.write('Precision:' + str(indicators["precision"]) + '\n')
_f.write('Recall:' + str(indicators["recall"]) + '\n')
_f.write('F1:' + str(indicators["F1"]) + '\n')
_f.write('block_recall:' + str(indicators["block_recall"]) + '\n')
_f.write('interpretability:' + str(indicators['interpretability']) + '\n')
_f.write('performance:' + str(indicators['performance']) + '\n')
if __name__ == '__main__': if __name__ == '__main__':
blocking_mining() if os.path.isfile(hpo_output_dir + "incumbent.json"):
with open(hpo_output_dir + "configspace.json", 'r') as f:
dict_configspace = json.load(f)
str_configspace = json.dumps(dict_configspace)
configspace = csj.read(str_configspace)
with open(hpo_output_dir + "incumbent.json", 'r') as f:
dic = json.load(f)
configuration = ConfigSpace.Configuration(configspace, values=dic)
with open(er_output_dir + "blocking_result.pickle", "rb") as file:
blocking_result = pickle.load(file)
ml_er(configuration, blocking_result)

@ -0,0 +1,4 @@
from ml_er.magellan_new import blocking_mining
if __name__ == '__main__':
blocking_mining()

@ -1,12 +1,12 @@
from sentence_transformers import SentenceTransformer from sentence_transformers import SentenceTransformer
ltable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\DBLP-GoogleScholar\tableA.csv' ltable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\Abt-Buy\tableA.csv'
rtable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\DBLP-GoogleScholar\tableB.csv' rtable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\Abt-Buy\tableB.csv'
mapping_path = r'E:\Data\Research\Projects\matching_dependency\datasets\DBLP-GoogleScholar\matches.csv' mapping_path = r'E:\Data\Research\Projects\matching_dependency\datasets\Abt-Buy\matches.csv'
mapping_lid = 'idDBLP' # mapping表中左表id名 mapping_lid = 'idAbt' # mapping表中左表id名
mapping_rid = 'idScholar' # mapping表中右表id名 mapping_rid = 'idBuy' # mapping表中右表id名
ltable_block_attr = 'title' ltable_block_attr = 'name'
rtable_block_attr = 'title' rtable_block_attr = 'name'
ltable_id = 'id' # 左表id字段名称 ltable_id = 'id' # 左表id字段名称
rtable_id = 'id' # 右表id字段名称 rtable_id = 'id' # 右表id字段名称
target_attr = 'id' # 进行md挖掘时的目标字段 target_attr = 'id' # 进行md挖掘时的目标字段

Loading…
Cancel
Save