diff --git a/.gitignore b/.gitignore index 76cc2a5..dcb64f0 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,9 @@ /ml_er/output/* /md_discovery/output/* /hpo/output/* +tfile.py +table_embedding.py +set_none.py +generate_matches.py +ml_er/fuck.py + diff --git a/hpo/er_model_hpo.py b/hpo/er_model_hpo.py deleted file mode 100644 index af8b14c..0000000 --- a/hpo/er_model_hpo.py +++ /dev/null @@ -1,73 +0,0 @@ -import json -from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float -from ConfigSpace.conditions import InCondition -from ConfigSpace.read_and_write import json as csj -import py_entitymatching.catalog.catalog_manager as cm -import pandas as pd -from smac import HyperparameterOptimizationFacade, Scenario -from settings import * -from ml_er.ml_entity_resolver import er_process - - -class Classifier: - @property - def configspace(self) -> ConfigurationSpace: - cs = ConfigurationSpace(seed=0) - ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf", "lg", "ln", "nb"], default="rf") - # todo 每个分类器的超参数 - tree_criterion = Categorical("dt_criterion", ["gini", "entropy", "log_loss"], default="gini") - - - cs.add_hyperparameters([ml_matcher]) - return cs - - def train(self, config: Configuration, seed: int = 0) -> float: - cm.del_catalog() - indicators = er_process(config) - return 1-indicators['performance'] - - -def ml_er_hpo(): - classifier = Classifier() - cs = classifier.configspace - str_configspace = csj.write(cs) - dict_configspace = json.loads(str_configspace) - with open(hpo_output_dir + "configspace.json", "w") as f: - json.dump(dict_configspace, f, indent=4) - - scenario = Scenario( - cs, - deterministic=True, - n_trials=12, # We want to run max 50 trials (combination of config and seed) - n_workers=1 - ) - - initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5) - - smac = HyperparameterOptimizationFacade( - scenario, - classifier.train, - initial_design=initial_design, - overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state - ) - - incumbent = smac.optimize() - incumbent_cost = smac.validate(incumbent) - default = cs.get_default_configuration() - default_cost = smac.validate(default) - print(f"Default Cost: {default_cost}") - print(f"Incumbent Cost: {incumbent_cost}") - - if incumbent_cost > default_cost: - incumbent = default - print(f"Updated Incumbent Cost: {default_cost}") - - print(f"Optimized Configuration:{incumbent.values()}") - - with open(hpo_output_dir + "incumbent.json", "w") as f: - json.dump(dict(incumbent), f, indent=4) - return incumbent - - -if __name__ == '__main__': - ml_er_hpo() diff --git a/hpo/magellan_hpo.py b/hpo/magellan_hpo.py new file mode 100644 index 0000000..1bd051b --- /dev/null +++ b/hpo/magellan_hpo.py @@ -0,0 +1,110 @@ +import json +import pickle + +from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float +from ConfigSpace.conditions import InCondition, EqualsCondition +from ConfigSpace.read_and_write import json as csj +import py_entitymatching.catalog.catalog_manager as cm +import pandas as pd +from smac import HyperparameterOptimizationFacade, Scenario + +from ml_er.magellan_new import matching +from settings import * + + +class Classifier: + @property + def configspace(self) -> ConfigurationSpace: + cs = ConfigurationSpace(seed=0) + + ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf"]) + # note 以tree开头的超参数是DT和RF共用的 + tree_criterion = Categorical("tree_criterion", ["gini", "entropy", "log_loss"], default="gini") + rf_n_estimators = Integer('number_of_tree', (10, 150)) + tree_max_depth = Integer('tree_max_depth', (15, 30), default=None) + rf_max_features = Categorical('rf_max_features', ["sqrt", "log2", "auto"], default='sqrt') + + svm_kernel = Categorical('svm_kernel', ['linear', 'poly', 'rbf', 'sigmoid', 'precomputed'], default='rbf') + svm_C = Integer('svm_C', (1, 100), default=1) + svm_gamma = Categorical('svm_gamma', ['scale', 'auto'], default='scale') + svm_degree = Integer('svm_degree', (1, 5), default=3) + svm_constant = Float('svm_constant', (0.0, 5.0), default=0.0) + + dt_splitter = Categorical('dt_splitter', ["best", "random"], default='best') + dt_max_features = Categorical('dt_max_features', ["auto", "sqrt", "log2"], default=None) + + cs.add_hyperparameters([ml_matcher, tree_criterion, rf_n_estimators, tree_max_depth, rf_max_features, + svm_kernel, svm_C, svm_gamma, svm_degree, svm_constant, dt_splitter, dt_max_features]) + + active_tree_criterion = InCondition(child=tree_criterion, parent=ml_matcher, values=['dt', 'rf']) + active_tree_max_depth = InCondition(child=tree_max_depth, parent=ml_matcher, values=['dt', 'rf']) + active_rf_n_estimators = EqualsCondition(child=rf_n_estimators, parent=ml_matcher, value="rf") + active_rf_max_features = EqualsCondition(child=rf_max_features, parent=ml_matcher, value="rf") + active_dt_splitter = EqualsCondition(child=dt_splitter, parent=ml_matcher, value="dt") + active_dt_max_features = EqualsCondition(child=dt_max_features, parent=ml_matcher, value="dt") + active_svm_kernel = EqualsCondition(child=svm_kernel, parent=ml_matcher, value="svm") + active_svm_gamma = EqualsCondition(child=svm_gamma, parent=ml_matcher, value="svm") + active_svm_degree = EqualsCondition(child=svm_degree, parent=ml_matcher, value="svm") + active_svm_constant = EqualsCondition(child=svm_constant, parent=ml_matcher, value="svm") + active_svm_C = EqualsCondition(child=svm_C, parent=ml_matcher, value="svm") + + cs.add_conditions([active_svm_C, active_svm_constant, active_svm_degree, active_svm_gamma, active_svm_kernel, + active_dt_splitter, active_rf_n_estimators, active_dt_max_features, active_rf_max_features, + active_tree_max_depth, active_tree_criterion]) + + return cs + + def train(self, config: Configuration, seed: int = 0) -> float: + cm.del_catalog() + with open(er_output_dir + "blocking_result.pickle", "rb") as file: + blocking_result = pickle.load(file) + indicators = matching(config, blocking_result) + return 1 - indicators['performance'] + + +def ml_er_hpo(): + classifier = Classifier() + cs = classifier.configspace + str_configspace = csj.write(cs) + dict_configspace = json.loads(str_configspace) + # 将超参数空间保存本地 + with open(hpo_output_dir + "configspace.json", "w") as f: + json.dump(dict_configspace, f, indent=4) + + scenario = Scenario( + cs, + crash_cost=1.0, + deterministic=True, + n_trials=50, + n_workers=1 + ) + + initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5) + + smac = HyperparameterOptimizationFacade( + scenario, + classifier.train, + initial_design=initial_design, + overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state + ) + + incumbent = smac.optimize() + incumbent_cost = smac.validate(incumbent) + default = cs.get_default_configuration() + default_cost = smac.validate(default) + print(f"Default Cost: {default_cost}") + print(f"Incumbent Cost: {incumbent_cost}") + + if incumbent_cost > default_cost: + incumbent = default + print(f"Updated Incumbent Cost: {default_cost}") + + print(f"Optimized Configuration:{incumbent.values()}") + + with open(hpo_output_dir + "incumbent.json", "w") as f: + json.dump(dict(incumbent), f, indent=4) + return incumbent + + +if __name__ == '__main__': + ml_er_hpo() diff --git a/md_discovery/md_mining.py b/md_discovery/md_mining.py index f949926..0a5a84e 100644 --- a/md_discovery/md_mining.py +++ b/md_discovery/md_mining.py @@ -1,5 +1,7 @@ import random import operator +from operator import itemgetter + import pandas as pd import torch import matplotlib.pyplot as plt @@ -41,12 +43,12 @@ def mining(train: pd.DataFrame): sim_tensor_list = [] for col_tuple in col_tuple_list: mask = ((data[columns[col_tuple[0]]].isin([''])) | (data[columns[col_tuple[1]]].isin(['']))) - empty_string_indices = data[mask].index.tolist() + empty_string_indices = data[mask].index.tolist() # 空字符串索引 lattr_tensor = norm_table_tensor[col_tuple[0]] rattr_tensor = norm_table_tensor[col_tuple[1]] mul_tensor = lattr_tensor * rattr_tensor - sim_tensor = torch.sum(mul_tensor, 1) + sim_tensor = torch.sum(mul_tensor, 1) # 求和得到对应属性2列张量相似度, 2列变1列 # 将有空字符串的位置强制置为-1.0000 sim_tensor = sim_tensor.scatter(0, torch.tensor(empty_string_indices, device='cuda').long(), -1.0000) sim_tensor = torch.round(sim_tensor, decimals=2) @@ -86,7 +88,9 @@ def mining(train: pd.DataFrame): for k in range(0, len(columns_without_prefix)): md_dict_format[columns_without_prefix[k]] = md_list_format[k] result_list.append((md_dict_format, abs_support, confidence)) - result_list.sort(key=operator.itemgetter(2), reverse=True) + # result_list.sort(key=itemgetter(2), reverse=True) + # 按confidence->support的优先级排序 + result_list.sort(key=itemgetter(2, 1), reverse=True) mds_to_txt(result_list) return result_list diff --git a/ml_er/magellan_new.py b/ml_er/magellan_new.py index 7b43ff0..61c4926 100644 --- a/ml_er/magellan_new.py +++ b/ml_er/magellan_new.py @@ -1,6 +1,14 @@ +import json +import os +import pickle import time + +import ConfigSpace import pandas as pd import py_entitymatching as em +import torch +from ConfigSpace import Configuration +from ConfigSpace.read_and_write import json as csj import py_entitymatching.catalog.catalog_manager as cm from tqdm import tqdm @@ -16,11 +24,11 @@ def blocking_mining(): cm.set_key(rtable, rtable_id) mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1') matching_number = len(mappings) - if ltable_id == rtable_id: - tables_id = rtable_id + # if ltable_id == rtable_id: + # tables_id = rtable_id attributes = ltable.columns.values.tolist() - lattributes = ['ltable_' + i for i in attributes] - rattributes = ['rtable_' + i for i in attributes] + # lattributes = ['ltable_' + i for i in attributes] + # rattributes = ['rtable_' + i for i in attributes] cm.set_key(ltable, ltable_id) cm.set_key(rtable, rtable_id) @@ -65,15 +73,208 @@ def blocking_mining(): label_and_split_time = time.time() print(f'Label and Split Time: {label_and_split_time - block_time}') - mining(train_set) + # 挖掘MD并保存本地 + md_list = mining(train_set) mining_time = time.time() print(f'Mining Time: {mining_time - label_and_split_time}') - return 1 + blocking_results = (ltable, rtable, train_set, test_set, md_list, block_recall) + # 将blocking结果保存到本地 + with open(er_output_dir + "blocking_result.pickle", "wb") as file_: + pickle.dump(blocking_results, file_) + return blocking_results + + +def matching(config: Configuration, blocking_result_): + print(f'\033[33mConfig: {config}\033[0m') + start = time.time() + ltable = blocking_result_[0] + rtable = blocking_result_[1] + train_set = blocking_result_[2] + test_set = blocking_result_[3] + md_list = blocking_result_[4] + block_recall = blocking_result_[5] + ml_matcher = config["ml_matcher"] + match ml_matcher: + case "dt": + matcher = em.DTMatcher(name='DecisionTree', random_state=0, criterion=config['tree_criterion'], + max_depth=config['tree_max_depth'], splitter=config['dt_splitter'], + max_features=config['dt_max_features']) + case "svm": + matcher = em.SVMMatcher(name='SVM', random_state=0, kernel=config['svm_kernel'], degree=config['svm_degree'], + gamma=config['svm_gamma'], C=config['svm_C'], coef0=config['svm_constant']) + case "rf": + matcher = em.RFMatcher(name='RandomForest', random_state=0, criterion=config['tree_criterion'], + max_depth=config['tree_max_depth'], n_estimators=config['number_of_tree'], + max_features=config['rf_max_features']) + + cm.set_key(train_set, '_id') + cm.set_fk_ltable(train_set, 'ltable_' + ltable_id) + cm.set_fk_rtable(train_set, 'rtable_' + rtable_id) + cm.set_ltable(train_set, ltable) + cm.set_rtable(train_set, rtable) + cm.set_key(ltable, ltable_id) + cm.set_key(rtable, rtable_id) + + cm.set_key(test_set, '_id') + cm.set_fk_ltable(test_set, 'ltable_' + ltable_id) + cm.set_fk_rtable(test_set, 'rtable_' + rtable_id) + cm.set_ltable(test_set, ltable) + cm.set_rtable(test_set, rtable) + feature_table = em.get_features_for_matching(ltable, rtable, validate_inferred_attr_types=False) + train_feature_vecs = em.extract_feature_vecs(train_set, + feature_table=feature_table, + attrs_after=['gold'], + show_progress=False) + train_feature_vecs.fillna(value=0, inplace=True) + + test_feature_after = ['ltable_' + i for i in ltable.columns.values.tolist()] + for _ in test_feature_after[:]: + test_feature_after.append(_.replace('ltable_', 'rtable_')) + for _ in test_feature_after: + if _.endswith(ltable_id) or _.endswith(rtable_id): + test_feature_after.remove(_) + test_feature_after.append('gold') + test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, + attrs_after=test_feature_after, show_progress=False) + test_feature_vecs.fillna(value=0, inplace=True) + + fit_exclude = ['_id', 'ltable_' + ltable_id, 'rtable_' + rtable_id, 'gold'] + matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold') + test_feature_after.extend(['_id', 'ltable_' + ltable_id, 'rtable_' + rtable_id]) + predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after, + append=True, target_attr='predicted', inplace=False) + eval_result = em.eval_matches(predictions, 'gold', 'predicted') + em.print_eval_summary(eval_result) + indicators = evaluate_prediction(predictions, 'gold', 'predicted') + indicators['block_recall'] = block_recall + + test_feature_after.remove('_id') + test_feature_after.append('predicted') + predictions = predictions[test_feature_after] + + predictions = predictions.reset_index(drop=True) + predictions = predictions.astype(str) + sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) + predictions['confidence'] = 0 + + epl_match = 0 # 可解释,预测match + if len(md_list) > 0: + for row in tqdm(predictions.itertuples()): + x = is_explicable(row, md_list, sim_tensor_dict) + if x > 0 and str(getattr(row, 'predicted')) == str(1): + predictions.loc[row[0], 'confidence'] = x + epl_match += 1 + + df = predictions[predictions['predicted'] == str(1)] + interpretability = epl_match / len(df) # 可解释性 + indicators['interpretability'] = interpretability + + # note 既然不调block参数, 不妨假设block_recall很高, 不必考虑 + # if indicators["block_recall"] < indicators["recall"]: + # f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / ( + # indicators["precision"] + indicators["block_recall"]) + # else: + # f1 = indicators["F1"] + + performance = interpre_weight * interpretability + (1 - interpre_weight) * indicators["F1"] + indicators['performance'] = performance + print(f'ER Indicators: {indicators}') + predictions.to_csv(er_output_dir + 'predictions.csv', sep=',', index=False, header=True) + print(f'\033[33mTime consumed by matching in seconds: {time.time() - start}\033[0m') + return indicators + + +def evaluate_prediction(prediction_: pd.DataFrame, labeled_attr: str, predicted_attr: str) -> dict: + new_df = prediction_.reset_index(drop=False, inplace=False) + gold = new_df[labeled_attr] + predicted = new_df[predicted_attr] + gold_negative = gold[gold == 0].index.values + gold_positive = gold[gold == 1].index.values + predicted_negative = predicted[predicted == 0].index.values + predicted_positive = predicted[predicted == 1].index.values + + false_positive_indices = list(set(gold_negative).intersection(predicted_positive)) + true_positive_indices = list(set(gold_positive).intersection(predicted_positive)) + false_negative_indices = list(set(gold_positive).intersection(predicted_negative)) + + num_true_positives = float(len(true_positive_indices)) + num_false_positives = float(len(false_positive_indices)) + num_false_negatives = float(len(false_negative_indices)) + + precision_denominator = num_true_positives + num_false_positives + recall_denominator = num_true_positives + num_false_negatives + + precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator + recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator + F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall) + + return {"precision": precision, "recall": recall, "F1": F1} + + +def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame): + predictions_attrs = predictions.columns.values.tolist() + col_tuple_list = [] + for _ in predictions_attrs: + if _.startswith('ltable'): + left_index = predictions_attrs.index(_) + right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_')) + col_tuple_list.append((left_index, right_index)) + + length = predictions.shape[0] + width = predictions.shape[1] + predictions = predictions.reset_index(drop=True) + sentences = predictions.values.flatten(order='F').tolist() + + embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True) + split_embedding = torch.split(embedding, length, dim=0) + table_tensor = torch.stack(split_embedding, dim=0, out=None) + # prediction的归一化嵌入张量 + norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2) + sim_tensor_dict = {} + for col_tuple in col_tuple_list: + lattr_tensor = norm_table_tensor[col_tuple[0]] + rattr_tensor = norm_table_tensor[col_tuple[1]] + mul_tensor = lattr_tensor * rattr_tensor + sim_tensor = torch.sum(mul_tensor, 1) + sim_tensor = torch.round(sim_tensor, decimals=4) + sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor + return sim_tensor_dict + + +def is_explicable(row, all_mds: list, st_dict): + attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段 + for md_tuple in all_mds: + explicable = True # 假设这条md能解释当前元组 + for a in attrs: + if st_dict[a][row[0]].item() < md_tuple[0][a]: + explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组 + break # 不再与当前md的其他相似度阈值比较,跳转到下一条md + if explicable: + return md_tuple[2] # 任意一条md能解释,直接返回 + return -1.0 # 遍历结束,不能解释 -def matching(): - return 1 +def ml_er(config: Configuration, blocking_result_): + indicators = matching(config, blocking_result_) + output_path = er_output_dir + "eval_result.txt" + with open(output_path, 'w') as _f: + _f.write('Precision:' + str(indicators["precision"]) + '\n') + _f.write('Recall:' + str(indicators["recall"]) + '\n') + _f.write('F1:' + str(indicators["F1"]) + '\n') + _f.write('block_recall:' + str(indicators["block_recall"]) + '\n') + _f.write('interpretability:' + str(indicators['interpretability']) + '\n') + _f.write('performance:' + str(indicators['performance']) + '\n') if __name__ == '__main__': - blocking_mining() + if os.path.isfile(hpo_output_dir + "incumbent.json"): + with open(hpo_output_dir + "configspace.json", 'r') as f: + dict_configspace = json.load(f) + str_configspace = json.dumps(dict_configspace) + configspace = csj.read(str_configspace) + with open(hpo_output_dir + "incumbent.json", 'r') as f: + dic = json.load(f) + configuration = ConfigSpace.Configuration(configspace, values=dic) + with open(er_output_dir + "blocking_result.pickle", "rb") as file: + blocking_result = pickle.load(file) + ml_er(configuration, blocking_result) diff --git a/ml_er/magellan_start.py b/ml_er/magellan_start.py new file mode 100644 index 0000000..567909d --- /dev/null +++ b/ml_er/magellan_start.py @@ -0,0 +1,4 @@ +from ml_er.magellan_new import blocking_mining + +if __name__ == '__main__': + blocking_mining() diff --git a/settings.py b/settings.py index 3219095..e550c5d 100644 --- a/settings.py +++ b/settings.py @@ -1,12 +1,12 @@ from sentence_transformers import SentenceTransformer -ltable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\DBLP-GoogleScholar\tableA.csv' -rtable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\DBLP-GoogleScholar\tableB.csv' -mapping_path = r'E:\Data\Research\Projects\matching_dependency\datasets\DBLP-GoogleScholar\matches.csv' -mapping_lid = 'idDBLP' # mapping表中左表id名 -mapping_rid = 'idScholar' # mapping表中右表id名 -ltable_block_attr = 'title' -rtable_block_attr = 'title' +ltable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\Abt-Buy\tableA.csv' +rtable_path = r'E:\Data\Research\Projects\matching_dependency\datasets\Abt-Buy\tableB.csv' +mapping_path = r'E:\Data\Research\Projects\matching_dependency\datasets\Abt-Buy\matches.csv' +mapping_lid = 'idAbt' # mapping表中左表id名 +mapping_rid = 'idBuy' # mapping表中右表id名 +ltable_block_attr = 'name' +rtable_block_attr = 'name' ltable_id = 'id' # 左表id字段名称 rtable_id = 'id' # 右表id字段名称 target_attr = 'id' # 进行md挖掘时的目标字段