You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
matching_dependency/ml_er/magellan_er.py

205 lines
9.5 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import os
import pickle
import time
import ConfigSpace
import pandas as pd
import py_entitymatching as em
import torch
from ConfigSpace import Configuration
from ConfigSpace.read_and_write import json as csj
import py_entitymatching.catalog.catalog_manager as cm
from tqdm import tqdm
from colorama import Fore
from settings import *
def matching(config: Configuration):
print(Fore.BLUE + f'Config: {config}')
with open(md_output_dir + r"\mds.pickle", "rb") as file:
md_list = pickle.load(file)
train_set = pd.read_csv(directory_path + r'\train_whole.csv', encoding='ISO-8859-1')
test_set = pd.read_csv(directory_path + r'\test_whole.csv', encoding='ISO-8859-1')
ltable = pd.read_csv(directory_path + r'\tableA.csv', encoding='ISO-8859-1')
rtable = pd.read_csv(directory_path + r'\tableB.csv', encoding='ISO-8859-1')
ml_matcher = config["ml_matcher"]
match ml_matcher:
case "dt":
matcher = em.DTMatcher(name='DecisionTree', random_state=0, criterion=config['tree_criterion'],
max_depth=config['tree_max_depth'], splitter=config['dt_splitter'],
max_features=config['dt_max_features'])
case "svm":
matcher = em.SVMMatcher(name='SVM', random_state=0, kernel=config['svm_kernel'], degree=config['svm_degree'],
gamma=config['svm_gamma'], C=config['svm_C'], coef0=config['svm_constant'])
case "rf":
matcher = em.RFMatcher(name='RandomForest', random_state=0, criterion=config['tree_criterion'],
max_depth=config['tree_max_depth'], n_estimators=config['number_of_tree'],
max_features=config['rf_max_features'])
cm.set_key(train_set, '_id')
cm.set_fk_ltable(train_set, 'ltable_id')
cm.set_fk_rtable(train_set, 'rtable_id')
cm.set_ltable(train_set, ltable)
cm.set_rtable(train_set, rtable)
cm.set_key(ltable, 'id')
cm.set_key(rtable, 'id')
cm.set_key(test_set, '_id')
cm.set_fk_ltable(test_set, 'ltable_id')
cm.set_fk_rtable(test_set, 'rtable_id')
cm.set_ltable(test_set, ltable)
cm.set_rtable(test_set, rtable)
feature_table = em.get_features_for_matching(ltable, rtable, validate_inferred_attr_types=False)
train_feature_vecs = em.extract_feature_vecs(train_set,
feature_table=feature_table,
attrs_after=['label'],
show_progress=False)
train_feature_vecs.fillna(value=0, inplace=True)
test_feature_after = ['ltable_' + i for i in ltable.columns.values.tolist()]
for _ in test_feature_after[:]:
test_feature_after.append(_.replace('ltable_', 'rtable_'))
for _ in test_feature_after:
if _.endswith('id'):
test_feature_after.remove(_)
test_feature_after.append('label')
test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table,
attrs_after=test_feature_after, show_progress=False)
test_feature_vecs.fillna(value=0, inplace=True)
fit_exclude = ['_id', 'ltable_id', 'rtable_id', 'label']
matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='label')
test_feature_after.extend(['_id', 'ltable_id', 'rtable_id'])
predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after,
append=True, target_attr='predicted', inplace=False)
eval_result = em.eval_matches(predictions, 'label', 'predicted')
em.print_eval_summary(eval_result)
indicators = evaluate_prediction(predictions, 'label', 'predicted')
test_feature_after.remove('_id')
test_feature_after.append('predicted')
predictions = predictions[test_feature_after]
predictions = predictions.reset_index(drop=True)
predictions = predictions.astype(str)
# 目前predictions包含的属性左右表全部属性+gold+predicted
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions)
predictions['confidence'] = 0
predictions['md'] = ''
epl_match = 0 # 可解释预测match
if len(md_list) > 0:
for row in tqdm(predictions.itertuples()):
if str(getattr(row, 'predicted')) == str(1):
conf, md_dict = is_explicable(row, md_list, sim_tensor_dict)
if conf > 0:
predictions.loc[row[0], 'confidence'] = conf
predictions.loc[row[0], 'md'] = str(md_dict)
epl_match += 1
df = predictions[predictions['predicted'] == str(1)]
interpretability = epl_match / len(df) # 可解释性
indicators['interpretability'] = interpretability
performance = interpre_weight * interpretability + (1 - interpre_weight) * indicators["F1"]
indicators['performance'] = performance
print(Fore.BLUE + f'ER Indicators: {indicators}')
predictions.to_csv(er_output_dir + r'\predictions.csv', sep=',', index=False, header=True)
print(Fore.CYAN + f'Finish Time: {time.time()}')
return indicators
def evaluate_prediction(prediction_: pd.DataFrame, labeled_attr: str, predicted_attr: str) -> dict:
new_df = prediction_.reset_index(drop=False, inplace=False)
gold = new_df[labeled_attr]
predicted = new_df[predicted_attr]
gold_negative = gold[gold == 0].index.values
gold_positive = gold[gold == 1].index.values
predicted_negative = predicted[predicted == 0].index.values
predicted_positive = predicted[predicted == 1].index.values
false_positive_indices = list(set(gold_negative).intersection(predicted_positive))
true_positive_indices = list(set(gold_positive).intersection(predicted_positive))
false_negative_indices = list(set(gold_positive).intersection(predicted_negative))
num_true_positives = float(len(true_positive_indices))
num_false_positives = float(len(false_positive_indices))
num_false_negatives = float(len(false_negative_indices))
precision_denominator = num_true_positives + num_false_positives
recall_denominator = num_true_positives + num_false_negatives
precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator
recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator
F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall)
return {"precision": precision, "recall": recall, "F1": F1}
def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame):
predictions_attrs = predictions.columns.values.tolist()
col_tuple_list = []
for _ in predictions_attrs:
if _.startswith('ltable'):
left_index = predictions_attrs.index(_)
right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_'))
col_tuple_list.append((left_index, right_index))
length = predictions.shape[0]
# width = predictions.shape[1]
predictions = predictions.reset_index(drop=True)
sentences = predictions.values.flatten(order='F').tolist()
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True)
split_embedding = torch.split(embedding, length, dim=0)
table_tensor = torch.stack(split_embedding, dim=0, out=None)
# prediction的归一化嵌入张量
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
sim_tensor_dict = {}
for col_tuple in col_tuple_list:
lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1)
sim_tensor = torch.round(sim_tensor, decimals=2)
sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor
return sim_tensor_dict
def is_explicable(row, all_mds: list, st_dict):
attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段
for md_tuple in all_mds:
explicable = True # 假设这条md能解释当前元组
for a in attrs:
if st_dict[a][row[0]].item() < md_tuple[0][a]:
explicable = False # 任意一个字段的相似度达不到阈值这条md就不能解释当前元组
break # 不再与当前md的其他相似度阈值比较跳转到下一条md
if explicable:
return md_tuple[2], md_tuple[0] # 任意一条md能解释直接返回
return -1.0, {} # 遍历结束,不能解释
def ml_er(config: Configuration):
indicators = matching(config)
output_path = er_output_dir + r"\eval_result.txt"
with open(output_path, 'w') as _f:
_f.write('Precision:' + str(indicators["precision"]) + '\n')
_f.write('Recall:' + str(indicators["recall"]) + '\n')
_f.write('F1:' + str(indicators["F1"]) + '\n')
_f.write('interpretability:' + str(indicators['interpretability']) + '\n')
_f.write('performance:' + str(indicators['performance']) + '\n')
if __name__ == '__main__':
if os.path.isfile(hpo_output_dir + r"\incumbent.json"):
with open(hpo_output_dir + r"\configspace.json", 'r') as f:
dict_configspace = json.load(f)
str_configspace = json.dumps(dict_configspace)
configspace = csj.read(str_configspace)
with open(hpo_output_dir + r"\incumbent.json", 'r') as f:
dic = json.load(f)
configuration = ConfigSpace.Configuration(configspace, values=dic)
ml_er(configuration)