import json import os import pickle import time import deepmatcher as dm import torch import pandas as pd import ConfigSpace from ConfigSpace import Configuration from ConfigSpace.read_and_write import json as csj import torch.nn.functional from colorama import init, Fore from tqdm import tqdm from setting import * def matching(config): # init(autoreset=True) print(Fore.BLUE + f'Config: {config}') with open(md_output_dir + r"\mds.pickle", "rb") as file: md_list = pickle.load(file) train, valid, test = dm.data.process( path=directory_path, train='train_whole.csv', validation='valid_whole.csv', test='test_whole.csv', use_magellan_convention=True, # 与Magellan命名风格相同 embeddings=config['embeddings']) # train_table = train.get_raw_table() # test_table = test.get_raw_table() # valid_table = valid.get_raw_table() attr_summarizer = config['attr_summarizer'] if attr_summarizer == 'sif': model_ = dm.MatchingModel( attr_comparator=config['attr_comparator'], classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[ 'classifier_nonlinear'], attr_summarizer=dm.attr_summarizers.SIF(word_contextualizer=config['word_contextualizer'], word_comparator=config['word_comparator'], word_aggregator=config['word_aggregator'])) elif attr_summarizer == 'rnn': model_ = dm.MatchingModel( attr_comparator=config['attr_comparator'], classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[ 'classifier_nonlinear'], attr_summarizer=dm.attr_summarizers.RNN(word_contextualizer=config['word_contextualizer'], word_comparator=config['word_comparator'], word_aggregator=config['word_aggregator'])) elif attr_summarizer == 'attention': model_ = dm.MatchingModel( attr_comparator=config['attr_comparator'], classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[ 'classifier_nonlinear'], attr_summarizer=dm.attr_summarizers.Attention(word_contextualizer=config['word_contextualizer'], word_comparator=config['word_comparator'], word_aggregator=config['word_aggregator'])) else: # 'hybrid' model_ = dm.MatchingModel( attr_comparator=config['attr_comparator'], classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[ 'classifier_nonlinear'], attr_summarizer=dm.attr_summarizers.Hybrid(word_contextualizer=config['word_contextualizer'], word_comparator=config['word_comparator'], word_aggregator=config['word_aggregator'])) model_.run_train( train, valid, device='cuda', epochs=10, batch_size=16, best_save_path=attr_summarizer + '_model.pth', pos_neg_ratio=3) indicators = {} f1_score = model_.run_eval(test, device='cuda') indicators["F1"] = f1_score.item() / 100 predictions = model_.run_prediction(test, device='cuda', output_attributes=True) # predictions中没有predicted列, 根据match_score手动新增 deepmatcher在计算F1时的阈值为0.5 predictions['predicted'] = predictions['match_score'].apply(lambda score: 1 if score >= 0.5 else 0) predictions = predictions.reset_index(drop=True) predictions = predictions.astype(str) # 目前predictions包含的属性:左右表全部属性+label+predicted+match_score+_id sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions) predictions['confidence'] = 0 predictions['md'] = '' epl_match = 0 # 可解释,预测match if len(md_list) > 0: for row in tqdm(predictions.itertuples()): conf, md_dict = is_explicable(row, md_list, sim_tensor_dict) if conf > 0 and str(getattr(row, 'predicted')) == str(1): predictions.loc[row[0], 'confidence'] = conf predictions.loc[row[0], 'md'] = str(md_dict) epl_match += 1 df = predictions[predictions['predicted'] == str(1)] interpretability = epl_match / len(df) # 可解释性 indicators['interpretability'] = interpretability performance = interpre_weight * interpretability + (1 - interpre_weight) * indicators["F1"] indicators['performance'] = performance print(Fore.BLUE + f'ER Indicators: {indicators}') predictions.to_csv(er_output_dir + r'\predictions.csv', sep=',', index=False, header=True) print(Fore.CYAN + f'Finish Time: {time.time()}') return indicators def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame): predictions_attrs = predictions.columns.values.tolist() col_tuple_list = [] for _ in predictions_attrs: if _.startswith('ltable'): left_index = predictions_attrs.index(_) right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_')) col_tuple_list.append((left_index, right_index)) length = predictions.shape[0] # width = predictions.shape[1] predictions = predictions.reset_index(drop=True) sentences = predictions.values.flatten(order='F').tolist() embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True) split_embedding = torch.split(embedding, length, dim=0) table_tensor = torch.stack(split_embedding, dim=0, out=None) # prediction的归一化嵌入张量 norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2) sim_tensor_dict = {} for col_tuple in col_tuple_list: lattr_tensor = norm_table_tensor[col_tuple[0]] rattr_tensor = norm_table_tensor[col_tuple[1]] mul_tensor = lattr_tensor * rattr_tensor sim_tensor = torch.sum(mul_tensor, 1) sim_tensor = torch.round(sim_tensor * 100) / 100 sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor return sim_tensor_dict def is_explicable(row, all_mds: list, st_dict): attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段 for md_tuple in all_mds: explicable = True # 假设这条md能解释当前元组 for a in attrs: if st_dict[a][row[0]].item() < md_tuple[0][a]: explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组 break # 不再与当前md的其他相似度阈值比较,跳转到下一条md if explicable: return md_tuple[2], md_tuple[0] # 任意一条md能解释,直接返回 return -1.0, {} # 遍历结束,不能解释 def ml_er(config: Configuration): indicators = matching(config) output_path = er_output_dir + r"\eval_result.txt" with open(output_path, 'w') as _f: _f.write('F1:' + str(indicators["F1"]) + '\n') _f.write('interpretability:' + str(indicators['interpretability']) + '\n') _f.write('performance:' + str(indicators['performance']) + '\n') if __name__ == '__main__': if os.path.isfile(hpo_output_dir + r"\incumbent.json"): with open(hpo_output_dir + r"\configspace.json", 'r') as f: dict_configspace = json.load(f) str_configspace = json.dumps(dict_configspace) configspace = csj.read(str_configspace) with open(hpo_output_dir + r"\incumbent.json", 'r') as f: dic = json.load(f) configuration = ConfigSpace.Configuration(configspace, values=dic) ml_er(configuration)