You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
matching_dependency/ml_er/ml_entity_resolver.py

317 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import sys
from py_entitymatching.debugmatcher.debug_gui_utils import _get_metric
import py_entitymatching as em
import py_entitymatching.catalog.catalog_manager as cm
import pandas as pd
import six
from ConfigSpace import Configuration
from md_discovery.multi_process_infer_by_pairs import my_Levenshtein_ratio, norm_cos_sim
from settings import *
def process_prediction_for_md_discovery(pred: pd.DataFrame,
tp_single_tuple_path: str = er_output_dir + "tp_single_tuple.csv",
fn_single_tuple_path: str = er_output_dir + "fn_single_tuple.csv"):
# 提取预测表中真阳和假阴部分
tp = pred[(pred['gold'] == 1) & (pred['predicted'] == 1)]
fn = pred[(pred['gold'] == 1) & (pred['predicted'] == 0)]
# 将真阳/假阴表中左右ID调整一致
for index, row in tp.iterrows():
tp.loc[index, "rtable_" + rtable_id] = row["ltable_" + rtable_id]
for index, row in fn.iterrows():
fn.loc[index, "rtable_" + rtable_id] = row["ltable_" + rtable_id]
pred_columns = pred.columns.values.tolist()
l_columns = []
r_columns = []
columns = []
# 将预测表中左表和右表字段名分别加入两个列表
for _ in pred_columns:
if _.startswith('ltable'):
l_columns.append(_)
elif _.startswith('rtable'):
r_columns.append(_)
# 将左表中字段名去掉前缀,作为统一的字段名列表(前提是两张表内对应字段名调整一致)
for _ in l_columns:
columns.append(_.replace('ltable_', ''))
# 将表拆分成左右两部分
tpl = tp[l_columns]
tpr = tp[r_columns]
# 将左右两部分字段名统一
tpl.columns = columns
tpr.columns = columns
fnl = fn[l_columns]
fnr = fn[r_columns]
fnl.columns = columns
fnr.columns = columns
tp_single_tuple = pd.concat([tpl, tpr])
fn_single_tuple = pd.concat([fnl, fnr])
tp_single_tuple.to_csv(tp_single_tuple_path, sep=',', index=False, header=True)
fn_single_tuple.to_csv(fn_single_tuple_path, sep=',', index=False, header=True)
def evaluate_prediction(df: pd.DataFrame, labeled_attr: str, predicted_attr: str, matching_number: int,
test_proportion: float) -> dict:
new_df = df.reset_index(drop=False, inplace=False)
gold = new_df[labeled_attr]
predicted = new_df[predicted_attr]
gold_negative = gold[gold == 0].index.values
gold_positive = gold[gold == 1].index.values
predicted_negative = predicted[predicted == 0].index.values
predicted_positive = predicted[predicted == 1].index.values
false_positive_indices = list(set(gold_negative).intersection(predicted_positive))
true_positive_indices = list(set(gold_positive).intersection(predicted_positive))
false_negative_indices = list(set(gold_positive).intersection(predicted_negative))
num_true_positives = float(len(true_positive_indices))
num_false_positives = float(len(false_positive_indices))
num_false_negatives = float(len(false_negative_indices))
precision_denominator = num_true_positives + num_false_positives
recall_denominator = num_true_positives + num_false_negatives
precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator
recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator
F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall)
block_recall = num_true_positives / (matching_number * test_proportion)
return {"precision": precision, "recall": recall, "F1": F1, "block_recall": block_recall}
def load_mds(paths: list) -> list:
if len(paths) == 0:
return []
all_mds = []
# 传入md路径列表
for md_path in paths:
if not os.path.exists(md_path):
continue
mds = []
# 打开每一个md文件
with open(md_path, 'r') as f:
# 读取每一行的md加入该文件的md列表
for line in f.readlines():
md_metadata = line.strip().split('\t')
md = eval(md_metadata[0].replace('md:', ''))
confidence = eval(md_metadata[2].replace('confidence:', ''))
if confidence > 0:
mds.append(md)
all_mds.extend(mds)
return all_mds
def is_explicable(row, all_mds: list) -> bool:
attrs = all_mds[0].keys() # 从第一条md中读取所有字段
for md in all_mds:
explicable = True # 假设这条md能解释当前元组
for a in attrs:
threshold = md[a]
if norm_cos_sim(embedding_dict[str(getattr(row, 'ltable_'+a))],
embedding_dict[str(getattr(row, 'rtable_'+a))]) < threshold:
explicable = False # 任意一个字段的相似度达不到阈值这条md就不能解释当前元组
break # 不再与当前md的其他相似度阈值比较跳转到下一条md
if explicable:
return True # 任意一条md能解释直接返回
return False # 遍历结束,不能解释
def load_data(left_path: str, right_path: str, mapping_path: str):
left = pd.read_csv(left_path, encoding='ISO-8859-1')
cm.set_key(left, left.columns.values.tolist()[0])
left.fillna("", inplace=True)
left = left.astype(str)
right = pd.read_csv(right_path, encoding='ISO-8859-1')
cm.set_key(right, right.columns.values.tolist()[0])
right.fillna("", inplace=True)
right = right.astype(str)
mapping = pd.read_csv(mapping_path)
mapping = mapping.astype(str)
return left, right, mapping
def ml_er(iter_round: int, config: Configuration = None, ):
# todo:
# if config is not None -> load configs
# else -> use default configs
ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1')
cm.set_key(ltable, ltable_id)
ltable.fillna("", inplace=True)
rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1')
cm.set_key(rtable, rtable_id)
rtable.fillna("", inplace=True)
mappings = pd.read_csv(mapping_path)
# 仅保留两表中出现在映射表中的行,增大正样本比例
lid_mapping_list = []
rid_mapping_list = []
# 全部转为字符串
ltable = ltable.astype(str)
rtable = rtable.astype(str)
mappings = mappings.astype(str)
matching_number = len(mappings) # 所有阳性样本数商品数据集应为1300
for index, row in mappings.iterrows():
lid_mapping_list.append(row[mapping_lid])
rid_mapping_list.append(row[mapping_rid])
selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)]
selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段
tables_id = rtable_id
selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)]
selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名
items_but_id = selected_attrs[:]
items_but_id.remove(tables_id) # 两张表中除了id的字段名
attrs_with_l_prefix = ['ltable_'+i for i in selected_attrs]
attrs_with_r_prefix = ['rtable_'+i for i in selected_attrs]
cm.set_key(selected_ltable, tables_id)
cm.set_key(selected_rtable, tables_id)
if config is not None:
ml_matcher = config["ml_matcher"]
if ml_matcher == "dt":
matcher = em.DTMatcher(name='DecisionTree', random_state=0)
elif ml_matcher == "svm":
matcher = em.SVMMatcher(name='SVM', random_state=0)
elif ml_matcher == "rf":
matcher = em.RFMatcher(name='RF', random_state=0)
elif ml_matcher == "lg":
matcher = em.LogRegMatcher(name='LogReg', random_state=0)
elif ml_matcher == "ln":
matcher = em.LinRegMatcher(name='LinReg')
elif ml_matcher == "nb":
matcher = em.NBMatcher(name='NaiveBayes')
if config["ml_blocker"] == "over_lap":
blocker = em.OverlapBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"],
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
overlap_size=config["overlap_size"], show_progress=False)
elif config["ml_blocker"] == "attr_equiv":
blocker = em.AttrEquivalenceBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"],
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs)
else:
matcher = em.RFMatcher(name='RF', random_state=0)
blocker = em.OverlapBlocker()
candidate = blocker.block_tables(selected_ltable, selected_rtable, items_but_id[0], items_but_id[0],
l_output_attrs=selected_attrs, r_output_attrs=selected_attrs,
overlap_size=1, show_progress=False)
candidate['gold'] = 0
candidate_match_rows = []
for index, row in candidate.iterrows():
l_id = row['ltable_' + tables_id]
map_row = mappings[mappings[mapping_lid] == l_id]
if map_row is not None:
r_id = map_row[mapping_rid]
for value in r_id:
if value == row['rtable_' + tables_id]:
candidate_match_rows.append(row["_id"])
else:
continue
for row in candidate_match_rows:
candidate.loc[row, 'gold'] = 1
# 裁剪负样本,保持正负样本数量一致
candidate_mismatch = candidate[candidate['gold'] == 0]
candidate_match = candidate[candidate['gold'] == 1]
if len(candidate_mismatch) > len(candidate_match):
candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match))
# 拼接正负样本
candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match])
cm.set_key(candidate_for_train_test, '_id')
cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id)
cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id)
cm.set_ltable(candidate_for_train_test, selected_ltable)
cm.set_rtable(candidate_for_train_test, selected_rtable)
# 分为训练测试集
train_proportion = 0.7
test_proportion = 0.3
sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0)
train_set = sets['train']
test_set = sets['test']
feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False)
train_feature_vecs = em.extract_feature_vecs(train_set,
feature_table=feature_table,
attrs_after=['gold'],
show_progress=False)
test_feature_after = attrs_with_l_prefix[:]
test_feature_after.extend(attrs_with_r_prefix)
for _ in test_feature_after:
if _.endswith(tables_id):
test_feature_after.remove(_)
test_feature_after.append('gold')
test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table,
attrs_after=test_feature_after, show_progress=False)
fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold']
matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold')
test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id])
predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after,
append=True, target_attr='predicted', inplace=False)
eval_result = em.eval_matches(predictions, 'gold', 'predicted')
em.print_eval_summary(eval_result)
indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, test_proportion)
print(indicators)
# 计算可解释性
################################################################################################################
predictions_attrs = []
predictions_attrs.extend(attrs_with_l_prefix)
predictions_attrs.extend(attrs_with_r_prefix)
predictions_attrs.extend(['gold', 'predicted'])
predictions = predictions[predictions_attrs]
md_paths = [md_output_dir + 'tp_mds.txt', md_output_dir + 'tp_vio.txt',
md_output_dir + 'fn_mds.txt', md_output_dir + 'fn_vio.txt']
epl_match = 0 # 可解释预测match
nepl_mismatch = 0 # 不可解释预测mismatch
md_list = load_mds(md_paths) # 从全局变量中读取所有的md
if len(md_list) > 0:
for row in predictions.itertuples():
if is_explicable(row, md_list):
if getattr(row, 'predicted') == 1:
epl_match += 1
else:
if getattr(row, 'predicted') == 0:
nepl_mismatch += 1
interpretability = (epl_match + nepl_mismatch) / len(predictions) # 可解释性
if indicators["block_recall"] >= 0.8:
f1 = indicators["F1"]
else:
f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (indicators["precision"] + indicators["block_recall"])
performance = interpre_weight * interpretability + (1 - interpre_weight) * f1
################################################################################################################
process_prediction_for_md_discovery(predictions)
output_path = er_output_dir + "eval_result_" + str(iter_round) + ".txt"
with open(output_path, 'w') as f:
for key, value in six.iteritems(_get_metric(eval_result)):
f.write(key + " : " + value)
f.write('\n')
f.write('block_recall:' + str(indicators["block_recall"]) + '\n')
f.write('interpretability:' + str(interpretability) + '\n')
f.write('performance:' + str(performance) + '\n')
if __name__ == '__main__':
ml_er(1)