|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import sys
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
import ConfigSpace
|
|
|
|
|
import pandas
|
|
|
|
|
import torch
|
|
|
|
|
from py_entitymatching.debugmatcher.debug_gui_utils import _get_metric
|
|
|
|
|
from ConfigSpace.read_and_write import json as csj
|
|
|
|
|
import py_entitymatching as em
|
|
|
|
|
import py_entitymatching.catalog.catalog_manager as cm
|
|
|
|
|
import pandas as pd
|
|
|
|
|
import six
|
|
|
|
|
from ConfigSpace import Configuration
|
|
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
|
|
|
|
from md_discovery.md_discover import md_discover
|
|
|
|
|
from settings import *
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def prepare_file_for_md_discovery(train, t_single_tuple_path=er_output_dir + "t_single_tuple.csv"):
|
|
|
|
|
df = train[train['gold'] == 1]
|
|
|
|
|
# 元组对左右ID调整一致
|
|
|
|
|
for index, row in df.iterrows():
|
|
|
|
|
df.loc[index, "rtable_" + rtable_id] = row["ltable_" + rtable_id]
|
|
|
|
|
|
|
|
|
|
train_columns = train.columns.values.tolist()
|
|
|
|
|
l_columns = []
|
|
|
|
|
r_columns = []
|
|
|
|
|
cols = []
|
|
|
|
|
# 左表和右表字段名分别加入两个列表
|
|
|
|
|
for _ in train_columns:
|
|
|
|
|
if _.startswith('ltable'):
|
|
|
|
|
l_columns.append(_)
|
|
|
|
|
elif _.startswith('rtable'):
|
|
|
|
|
r_columns.append(_)
|
|
|
|
|
# 将左表中字段名去掉前缀,作为统一的字段名列表(前提是两张表内对应字段名调整一致)
|
|
|
|
|
for _ in l_columns:
|
|
|
|
|
cols.append(_.replace('ltable_', ''))
|
|
|
|
|
|
|
|
|
|
ldf = df[l_columns]
|
|
|
|
|
rdf = df[r_columns]
|
|
|
|
|
ldf.columns = cols
|
|
|
|
|
rdf.columns = cols
|
|
|
|
|
t_single_tuple = pd.concat([ldf, rdf])
|
|
|
|
|
t_single_tuple = t_single_tuple.reset_index(drop=True)
|
|
|
|
|
|
|
|
|
|
t_single_tuple.to_csv(t_single_tuple_path, sep=',', index=False, header=True, quoting=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def evaluate_prediction(df: pd.DataFrame, labeled_attr: str, predicted_attr: str, matching_number: int,
|
|
|
|
|
candidate: pd.DataFrame) -> dict:
|
|
|
|
|
new_df = df.reset_index(drop=False, inplace=False)
|
|
|
|
|
gold = new_df[labeled_attr]
|
|
|
|
|
predicted = new_df[predicted_attr]
|
|
|
|
|
gold_negative = gold[gold == 0].index.values
|
|
|
|
|
gold_positive = gold[gold == 1].index.values
|
|
|
|
|
predicted_negative = predicted[predicted == 0].index.values
|
|
|
|
|
predicted_positive = predicted[predicted == 1].index.values
|
|
|
|
|
|
|
|
|
|
false_positive_indices = list(set(gold_negative).intersection(predicted_positive))
|
|
|
|
|
true_positive_indices = list(set(gold_positive).intersection(predicted_positive))
|
|
|
|
|
false_negative_indices = list(set(gold_positive).intersection(predicted_negative))
|
|
|
|
|
|
|
|
|
|
num_true_positives = float(len(true_positive_indices))
|
|
|
|
|
num_false_positives = float(len(false_positive_indices))
|
|
|
|
|
num_false_negatives = float(len(false_negative_indices))
|
|
|
|
|
|
|
|
|
|
precision_denominator = num_true_positives + num_false_positives
|
|
|
|
|
recall_denominator = num_true_positives + num_false_negatives
|
|
|
|
|
|
|
|
|
|
precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator
|
|
|
|
|
recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator
|
|
|
|
|
F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall)
|
|
|
|
|
block_recall = len(candidate[candidate['gold'] == 1]) / matching_number
|
|
|
|
|
|
|
|
|
|
return {"precision": precision, "recall": recall, "F1": F1, "block_recall": block_recall}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_mds(paths: list) -> list:
|
|
|
|
|
if len(paths) == 0:
|
|
|
|
|
return []
|
|
|
|
|
all_mds = []
|
|
|
|
|
# 传入md路径列表
|
|
|
|
|
for md_path in paths:
|
|
|
|
|
if not os.path.exists(md_path):
|
|
|
|
|
continue
|
|
|
|
|
mds = []
|
|
|
|
|
# 打开每一个md文件
|
|
|
|
|
with open(md_path, 'r') as f:
|
|
|
|
|
# 读取每一行的md,加入该文件的md列表
|
|
|
|
|
for line in f.readlines():
|
|
|
|
|
md_metadata = line.strip().split('\t')
|
|
|
|
|
# todo 如果MD文件的形式改了 这里也要改
|
|
|
|
|
md = eval(md_metadata[1])
|
|
|
|
|
mds.append(md)
|
|
|
|
|
all_mds.extend(mds)
|
|
|
|
|
return all_mds
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def is_explicable(row, all_mds: list, st_dict):
|
|
|
|
|
attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段
|
|
|
|
|
for md_tuple in all_mds:
|
|
|
|
|
explicable = True # 假设这条md能解释当前元组
|
|
|
|
|
for a in attrs:
|
|
|
|
|
if a != target_attr:
|
|
|
|
|
if st_dict[a][row[0]].item() < md_tuple[0][a]:
|
|
|
|
|
explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组
|
|
|
|
|
break # 不再与当前md的其他相似度阈值比较,跳转到下一条md
|
|
|
|
|
if explicable:
|
|
|
|
|
return md_tuple[2] # 任意一条md能解释,直接返回
|
|
|
|
|
return -1.0 # 遍历结束,不能解释
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def build_col_pairs_sim_tensor_dict(predictions: pandas.DataFrame):
|
|
|
|
|
predictions_attrs = predictions.columns.values.tolist()
|
|
|
|
|
col_tuple_list = []
|
|
|
|
|
for _ in predictions_attrs:
|
|
|
|
|
if _.startswith('ltable'):
|
|
|
|
|
left_index = predictions_attrs.index(_)
|
|
|
|
|
right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_'))
|
|
|
|
|
col_tuple_list.append((left_index, right_index))
|
|
|
|
|
|
|
|
|
|
length = predictions.shape[0]
|
|
|
|
|
width = predictions.shape[1]
|
|
|
|
|
sentences = []
|
|
|
|
|
for col in range(0, width):
|
|
|
|
|
for row in range(0, length):
|
|
|
|
|
cell_value = predictions.values[row, col]
|
|
|
|
|
sentences.append(cell_value)
|
|
|
|
|
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda")
|
|
|
|
|
split_embedding = torch.split(embedding, length, dim=0)
|
|
|
|
|
table_tensor = torch.stack(split_embedding, dim=0, out=None)
|
|
|
|
|
# prediction的归一化嵌入张量
|
|
|
|
|
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
|
|
|
|
|
sim_tensor_dict = {}
|
|
|
|
|
for col_tuple in col_tuple_list:
|
|
|
|
|
lattr_tensor = norm_table_tensor[col_tuple[0]]
|
|
|
|
|
rattr_tensor = norm_table_tensor[col_tuple[1]]
|
|
|
|
|
mul_tensor = lattr_tensor * rattr_tensor
|
|
|
|
|
sim_tensor = torch.sum(mul_tensor, 1)
|
|
|
|
|
sim_tensor = torch.round(sim_tensor, decimals=4)
|
|
|
|
|
sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor
|
|
|
|
|
return sim_tensor_dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def er_process(config: Configuration):
|
|
|
|
|
start = time.time()
|
|
|
|
|
ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1')
|
|
|
|
|
cm.set_key(ltable, ltable_id)
|
|
|
|
|
# ltable.fillna("", inplace=True)
|
|
|
|
|
rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1')
|
|
|
|
|
cm.set_key(rtable, rtable_id)
|
|
|
|
|
# rtable.fillna("", inplace=True)
|
|
|
|
|
mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1')
|
|
|
|
|
|
|
|
|
|
# 仅保留两表中出现在映射表中的行,增大正样本比例
|
|
|
|
|
lid_mapping_list = []
|
|
|
|
|
rid_mapping_list = []
|
|
|
|
|
# 全部转为字符串
|
|
|
|
|
# ltable = ltable.astype(str)
|
|
|
|
|
# rtable = rtable.astype(str)
|
|
|
|
|
# mappings = mappings.astype(str)
|
|
|
|
|
matching_number = len(mappings) # 所有阳性样本数
|
|
|
|
|
|
|
|
|
|
for index, row in mappings.iterrows():
|
|
|
|
|
lid_mapping_list.append(row[mapping_lid])
|
|
|
|
|
rid_mapping_list.append(row[mapping_rid])
|
|
|
|
|
|
|
|
|
|
selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)]
|
|
|
|
|
# if len(lr_attrs_map) > 0:
|
|
|
|
|
# selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段
|
|
|
|
|
tables_id = rtable_id
|
|
|
|
|
selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)]
|
|
|
|
|
selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名
|
|
|
|
|
attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs]
|
|
|
|
|
attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs]
|
|
|
|
|
cm.set_key(selected_ltable, tables_id)
|
|
|
|
|
cm.set_key(selected_rtable, tables_id)
|
|
|
|
|
|
|
|
|
|
if config["ml_blocker"] == "over_lap":
|
|
|
|
|
blocker = em.OverlapBlocker()
|
|
|
|
|
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"],
|
|
|
|
|
config["block_attr"], allow_missing=True,
|
|
|
|
|
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
|
|
|
|
|
overlap_size=1, show_progress=False)
|
|
|
|
|
elif config["ml_blocker"] == "attr_equiv":
|
|
|
|
|
blocker = em.AttrEquivalenceBlocker()
|
|
|
|
|
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"],
|
|
|
|
|
config["block_attr"], allow_missing=True,
|
|
|
|
|
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs)
|
|
|
|
|
|
|
|
|
|
candidate['gold'] = 0
|
|
|
|
|
candidate = candidate.reset_index(drop=True)
|
|
|
|
|
candidate_match_rows = []
|
|
|
|
|
for row in candidate.itertuples():
|
|
|
|
|
l_id = getattr(row, 'ltable_' + tables_id)
|
|
|
|
|
map_row = mappings[mappings[mapping_lid] == l_id]
|
|
|
|
|
|
|
|
|
|
if map_row is not None:
|
|
|
|
|
r_id = map_row[mapping_rid]
|
|
|
|
|
for value in r_id:
|
|
|
|
|
if value == getattr(row, 'rtable_' + tables_id):
|
|
|
|
|
candidate_match_rows.append(row[0])
|
|
|
|
|
else:
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
for _ in candidate_match_rows:
|
|
|
|
|
candidate.loc[_, 'gold'] = 1
|
|
|
|
|
|
|
|
|
|
candidate.fillna(value="", inplace=True)
|
|
|
|
|
|
|
|
|
|
# 裁剪负样本,保持正负样本数量一致
|
|
|
|
|
candidate_mismatch = candidate[candidate['gold'] == 0]
|
|
|
|
|
candidate_match = candidate[candidate['gold'] == 1]
|
|
|
|
|
if len(candidate_mismatch) > len(candidate_match):
|
|
|
|
|
candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match))
|
|
|
|
|
# 拼接正负样本
|
|
|
|
|
candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match])
|
|
|
|
|
# if len(candidate_for_train_test) == 0:
|
|
|
|
|
# return 0
|
|
|
|
|
# 如果拼接后不重设索引可能导致索引重复
|
|
|
|
|
candidate_for_train_test = candidate_for_train_test.reset_index(drop=True)
|
|
|
|
|
cm.set_key(candidate_for_train_test, '_id')
|
|
|
|
|
cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id)
|
|
|
|
|
cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id)
|
|
|
|
|
cm.set_ltable(candidate_for_train_test, selected_ltable)
|
|
|
|
|
cm.set_rtable(candidate_for_train_test, selected_rtable)
|
|
|
|
|
|
|
|
|
|
# 分为训练测试集
|
|
|
|
|
train_proportion = 0.7
|
|
|
|
|
sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0)
|
|
|
|
|
train_set = sets['train']
|
|
|
|
|
test_set = sets['test']
|
|
|
|
|
|
|
|
|
|
# cm.set_key(train_set, '_id')
|
|
|
|
|
# cm.set_fk_ltable(train_set, 'ltable_' + tables_id)
|
|
|
|
|
# cm.set_fk_rtable(train_set, 'rtable_' + tables_id)
|
|
|
|
|
# cm.set_ltable(train_set, selected_ltable)
|
|
|
|
|
# cm.set_rtable(train_set, selected_rtable)
|
|
|
|
|
#
|
|
|
|
|
# cm.set_key(test_set, '_id')
|
|
|
|
|
# cm.set_fk_ltable(test_set, 'ltable_' + tables_id)
|
|
|
|
|
# cm.set_fk_rtable(test_set, 'rtable_' + tables_id)
|
|
|
|
|
# cm.set_ltable(test_set, selected_ltable)
|
|
|
|
|
# cm.set_rtable(test_set, selected_rtable)
|
|
|
|
|
|
|
|
|
|
ml_matcher = config["ml_matcher"]
|
|
|
|
|
if ml_matcher == "dt":
|
|
|
|
|
matcher = em.DTMatcher(name='DecisionTree', random_state=0)
|
|
|
|
|
elif ml_matcher == "svm":
|
|
|
|
|
matcher = em.SVMMatcher(name='SVM', random_state=0)
|
|
|
|
|
elif ml_matcher == "rf":
|
|
|
|
|
matcher = em.RFMatcher(name='RF', random_state=0)
|
|
|
|
|
elif ml_matcher == "lg":
|
|
|
|
|
matcher = em.LogRegMatcher(name='LogReg', random_state=0)
|
|
|
|
|
elif ml_matcher == "ln":
|
|
|
|
|
matcher = em.LinRegMatcher(name='LinReg')
|
|
|
|
|
elif ml_matcher == "nb":
|
|
|
|
|
matcher = em.NBMatcher(name='NaiveBayes')
|
|
|
|
|
|
|
|
|
|
feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False)
|
|
|
|
|
|
|
|
|
|
train_feature_vecs = em.extract_feature_vecs(train_set,
|
|
|
|
|
feature_table=feature_table,
|
|
|
|
|
attrs_after=['gold'],
|
|
|
|
|
show_progress=False)
|
|
|
|
|
train_feature_vecs.fillna(value=0, inplace=True)
|
|
|
|
|
|
|
|
|
|
test_feature_after = attrs_with_l_prefix[:]
|
|
|
|
|
test_feature_after.extend(attrs_with_r_prefix)
|
|
|
|
|
for _ in test_feature_after:
|
|
|
|
|
if _.endswith(tables_id):
|
|
|
|
|
test_feature_after.remove(_)
|
|
|
|
|
test_feature_after.append('gold')
|
|
|
|
|
test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table,
|
|
|
|
|
attrs_after=test_feature_after, show_progress=False)
|
|
|
|
|
test_feature_vecs.fillna(value=0, inplace=True)
|
|
|
|
|
|
|
|
|
|
fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold']
|
|
|
|
|
matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold')
|
|
|
|
|
test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id])
|
|
|
|
|
predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after,
|
|
|
|
|
append=True, target_attr='predicted', inplace=False)
|
|
|
|
|
eval_result = em.eval_matches(predictions, 'gold', 'predicted')
|
|
|
|
|
em.print_eval_summary(eval_result)
|
|
|
|
|
indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, candidate_for_train_test)
|
|
|
|
|
|
|
|
|
|
# 计算可解释性
|
|
|
|
|
################################################################################################################
|
|
|
|
|
predictions_attrs = []
|
|
|
|
|
predictions_attrs.extend(attrs_with_l_prefix)
|
|
|
|
|
predictions_attrs.extend(attrs_with_r_prefix)
|
|
|
|
|
predictions_attrs.extend(['gold', 'predicted'])
|
|
|
|
|
predictions = predictions[predictions_attrs]
|
|
|
|
|
train_attrs = predictions_attrs[:]
|
|
|
|
|
train_attrs.remove('predicted')
|
|
|
|
|
train_set = train_set[train_attrs]
|
|
|
|
|
prepare_file_for_md_discovery(train_set)
|
|
|
|
|
predictions = predictions.reset_index(drop=True)
|
|
|
|
|
predictions = predictions.astype(str)
|
|
|
|
|
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions)
|
|
|
|
|
predictions['confidence'] = 0
|
|
|
|
|
|
|
|
|
|
md_discover(config, er_output_dir + "t_single_tuple.csv", md_output_dir + "mds.txt")
|
|
|
|
|
md_paths = [md_output_dir + 'mds.txt']
|
|
|
|
|
md_list = load_mds(md_paths) # 从全局变量中读取所有的md
|
|
|
|
|
epl_match = 0 # 可解释,预测match
|
|
|
|
|
|
|
|
|
|
unexplainable = pd.DataFrame()
|
|
|
|
|
|
|
|
|
|
if len(md_list) > 0:
|
|
|
|
|
for row in tqdm(predictions.itertuples()):
|
|
|
|
|
x = is_explicable(row, md_list, sim_tensor_dict)
|
|
|
|
|
if x > 0 and str(getattr(row, 'predicted')) == str(1):
|
|
|
|
|
predictions.loc[row[0], 'confidence'] = x
|
|
|
|
|
epl_match += 1
|
|
|
|
|
# else:
|
|
|
|
|
# series = pd.Series(row)
|
|
|
|
|
# unexplainable = unexplainable._append(series, ignore_index=True)
|
|
|
|
|
# unexplainable.drop(columns=unexplainable.columns[[-1, 0]], inplace=True)
|
|
|
|
|
# unexplainable.columns = predictions_attrs
|
|
|
|
|
# unexplainable = unexplainable[train_attrs]
|
|
|
|
|
# if len(unexplainable[unexplainable['gold'] == str(1)]) > 0:
|
|
|
|
|
# prepare_file_for_md_discovery(unexplainable, t_single_tuple_path=er_output_dir + 'unexplainable_tst.csv')
|
|
|
|
|
# md_discover(config, er_output_dir + 'unexplainable_tst.csv', md_output_dir + "from_unexplainable.txt")
|
|
|
|
|
|
|
|
|
|
df = predictions[predictions['predicted'] == str(1)]
|
|
|
|
|
interpretability = epl_match / len(df) # 可解释性
|
|
|
|
|
indicators['interpretability'] = interpretability
|
|
|
|
|
if indicators["block_recall"] < indicators["recall"]:
|
|
|
|
|
f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (
|
|
|
|
|
indicators["precision"] + indicators["block_recall"])
|
|
|
|
|
else:
|
|
|
|
|
f1 = indicators["F1"]
|
|
|
|
|
performance = interpre_weight * interpretability + (1 - interpre_weight) * f1
|
|
|
|
|
indicators['performance'] = performance
|
|
|
|
|
indicators['eval_result'] = eval_result
|
|
|
|
|
print(indicators)
|
|
|
|
|
predictions.to_csv(er_output_dir + 'predictions.csv', sep=',', index=False, header=True)
|
|
|
|
|
################################################################################################################
|
|
|
|
|
print(f'\033[33mTime consumed by ML-ER in seconds: {time.time() - start}\033[0m')
|
|
|
|
|
return indicators
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ml_er(config: Configuration = None):
|
|
|
|
|
indicators = er_process(config)
|
|
|
|
|
output_path = er_output_dir + "eval_result.txt"
|
|
|
|
|
with open(output_path, 'w') as f:
|
|
|
|
|
for key, value in six.iteritems(_get_metric(indicators['eval_result'])):
|
|
|
|
|
f.write(key + " : " + value)
|
|
|
|
|
f.write('\n')
|
|
|
|
|
f.write('block_recall:' + str(indicators["block_recall"]) + '\n')
|
|
|
|
|
f.write('interpretability:' + str(indicators['interpretability']) + '\n')
|
|
|
|
|
f.write('performance:' + str(indicators['performance']) + '\n')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
if os.path.isfile(hpo_output_dir + "incumbent.json"):
|
|
|
|
|
with open(hpo_output_dir + "configspace.json", 'r') as f:
|
|
|
|
|
dict_configspace = json.load(f)
|
|
|
|
|
str_configspace = json.dumps(dict_configspace)
|
|
|
|
|
configspace = csj.read(str_configspace)
|
|
|
|
|
with open(hpo_output_dir + "incumbent.json", 'r') as f:
|
|
|
|
|
dic = json.load(f)
|
|
|
|
|
configuration = ConfigSpace.Configuration(configspace, values=dic)
|
|
|
|
|
ml_er(configuration)
|