From 9a4dfde0470573add852ceb74778b9f06eacaf40 Mon Sep 17 00:00:00 2001 From: HuangJintao <1447537163@qq.com> Date: Tue, 14 Nov 2023 07:48:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=E7=9B=B8=E4=BC=BC=E5=BA=A6=E9=98=88?= =?UTF-8?q?=E5=80=BC=E3=80=81support=E9=98=88=E5=80=BC=E3=80=81confidence?= =?UTF-8?q?=E9=98=88=E5=80=BC=E4=BA=A4=E7=BB=99smac=E8=B0=83=E8=8A=82=20?= =?UTF-8?q?=E5=B0=81=E8=A3=85ER=E5=87=BD=E6=95=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hpo/er_model_hpo.py | 374 ++++++++++--------- md_discovery/md_discover.py | 10 +- md_discovery/multi_process_infer_by_pairs.py | 276 -------------- md_discovery/tmp_discover.py | 17 +- ml_er/ml_entity_resolver.py | 111 +++--- settings.py | 6 +- tfile.py | 31 +- 7 files changed, 293 insertions(+), 532 deletions(-) delete mode 100644 md_discovery/multi_process_infer_by_pairs.py diff --git a/hpo/er_model_hpo.py b/hpo/er_model_hpo.py index 486bfc0..8253146 100644 --- a/hpo/er_model_hpo.py +++ b/hpo/er_model_hpo.py @@ -3,7 +3,7 @@ import os import numpy as np import torch import json -from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer +from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float from ConfigSpace.conditions import InCondition from ConfigSpace.read_and_write import json as csj import py_entitymatching as em @@ -11,36 +11,11 @@ import py_entitymatching.catalog.catalog_manager as cm import pandas as pd from smac import HyperparameterOptimizationFacade, Scenario -from settings import * -from ml_er.ml_entity_resolver import evaluate_prediction, load_mds, is_explicable, build_col_pairs_sim_tensor_dict - -# 数据在外部加载 -######################################################################################################################## -ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') -# ltable.fillna("", inplace=True) -rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1') -# rtable.fillna("", inplace=True) -mappings = pd.read_csv(mapping_path) - -lid_mapping_list = [] -rid_mapping_list = [] -# 全部转为字符串 -# ltable = ltable.astype(str) -# rtable = rtable.astype(str) -# mappings = mappings.astype(str) -matching_number = len(mappings) # 所有阳性样本数,商品数据集应为1300 -for index, row in mappings.iterrows(): - lid_mapping_list.append(row[mapping_lid]) - rid_mapping_list.append(row[mapping_rid]) -# 仅保留两表中出现在映射表中的行,增大正样本比例 -selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)] -# if len(lr_attrs_map) > 0: -# selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段 -tables_id = rtable_id # 不论左表右表ID字段名是否一致,经上一行调整,统一以右表为准 -selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)] -selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名 -######################################################################################################################## +from md_discovery.md_discover import md_discover +from settings import * +from ml_er.ml_entity_resolver import evaluate_prediction, load_mds, is_explicable, build_col_pairs_sim_tensor_dict, \ + process_prediction_for_md_discovery, er_process class Classifier: @@ -48,165 +23,207 @@ class Classifier: def configspace(self) -> ConfigurationSpace: # Build Configuration Space which defines all parameters and their ranges cs = ConfigurationSpace(seed=0) + ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') + selected_attrs = ltable.columns.values.tolist() block_attr_items = selected_attrs[:] - block_attr_items.remove(tables_id) + block_attr_items.remove(ltable_id) block_attr = Categorical("block_attr", block_attr_items) overlap_size = Integer("overlap_size", (1, 3), default=1) ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf", "lg", "ln", "nb"], default="rf") ml_blocker = Categorical("ml_blocker", ["over_lap", "attr_equiv"], default="over_lap") + similarity_thresh = Float("similarity_thresh", (0.2, 0.21)) + support_thresh = Integer("support_thresh", (1, 1000)) + confidence_thresh = Float("confidence_thresh", (0.25, 0.5)) use_overlap_size = InCondition(child=overlap_size, parent=ml_blocker, values=["over_lap"]) - cs.add_hyperparameters([block_attr, overlap_size, ml_matcher, ml_blocker]) + cs.add_hyperparameters([block_attr, overlap_size, ml_matcher, ml_blocker, + similarity_thresh, support_thresh, confidence_thresh]) cs.add_conditions([use_overlap_size]) return cs # train 就是整个函数 只需将返回结果由预测变成预测结果的评估 def train(self, config: Configuration, seed: int = 0) -> float: cm.del_catalog() - attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs] # 字段名加左前缀 - attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs] # 字段名加右前缀 - cm.set_key(selected_ltable, tables_id) - cm.set_key(selected_rtable, tables_id) - if config["ml_blocker"] == "over_lap": - blocker = em.OverlapBlocker() - candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], - l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, - overlap_size=config["overlap_size"], show_progress=False, - allow_missing=True) - elif config["ml_blocker"] == "attr_equiv": - blocker = em.AttrEquivalenceBlocker() - candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], - l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, - allow_missing=True) - - candidate['gold'] = 0 - candidate = candidate.reset_index(drop=True) - candidate_match_rows = [] - for line in candidate.itertuples(): - l_id = getattr(line, 'ltable_' + tables_id) - map_row = mappings[mappings[mapping_lid] == l_id] - - if map_row is not None: - r_id = map_row[mapping_rid] - for value in r_id: - if value == getattr(line, 'rtable_' + tables_id): - candidate_match_rows.append(line[0]) - else: - continue - for _ in candidate_match_rows: - candidate.loc[_, 'gold'] = 1 - - candidate.fillna("", inplace=True) - - # 裁剪负样本,保持正负样本数量一致 - candidate_mismatch = candidate[candidate['gold'] == 0] - candidate_match = candidate[candidate['gold'] == 1] - if len(candidate_mismatch) > len(candidate_match): - candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) - # 拼接正负样本 - candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) - if len(candidate_for_train_test) == 0: - return 1 - candidate_for_train_test = candidate_for_train_test.reset_index(drop=True) - cm.set_key(candidate_for_train_test, '_id') - cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id) - cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id) - cm.set_ltable(candidate_for_train_test, selected_ltable) - cm.set_rtable(candidate_for_train_test, selected_rtable) - - # 分为训练测试集 - train_proportion = 0.7 - test_proportion = 0.3 - sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0) - train_set = sets['train'] - test_set = sets['test'] - - cm.set_key(train_set, '_id') - cm.set_fk_ltable(train_set, 'ltable_' + tables_id) - cm.set_fk_rtable(train_set, 'rtable_' + tables_id) - cm.set_ltable(train_set, selected_ltable) - cm.set_rtable(train_set, selected_rtable) - - cm.set_key(test_set, '_id') - cm.set_fk_ltable(test_set, 'ltable_' + tables_id) - cm.set_fk_rtable(test_set, 'rtable_' + tables_id) - cm.set_ltable(test_set, selected_ltable) - cm.set_rtable(test_set, selected_rtable) - - if config["ml_matcher"] == "dt": - matcher = em.DTMatcher(name='DecisionTree', random_state=0) - elif config["ml_matcher"] == "svm": - matcher = em.SVMMatcher(name='SVM', random_state=0) - elif config["ml_matcher"] == "rf": - matcher = em.RFMatcher(name='RF', random_state=0) - elif config["ml_matcher"] == "lg": - matcher = em.LogRegMatcher(name='LogReg', random_state=0) - elif config["ml_matcher"] == "ln": - matcher = em.LinRegMatcher(name='LinReg') - elif config["ml_matcher"] == "nb": - matcher = em.NBMatcher(name='NaiveBayes') - feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False) - - train_feature_vecs = em.extract_feature_vecs(train_set, - feature_table=feature_table, - attrs_after=['gold'], - show_progress=False) - train_feature_vecs.fillna(value=0, inplace=True) - test_feature_after = attrs_with_l_prefix[:] - test_feature_after.extend(attrs_with_r_prefix) - for _ in test_feature_after: - if _.endswith(tables_id): - test_feature_after.remove(_) - test_feature_after.append('gold') - test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, - attrs_after=test_feature_after, show_progress=False) - test_feature_vecs.fillna(value=0, inplace=True) - fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold'] - matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold') - - test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id]) - predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after, - append=True, target_attr='predicted', inplace=False) - eval_result = em.eval_matches(predictions, 'gold', 'predicted') - em.print_eval_summary(eval_result) - indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, candidate_for_train_test) - print(indicators) - - # 计算可解释性 - predictions_attrs = [] - predictions_attrs.extend(attrs_with_l_prefix) - predictions_attrs.extend(attrs_with_r_prefix) - predictions_attrs.extend(['gold', 'predicted']) - predictions = predictions[predictions_attrs] - predictions = predictions.reset_index(drop=True) - predictions = predictions.astype(str) - sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) - - # 默认路径为 "../md_discovery/output/xxx.txt" - # mds/vio 共2个md文件 - md_paths = [md_output_dir + 'mds.txt', md_output_dir + 'vio.txt'] - md_list = load_mds(md_paths) # 从全局变量中读取所有的md - epl_match = 0 # 可解释,预测match - if len(md_list) > 0: - for line in predictions.itertuples(): - if is_explicable(line, md_list, sim_tensor_dict) and str(getattr(line, 'predicted')) == str(1): - epl_match += 1 - - ppre = predictions[predictions['predicted'] == str(1)] - interpretability = epl_match / len(ppre) # 可解释性 - - if (indicators["block_recall"] < 0.8) and (indicators["block_recall"] < indicators["recall"]): - f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / ( - indicators["precision"] + indicators["block_recall"]) - else: - f1 = indicators["F1"] - # if indicators["block_recall"] < 0.8: + indicators = er_process(config) + return 1-indicators['performance'] + + # # 数据在外部加载 + # ######################################################################################################################## + # ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') + # cm.set_key(ltable, ltable_id) + # # ltable.fillna("", inplace=True) + # rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1') + # cm.set_key(rtable, rtable_id) + # # rtable.fillna("", inplace=True) + # mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1') + # + # lid_mapping_list = [] + # rid_mapping_list = [] + # # 全部转为字符串 + # # ltable = ltable.astype(str) + # # rtable = rtable.astype(str) + # # mappings = mappings.astype(str) + # matching_number = len(mappings) # 所有阳性样本数,商品数据集应为1300 + # + # for index, row in mappings.iterrows(): + # lid_mapping_list.append(row[mapping_lid]) + # rid_mapping_list.append(row[mapping_rid]) + # # 仅保留两表中出现在映射表中的行,增大正样本比例 + # selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)] + # # if len(lr_attrs_map) > 0: + # # selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段 + # tables_id = rtable_id # 不论左表右表ID字段名是否一致,经上一行调整,统一以右表为准 + # selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)] + # selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名 + # ######################################################################################################################## + # + # attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs] # 字段名加左前缀 + # attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs] # 字段名加右前缀 + # cm.set_key(selected_ltable, tables_id) + # cm.set_key(selected_rtable, tables_id) + # + # if config["ml_blocker"] == "over_lap": + # blocker = em.OverlapBlocker() + # candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], + # l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, + # overlap_size=config["overlap_size"], show_progress=False, + # allow_missing=True) + # elif config["ml_blocker"] == "attr_equiv": + # blocker = em.AttrEquivalenceBlocker() + # candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], + # l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, + # allow_missing=True) + # + # candidate['gold'] = 0 + # candidate = candidate.reset_index(drop=True) + # candidate_match_rows = [] + # for line in candidate.itertuples(): + # l_id = getattr(line, 'ltable_' + tables_id) + # map_row = mappings[mappings[mapping_lid] == l_id] + # + # if map_row is not None: + # r_id = map_row[mapping_rid] + # for value in r_id: + # if value == getattr(line, 'rtable_' + tables_id): + # candidate_match_rows.append(line[0]) + # else: + # continue + # for _ in candidate_match_rows: + # candidate.loc[_, 'gold'] = 1 + # + # candidate.fillna("", inplace=True) + # + # # 裁剪负样本,保持正负样本数量一致 + # candidate_mismatch = candidate[candidate['gold'] == 0] + # candidate_match = candidate[candidate['gold'] == 1] + # if len(candidate_mismatch) > len(candidate_match): + # candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) + # # 拼接正负样本 + # candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) + # if len(candidate_for_train_test) == 0: # return 1 - # f1 = indicators["F1"] - performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 - print('Interpretability: ', interpretability) - return 1 - performance + # candidate_for_train_test = candidate_for_train_test.reset_index(drop=True) + # cm.set_key(candidate_for_train_test, '_id') + # cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id) + # cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id) + # cm.set_ltable(candidate_for_train_test, selected_ltable) + # cm.set_rtable(candidate_for_train_test, selected_rtable) + # + # # 分为训练测试集 + # train_proportion = 0.7 + # test_proportion = 0.3 + # sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0) + # train_set = sets['train'] + # test_set = sets['test'] + # + # cm.set_key(train_set, '_id') + # cm.set_fk_ltable(train_set, 'ltable_' + tables_id) + # cm.set_fk_rtable(train_set, 'rtable_' + tables_id) + # cm.set_ltable(train_set, selected_ltable) + # cm.set_rtable(train_set, selected_rtable) + # + # cm.set_key(test_set, '_id') + # cm.set_fk_ltable(test_set, 'ltable_' + tables_id) + # cm.set_fk_rtable(test_set, 'rtable_' + tables_id) + # cm.set_ltable(test_set, selected_ltable) + # cm.set_rtable(test_set, selected_rtable) + # + # if config["ml_matcher"] == "dt": + # matcher = em.DTMatcher(name='DecisionTree', random_state=0) + # elif config["ml_matcher"] == "svm": + # matcher = em.SVMMatcher(name='SVM', random_state=0) + # elif config["ml_matcher"] == "rf": + # matcher = em.RFMatcher(name='RF', random_state=0) + # elif config["ml_matcher"] == "lg": + # matcher = em.LogRegMatcher(name='LogReg', random_state=0) + # elif config["ml_matcher"] == "ln": + # matcher = em.LinRegMatcher(name='LinReg') + # elif config["ml_matcher"] == "nb": + # matcher = em.NBMatcher(name='NaiveBayes') + # feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False) + # + # train_feature_vecs = em.extract_feature_vecs(train_set, + # feature_table=feature_table, + # attrs_after=['gold'], + # show_progress=False) + # train_feature_vecs.fillna(value=0, inplace=True) + # test_feature_after = attrs_with_l_prefix[:] + # test_feature_after.extend(attrs_with_r_prefix) + # for _ in test_feature_after: + # if _.endswith(tables_id): + # test_feature_after.remove(_) + # test_feature_after.append('gold') + # test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, + # attrs_after=test_feature_after, show_progress=False) + # test_feature_vecs.fillna(value=0, inplace=True) + # fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold'] + # matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold') + # + # test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id]) + # predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after, + # append=True, target_attr='predicted', inplace=False) + # eval_result = em.eval_matches(predictions, 'gold', 'predicted') + # em.print_eval_summary(eval_result) + # indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, candidate_for_train_test) + # print(indicators) + # + # # 计算可解释性 + # predictions_attrs = [] + # predictions_attrs.extend(attrs_with_l_prefix) + # predictions_attrs.extend(attrs_with_r_prefix) + # predictions_attrs.extend(['gold', 'predicted']) + # predictions = predictions[predictions_attrs] + # process_prediction_for_md_discovery(predictions) + # predictions = predictions.reset_index(drop=True) + # predictions = predictions.astype(str) + # sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) + # + # # 默认路径为 "../md_discovery/output/xxx.txt" + # # mds/vio 共2个md文件 + # md_discover(config) + # md_paths = [md_output_dir + 'mds.txt', md_output_dir + 'vio.txt'] + # md_list = load_mds(md_paths) # 从全局变量中读取所有的md + # epl_match = 0 # 可解释,预测match + # if len(md_list) > 0: + # for line in predictions.itertuples(): + # if is_explicable(line, md_list, sim_tensor_dict) and str(getattr(line, 'predicted')) == str(1): + # epl_match += 1 + # + # ppre = predictions[predictions['predicted'] == str(1)] + # interpretability = epl_match / len(ppre) # 可解释性 + # + # if (indicators["block_recall"] < 0.8) and (indicators["block_recall"] < indicators["recall"]): + # f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / ( + # indicators["precision"] + indicators["block_recall"]) + # else: + # f1 = indicators["F1"] + # # if indicators["block_recall"] < 0.8: + # # return 1 + # # f1 = indicators["F1"] + # performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 + # print('Interpretability: ', interpretability) + # return 1 - performance def ml_er_hpo(): @@ -215,13 +232,13 @@ def ml_er_hpo(): str_configspace = csj.write(cs) dict_configspace = json.loads(str_configspace) with open(hpo_output_dir + "configspace.json", "w") as f: - json.dump(dict_configspace, f) + json.dump(dict_configspace, f, indent=4) # Next, we create an object, holding general information about the run scenario = Scenario( cs, deterministic=True, - n_trials=12, # We want to run max 50 trials (combination of config and seed) + n_trials=50, # We want to run max 50 trials (combination of config and seed) n_workers=1 ) @@ -248,9 +265,8 @@ def ml_er_hpo(): print(f"Optimized Configuration:{incumbent.values()}") - incumbent_ndarray = incumbent.get_array() - np.save(hpo_output_dir + 'incumbent.npy', incumbent_ndarray) - + with open(hpo_output_dir + "incumbent.json", "w") as f: + json.dump(dict(incumbent), f, indent=4) return incumbent diff --git a/md_discovery/md_discover.py b/md_discovery/md_discover.py index ae5d50b..102b497 100644 --- a/md_discovery/md_discover.py +++ b/md_discovery/md_discover.py @@ -1,3 +1,5 @@ +from ConfigSpace import Configuration + from md_discovery import tmp_discover from settings import * @@ -15,11 +17,11 @@ from settings import * # f.write(str(_) + '\n') -def md_discover(): +def md_discover(config: Configuration): t_single_tuple_path = er_output_dir + "t_single_tuple.csv" # 输入:csv文件路径,md左侧相似度阈值,md右侧目标字段 # 输出:2个md列表,列表1中md无violation,列表2中md有violation但confidence满足阈值 - mds_list, vio_list = tmp_discover.pairs_inference(t_single_tuple_path, similarity_threshold, target_attr) + mds_list, vio_list = tmp_discover.pairs_inference(t_single_tuple_path, target_attr, config) # 将列表1写入本地,路径需自己修改 mds_path = md_output_dir + "mds.txt" @@ -38,5 +40,5 @@ def md_discover(): f.write('\n') -if __name__ == '__main__': - md_discover() +# if __name__ == '__main__': +# md_discover() diff --git a/md_discovery/multi_process_infer_by_pairs.py b/md_discovery/multi_process_infer_by_pairs.py deleted file mode 100644 index 319c24d..0000000 --- a/md_discovery/multi_process_infer_by_pairs.py +++ /dev/null @@ -1,276 +0,0 @@ -import multiprocessing -import pandas as pd -import Levenshtein -import copy -import numpy as np -import time -import torch -from tqdm import tqdm -from transformers import AutoTokenizer, AutoModel -from settings import model, er_output_dir -from sentence_transformers.util import cos_sim - -conf_thresh = 0.8 - - -def my_Levenshtein_ratio(str1, str2): - if max(len(str1), len(str2)) == 0: - return 1 - return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2)) - - -def norm_cos_sim(embed1, embed2): - sim = cos_sim(embed1, embed2) - return sim.tolist()[0][0]/2 + 0.5 - - -def table_encode(tp_path, fn_path): - embedding_dic = {} - - tp_data = pd.read_csv(tp_path, low_memory=False, encoding='ISO-8859-1') - tp_data.fillna("", inplace=True) - tp_data = tp_data.astype(str) - tp_length = tp_data.shape[0] - tp_width = tp_data.shape[1] - tp_sentences = [] - - for row in range(0, tp_length): - for col in range(0, tp_width): - cell_value = tp_data.values[row, col] - tp_sentences.append(cell_value) - tp_embedding = model.encode(tp_sentences, convert_to_tensor=True, device="cuda") - - list_tp_embedding = tp_embedding.tolist() - for row in range(0, tp_length): - for col in range(0, tp_width): - cell_value = tp_data.values[row, col] - embedding_dic[cell_value] = list_tp_embedding[row * tp_width + col] - - - fn_data = pd.read_csv(fn_path, low_memory=False, encoding='ISO-8859-1') - fn_data.fillna("", inplace=True) - fn_data = fn_data.astype(str) - fn_length = fn_data.shape[0] - fn_width = fn_data.shape[1] - fn_sentences = [] - - for row in range(0, fn_length): - for col in range(0, fn_width): - cell_value = fn_data.values[row, col] - fn_sentences.append(cell_value) - fn_embedding = model.encode(fn_sentences, convert_to_tensor=True, device="cuda") - - list_fn_embedding = fn_embedding.tolist() - for row in range(0, fn_length): - for col in range(0, fn_width): - cell_value = fn_data.values[row, col] - embedding_dic[cell_value] = list_fn_embedding[row * fn_width + col] - - np.save('embedding_dic.npy', embedding_dic) - - -def test_table_encode(): - start = time.time() - table_encode(er_output_dir+'tp_single_tuple.csv', er_output_dir+'fn_single_tuple.csv') - print(time.time()-start) - - -def test_load(): - load_dict = np.load('embedding_dic.npy', allow_pickle=True).item() - a = load_dict['model- bdcd00105wi venor- bitdefender features- bitdefender antivirus v10- small box antivirus v10 delivers a one-two security punch integrating todays most powerful antivirus and antispyware modules into one convenient package. its easy to use and updates itself automatically making it truly an install and forget solution. * antivirus the purpose of the antivirus module is to ensure detection and removal of all viruses in the wild. bitdefender antivirus uses robust scan engines certified by icsa labs virus bulletin checkmark checkvir and tuv. - improved proactive detection b-have (behavioral heuristic analyzer in virtual environments) emulates a virtual computer-inside-a-computer where pieces of software are run in order to check for potential malware behavior. this bitdefender proprietary technology represents a new security layer that keeps the operating system safe from unknown viruses by detecting malicious pieces of code for which signatures have not yet been released. - permanent antivirus protection the new and improved bitdefender scanning engines will scan and disinfect infected files on access minimizing data loss. infected documents can now be recovered instead of being deleted. - new rootkit detection and removal a new bitdefender module looks for rootkits (malicious programs designed to control victim computers while staying hidden) and removes them on detection. - new web scanning web traffic is now filtered in real-time even before reaching your browser providing a safe and enjoyable web experience. - peer-2-peer and im applications protection filters against viruses that spread'] - print(a) - print(1) -# def test_lm_similarity(): -# print(time.time()) -# sentences = ['fun with reading & writing! is designed to help kids learn to read and write better through exercises puzzle-solving creative writing decoding and more!', -# 'based on the tween lifestyle brand launched in 2004 this action/adventure game will contain loads of adventures tailored specifically to the player\'s personality type. the evergirl brand features a clothing and accessories line with a companion web ...'] -# embeddings = model.encode(sentences, convert_to_tensor=True) -# print(time.time()) -# sim = cos_sim(embeddings[0], embeddings[1]) -# print(time.time()) -# # print(sim.tolist()[0][0]/2 + 0.5) - - -def is_minimal(md, md_list, target_col): - # 假设这个md是minimal - minimal = True - if len(md_list) == 0: - return True - if md_list.count(md) > 1: - return False - for _ in md_list: - if _ != md: - # 假设列表中每一个md都使当前md不minimal - exist = True - # 如果左边任何一个大于,则假设不成立 - for col in list(set(_.keys()) - {target_col}): - if _[col] > md[col]: - exist = False - # 如果右边小于,假设也不成立 - if _[target_col] < md[target_col]: - exist = False - # 任何一次假设成立,当前md不minimal - if exist: - minimal = False - break - return minimal - - -def remove_by_confidence(md, l, relation, target_col, lock): - support, confidence = get_one_md_metadata(md, relation, target_col) - if confidence < 0.8: - with lock: - l.remove(md) - - -# def remove_by_confidence(md, l, relation, target_col): -# boolean, conf = satisfy_confidence(md, relation, 0.8, target_col) -# if not boolean: -# l.remove(md) -# print(md, '\t', conf) - - -def inference_from_record_pairs(path, threshold, target_col): - data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') - data.fillna("", inplace=True) - data = data.astype(str) - columns = data.columns.values.tolist() - other_columns = list(set(columns) - {target_col}) - - md_list = [] - minimal_vio = [] - init_md = {} - for col in columns: - init_md[col] = 1 if col == target_col else 0 - md_list.append(init_md) - - for row1 in tqdm(data.itertuples()): - # 获取当前行的索引,从后一行开始切片 - i = row1[0] - data1 = data[i + 1:] - for row2 in data1.itertuples(): - violated_mds = [] - # sims是两行的相似度 - sims = {} - for col in columns: - similarity = norm_cos_sim(getattr(row1, col), getattr(row2, col)) - sims[col] = similarity - - # 寻找violated md,从md列表中删除并加入vio列表 - # tmp_md_list = copy.deepcopy(md_list) - for md in md_list[:]: - lhs_satis = True - rhs_satis = True - for col in other_columns: - if sims[col] + 0.0000001 < md[col]: - lhs_satis = False - break - if sims[target_col] + 0.0000001 < md[target_col]: - rhs_satis = False - if lhs_satis == True and rhs_satis == False: - md_list.remove(md) - violated_mds.append(md) - # minimal_vio.extend(violated_mds) - - for vio_md in violated_mds: - # 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值 - # if sims[target_col] >= threshold: - # new_rhs = sims[target_col] - # spec_r_md = copy.deepcopy(vio_md) - # spec_r_md[target_col] = new_rhs - # if is_minimal(spec_r_md, md_list, target_col): - # md_list.append(spec_r_md) - - # 特殊化左侧 - for col in other_columns: - if sims[col] + 0.01 <= 1: - spec_l_md = copy.deepcopy(vio_md) - spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.01 - if is_minimal(spec_l_md, md_list, target_col): - md_list.append(spec_l_md) - - # for vio in minimal_vio[:]: - # if not is_minimal(vio, md_list, target_col): - # minimal_vio.remove(vio) - - # fuck = len(minimal_vio) - # tmp = [] - # for _ in minimal_vio: - # if _ not in tmp: - # tmp.append(_) - # minimal_vio = tmp - - # manager = multiprocessing.Manager() - # lock = manager.Lock() - # if len(minimal_vio) == 0: - # return md_list, [] - # pool_size = len(minimal_vio) if len(minimal_vio) < 16 else 16 - # pool = multiprocessing.Pool(pool_size) - # # tmp = copy.deepcopy(minimal_vio) - # with manager: - # proxy_minimal_vio = manager.list(minimal_vio) - # for _ in minimal_vio[:]: - # pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, target_col, lock)) - # pool.close() - # pool.join() - # minimal_vio = list(proxy_minimal_vio) - # - # for _ in minimal_vio[:]: - # if not is_minimal(_, minimal_vio, target_col): - # minimal_vio.remove(_) - # - for _ in md_list[:]: - if not is_minimal(_, md_list, target_col): - md_list.remove(_) - - return md_list, minimal_vio - - -def get_mds_metadata(md_list, dataset_path, target_col): - data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1') - data.fillna("", inplace=True) - data = data.astype(str) - - manager = multiprocessing.Manager() - if len(md_list) == 0: - return [] - pool_size = len(md_list) if len(md_list) < 16 else 16 - pool = multiprocessing.Pool(pool_size) - result = [] - with manager: - for _ in md_list: - task = pool.apply_async(get_one_md_metadata, args=(_, data, target_col)) - support, confidence = task.get() - result.append({"md": _, "support": support, "confidence": confidence}) - pool.close() - pool.join() - return result - - -def get_one_md_metadata(md, dataframe, target_col): - support = 0 - pre_confidence = 0 - columns = dataframe.columns.values.tolist() - for row1 in dataframe.itertuples(): - i = row1[0] - df_slice = dataframe[i + 1:] - for row2 in df_slice.itertuples(): - left_satisfy = True - both_satisfy = True - for col in columns: - sim = norm_cos_sim(getattr(row1, col), getattr(row2, col)) - if col == target_col: - if sim + 0.0000001 < 1: - both_satisfy = False - else: - if sim + 0.0000001 < md[col]: - left_satisfy = False - both_satisfy = False - if left_satisfy: - support += 1 - if both_satisfy: - pre_confidence += 1 - - confidence = 0 if support == 0 else pre_confidence / support - # return {"md": md, "support": support, "confidence": confidence} - return support, confidence diff --git a/md_discovery/tmp_discover.py b/md_discovery/tmp_discover.py index d4d7870..0421075 100644 --- a/md_discovery/tmp_discover.py +++ b/md_discovery/tmp_discover.py @@ -8,9 +8,10 @@ import pandas as pd import copy import torch +from ConfigSpace import Configuration from tqdm import tqdm -from settings import model, md_output_dir, confidence_threshold, support_threshold +from settings import model def is_minimal(md, md_list, target_col): @@ -38,7 +39,10 @@ def is_minimal(md, md_list, target_col): return minimal -def pairs_inference(path, threshold, target_col): +def pairs_inference(path, target_col, conf: Configuration): + simt = conf["similarity_thresh"] + supt = conf["support_thresh"] + cont = conf["confidence_thresh"] data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') data.fillna("", inplace=True) data = data.astype(str) @@ -98,7 +102,7 @@ def pairs_inference(path, threshold, target_col): for col in cols_but_target: if sims[col] + 0.01 <= 1: spec_l_md = copy.deepcopy(vio_md) - spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.01 + spec_l_md[col] = simt if sims[col] < simt else sims[col] + 0.01 if is_minimal(spec_l_md, md_list, target_col): md_list.append(spec_l_md) if vio_md not in minimal_vio: @@ -149,16 +153,16 @@ def pairs_inference(path, threshold, target_col): md_rm_list = [] for _ in md_list: support, confidence = get_metrics(_, data, sim_tensor, target_col, target_index) - if support < support_threshold: + if support < supt: md_rm_list.append(_) for _ in md_rm_list: md_list.remove(_) for md in minimal_vio: support, confidence = get_metrics(md, data, sim_tensor, target_col, target_index) # fuck.append((support, confidence)) - if support < support_threshold: + if support < supt: remove_list.append(md) - if confidence < confidence_threshold and md not in remove_list: + if confidence < cont and md not in remove_list: remove_list.append(md) # fuck_me = sorted(fuck, key=lambda x: x[1], reverse=True) for _ in remove_list: @@ -169,6 +173,7 @@ def pairs_inference(path, threshold, target_col): minimal_vio.remove(_) print('\033[31m' + 'vio_length\t' + str(len(minimal_vio)) + '\033[0m') + print(f"Support: {supt}\tConfidence: {cont}") return md_list, minimal_vio diff --git a/ml_er/ml_entity_resolver.py b/ml_er/ml_entity_resolver.py index 7d2db26..848656b 100644 --- a/ml_er/ml_entity_resolver.py +++ b/ml_er/ml_entity_resolver.py @@ -12,6 +12,8 @@ import py_entitymatching.catalog.catalog_manager as cm import pandas as pd import six from ConfigSpace import Configuration + +from md_discovery.md_discover import md_discover from settings import * @@ -144,7 +146,7 @@ def build_col_pairs_sim_tensor_dict(predictions: pandas.DataFrame): return sim_tensor_dict -def ml_er(iter_round: int, config: Configuration = None, ): +def er_process(config: Configuration): ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') cm.set_key(ltable, ltable_id) # ltable.fillna("", inplace=True) @@ -172,45 +174,22 @@ def ml_er(iter_round: int, config: Configuration = None, ): tables_id = rtable_id selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)] selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名 - items_but_id = selected_attrs[:] - items_but_id.remove(tables_id) # 两张表中除了id的字段名 attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs] attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs] cm.set_key(selected_ltable, tables_id) cm.set_key(selected_rtable, tables_id) - if config is not None: - ml_matcher = config["ml_matcher"] - if ml_matcher == "dt": - matcher = em.DTMatcher(name='DecisionTree', random_state=0) - elif ml_matcher == "svm": - matcher = em.SVMMatcher(name='SVM', random_state=0) - elif ml_matcher == "rf": - matcher = em.RFMatcher(name='RF', random_state=0) - elif ml_matcher == "lg": - matcher = em.LogRegMatcher(name='LogReg', random_state=0) - elif ml_matcher == "ln": - matcher = em.LinRegMatcher(name='LinReg') - elif ml_matcher == "nb": - matcher = em.NBMatcher(name='NaiveBayes') - - if config["ml_blocker"] == "over_lap": - blocker = em.OverlapBlocker() - candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], - config["block_attr"], allow_missing=True, - l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, - overlap_size=config["overlap_size"], show_progress=False) - elif config["ml_blocker"] == "attr_equiv": - blocker = em.AttrEquivalenceBlocker() - candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], - config["block_attr"], allow_missing=True, - l_output_attrs=selected_attrs, r_output_attrs=selected_attrs) - else: - matcher = em.SVMMatcher(name='SVM', random_state=0) + if config["ml_blocker"] == "over_lap": blocker = em.OverlapBlocker() - candidate = blocker.block_tables(selected_ltable, selected_rtable, selected_attrs[-1], selected_attrs[-1], + candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], + config["block_attr"], allow_missing=True, l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, - overlap_size=1, show_progress=False, allow_missing=True) + overlap_size=config["overlap_size"], show_progress=False) + elif config["ml_blocker"] == "attr_equiv": + blocker = em.AttrEquivalenceBlocker() + candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], + config["block_attr"], allow_missing=True, + l_output_attrs=selected_attrs, r_output_attrs=selected_attrs) candidate['gold'] = 0 candidate = candidate.reset_index(drop=True) @@ -239,6 +218,8 @@ def ml_er(iter_round: int, config: Configuration = None, ): candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) # 拼接正负样本 candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) + # if len(candidate_for_train_test) == 0: + # return 0 # 如果拼接后不重设索引可能导致索引重复 candidate_for_train_test = candidate_for_train_test.reset_index(drop=True) cm.set_key(candidate_for_train_test, '_id') @@ -249,11 +230,36 @@ def ml_er(iter_round: int, config: Configuration = None, ): # 分为训练测试集 train_proportion = 0.7 - test_proportion = 0.3 sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0) train_set = sets['train'] test_set = sets['test'] + # cm.set_key(train_set, '_id') + # cm.set_fk_ltable(train_set, 'ltable_' + tables_id) + # cm.set_fk_rtable(train_set, 'rtable_' + tables_id) + # cm.set_ltable(train_set, selected_ltable) + # cm.set_rtable(train_set, selected_rtable) + # + # cm.set_key(test_set, '_id') + # cm.set_fk_ltable(test_set, 'ltable_' + tables_id) + # cm.set_fk_rtable(test_set, 'rtable_' + tables_id) + # cm.set_ltable(test_set, selected_ltable) + # cm.set_rtable(test_set, selected_rtable) + + ml_matcher = config["ml_matcher"] + if ml_matcher == "dt": + matcher = em.DTMatcher(name='DecisionTree', random_state=0) + elif ml_matcher == "svm": + matcher = em.SVMMatcher(name='SVM', random_state=0) + elif ml_matcher == "rf": + matcher = em.RFMatcher(name='RF', random_state=0) + elif ml_matcher == "lg": + matcher = em.LogRegMatcher(name='LogReg', random_state=0) + elif ml_matcher == "ln": + matcher = em.LinRegMatcher(name='LinReg') + elif ml_matcher == "nb": + matcher = em.NBMatcher(name='NaiveBayes') + feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False) train_feature_vecs = em.extract_feature_vecs(train_set, @@ -280,7 +286,6 @@ def ml_er(iter_round: int, config: Configuration = None, ): eval_result = em.eval_matches(predictions, 'gold', 'predicted') em.print_eval_summary(eval_result) indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, candidate_for_train_test) - print(indicators) # 计算可解释性 ################################################################################################################ @@ -294,6 +299,7 @@ def ml_er(iter_round: int, config: Configuration = None, ): predictions = predictions.astype(str) sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) + md_discover(config) md_paths = [md_output_dir + 'mds.txt', md_output_dir + 'vio.txt'] md_list = load_mds(md_paths) # 从全局变量中读取所有的md epl_match = 0 # 可解释,预测match @@ -304,39 +310,40 @@ def ml_er(iter_round: int, config: Configuration = None, ): df = predictions[predictions['predicted'] == str(1)] interpretability = epl_match / len(df) # 可解释性 + indicators['interpretability'] = interpretability if (indicators["block_recall"] < 0.8) and (indicators["block_recall"] < indicators["recall"]): f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / ( indicators["precision"] + indicators["block_recall"]) else: f1 = indicators["F1"] performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 + indicators['performance'] = performance + indicators['eval_result'] = eval_result + print(indicators) ################################################################################################################ + return indicators + - output_path = er_output_dir + "eval_result_" + str(iter_round) + ".txt" +def ml_er(config: Configuration = None): + indicators = er_process(config) + output_path = er_output_dir + "eval_result.txt" with open(output_path, 'w') as f: - for key, value in six.iteritems(_get_metric(eval_result)): + for key, value in six.iteritems(_get_metric(indicators['eval_result'])): f.write(key + " : " + value) f.write('\n') f.write('block_recall:' + str(indicators["block_recall"]) + '\n') - f.write('interpretability:' + str(interpretability) + '\n') - f.write('performance:' + str(performance) + '\n') + f.write('interpretability:' + str(indicators['interpretability']) + '\n') + f.write('performance:' + str(indicators['performance']) + '\n') + if __name__ == '__main__': - iterations = 1 - filename_list = os.listdir(er_output_dir) - if len(filename_list) > 0: - for _ in filename_list: - if _.startswith('eval_result'): - iterations = int(_[12:13]) + 1 - - if iterations > 1: - incumbent_array = np.load(hpo_output_dir + 'incumbent.npy') + if os.path.isfile(hpo_output_dir + "incumbent.json"): with open(hpo_output_dir + "configspace.json", 'r') as f: dict_configspace = json.load(f) str_configspace = json.dumps(dict_configspace) configspace = csj.read(str_configspace) - configuration = ConfigSpace.Configuration(configspace, vector=incumbent_array) - ml_er(iterations, configuration) - else: - ml_er(1) + with open(hpo_output_dir + "incumbent.json", 'r') as f: + dic = json.load(f) + configuration = ConfigSpace.Configuration(configspace, values=dic) + ml_er(configuration) diff --git a/settings.py b/settings.py index 9537ded..ff43696 100644 --- a/settings.py +++ b/settings.py @@ -12,10 +12,10 @@ target_attr = 'id' # 进行md挖掘时的目标字段 # lr_attrs_map = {} # 如果两个表中存在对应字段名称不一样的情况,将名称加入列表便于调整一致 model = SentenceTransformer('E:\\Data\\Research\\Models\\roberta-large-nli-stsb-mean-tokens') -similarity_threshold = 0.2 -support_threshold = 100 -confidence_threshold = 0.4 interpre_weight = 0.5 # 可解释性权重 +# similarity_threshold = 0.2 +# support_threshold = 100 +# confidence_threshold = 0.4 er_output_dir = 'E:\\Data\\Research\\Projects\\matching_dependency\\ml_er\\output\\' md_output_dir = 'E:\\Data\\Research\\Projects\\matching_dependency\\md_discovery\\output\\' diff --git a/tfile.py b/tfile.py index ca630aa..63d77ee 100644 --- a/tfile.py +++ b/tfile.py @@ -10,7 +10,7 @@ import torch from tqdm import tqdm from ConfigSpace.read_and_write import json as csj from md_discovery import tmp_discover -from settings import er_output_dir, similarity_threshold, target_attr, hpo_output_dir +from settings import er_output_dir, hpo_output_dir def fuck(i): @@ -107,13 +107,10 @@ def test7(): def test8(): - cum = np.load(hpo_output_dir + 'incumbent.npy') with open(hpo_output_dir + "configspace.json", 'r') as load_f: dict_configspace = json.load(load_f) str_configspace = json.dumps(dict_configspace) configspace = csj.read(str_configspace) - config = ConfigSpace.Configuration(configspace, vector=cum) - print(cum) def test9(): @@ -131,11 +128,21 @@ def test10(): sep=',', index=False, header=True, quoting=1) -if __name__ == '__main__': - start = time.time() - t_single_tuple_path = er_output_dir + "t_single_tuple.csv" - # tp_mds, tp_vio = inference_from_record_pairs(tp_single_tuple_path, similarity_threshold, target_attr) - tp_mds, tp_vio = tmp_discover.pairs_inference(t_single_tuple_path, similarity_threshold, target_attr) - print(time.time() - start) - - +def test11(): + values = { + 'block_attr': 'class', + 'confidence_thresh': 0.2717823249253852, + 'ml_blocker': 'attr_equiv', + 'ml_matcher': 'ln', + 'similarity_thresh': 0.20681820299103484, + 'support_thresh': 129, + } + with open(hpo_output_dir + "incumbent.json", "w") as f: + json.dump(values, f, indent=4) + + +def test12(): + with open(hpo_output_dir + "incumbent.json", 'r') as f: + dic = json.load(f) + for _ in dic.keys(): + print(f'Key:{_}\tValue:{dic[_]}\tType:{type(dic[_])}')