import json import os import sys import time import ConfigSpace import pandas import torch from py_entitymatching.debugmatcher.debug_gui_utils import _get_metric from ConfigSpace.read_and_write import json as csj import py_entitymatching as em import py_entitymatching.catalog.catalog_manager as cm import pandas as pd import six from ConfigSpace import Configuration from tqdm import tqdm from md_discovery.md_discover import md_discover from settings import * def prepare_file_for_md_discovery(train, t_single_tuple_path=er_output_dir + "t_single_tuple.csv"): df = train[train['gold'] == 1] # 元组对左右ID调整一致 for index, row in df.iterrows(): df.loc[index, "rtable_" + rtable_id] = row["ltable_" + rtable_id] train_columns = train.columns.values.tolist() l_columns = [] r_columns = [] cols = [] # 左表和右表字段名分别加入两个列表 for _ in train_columns: if _.startswith('ltable'): l_columns.append(_) elif _.startswith('rtable'): r_columns.append(_) # 将左表中字段名去掉前缀,作为统一的字段名列表(前提是两张表内对应字段名调整一致) for _ in l_columns: cols.append(_.replace('ltable_', '')) ldf = df[l_columns] rdf = df[r_columns] ldf.columns = cols rdf.columns = cols t_single_tuple = pd.concat([ldf, rdf]) t_single_tuple = t_single_tuple.reset_index(drop=True) t_single_tuple.to_csv(t_single_tuple_path, sep=',', index=False, header=True, quoting=1) def evaluate_prediction(df: pd.DataFrame, labeled_attr: str, predicted_attr: str, matching_number: int, candidate: pd.DataFrame) -> dict: new_df = df.reset_index(drop=False, inplace=False) gold = new_df[labeled_attr] predicted = new_df[predicted_attr] gold_negative = gold[gold == 0].index.values gold_positive = gold[gold == 1].index.values predicted_negative = predicted[predicted == 0].index.values predicted_positive = predicted[predicted == 1].index.values false_positive_indices = list(set(gold_negative).intersection(predicted_positive)) true_positive_indices = list(set(gold_positive).intersection(predicted_positive)) false_negative_indices = list(set(gold_positive).intersection(predicted_negative)) num_true_positives = float(len(true_positive_indices)) num_false_positives = float(len(false_positive_indices)) num_false_negatives = float(len(false_negative_indices)) precision_denominator = num_true_positives + num_false_positives recall_denominator = num_true_positives + num_false_negatives precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall) block_recall = len(candidate[candidate['gold'] == 1]) / matching_number return {"precision": precision, "recall": recall, "F1": F1, "block_recall": block_recall} def load_mds(paths: list) -> list: if len(paths) == 0: return [] all_mds = [] # 传入md路径列表 for md_path in paths: if not os.path.exists(md_path): continue mds = [] # 打开每一个md文件 with open(md_path, 'r') as f: # 读取每一行的md,加入该文件的md列表 for line in f.readlines(): md_metadata = line.strip().split('\t') # todo 如果MD文件的形式改了 这里也要改 md = eval(md_metadata[1]) mds.append(md) all_mds.extend(mds) return all_mds def is_explicable(row, all_mds: list, st_dict): attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段 for md_tuple in all_mds: explicable = True # 假设这条md能解释当前元组 for a in attrs: if a != target_attr: if st_dict[a][row[0]].item() < md_tuple[0][a]: explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组 break # 不再与当前md的其他相似度阈值比较,跳转到下一条md if explicable: return md_tuple[2] # 任意一条md能解释,直接返回 return -1.0 # 遍历结束,不能解释 # 形成一个字典,key为字段名称,value为一维张量,记录了预测表中这一字段每行的左右属性的相似度 def build_col_pairs_sim_tensor_dict(predictions: pandas.DataFrame): predictions_attrs = predictions.columns.values.tolist() col_tuple_list = [] for _ in predictions_attrs: if _.startswith('ltable'): left_index = predictions_attrs.index(_) right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_')) col_tuple_list.append((left_index, right_index)) length = predictions.shape[0] width = predictions.shape[1] sentences = [] for col in range(0, width): for row in range(0, length): cell_value = predictions.values[row, col] sentences.append(cell_value) embedding = model.encode(sentences, convert_to_tensor=True, device="cuda") split_embedding = torch.split(embedding, length, dim=0) table_tensor = torch.stack(split_embedding, dim=0, out=None) # prediction的归一化嵌入张量 norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2) sim_tensor_dict = {} for col_tuple in col_tuple_list: lattr_tensor = norm_table_tensor[col_tuple[0]] rattr_tensor = norm_table_tensor[col_tuple[1]] mul_tensor = lattr_tensor * rattr_tensor sim_tensor = torch.sum(mul_tensor, 1) sim_tensor = torch.round(sim_tensor, decimals=4) sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor return sim_tensor_dict def er_process(config: Configuration): start = time.time() ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') cm.set_key(ltable, ltable_id) # ltable.fillna("", inplace=True) rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1') cm.set_key(rtable, rtable_id) # rtable.fillna("", inplace=True) mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1') # 仅保留两表中出现在映射表中的行,增大正样本比例 lid_mapping_list = [] rid_mapping_list = [] # 全部转为字符串 # ltable = ltable.astype(str) # rtable = rtable.astype(str) # mappings = mappings.astype(str) matching_number = len(mappings) # 所有阳性样本数 for index, row in mappings.iterrows(): lid_mapping_list.append(row[mapping_lid]) rid_mapping_list.append(row[mapping_rid]) selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)] tables_id = rtable_id selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)] selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名 attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs] attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs] cm.set_key(selected_ltable, tables_id) cm.set_key(selected_rtable, tables_id) blocker = em.OverlapBlocker() candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], allow_missing=True, l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, overlap_size=1, show_progress=False) candidate['gold'] = 0 candidate = candidate.reset_index(drop=True) candidate_match_rows = [] for row in candidate.itertuples(): l_id = getattr(row, 'ltable_' + tables_id) map_row = mappings[mappings[mapping_lid] == l_id] if map_row is not None: r_id = map_row[mapping_rid] for value in r_id: if value == getattr(row, 'rtable_' + tables_id): candidate_match_rows.append(row[0]) else: continue for _ in candidate_match_rows: candidate.loc[_, 'gold'] = 1 candidate.fillna(value="", inplace=True) # 裁剪负样本,保持正负样本数量一致 candidate_mismatch = candidate[candidate['gold'] == 0] candidate_match = candidate[candidate['gold'] == 1] if len(candidate_mismatch) > len(candidate_match): candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) # 拼接正负样本 candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) # if len(candidate_for_train_test) == 0: # return 0 # 如果拼接后不重设索引可能导致索引重复 candidate_for_train_test = candidate_for_train_test.reset_index(drop=True) cm.set_key(candidate_for_train_test, '_id') cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id) cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id) cm.set_ltable(candidate_for_train_test, selected_ltable) cm.set_rtable(candidate_for_train_test, selected_rtable) block_recall = len(candidate_match) / matching_number # 分为训练测试集 train_proportion = 0.7 sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0) train_set = sets['train'] test_set = sets['test'] # cm.set_key(train_set, '_id') # cm.set_fk_ltable(train_set, 'ltable_' + tables_id) # cm.set_fk_rtable(train_set, 'rtable_' + tables_id) # cm.set_ltable(train_set, selected_ltable) # cm.set_rtable(train_set, selected_rtable) # # cm.set_key(test_set, '_id') # cm.set_fk_ltable(test_set, 'ltable_' + tables_id) # cm.set_fk_rtable(test_set, 'rtable_' + tables_id) # cm.set_ltable(test_set, selected_ltable) # cm.set_rtable(test_set, selected_rtable) ml_matcher = config["ml_matcher"] if ml_matcher == "dt": matcher = em.DTMatcher(name='DecisionTree', random_state=0) elif ml_matcher == "svm": matcher = em.SVMMatcher(name='SVM', random_state=0) elif ml_matcher == "rf": matcher = em.RFMatcher(name='RF', random_state=0) elif ml_matcher == "lg": matcher = em.LogRegMatcher(name='LogReg', random_state=0) elif ml_matcher == "ln": matcher = em.LinRegMatcher(name='LinReg') elif ml_matcher == "nb": matcher = em.NBMatcher(name='NaiveBayes') feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False) train_feature_vecs = em.extract_feature_vecs(train_set, feature_table=feature_table, attrs_after=['gold'], show_progress=False) train_feature_vecs.fillna(value=0, inplace=True) test_feature_after = attrs_with_l_prefix[:] test_feature_after.extend(attrs_with_r_prefix) for _ in test_feature_after: if _.endswith(tables_id): test_feature_after.remove(_) test_feature_after.append('gold') test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, attrs_after=test_feature_after, show_progress=False) test_feature_vecs.fillna(value=0, inplace=True) fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold'] matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold') test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id]) predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after, append=True, target_attr='predicted', inplace=False) eval_result = em.eval_matches(predictions, 'gold', 'predicted') em.print_eval_summary(eval_result) indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, candidate_for_train_test) # 计算可解释性 ################################################################################################################ predictions_attrs = [] predictions_attrs.extend(attrs_with_l_prefix) predictions_attrs.extend(attrs_with_r_prefix) predictions_attrs.extend(['gold', 'predicted']) predictions = predictions[predictions_attrs] # 必须从训练集内挖MD train_attrs = predictions_attrs[:] train_attrs.remove('predicted') train_set = train_set[train_attrs] prepare_file_for_md_discovery(train_set) predictions = predictions.reset_index(drop=True) predictions = predictions.astype(str) sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) predictions['confidence'] = 0 md_discover(config, er_output_dir + "t_single_tuple.csv", md_output_dir + "mds.txt") md_paths = [md_output_dir + 'mds.txt'] md_list = load_mds(md_paths) # 从全局变量中读取所有的md epl_match = 0 # 可解释,预测match unexplainable = pd.DataFrame() if len(md_list) > 0: for row in tqdm(predictions.itertuples()): x = is_explicable(row, md_list, sim_tensor_dict) if x > 0 and str(getattr(row, 'predicted')) == str(1): predictions.loc[row[0], 'confidence'] = x epl_match += 1 # else: # series = pd.Series(row) # unexplainable = unexplainable._append(series, ignore_index=True) # unexplainable.drop(columns=unexplainable.columns[[-1, 0]], inplace=True) # unexplainable.columns = predictions_attrs # unexplainable = unexplainable[train_attrs] # if len(unexplainable[unexplainable['gold'] == str(1)]) > 0: # prepare_file_for_md_discovery(unexplainable, t_single_tuple_path=er_output_dir + 'unexplainable_tst.csv') # md_discover(config, er_output_dir + 'unexplainable_tst.csv', md_output_dir + "from_unexplainable.txt") df = predictions[predictions['predicted'] == str(1)] interpretability = epl_match / len(df) # 可解释性 indicators['interpretability'] = interpretability if indicators["block_recall"] < indicators["recall"]: f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / ( indicators["precision"] + indicators["block_recall"]) else: f1 = indicators["F1"] performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 indicators['performance'] = performance indicators['eval_result'] = eval_result print(indicators) predictions.to_csv(er_output_dir + 'predictions.csv', sep=',', index=False, header=True) ################################################################################################################ print(f'\033[33mTime consumed by ML-ER in seconds: {time.time() - start}\033[0m') return indicators def ml_er(config: Configuration = None): indicators = er_process(config) output_path = er_output_dir + "eval_result.txt" with open(output_path, 'w') as f: for key, value in six.iteritems(_get_metric(indicators['eval_result'])): f.write(key + " : " + value) f.write('\n') f.write('block_recall:' + str(indicators["block_recall"]) + '\n') f.write('interpretability:' + str(indicators['interpretability']) + '\n') f.write('performance:' + str(indicators['performance']) + '\n') if __name__ == '__main__': if os.path.isfile(hpo_output_dir + "incumbent.json"): with open(hpo_output_dir + "configspace.json", 'r') as f: dict_configspace = json.load(f) str_configspace = json.dumps(dict_configspace) configspace = csj.read(str_configspace) with open(hpo_output_dir + "incumbent.json", 'r') as f: dic = json.load(f) configuration = ConfigSpace.Configuration(configspace, values=dic) ml_er(configuration)