from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float import py_entitymatching as em import py_entitymatching.catalog.catalog_manager as cm import pandas as pd from py_entitymatching.blocker.blocker import Blocker from py_entitymatching.matcher.mlmatcher import MLMatcher from smac import HyperparameterOptimizationFacade, Scenario from md_discovery.functions.multi_process_infer_by_pairs import my_Levenshtein_ratio from entrance import * # todo 距离度量用户可设置 # 全局变量,每次迭代后清空列表,加入新的md路径 # todo: # 默认路径为 "../md_discovery/output/xxx.txt" # 真阳/假阴 mds/vio 共4个md文件 def evaluate_prediction(df: pd.DataFrame, labeled_attr: str, predicted_attr: str, matching_number: int, test_proportion: float) -> dict: new_df = df.reset_index(drop=False, inplace=False) gold = new_df[labeled_attr] predicted = new_df[predicted_attr] gold_negative = gold[gold == 0].index.values gold_positive = gold[gold == 1].index.values predicted_negative = predicted[predicted == 0].index.values predicted_positive = predicted[predicted == 1].index.values false_positive_indices = list(set(gold_negative).intersection(predicted_positive)) true_positive_indices = list(set(gold_positive).intersection(predicted_positive)) false_negative_indices = list(set(gold_positive).intersection(predicted_negative)) num_true_positives = float(len(true_positive_indices)) num_false_positives = float(len(false_positive_indices)) num_false_negatives = float(len(false_negative_indices)) precision_denominator = num_true_positives + num_false_positives recall_denominator = num_true_positives + num_false_negatives precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall) my_recall = num_true_positives / (matching_number * test_proportion) return {"precision": precision, "recall": recall, "F1": F1, "my_recall": my_recall} def load_mds(paths: list) -> list: if len(paths) == 0: return [] all_mds = [] # 传入md路径列表 for md_path in paths: mds = [] # 打开每一个md文件 with open(md_path, 'r') as f: # 读取每一行的md,加入该文件的md列表 for line in f.readlines(): md_metadata = line.strip().split('\t') md = eval(md_metadata[0].replace('md:', '')) confidence = eval(md_metadata[2].replace('confidence:', '')) if confidence > 0: mds.append(md) all_mds.extend(mds) return all_mds def is_explicable(row, all_mds: list) -> bool: attrs = all_mds[0].keys() # 从第一条md中读取所有字段 for md in all_mds: explicable = True # 假设这条md能解释当前元组 for a in attrs: threshold = md[a] if my_Levenshtein_ratio(str(getattr(row, 'ltable_'+a)), str(getattr(row, 'rtable_'+a))) < threshold: explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组 break # 不再与当前md的其他相似度阈值比较,跳转到下一条md if explicable: return True # 任意一条md能解释,直接返回 return False # 遍历结束,不能解释 class SVM: @property def configspace(self) -> ConfigurationSpace: # Build Configuration Space which defines all parameters and their ranges cs = ConfigurationSpace(seed=0) # todo # block_attr 取消打桩 block_attr = Categorical("block_attr", ["name", "description", "manufacturer", "price"], default="title") overlap_size = Integer("overlap_size", (1, 3), default=1) ml_matcher = Categorical("ml_matcher", ["dt", "svm", "rf", "lg", "ln", "nb"], default="rf") ml_blocker = Categorical("ml_blocker", ["over_lap", "attr_equiv"], default="over_lap") cs.add_hyperparameters([block_attr, overlap_size, ml_matcher, ml_blocker]) return cs # train 就是整个函数 只需将返回结果由预测变成预测结果的评估 def train(self, config: Configuration) -> float: ltable = pd.read_csv(ltable_path, encoding='ISO-8859-1') cm.set_key(ltable, ltable_id) ltable.fillna("", inplace=True) rtable = pd.read_csv(rtable_path, encoding='ISO-8859-1') cm.set_key(rtable, rtable_id) rtable.fillna("", inplace=True) mappings = pd.read_csv(mapping_path) # 仅保留两表中出现在映射表中的行,增大正样本比例 lid_mapping_list = [] rid_mapping_list = [] # 全部转为字符串 ltable = ltable.astype(str) rtable = rtable.astype(str) mappings = mappings.astype(str) matching_number = len(mappings) # 所有阳性样本数,商品数据集应为1300 for index, row in mappings.iterrows(): lid_mapping_list.append(row[mapping_lid]) rid_mapping_list.append(row[mapping_rid]) selected_ltable = ltable[ltable[ltable_id].isin(lid_mapping_list)] selected_ltable = selected_ltable.rename(columns=lr_attrs_map) # 参照右表,修改左表中与右表对应但不同名的字段 selected_rtable = rtable[rtable[rtable_id].isin(rid_mapping_list)] selected_attrs = selected_ltable.columns.values.tolist() # 两张表中的字段名 attrs_with_l_prefix = ['ltable_'+i for i in selected_attrs] attrs_with_r_prefix = ['rtable_'+i for i in selected_attrs] cm.set_key(selected_ltable, ltable_id) cm.set_key(selected_rtable, rtable_id) blocker = None if config["ml_blocker"] == "over_lap": blocker = em.OverlapBlocker() candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, overlap_size=config["overlap_size"], show_progress=False, n_jobs=-1) elif config["ml_blocker"] == "attr_equiv": blocker = em.AttrEquivalenceBlocker() candidate = blocker.block_tables(selected_ltable, selected_rtable, config["block_attr"], config["block_attr"], l_output_attrs=selected_attrs, r_output_attrs=selected_attrs, n_jobs=-1) candidate['gold'] = 0 candidate_match_rows = [] for index, row in candidate.iterrows(): l_id = row['ltable_' + ltable_id] map_row = mappings[mappings[mapping_lid] == l_id] if map_row is not None: r_id = map_row[mapping_rid] for value in r_id: if value == row['rtable_' + rtable_id]: candidate_match_rows.append(row["_id"]) else: continue for row in candidate_match_rows: candidate.loc[row, 'gold'] = 1 # 裁剪负样本,保持正负样本数量一致 candidate_mismatch = candidate[candidate['gold'] == 0] candidate_match = candidate[candidate['gold'] == 1] if len(candidate_mismatch) > len(candidate_match): candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) # 拼接正负样本 candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) cm.set_key(candidate_for_train_test, '_id') cm.set_fk_ltable(candidate_for_train_test, 'ltable_' + ltable_id) cm.set_fk_rtable(candidate_for_train_test, 'rtable_' + rtable_id) cm.set_ltable(candidate_for_train_test, selected_ltable) cm.set_rtable(candidate_for_train_test, selected_rtable) # 分为训练测试集 train_proportion = 0.7 test_proportion = 0.3 sets = em.split_train_test(candidate_for_train_test, train_proportion=train_proportion, random_state=0) train_set = sets['train'] test_set = sets['test'] matcher = None if config["ml_matcher"] == "dt": matcher = em.DTMatcher(name='DecisionTree', random_state=0) elif config["ml_matcher"] == "svm": matcher = em.SVMMatcher(name='SVM', random_state=0) elif config["ml_matcher"] == "rf": matcher = em.RFMatcher(name='RF', random_state=0) elif config["ml_matcher"] == "lg": matcher = em.LogRegMatcher(name='LogReg', random_state=0) elif config["ml_matcher"] == "ln": matcher = em.LinRegMatcher(name='LinReg') elif config["ml_matcher"] == "nb": matcher = em.NBMatcher(name='NaiveBayes') feature_table = em.get_features_for_matching(selected_ltable, selected_rtable, validate_inferred_attr_types=False) train_feature_vecs = em.extract_feature_vecs(train_set, feature_table=feature_table, attrs_after=['gold'], show_progress=False) test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, attrs_after=['ltable_title', 'ltable_description', 'ltable_manufacturer', 'ltable_price', 'rtable_name', 'rtable_description', 'rtable_manufacturer', 'rtable_price', 'gold'], show_progress=False) # todo 参数可调 用drop删除特征向量中的列? # 1.exclude_attrs # 去掉id相关的相似度 matcher.fit(table=train_feature_vecs, exclude_attrs=['_id', 'ltable_id', 'rtable_id', 'gold'], target_attr='gold') # 1.exclude_attrs predictions = matcher.predict(table=test_feature_vecs, exclude_attrs=['_id', 'ltable_id', 'rtable_id', 'ltable_title', 'ltable_description', 'ltable_manufacturer', 'ltable_price', 'rtable_name', 'rtable_description', 'rtable_manufacturer', 'rtable_price', 'gold'], append=True, target_attr='predicted', inplace=False) eval_result = em.eval_matches(predictions, 'gold', 'predicted') em.print_eval_summary(eval_result) indicators = evaluate_prediction(predictions, 'gold', 'predicted', matching_number, test_proportion) print(indicators) # 计算可解释性 predictions_attrs = [] predictions_attrs.extend(attrs_with_l_prefix) predictions_attrs.extend(attrs_with_r_prefix) predictions_attrs.extend(['gold', 'predicted']) predictions = predictions[predictions_attrs] md_paths = ['../md_discovery/output/tp_mds.txt', '../md_discovery/output/tp_vio.txt', '../md_discovery/output/fn_mds.txt', '../md_discovery/output/fn_vio.txt'] epl_match = 0 # 可解释,预测match nepl_mismatch = 0 # 不可解释,预测mismatch md_list = load_mds(md_paths) # 从全局变量中读取所有的md for row in predictions.itertuples(): if is_explicable(row, md_list): if getattr(row, 'predicted') == 1: epl_match += 1 else: if getattr(row, 'predicted') == 0: nepl_mismatch += 1 epl_ability = (epl_match + nepl_mismatch) / len(predictions) # 可解释性 f1 = indicators['F1'] performance = interpretability_weight * epl_ability + (1 - interpretability_weight) * f1 return 1 - performance if __name__ == "__main__": classifier = SVM() # Next, we create an object, holding general information about the run scenario = Scenario( classifier.configspace, n_trials=12, # We want to run max 50 trials (combination of config and seed) ) initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=3) # Now we use SMAC to find the best hyperparameters smac = HyperparameterOptimizationFacade( scenario, classifier.train, initial_design=initial_design, overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state ) # todo # 如果new_recall过低则避免其成为最优解 # 将损失函数置为1/用new_recall降低F1从而提高损失函数 incumbent = smac.optimize() # Get cost of default configuration default_cost = smac.validate(classifier.configspace.get_default_configuration()) print(f"Default cost: {default_cost}") # Let's calculate the cost of the incumbent incumbent_cost = smac.validate(incumbent) print(f"Incumbent cost: {incumbent_cost}") print(f"Configuration:{incumbent.values()}") print(f"MAX_F1:{1-classifier.train(incumbent)}")