import os import numpy as np import torch import json from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float from ConfigSpace.conditions import InCondition from ConfigSpace.read_and_write import json as csj import py_entitymatching as em import py_entitymatching.catalog.catalog_manager as cm import pandas as pd from smac import HyperparameterOptimizationFacade, Scenario from md_discovery.md_discover import md_discover from settings import * from ml_er.ml_entity_resolver import evaluate_prediction, load_mds, is_explicable, build_col_pairs_sim_tensor_dict, \ process_prediction_for_md_discovery, er_process class Classifier: @property def configspace(self) -> ConfigurationSpace: # Build Configuration Space which defines all parameters and their ranges cs = ConfigurationSpace(seed=0) ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') selected_attrs = ltable.columns.values.tolist() block_attr_items = selected_attrs[:] block_attr_items.remove(ltable_id) block_attr = Categorical("block_attr", block_attr_items) overlap_size = Integer("overlap_size", (1, 3), default=1) ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf", "lg", "ln", "nb"], default="rf") ml_blocker = Categorical("ml_blocker", ["over_lap", "attr_equiv"], default="over_lap") similarity_thresh = Float("similarity_thresh", (0.2, 0.21)) support_thresh = Integer("support_thresh", (1, 1000)) confidence_thresh = Float("confidence_thresh", (0.25, 0.5)) use_overlap_size = InCondition(child=overlap_size, parent=ml_blocker, values=["over_lap"]) cs.add_hyperparameters([block_attr, overlap_size, ml_matcher, ml_blocker, similarity_thresh, support_thresh, confidence_thresh]) cs.add_conditions([use_overlap_size]) return cs # train 就是整个函数 只需将返回结果由预测变成预测结果的评估 def train(self, config: Configuration, seed: int = 0) -> float: cm.del_catalog() indicators = er_process(config) return 1-indicators['performance'] # # 数据在外部加载 # ######################################################################################################################## # ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') # cm.set_key(ltable, ltable_id) # # ltable.fillna("", inplace=True) # rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1') # cm.set_key(rtable, rtable_id) # # rtable.fillna("", inplace=True) # mappings = pd.read_csv(mapping_path, encoding='ISO-8859-1') # # lid_mapping_list = [] # rid_mapping_list = [] # # 全部转为字符串 # # ltable = ltable.astype(str) # # rtable = rtable.astype(str) # # mappings = mappings.astype(str) # matching_number = len(mappings) # 所有阳性样本数,商品数据集应为1300 # # for index, row in mappings.iterrows(): # lid_mapping_list.append(row[mapping_lid]) # rid_mapping_list.append(row[mapping_rid]) # # 仅保留两表中出现在映射表中的行,增大正样本比例 # selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)] # # if len(lr_attrs_map) > 0: # # selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段 # tables_id = rtable_id # 不论左表右表ID字段名是否一致,经上一行调整,统一以右表为准 # selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)] # selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名 # ######################################################################################################################## # # attrs_with_l_prefix = ['ltable_' + i for i in selected_attrs] # 字段名加左前缀 # attrs_with_r_prefix = ['rtable_' + i for i in selected_attrs] # 字段名加右前缀 # cm.set_key(selected_ltable, tables_id) # cm.set_key(selected_rtable, tables_id) # # if config["ml_blocker"] == "over_lap": # blocker = em.OverlapBlocker() # candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], # l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, # overlap_size=config["overlap_size"], show_progress=False, # allow_missing=True) # elif config["ml_blocker"] == "attr_equiv": # blocker = em.AttrEquivalenceBlocker() # candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], # l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, # allow_missing=True) # # candidate['gold'] = 0 # candidate = candidate.reset_index(drop=True) # candidate_match_rows = [] # for line in candidate.itertuples(): # l_id = getattr(line, 'ltable_' + tables_id) # map_row = mappings[mappings[mapping_lid] == l_id] # # if map_row is not None: # r_id = map_row[mapping_rid] # for value in r_id: # if value == getattr(line, 'rtable_' + tables_id): # candidate_match_rows.append(line[0]) # else: # continue # for _ in candidate_match_rows: # candidate.loc[_, 'gold'] = 1 # # candidate.fillna("", inplace=True) # # # 裁剪负样本,保持正负样本数量一致 # candidate_mismatch = candidate[candidate['gold'] == 0] # candidate_match = candidate[candidate['gold'] == 1] # if len(candidate_mismatch) > len(candidate_match): # candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) # # 拼接正负样本 # candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) # if len(candidate_for_train_test) == 0: # return 1 # candidate_for_train_test = candidate_for_train_test.reset_index(drop=True) # cm.set_key(candidate_for_train_test, '_id') # cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + tables_id) # cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + tables_id) # cm.set_ltable(candidate_for_train_test, selected_ltable) # cm.set_rtable(candidate_for_train_test, selected_rtable) # # # 分为训练测试集 # train_proportion = 0.7 # test_proportion = 0.3 # sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0) # train_set = sets['train'] # test_set = sets['test'] # # cm.set_key(train_set, '_id') # cm.set_fk_ltable(train_set, 'ltable_' + tables_id) # cm.set_fk_rtable(train_set, 'rtable_' + tables_id) # cm.set_ltable(train_set, selected_ltable) # cm.set_rtable(train_set, selected_rtable) # # cm.set_key(test_set, '_id') # cm.set_fk_ltable(test_set, 'ltable_' + tables_id) # cm.set_fk_rtable(test_set, 'rtable_' + tables_id) # cm.set_ltable(test_set, selected_ltable) # cm.set_rtable(test_set, selected_rtable) # # if config["ml_matcher"] == "dt": # matcher = em.DTMatcher(name='DecisionTree', random_state=0) # elif config["ml_matcher"] == "svm": # matcher = em.SVMMatcher(name='SVM', random_state=0) # elif config["ml_matcher"] == "rf": # matcher = em.RFMatcher(name='RF', random_state=0) # elif config["ml_matcher"] == "lg": # matcher = em.LogRegMatcher(name='LogReg', random_state=0) # elif config["ml_matcher"] == "ln": # matcher = em.LinRegMatcher(name='LinReg') # elif config["ml_matcher"] == "nb": # matcher = em.NBMatcher(name='NaiveBayes') # feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False) # # train_feature_vecs = em.extract_feature_vecs(train_set, # feature_table=feature_table, # attrs_after=['gold'], # show_progress=False) # train_feature_vecs.fillna(value=0, inplace=True) # test_feature_after = attrs_with_l_prefix[:] # test_feature_after.extend(attrs_with_r_prefix) # for _ in test_feature_after: # if _.endswith(tables_id): # test_feature_after.remove(_) # test_feature_after.append('gold') # test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, # attrs_after=test_feature_after, show_progress=False) # test_feature_vecs.fillna(value=0, inplace=True) # fit_exclude = ['_id', 'ltable_' + tables_id, 'rtable_' + tables_id, 'gold'] # matcher.fit(table=train_feature_vecs, exclude_attrs=fit_exclude, target_attr='gold') # # test_feature_after.extend(['_id', 'ltable_' + tables_id, 'rtable_' + tables_id]) # predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=test_feature_after, # append=True, target_attr='predicted', inplace=False) # eval_result = em.eval_matches(predictions, 'gold', 'predicted') # em.print_eval_summary(eval_result) # indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, candidate_for_train_test) # print(indicators) # # # 计算可解释性 # predictions_attrs = [] # predictions_attrs.extend(attrs_with_l_prefix) # predictions_attrs.extend(attrs_with_r_prefix) # predictions_attrs.extend(['gold', 'predicted']) # predictions = predictions[predictions_attrs] # process_prediction_for_md_discovery(predictions) # predictions = predictions.reset_index(drop=True) # predictions = predictions.astype(str) # sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) # # # 默认路径为 "../md_discovery/output/xxx.txt" # # mds/vio 共2个md文件 # md_discover(config) # md_paths = [md_output_dir + 'mds.txt', md_output_dir + 'vio.txt'] # md_list = load_mds(md_paths) # 从全局变量中读取所有的md # epl_match = 0 # 可解释,预测match # if len(md_list) > 0: # for line in predictions.itertuples(): # if is_explicable(line, md_list, sim_tensor_dict) and str(getattr(line, 'predicted')) == str(1): # epl_match += 1 # # ppre = predictions[predictions['predicted'] == str(1)] # interpretability = epl_match / len(ppre) # 可解释性 # # if (indicators["block_recall"] < 0.8) and (indicators["block_recall"] < indicators["recall"]): # f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / ( # indicators["precision"] + indicators["block_recall"]) # else: # f1 = indicators["F1"] # # if indicators["block_recall"] < 0.8: # # return 1 # # f1 = indicators["F1"] # performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 # print('Interpretability: ', interpretability) # return 1 - performance def ml_er_hpo(): classifier = Classifier() cs = classifier.configspace str_configspace = csj.write(cs) dict_configspace = json.loads(str_configspace) with open(hpo_output_dir + "configspace.json", "w") as f: json.dump(dict_configspace, f, indent=4) # Next, we create an object, holding general information about the run scenario = Scenario( cs, deterministic=True, n_trials=50, # We want to run max 50 trials (combination of config and seed) n_workers=1 ) initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5) # Now we use SMAC to find the best hyperparameters smac = HyperparameterOptimizationFacade( scenario, classifier.train, initial_design=initial_design, overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state ) incumbent = smac.optimize() incumbent_cost = smac.validate(incumbent) default = cs.get_default_configuration() default_cost = smac.validate(default) print(f"Default Cost: {default_cost}") print(f"Incumbent Cost: {incumbent_cost}") if incumbent_cost > default_cost: incumbent = default print(f"Updated Incumbent Cost: {default_cost}") print(f"Optimized Configuration:{incumbent.values()}") with open(hpo_output_dir + "incumbent.json", "w") as f: json.dump(dict(incumbent), f, indent=4) return incumbent if __name__ == '__main__': ml_er_hpo()