You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
matching_dependency/ml_er/ml_entity_resolver.py

338 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import os
import sys
import ConfigSpace
import pandas
import torch
from py_entitymatching.debugmatcher.debug_gui_utils import _get_metric
from ConfigSpace.read_and_write import json as csj
import py_entitymatching as em
import py_entitymatching.catalog.catalog_manager as cm
import pandas as pd
import six
from ConfigSpace import Configuration
from settings import *
def process_prediction_for_md_discovery(pred: pd.DataFrame,
t_single_tuple_path: str = er_output_dir + "t_single_tuple.csv"):
# 提取预测表中真阳和假阴部分
tp = pred[(pred['gold'] == 1) & (pred['predicted'] == 1)]
fn = pred[(pred['gold'] == 1) & (pred['predicted'] == 0)]
# 拼成一张表
df = pd.concat([tp, fn])
# 将真阳/假阴表中左右ID调整一致
for index, row in df.iterrows():
df.loc[index, "rtable_" + rtable_id] = row["ltable_" + rtable_id]
pred_columns = pred.columns.values.tolist()
l_columns = []
r_columns = []
cols = []
# 将预测表中左表和右表字段名分别加入两个列表
for _ in pred_columns:
if _.startswith('ltable'):
l_columns.append(_)
elif _.startswith('rtable'):
r_columns.append(_)
# 将左表中字段名去掉前缀,作为统一的字段名列表(前提是两张表内对应字段名调整一致)
for _ in l_columns:
cols.append(_.replace('ltable_', ''))
ldf = df[l_columns]
rdf = df[r_columns]
ldf.columns = cols
rdf.columns = cols
t_single_tuple = pd.concat([ldf, rdf])
t_single_tuple.to_csv(t_single_tuple_path, sep=',', index=False, header=True, quoting=1)
def evaluate_prediction(df: pd.DataFrame, labeled_attr: str, predicted_attr: str, matching_number: int,
test_proportion: float) -> dict:
new_df = df.reset_index(drop=False, inplace=False)
gold = new_df[labeled_attr]
predicted = new_df[predicted_attr]
gold_negative = gold[gold == 0].index.values
gold_positive = gold[gold == 1].index.values
predicted_negative = predicted[predicted == 0].index.values
predicted_positive = predicted[predicted == 1].index.values
false_positive_indices = list(set(gold_negative).intersection(predicted_positive))
true_positive_indices = list(set(gold_positive).intersection(predicted_positive))
false_negative_indices = list(set(gold_positive).intersection(predicted_negative))
num_true_positives = float(len(true_positive_indices))
num_false_positives = float(len(false_positive_indices))
num_false_negatives = float(len(false_negative_indices))
precision_denominator = num_true_positives + num_false_positives
recall_denominator = num_true_positives + num_false_negatives
precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator
recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator
F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall)
block_recall = num_true_positives / (matching_number * test_proportion)
return {"precision": precision, "recall": recall, "F1": F1, "block_recall": block_recall}
def load_mds(paths: list) -> list:
if len(paths) == 0:
return []
all_mds = []
# 传入md路径列表
for md_path in paths:
if not os.path.exists(md_path):
continue
mds = []
# 打开每一个md文件
with open(md_path, 'r') as f:
# 读取每一行的md加入该文件的md列表
for line in f.readlines():
md_metadata = line.strip().split('\t')
# todo 如果MD文件的形式改了 这里也要改
md = eval(md_metadata[1])
mds.append(md)
all_mds.extend(mds)
return all_mds
def is_explicable(row, all_mds: list, st_dict) -> bool:
attrs = all_mds[0].keys() # 从第一条md中读取所有字段
for md in all_mds:
explicable = True # 假设这条md能解释当前元组
for a in attrs:
if a != target_attr:
if st_dict[a][row[0]].item() < md[a]:
explicable = False # 任意一个字段的相似度达不到阈值这条md就不能解释当前元组
break # 不再与当前md的其他相似度阈值比较跳转到下一条md
if explicable:
return True # 任意一条md能解释直接返回
return False # 遍历结束,不能解释
def build_col_pairs_sim_tensor_dict(predictions: pandas.DataFrame):
predictions_attrs = predictions.columns.values.tolist()
col_tuple_list = []
for _ in predictions_attrs:
if _.startswith('ltable'):
left_index = predictions_attrs.index(_)
right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_'))
col_tuple_list.append((left_index, right_index))
length = predictions.shape[0]
width = predictions.shape[1]
sentences = []
for col in range(0, width):
for row in range(0, length):
cell_value = predictions.values[row, col]
sentences.append(cell_value)
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda")
split_embedding = torch.split(embedding, length, dim=0)
table_tensor = torch.stack(split_embedding, dim=0, out=None)
# prediction的归一化嵌入张量
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
sim_tensor_dict = {}
for col_tuple in col_tuple_list:
lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1)
sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor
return sim_tensor_dict
def ml_er(iter_round: int, config: Configuration = None, ):
ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1')
cm.set_key(ltable, ltable_id)
# ltable.fillna("", inplace=True)
rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1')
cm.set_key(rtable, rtable_id)
# rtable.fillna("", inplace=True)
mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1')
# 仅保留两表中出现在映射表中的行,增大正样本比例
lid_mapping_list = []
rid_mapping_list = []
# 全部转为字符串
ltable = ltable.astype(str)
rtable = rtable.astype(str)
mappings = mappings.astype(str)
matching_number = len(mappings) # 所有阳性样本数
for index, row in mappings.iterrows():
lid_mapping_list.append(row[mapping_lid])
rid_mapping_list.append(row[mapping_rid])
selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)]
if len(lr_attrs_map) > 0:
selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段
tables_id = rtable_id
selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)]
selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名
items_but_id = selected_attrs[:]
items_but_id.remove(tables_id) # 两张表中除了id的字段名
attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs]
attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs]
cm.set_key(selected_ltable, tables_id)
cm.set_key(selected_rtable, tables_id)
if config is not None:
ml_matcher = config["ml_matcher"]
if ml_matcher == "dt":
matcher = em.DTMatcher(name='DecisionTree', random_state=0)
elif ml_matcher == "svm":
matcher = em.SVMMatcher(name='SVM', random_state=0)
elif ml_matcher == "rf":
matcher = em.RFMatcher(name='RF', random_state=0)
elif ml_matcher == "lg":
matcher = em.LogRegMatcher(name='LogReg', random_state=0)
elif ml_matcher == "ln":
matcher = em.LinRegMatcher(name='LinReg')
elif ml_matcher == "nb":
matcher = em.NBMatcher(name='NaiveBayes')
if config["ml_blocker"] == "over_lap":
blocker = em.OverlapBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"],
config["block_attr"], allow_missing=True,
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
overlap_size=config["overlap_size"], show_progress=False)
elif config["ml_blocker"] == "attr_equiv":
blocker = em.AttrEquivalenceBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"],
config["block_attr"], allow_missing=True,
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs)
else:
matcher = em.SVMMatcher(name='SVM', random_state=0)
blocker = em.OverlapBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, selected_attrs[-1], selected_attrs[-1],
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
overlap_size=1, show_progress=False, allow_missing=True)
candidate['gold'] = 0
candidate_match_rows = []
for index, row in candidate.iterrows():
l_id = row['ltable_' + tables_id]
map_row = mappings[mappings[mapping_lid] == l_id]
if map_row is not None:
r_id = map_row[mapping_rid]
for value in r_id:
if value == row['rtable_' + tables_id]:
candidate_match_rows.append(row["_id"])
else:
continue
for row in candidate_match_rows:
candidate.loc[row, 'gold'] = 1
candidate.fillna("", inplace=True)
# 裁剪负样本,保持正负样本数量一致
candidate_mismatch = candidate[candidate['gold'] == 0]
candidate_match = candidate[candidate['gold'] == 1]
if len(candidate_mismatch) > len(candidate_match):
candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match))
# 拼接正负样本
candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match])
cm.set_key(candidate_for_train_test, '_id')
cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id)
cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id)
cm.set_ltable(candidate_for_train_test, selected_ltable)
cm.set_rtable(candidate_for_train_test, selected_rtable)
# 分为训练测试集
train_proportion = 0.7
test_proportion = 0.3
sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0)
train_set = sets['train']
test_set = sets['test']
feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False)
train_feature_vecs = em.extract_feature_vecs(train_set,
feature_table=feature_table,
attrs_after=['gold'],
show_progress=False)
test_feature_after = attrs_with_l_prefix[:]
test_feature_after.extend(attrs_with_r_prefix)
for _ in test_feature_after:
if _.endswith(tables_id):
test_feature_after.remove(_)
test_feature_after.append('gold')
test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table,
attrs_after=test_feature_after, show_progress=False)
fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold']
matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold')
test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id])
predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after,
append=True, target_attr='predicted', inplace=False)
eval_result = em.eval_matches(predictions, 'gold', 'predicted')
em.print_eval_summary(eval_result)
indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, test_proportion)
print(indicators)
# 计算可解释性
################################################################################################################
predictions_attrs = []
predictions_attrs.extend(attrs_with_l_prefix)
predictions_attrs.extend(attrs_with_r_prefix)
predictions_attrs.extend(['gold', 'predicted'])
predictions = predictions[predictions_attrs]
process_prediction_for_md_discovery(predictions)
predictions = predictions.reset_index(drop=True)
predictions = predictions.astype(str)
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions)
md_paths = [md_output_dir + 'mds.txt', md_output_dir + 'vio.txt']
md_list = load_mds(md_paths) # 从全局变量中读取所有的md
epl_match = 0 # 可解释预测match
if len(md_list) > 0:
for row in predictions.itertuples():
if is_explicable(row, md_list, sim_tensor_dict) and str(getattr(row, 'predicted')) == str(1):
epl_match += 1
df = predictions[predictions['predicted'] == str(1)]
interpretability = epl_match / len(df) # 可解释性
if (indicators["block_recall"] < 0.8) and (indicators["block_recall"] < indicators["recall"]):
f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (
indicators["precision"] + indicators["block_recall"])
else:
f1 = indicators["F1"]
performance = interpre_weight * interpretability + (1 - interpre_weight) * f1
################################################################################################################
output_path = er_output_dir + "eval_result_" + str(iter_round) + ".txt"
with open(output_path, 'w') as f:
for key, value in six.iteritems(_get_metric(eval_result)):
f.write(key + " : " + value)
f.write('\n')
f.write('block_recall:' + str(indicators["block_recall"]) + '\n')
f.write('interpretability:' + str(interpretability) + '\n')
f.write('performance:' + str(performance) + '\n')
if __name__ == '__main__':
iterations = 1
filename_list = os.listdir(er_output_dir)
if len(filename_list) > 0:
for _ in filename_list:
if _.startswith('eval_result'):
iterations = int(_[12:13]) + 1
if iterations > 1:
incumbent_array = np.load(hpo_output_dir + 'incumbent.npy')
with open(hpo_output_dir + "configspace.json", 'r') as f:
dict_configspace = json.load(f)
str_configspace = json.dumps(dict_configspace)
configspace = csj.read(str_configspace)
configuration = ConfigSpace.Configuration(configspace, vector=incumbent_array)
ml_er(iterations, configuration)
else:
ml_er(1)