import json import os from time import time import ConfigSpace import pandas as pd import torch from ConfigSpace import Configuration from ConfigSpace.read_and_write import json as csj from pyjedai.clustering import UniqueMappingClustering from tqdm import tqdm from md_discovery.md_discover import md_discover from settings import * from pyjedai.datamodel import Data from pyjedai.block_cleaning import BlockPurging from pyjedai.block_cleaning import BlockFiltering from pyjedai.matching import EntityMatching from pyjedai.block_building import ( StandardBlocking, QGramsBlocking, ExtendedQGramsBlocking, SuffixArraysBlocking, ExtendedSuffixArraysBlocking, ) from pyjedai.comparison_cleaning import ( WeightedEdgePruning, WeightedNodePruning, CardinalityEdgePruning, CardinalityNodePruning, BLAST, ReciprocalCardinalityNodePruning, ReciprocalWeightedNodePruning, ComparisonPropagation ) from pyjedai.clustering import * def prepare_file_for_md_discovery(train: pd.DataFrame, t_single_tuple_path=er_output_dir + r"\t_single_tuple.csv"): # 挑选出实际匹配的元组对 df = train[train['gold'] == '1'] # 将左表id赋值给右表 for index, row in df.iterrows(): df.loc[index, "rtable_" + rtable_id] = row["ltable_" + ltable_id] train_columns = train.columns.values.tolist() l_columns = [] r_columns = [] cols = [] # 左表和右表字段名分别加入两个列表 for _ in train_columns: if _.startswith('ltable'): l_columns.append(_) elif _.startswith('rtable'): r_columns.append(_) # 将左表中字段名去掉前缀,作为统一的字段名列表(前提是两张表内对应字段名调整一致) for _ in l_columns: cols.append(_.replace('ltable_', '')) ldf = df[l_columns] rdf = df[r_columns] ldf.columns = cols rdf.columns = cols t_single_tuple = pd.concat([ldf, rdf]) t_single_tuple = t_single_tuple.reset_index(drop=True) t_single_tuple.to_csv(t_single_tuple_path, sep=',', index=False, header=True, quoting=1) # 形成一个字典,key为字段名称,value为一维张量,记录了预测表中这一字段每行的左右属性的相似度 def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame): predictions_attrs = predictions.columns.values.tolist() col_tuple_list = [] for _ in predictions_attrs: if _.startswith('ltable'): left_index = predictions_attrs.index(_) right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_')) col_tuple_list.append((left_index, right_index)) df = predictions.drop(columns=['gold', 'predicted', 'confidence'], inplace=False) length = df.shape[0] width = df.shape[1] sentences = [] for col in range(0, width): for row in range(0, length): cell_value = df.values[row, col] sentences.append(cell_value) if len(sentences) == 0: return {} embedding = model.encode(sentences, convert_to_tensor=True, device="cuda") split_embedding = torch.split(embedding, length, dim=0) table_tensor = torch.stack(split_embedding, dim=0, out=None) # prediction的归一化嵌入张量 norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2) sim_tensor_dict = {} for col_tuple in col_tuple_list: lattr_tensor = norm_table_tensor[col_tuple[0]] rattr_tensor = norm_table_tensor[col_tuple[1]] mul_tensor = lattr_tensor * rattr_tensor sim_tensor = torch.sum(mul_tensor, 1) sim_tensor = torch.round(sim_tensor, decimals=4) sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor return sim_tensor_dict def load_mds(paths: list) -> list: if len(paths) == 0: return [] all_mds = [] # 传入md路径列表 for md_path in paths: if not os.path.exists(md_path): continue mds = [] # 打开每一个md文件 with open(md_path, 'r') as f_: # 读取每一行的md,加入该文件的md列表 for line in f_.readlines(): md_metadata = line.strip().split('\t') # todo 如果MD文件的形式改了 这里也要改 md = eval(md_metadata[1]) mds.append(md) all_mds.extend(mds) return all_mds def is_explicable(row, all_mds: list, st_dict): attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段 for md_tuple in all_mds: explicable = True # 假设这条md能解释当前元组 for a in attrs: if a != target_attr: if st_dict[a][row[0]].item() < md_tuple[0][a]: explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组 break # 不再与当前md的其他相似度阈值比较,跳转到下一条md if explicable: return md_tuple[2] # 任意一条md能解释,直接返回 return -1.0 # 遍历结束,不能解释 def er_process(config: Configuration): print(f'\033[33mConfig: {config}\033[0m') start = time() ltable = pd.read_csv(ltable_path, sep='|', engine='python', na_filter=False) rtable = pd.read_csv(rtable_path, sep='|', engine='python', na_filter=False) mapping = pd.read_csv(mapping_path, sep='|', engine='python') data = Data(dataset_1=ltable, id_column_name_1=ltable_id, dataset_2=rtable, id_column_name_2=rtable_id, ground_truth=mapping) # clean data(optional) data.clean_dataset(remove_stopwords=False, remove_punctuation=False, remove_numbers=False, remove_unicodes=False) # block building blocker_name = config["jed_blocker"] block_attribute = config["block_attr"] match blocker_name: case "Standard": blocker = StandardBlocking() case "QGrams": blocker = QGramsBlocking() case "ExtendedQG": blocker = ExtendedQGramsBlocking() case "SuffixArrays": blocker = SuffixArraysBlocking() case "ExtendedSA": blocker = ExtendedSuffixArraysBlocking() blocks = blocker.build_blocks(data, attributes_1=[block_attribute], attributes_2=[block_attribute]) # block purging(optional) bp = BlockPurging() cleaned_blocks = bp.process(blocks, data, tqdm_disable=False) # block cleaning(optional) bf = BlockFiltering(ratio=0.8) # todo what is ratio for? filtered_blocks = bf.process(cleaned_blocks, data, tqdm_disable=False) # Comparison Cleaning - Meta Blocking(optional) pruning_method = config["meta_blocker"] weighting_scheme_name = config["weighting_scheme"] match pruning_method: case "WEP": meta_blocker = WeightedEdgePruning(weighting_scheme=weighting_scheme_name) case "WNP": meta_blocker = WeightedNodePruning(weighting_scheme=weighting_scheme_name) case "CEP": meta_blocker = CardinalityEdgePruning(weighting_scheme=weighting_scheme_name) case "CNP": meta_blocker = CardinalityNodePruning(weighting_scheme=weighting_scheme_name) case "BLAST": meta_blocker = BLAST(weighting_scheme=weighting_scheme_name) case "RCNP": meta_blocker = ReciprocalCardinalityNodePruning(weighting_scheme=weighting_scheme_name) case "RWNP": meta_blocker = ReciprocalWeightedNodePruning(weighting_scheme=weighting_scheme_name) case "CP": meta_blocker = ComparisonPropagation() candidate_pairs_blocks = meta_blocker.process(blocks=filtered_blocks, data=data, tqdm_disable=True) # entity matching em = EntityMatching( metric=config["matching_metric"], tokenizer=config["matching_tokenizer"], vectorizer=config["matching_vectorizer"], qgram=3, similarity_threshold=0.0 ) pairs_graph = em.predict(candidate_pairs_blocks, data, tqdm_disable=True) # draw(pairs_graph) # entity clustering clusteror_name = config["clusteror_name"] match clusteror_name: case "CCC": clusteror = ConnectedComponentsClustering() case "UMC": clusteror = UniqueMappingClustering() case "EC": clusteror = ExactClustering() case "CenterC": clusteror = CenterClustering() case "BMC": clusteror = BestMatchClustering() case "MCC": clusteror = MergeCenterClustering() case "CC": clusteror = CorrelationClustering() case "CTC": clusteror = CutClustering() case "MCL": clusteror = MarkovClustering() case "KMAC": clusteror = KiralyMSMApproximateClustering() case "RSRC": clusteror = RicochetSRClustering() # 得到预测结果与评估指标 clusters = clusteror.process(pairs_graph, data, similarity_threshold=0.17) matches_dataframe = clusteror.export_to_df(clusters) matches_dataframe_path = er_output_dir + r'\matches_dataframe.csv' matches_dataframe.to_csv(matches_dataframe_path, sep=',', index=False, header=True, quoting=1) evaluation = clusteror.evaluate(clusters) # evaluation.pop('True Positives') # evaluation.pop('False Positives') # evaluation.pop('True Negatives') # evaluation.pop('False Negatives') ltable = ltable.astype(str) rtable = rtable.astype(str) mapping = mapping.astype(str) result_df = matches_dataframe.astype(str) lcolumns_dict = {} rcolumns_dict = {} ltable_attrs = ltable.columns.values.tolist() rtable_attrs = rtable.columns.values.tolist() for _ in ltable_attrs: lcolumns_dict[_] = 'ltable_' + _ for _ in rtable_attrs: rcolumns_dict[_] = 'rtable_' + _ result_lid_list = result_df['id1'].tolist() selected_ltable = ltable[ltable[ltable_id].isin(result_lid_list)] selected_ltable = selected_ltable.rename(columns=lcolumns_dict) selected_ltable['key'] = 1 result_rid_list = result_df['id2'].tolist() selected_rtable = rtable[rtable[rtable_id].isin(result_rid_list)] selected_rtable = selected_rtable.rename(columns=rcolumns_dict) selected_rtable['key'] = 1 predictions = pd.merge(selected_ltable, selected_rtable, on='key') predictions.drop(columns='key', inplace=True) predictions = predictions.reset_index(drop=True) predictions['gold'] = '0' predictions['predicted'] = '0' gold_match_rows = [] predicted_match_rows = [] for tuple_ in tqdm(predictions.itertuples()): lid = getattr(tuple_, 'ltable_' + ltable_id) map_row = mapping[mapping[mapping_lid] == lid] result_row = result_df[result_df['id1'] == lid] if map_row is not None: rid = map_row[mapping_rid] for value in rid: if value == getattr(tuple_, 'rtable_' + rtable_id): gold_match_rows.append(tuple_[0]) if result_row is not None: rid = result_row['id2'] for value in rid: if value == getattr(tuple_, 'rtable_' + rtable_id): predicted_match_rows.append(tuple_[0]) for _ in gold_match_rows: predictions.loc[_, 'gold'] = '1' for _ in predicted_match_rows: predictions.loc[_, 'predicted'] = '1' # 挖MD 计算可解释性 prepare_file_for_md_discovery(predictions) predictions['confidence'] = 0 predicted_match = predictions[predictions['predicted'] == '1'] predicted_match = predicted_match.reset_index(drop=True) sim_tensor_dict = build_col_pairs_sim_tensor_dict(predicted_match) md_discover(config, er_output_dir + r"\t_single_tuple.csv", md_output_dir + r"\mds.txt") md_paths = [md_output_dir + r'\mds.txt'] md_list = load_mds(md_paths) # 从全局变量中读取所有的md epl_match = 0 # 可解释,预测match if sim_tensor_dict: if len(md_list) > 0: for row in tqdm(predicted_match.itertuples()): x = is_explicable(row, md_list, sim_tensor_dict) if x > 0: predicted_match.loc[row[0], 'confidence'] = x epl_match += 1 interpretability = epl_match / len(predicted_match) # 可解释性 evaluation['interpretability'] = interpretability else: interpretability = 0 evaluation['interpretability'] = interpretability f1 = evaluation['F1 %'] / 100 performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 evaluation['performance'] = performance print(f'\033[33mEvaluation: {evaluation}\033[0m') predicted_match.to_csv(er_output_dir + r'\predicted_match.csv', sep=',', index=False, header=True) ################################################################################################################ print(f'\033[33mTime consumed by ML-ER in seconds: {time() - start}\033[0m') return evaluation def ml_er(config: Configuration = None): indicators = er_process(config) output_path = er_output_dir + r"\eval_result.txt" with open(output_path, 'w') as f_: for key, value in indicators: f_.write(key + " : " + value) f_.write('\n') # if __name__ == '__main__': # if os.path.isfile(hpo_output_dir + r"\incumbent.json"): # with open(hpo_output_dir + r"\configspace.json", 'r') as f: # dict_configspace = json.load(f) # str_configspace = json.dumps(dict_configspace) # configspace = csj.read(str_configspace) # with open(hpo_output_dir + r"\incumbent.json", 'r') as f: # dic = json.load(f) # configuration = ConfigSpace.Configuration(configspace, values=dic) # ml_er(configuration) if __name__ == '__main__': with open(hpo_output_dir + r"\configspace.json", 'r') as f: dict_configspace = json.load(f) str_configspace = json.dumps(dict_configspace) configspace = csj.read(str_configspace) with open(hpo_output_dir + r"\fuck.json", 'r') as f: dic = json.load(f) configuration = ConfigSpace.Configuration(configspace, values=dic) ml_er(configuration)