diff --git a/entrance.py b/entrance.py index 403c4ac..49414d2 100644 --- a/entrance.py +++ b/entrance.py @@ -1,5 +1,5 @@ # this is the entrance of the auto-ER procedure -from md_discovery.script.md_discover import md_discover +from md_discovery.md_discover import md_discover from ml_er.ml_entity_resolver import ml_er from hpo.er_model_hpo import ml_er_hpo from settings import * @@ -7,7 +7,6 @@ from settings import * def run(rounds: int): hp_config = None - # while The termination condition is not met iter_round = 1 for i in range(0, rounds): ml_er(iter_round, hp_config) @@ -19,9 +18,7 @@ def run(rounds: int): if __name__ == '__main__': - path = 'md_discovery/output' # todo - # 距离度量用户可设置? # 使用drop删除特征向量中的列?(如删除id相关特征) run(1) # 迭代3轮 # ml_er(1) diff --git a/hpo/er_model_hpo.py b/hpo/er_model_hpo.py index a1437e5..e44b9d8 100644 --- a/hpo/er_model_hpo.py +++ b/hpo/er_model_hpo.py @@ -174,8 +174,8 @@ class Classifier: # 默认路径为 "../md_discovery/output/xxx.txt" # 真阳/假阴 mds/vio 共4个md文件 - md_paths = ['md_discovery/output/tp_mds.txt', 'md_discovery/output/tp_vio.txt', - 'md_discovery/output/fn_mds.txt', 'md_discovery/output/fn_vio.txt'] + md_paths = [md_output_dir + 'tp_mds.txt', md_output_dir + 'tp_vio.txt', + md_output_dir + 'fn_mds.txt', md_output_dir + 'fn_vio.txt'] epl_match = 0 # 可解释,预测match nepl_mismatch = 0 # 不可解释,预测mismatch md_list = load_mds(md_paths) # 从全局变量中读取所有的md @@ -188,11 +188,11 @@ class Classifier: if getattr(line, 'predicted') == 0: nepl_mismatch += 1 interpretability = (epl_match + nepl_mismatch) / len(predictions) # 可解释性 - # if indicators["my_recall"] >= 0.8: + # if indicators["block_recall"] >= 0.8: # f1 = indicators["F1"] # else: - # f1 = (2.0 * indicators["precision"] * indicators["my_recall"]) / (indicators["precision"] + indicators["my_recall"]) - if indicators["my_recall"] < 0.8: + # f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (indicators["precision"] + indicators["block_recall"]) + if indicators["block_recall"] < 0.8: return 1 f1 = indicators["F1"] performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 @@ -232,3 +232,7 @@ def ml_er_hpo(): print(f"Optimized_Configuration:{incumbent.values()}") return incumbent + + +if __name__ == '__main__': + print(1) diff --git a/md_discovery/functions/__init__.py b/md_discovery/functions/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/md_discovery/functions/multi_process_infer_by_pairs.py b/md_discovery/functions/multi_process_infer_by_pairs.py deleted file mode 100644 index 4100214..0000000 --- a/md_discovery/functions/multi_process_infer_by_pairs.py +++ /dev/null @@ -1,187 +0,0 @@ -import multiprocessing -import pandas as pd -import Levenshtein -import copy - - -conf_thresh = 0.8 - - -def my_Levenshtein_ratio(str1, str2): - if max(len(str1), len(str2)) == 0: - return 1 - return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2)) - - -def if_minimal(md, md_list, target_col): - # 假设这个md是minimal - minimal = True - if md_list.count(md) > 1: - return False - for _ in md_list: - if _ != md: - # 假设列表中每一个md都使当前md不minimal - exist = True - # 如果左边任何一个大于,则假设不成立 - for col in list(set(_.keys()) - {target_col}): - if _[col] > md[col]: - exist = False - # 如果右边小于,假设也不成立 - if _[target_col] < md[target_col]: - exist = False - # 任何一次假设成立,当前md不minimal - if exist: - minimal = False - break - return minimal - - -def remove_by_confidence(md, l, relation, target_col, lock): - support, confidence = get_one_md_metadata(md, relation, target_col) - if confidence < 0.8: - with lock: - l.remove(md) - - -# def remove_by_confidence(md, l, relation, target_col): -# boolean, conf = satisfy_confidence(md, relation, 0.8, target_col) -# if not boolean: -# l.remove(md) -# print(md, '\t', conf) - - -def inference_from_record_pairs(path, threshold, target_col): - data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') - data.fillna("", inplace=True) - data = data.astype(str) - columns = data.columns.values.tolist() - - md_list = [] - minimal_vio = [] - init_md = {} - for col in columns: - init_md[col] = 1 if col == target_col else 0 - md_list.append(init_md) - - for row1 in data.itertuples(): - # 获取当前行的索引,从后一行开始切片 - i = row1[0] - data1 = data[i + 1:] - for row2 in data1.itertuples(): - violated_mds = [] - # sims是两行的相似度 - sims = {} - for col in columns: - similarity = my_Levenshtein_ratio(getattr(row1, col), getattr(row2, col)) - sims[col] = similarity - - # 寻找violated md,从md列表中删除并加入vio列表 - # tmp_md_list = copy.deepcopy(md_list) - for md in md_list[:]: - lhs_satis = True - rhs_satis = True - for col in list(set(columns) - {target_col}): - if sims[col] < md[col]: - lhs_satis = False - if sims[target_col] < md[target_col]: - rhs_satis = False - if lhs_satis == True and rhs_satis == False: - md_list.remove(md) - violated_mds.append(md) - minimal_vio.extend(violated_mds) - - for vio_md in violated_mds: - # 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值 - # if sims[target_col] >= threshold: - # new_rhs = sims[target_col] - # spec_r_md = copy.deepcopy(vio_md) - # spec_r_md[target_col] = new_rhs - # if if_minimal(spec_r_md, md_list, target_col): - # md_list.append(spec_r_md) - - # 特殊化左侧 - for col in list(set(columns) - {target_col}): - if sims[col] + 0.001 <= 1: - spec_l_md = copy.deepcopy(vio_md) - spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.001 - if if_minimal(spec_l_md, md_list, target_col): - md_list.append(spec_l_md) - - # tmp_minimal_vio = copy.deepcopy(minimal_vio) - for vio in minimal_vio[:]: - if not if_minimal(vio, md_list, target_col): - minimal_vio.remove(vio) - - manager = multiprocessing.Manager() - lock = manager.Lock() - if len(minimal_vio) == 0: - return md_list, [] - pool_size = len(minimal_vio) if len(minimal_vio) < 61 else 60 - pool = multiprocessing.Pool(pool_size) - # tmp = copy.deepcopy(minimal_vio) - with manager: - proxy_minimal_vio = manager.list(minimal_vio) - for _ in minimal_vio[:]: - pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, target_col, lock)) - pool.close() - pool.join() - minimal_vio = list(proxy_minimal_vio) - - for _ in minimal_vio[:]: - if not if_minimal(_, minimal_vio, target_col): - minimal_vio.remove(_) - - for _ in md_list[:]: - if not if_minimal(_, md_list, target_col): - md_list.remove(_) - - return md_list, minimal_vio - - -def get_mds_metadata(md_list, dataset_path, target_col): - data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1') - data.fillna("", inplace=True) - data = data.astype(str) - - manager = multiprocessing.Manager() - if len(md_list) == 0: - return [] - pool_size = len(md_list) if len(md_list) < 61 else 60 - pool = multiprocessing.Pool(pool_size) - result = [] - with manager: - for _ in md_list: - task = pool.apply_async(get_one_md_metadata, args=(_, data, target_col)) - support, confidence = task.get() - result.append({"md": _, "support": support, "confidence": confidence}) - pool.close() - pool.join() - return result - - -def get_one_md_metadata(md, dataframe, target_col): - support = 0 - pre_confidence = 0 - for row1 in dataframe.itertuples(): - i = row1[0] - df_slice = dataframe[i + 1:] - for row2 in df_slice.itertuples(): - left_satisfy = True - both_satisfy = True - for col in dataframe.columns.values.tolist(): - sim = my_Levenshtein_ratio(getattr(row1, col), getattr(row2, col)) - if col == target_col: - if sim < 1: - both_satisfy = False - else: - if sim < md[col]: - left_satisfy = False - both_satisfy = False - if left_satisfy: - support += 1 - if both_satisfy: - pre_confidence += 1 - - confidence = 0 if support == 0 else pre_confidence / support - # return {"md": md, "support": support, "confidence": confidence} - return support, confidence diff --git a/md_discovery/script/md_discover.py b/md_discovery/md_discover.py similarity index 81% rename from md_discovery/script/md_discover.py rename to md_discovery/md_discover.py index c3b9908..043bcdd 100644 --- a/md_discovery/script/md_discover.py +++ b/md_discovery/md_discover.py @@ -1,6 +1,5 @@ -import time -from md_discovery.functions.multi_process_infer_by_pairs import inference_from_record_pairs -from md_discovery.functions.multi_process_infer_by_pairs import get_mds_metadata +from md_discovery.multi_process_infer_by_pairs import inference_from_record_pairs +from md_discovery.multi_process_infer_by_pairs import get_mds_metadata from settings import * # # 若不输出support和confidence,使用以下两块代码 @@ -19,8 +18,8 @@ from settings import * def md_discover(): # 目前可以仿照这个main函数写 - tp_single_tuple_path = "ml_er/output/tp_single_tuple.csv" - fn_single_tuple_path = "ml_er/output/fn_single_tuple.csv" + tp_single_tuple_path = er_output_dir + "tp_single_tuple.csv" + fn_single_tuple_path = er_output_dir + "fn_single_tuple.csv" # 输入:csv文件路径,md左侧相似度阈值,md右侧目标字段 # 输出:2个md列表,列表1中md无violation,列表2中md有violation但confidence满足阈值(0.8) # 例如此处输入参数要求md左侧相似度字段至少为0.7,右侧指向'id'字段 @@ -36,8 +35,8 @@ def md_discover(): # 若输出support和confidence,使用以下两块代码 # 将列表1写入本地,路径需自己修改 - tp_mds_path = "md_discovery/output/tp_mds.txt" - tp_vio_path = "md_discovery/output/tp_vio.txt" + tp_mds_path = md_output_dir + "tp_mds.txt" + tp_vio_path = md_output_dir + "tp_vio.txt" with open(tp_mds_path, 'w') as f: for _ in tp_mds_meta: @@ -51,8 +50,8 @@ def md_discover(): f.write(i + ':' + str(_[i]) + '\t') f.write('\n') - fn_mds_path = "md_discovery/output/fn_mds.txt" - fn_vio_path = "md_discovery/output/fn_vio.txt" + fn_mds_path = md_output_dir + "fn_mds.txt" + fn_vio_path = md_output_dir + "fn_vio.txt" with open(fn_mds_path, 'w') as f: for _ in fn_mds_meta: @@ -65,3 +64,7 @@ def md_discover(): for i in _.keys(): f.write(i + ':' + str(_[i]) + '\t') f.write('\n') + + +if __name__ == '__main__': + md_discover() diff --git a/md_discovery/multi_process_infer_by_pairs.py b/md_discovery/multi_process_infer_by_pairs.py new file mode 100644 index 0000000..b40970f --- /dev/null +++ b/md_discovery/multi_process_infer_by_pairs.py @@ -0,0 +1,273 @@ +import multiprocessing +import pandas as pd +import Levenshtein +import copy +import numpy as np +import time +import torch +from tqdm import tqdm +from transformers import AutoTokenizer, AutoModel +from settings import model, embedding_dict +from sentence_transformers.util import cos_sim + +conf_thresh = 0.8 + + +def my_Levenshtein_ratio(str1, str2): + if max(len(str1), len(str2)) == 0: + return 1 + return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2)) + + +def norm_cos_sim(embed1, embed2): + sim = cos_sim(embed1, embed2) + return sim.tolist()[0][0]/2 + 0.5 + + +def table_encode(tp_path, fn_path): + embedding_dic = {} + + tp_data = pd.read_csv(tp_path, low_memory=False, encoding='ISO-8859-1') + tp_data.fillna("", inplace=True) + tp_data = tp_data.astype(str) + tp_length = tp_data.shape[0] + tp_width = tp_data.shape[1] + tp_sentences = [] + + for row in range(0, tp_length): + for col in range(0, tp_width): + cell_value = tp_data.values[row, col] + tp_sentences.append(cell_value) + tp_embedding = model.encode(tp_sentences, convert_to_tensor=True) + + for row in range(0, tp_length): + for col in range(0, tp_width): + cell_value = tp_data.values[row, col] + embedding_dic[cell_value] = tp_embedding.tolist()[row * tp_width + col] + + + fn_data = pd.read_csv(fn_path, low_memory=False, encoding='ISO-8859-1') + fn_data.fillna("", inplace=True) + fn_data = fn_data.astype(str) + fn_length = fn_data.shape[0] + fn_width = fn_data.shape[1] + fn_sentences = [] + + for row in range(0, fn_length): + for col in range(0, fn_width): + cell_value = fn_data.values[row, col] + fn_sentences.append(cell_value) + fn_embedding = model.encode(fn_sentences, convert_to_tensor=True) + + for row in range(0, fn_length): + for col in range(0, fn_width): + cell_value = fn_data.values[row, col] + embedding_dic[cell_value] = fn_embedding.tolist()[row * fn_width + col] + + np.save('embedding_dic.npy', embedding_dic) + + +def test_table_encode(): + start = time.time() + table_encode('../ml_er/output/tp_single_tuple.csv', '../ml_er/output/fn_single_tuple.csv') + print(time.time()-start) + + +def test_load(): + load_dict = np.load('embedding_dic.npy', allow_pickle=True).item() + a = load_dict['model- bdcd00105wi venor- bitdefender features- bitdefender antivirus v10- small box antivirus v10 delivers a one-two security punch integrating todays most powerful antivirus and antispyware modules into one convenient package. its easy to use and updates itself automatically making it truly an install and forget solution. * antivirus the purpose of the antivirus module is to ensure detection and removal of all viruses in the wild. bitdefender antivirus uses robust scan engines certified by icsa labs virus bulletin checkmark checkvir and tuv. - improved proactive detection b-have (behavioral heuristic analyzer in virtual environments) emulates a virtual computer-inside-a-computer where pieces of software are run in order to check for potential malware behavior. this bitdefender proprietary technology represents a new security layer that keeps the operating system safe from unknown viruses by detecting malicious pieces of code for which signatures have not yet been released. - permanent antivirus protection the new and improved bitdefender scanning engines will scan and disinfect infected files on access minimizing data loss. infected documents can now be recovered instead of being deleted. - new rootkit detection and removal a new bitdefender module looks for rootkits (malicious programs designed to control victim computers while staying hidden) and removes them on detection. - new web scanning web traffic is now filtered in real-time even before reaching your browser providing a safe and enjoyable web experience. - peer-2-peer and im applications protection filters against viruses that spread'] + print(a) + print(1) +# def test_lm_similarity(): +# print(time.time()) +# sentences = ['fun with reading & writing! is designed to help kids learn to read and write better through exercises puzzle-solving creative writing decoding and more!', +# 'based on the tween lifestyle brand launched in 2004 this action/adventure game will contain loads of adventures tailored specifically to the player\'s personality type. the evergirl brand features a clothing and accessories line with a companion web ...'] +# embeddings = model.encode(sentences, convert_to_tensor=True) +# print(time.time()) +# sim = cos_sim(embeddings[0], embeddings[1]) +# print(time.time()) +# # print(sim.tolist()[0][0]/2 + 0.5) + + +def if_minimal(md, md_list, target_col): + # 假设这个md是minimal + minimal = True + if len(md_list) == 0: + return True + if md_list.count(md) > 1: + return False + for _ in md_list: + if _ != md: + # 假设列表中每一个md都使当前md不minimal + exist = True + # 如果左边任何一个大于,则假设不成立 + for col in list(set(_.keys()) - {target_col}): + if _[col] > md[col]: + exist = False + # 如果右边小于,假设也不成立 + if _[target_col] < md[target_col]: + exist = False + # 任何一次假设成立,当前md不minimal + if exist: + minimal = False + break + return minimal + + +def remove_by_confidence(md, l, relation, target_col, lock): + support, confidence = get_one_md_metadata(md, relation, target_col) + if confidence < 0.8: + with lock: + l.remove(md) + + +# def remove_by_confidence(md, l, relation, target_col): +# boolean, conf = satisfy_confidence(md, relation, 0.8, target_col) +# if not boolean: +# l.remove(md) +# print(md, '\t', conf) + + +def inference_from_record_pairs(path, threshold, target_col): + data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') + data.fillna("", inplace=True) + data = data.astype(str) + columns = data.columns.values.tolist() + + md_list = [] + minimal_vio = [] + init_md = {} + for col in columns: + init_md[col] = 1 if col == target_col else 0 + md_list.append(init_md) + + for row1 in data.itertuples(): + # 获取当前行的索引,从后一行开始切片 + i = row1[0] + data1 = data[i + 1:] + for row2 in data1.itertuples(): + violated_mds = [] + # sims是两行的相似度 + sims = {} + for col in columns: + similarity = norm_cos_sim(embedding_dict[getattr(row1, col)], embedding_dict[getattr(row2, col)]) + sims[col] = similarity + + # 寻找violated md,从md列表中删除并加入vio列表 + # tmp_md_list = copy.deepcopy(md_list) + for md in md_list[:]: + lhs_satis = True + rhs_satis = True + for col in list(set(columns) - {target_col}): + if sims[col] + 0.0000001 < md[col]: + lhs_satis = False + break + if sims[target_col] + 0.0000001 < md[target_col]: + rhs_satis = False + if lhs_satis == True and rhs_satis == False: + md_list.remove(md) + violated_mds.append(md) + minimal_vio.extend(violated_mds) + + for vio_md in violated_mds: + # 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值 + # if sims[target_col] >= threshold: + # new_rhs = sims[target_col] + # spec_r_md = copy.deepcopy(vio_md) + # spec_r_md[target_col] = new_rhs + # if if_minimal(spec_r_md, md_list, target_col): + # md_list.append(spec_r_md) + + # 特殊化左侧 + for col in list(set(columns) - {target_col}): + if sims[col] + 0.01 <= 1: + spec_l_md = copy.deepcopy(vio_md) + spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.01 + if if_minimal(spec_l_md, md_list, target_col): + md_list.append(spec_l_md) + + for vio in minimal_vio[:]: + if not if_minimal(vio, md_list, target_col): + minimal_vio.remove(vio) + + # fuck = len(minimal_vio) + # tmp = [] + # for _ in minimal_vio: + # if _ not in tmp: + # tmp.append(_) + # minimal_vio = tmp + + # manager = multiprocessing.Manager() + # lock = manager.Lock() + # if len(minimal_vio) == 0: + # return md_list, [] + # pool_size = len(minimal_vio) if len(minimal_vio) < 16 else 16 + # pool = multiprocessing.Pool(pool_size) + # # tmp = copy.deepcopy(minimal_vio) + # with manager: + # proxy_minimal_vio = manager.list(minimal_vio) + # for _ in minimal_vio[:]: + # pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, target_col, lock)) + # pool.close() + # pool.join() + # minimal_vio = list(proxy_minimal_vio) + # + # for _ in minimal_vio[:]: + # if not if_minimal(_, minimal_vio, target_col): + # minimal_vio.remove(_) + # + # for _ in md_list[:]: + # if not if_minimal(_, md_list, target_col): + # md_list.remove(_) + + return md_list, minimal_vio + + +def get_mds_metadata(md_list, dataset_path, target_col): + data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1') + data.fillna("", inplace=True) + data = data.astype(str) + + manager = multiprocessing.Manager() + if len(md_list) == 0: + return [] + pool_size = len(md_list) if len(md_list) < 16 else 16 + pool = multiprocessing.Pool(pool_size) + result = [] + with manager: + for _ in md_list: + task = pool.apply_async(get_one_md_metadata, args=(_, data, target_col)) + support, confidence = task.get() + result.append({"md": _, "support": support, "confidence": confidence}) + pool.close() + pool.join() + return result + + +def get_one_md_metadata(md, dataframe, target_col): + support = 0 + pre_confidence = 0 + columns = dataframe.columns.values.tolist() + for row1 in dataframe.itertuples(): + i = row1[0] + df_slice = dataframe[i + 1:] + for row2 in df_slice.itertuples(): + left_satisfy = True + both_satisfy = True + for col in columns: + sim = norm_cos_sim(embedding_dict[getattr(row1, col)], embedding_dict[getattr(row2, col)]) + if col == target_col: + if sim + 0.0000001 < 1: + both_satisfy = False + else: + if sim + 0.0000001 < md[col]: + left_satisfy = False + both_satisfy = False + if left_satisfy: + support += 1 + if both_satisfy: + pre_confidence += 1 + + confidence = 0 if support == 0 else pre_confidence / support + # return {"md": md, "support": support, "confidence": confidence} + return support, confidence diff --git a/md_discovery/script/__init__.py b/md_discovery/script/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ml_er/Goods Dataset-8.14.py b/ml_er/Goods Dataset-8.14.py deleted file mode 100644 index be42eb3..0000000 --- a/ml_er/Goods Dataset-8.14.py +++ /dev/null @@ -1,153 +0,0 @@ -import sys - -from py_entitymatching.debugmatcher.debug_gui_utils import _get_metric - -sys.path.append('/home/w/PycharmProjects/py_entitymatching/py_entitymatching') - -import py_entitymatching as em -import py_entitymatching.catalog.catalog_manager as cm -import pandas as pd -import time -import six - - -def load_data(left_path: str, right_path: str, mapping_path: str): - left = pd.read_csv(left_path, encoding='ISO-8859-1') - cm.set_key(left, left.columns.values.tolist()[0]) - left.fillna("", inplace=True) - left = left.astype(str) - - right = pd.read_csv(right_path, encoding='ISO-8859-1') - cm.set_key(right, right.columns.values.tolist()[0]) - right.fillna("", inplace=True) - right = right.astype(str) - - mapping = pd.read_csv(mapping_path) - mapping = mapping.astype(str) - return left, right, mapping - - -if __name__ == '__main__': - # 读入公开数据,注册并填充空值 - path_Amazon = '/home/w/PycharmProjects/py_entitymatching/py_entitymatching/datasets/end-to-end/Amazon-GoogleProducts/Amazon.csv' - path_Google = '/home/w/PycharmProjects/py_entitymatching/py_entitymatching/datasets/end-to-end/Amazon-GoogleProducts/GoogleProducts.csv' - path_Mappings = '/home/w/PycharmProjects/py_entitymatching/py_entitymatching/datasets/end-to-end/Amazon-GoogleProducts/Amzon_GoogleProducts_perfectMapping.csv' - Amazon = pd.read_csv(path_Amazon, encoding='ISO-8859-1') - cm.set_key(Amazon, 'id') - Amazon.fillna("", inplace=True) - Google = pd.read_csv(path_Google, encoding='ISO-8859-1') - cm.set_key(Google, 'id') - Google.fillna("", inplace=True) - Mappings = pd.read_csv(path_Mappings) - - # 仅保留两表中出现在映射表中的行,增大正样本比例 - l_id_list = [] - r_id_list = [] - # 全部转为字符串 - Amazon = Amazon.astype(str) - Google = Google.astype(str) - Mappings = Mappings.astype(str) - for index, row in Mappings.iterrows(): - l_id_list.append(row["idAmazon"]) - r_id_list.append(row["idGoogleBase"]) - selected_Amazon = Amazon[Amazon['id'].isin(l_id_list)] - selected_Amazon = selected_Amazon.rename(columns={'title': 'name'}) - selected_Google = Google[Google['id'].isin(r_id_list)] - cm.set_key(selected_Amazon, 'id') - cm.set_key(selected_Google, 'id') - - ######################################################################### - # False-retain True-remove - def match_last_name(ltuple, rtuple): - l_last_name = ltuple['name'] - r_last_name = rtuple['name'] - if l_last_name != r_last_name: - return True - else: - return False - - bb = em.BlackBoxBlocker() - bb.set_black_box_function(match_last_name) - - Candidate = bb.block_tables(selected_Amazon, selected_Google, l_output_attrs=['id', 'name', 'description', 'manufacturer', 'price'], r_output_attrs=['id', 'name', 'description', 'manufacturer', 'price']) - ######################################################################### - # block 并将gold标记为0 - blocker = em.OverlapBlocker() - candidate = blocker.block_tables(selected_Amazon, selected_Google, 'name', 'name', - l_output_attrs=['id', 'name', 'description', 'manufacturer', 'price'], - r_output_attrs=['id', 'name', 'description', 'manufacturer', 'price'], - overlap_size=0, show_progress=False) - candidate['gold'] = 0 - - start = time.time() - candidate_match_rows = [] - for index, row in candidate.iterrows(): - l_id = row["ltable_id"] - map_row = Mappings[Mappings['idAmazon'] == l_id] - - if map_row is not None: - r_id = map_row["idGoogleBase"] - for value in r_id: - if value == row["rtable_id"]: - candidate_match_rows.append(row["_id"]) - else: - continue - for row in candidate_match_rows: - candidate.loc[row, 'gold'] = 1 - - # 裁剪负样本,保持正负样本数量一致 - candidate_mismatch = candidate[candidate['gold'] == 0] - candidate_match = candidate[candidate['gold'] == 1] - candidate_mismatch = candidate_mismatch.sample(n=len(candidate_match)) - # 拼接正负样本 - candidate_for_train_test = pd.concat([candidate_mismatch, candidate_match]) - cm.set_key(candidate_for_train_test, '_id') - cm.set_fk_ltable(candidate_for_train_test, 'ltable_id') - cm.set_fk_rtable(candidate_for_train_test, 'rtable_id') - cm.set_ltable(candidate_for_train_test, selected_Amazon) - cm.set_rtable(candidate_for_train_test, selected_Google) - - # 分为训练测试集 - sets = em.split_train_test(candidate_for_train_test, train_proportion=0.7, random_state=0) - train_set = sets['train'] - test_set = sets['test'] - - dt = em.DTMatcher(name='DecisionTree', random_state=0) - svm = em.SVMMatcher(name='SVM', random_state=0) - rf = em.RFMatcher(name='RF', random_state=0) - lg = em.LogRegMatcher(name='LogReg', random_state=0) - ln = em.LinRegMatcher(name='LinReg') - nb = em.NBMatcher(name='NaiveBayes') - feature_table = em.get_features_for_matching(selected_Amazon, selected_Google, validate_inferred_attr_types=False) - - train_feature_vecs = em.extract_feature_vecs(train_set, - feature_table=feature_table, - attrs_after='gold', - show_progress=False) - - result = em.select_matcher([dt, rf, svm, ln, lg, nb], table=train_feature_vecs, - exclude_attrs=['_id', 'ltable_id', 'rtable_id', 'gold'], - k=5, - target_attr='gold', metric_to_select_matcher='f1', random_state=0) - - test_feature_vecs = em.extract_feature_vecs(test_set, feature_table=feature_table, - attrs_after=['ltable_name', 'ltable_description', 'ltable_manufacturer', - 'ltable_price', 'rtable_name', 'rtable_description', - 'rtable_manufacturer', 'rtable_price', 'gold'], show_progress=False) - - rf.fit(table=train_feature_vecs, - exclude_attrs=['_id', 'ltable_id', 'rtable_id', 'gold'], - target_attr='gold') - predictions = rf.predict(table=test_feature_vecs, exclude_attrs=['_id', 'ltable_id', 'rtable_id', 'ltable_name', - 'ltable_description', 'ltable_manufacturer', - 'ltable_price', 'rtable_name', 'rtable_description', - 'rtable_manufacturer', 'rtable_price', 'gold'], - append=True, target_attr='predicted', inplace=False) - eval_result = em.eval_matches(predictions, 'gold', 'predicted') - em.print_eval_summary(eval_result) - - output_path = "output/eval_result" + str(time.time()) + ".txt" - with open(output_path, 'w') as f: - for key, value in six.iteritems(_get_metric(eval_result)): - f.write(key + " : " + value) - f.write('\n') diff --git a/ml_er/ml_entity_resolver.py b/ml_er/ml_entity_resolver.py index 291c175..1ac0e5e 100644 --- a/ml_er/ml_entity_resolver.py +++ b/ml_er/ml_entity_resolver.py @@ -3,18 +3,18 @@ import sys from py_entitymatching.debugmatcher.debug_gui_utils import _get_metric -sys.path.append('/home/w/PycharmProjects/py_entitymatching/py_entitymatching') - import py_entitymatching as em import py_entitymatching.catalog.catalog_manager as cm import pandas as pd import six from ConfigSpace import Configuration -from md_discovery.functions.multi_process_infer_by_pairs import my_Levenshtein_ratio +from md_discovery.multi_process_infer_by_pairs import my_Levenshtein_ratio, norm_cos_sim from settings import * -def process_prediction_for_md_discovery(pred: pd.DataFrame, tp_single_tuple_path: str = "ml_er/output/tp_single_tuple.csv", fn_single_tuple_path: str = "ml_er/output/fn_single_tuple.csv"): +def process_prediction_for_md_discovery(pred: pd.DataFrame, + tp_single_tuple_path: str = er_output_dir + "tp_single_tuple.csv", + fn_single_tuple_path: str = er_output_dir + "fn_single_tuple.csv"): # 提取预测表中真阳和假阴部分 tp = pred[(pred['gold'] == 1) & (pred['predicted'] == 1)] fn = pred[(pred['gold'] == 1) & (pred['predicted'] == 0)] @@ -81,9 +81,9 @@ def evaluate_prediction(df: pd.DataFrame, labeled_attr: str, predicted_attr: str precision = 0.0 if precision_denominator == 0.0 else num_true_positives / precision_denominator recall = 0.0 if recall_denominator == 0.0 else num_true_positives / recall_denominator F1 = 0.0 if precision == 0.0 and recall == 0.0 else (2.0 * precision * recall) / (precision + recall) - my_recall = num_true_positives / (matching_number * test_proportion) + block_recall = num_true_positives / (matching_number * test_proportion) - return {"precision": precision, "recall": recall, "F1": F1, "my_recall": my_recall} + return {"precision": precision, "recall": recall, "F1": F1, "block_recall": block_recall} def load_mds(paths: list) -> list: @@ -114,7 +114,8 @@ def is_explicable(row, all_mds: list) -> bool: explicable = True # 假设这条md能解释当前元组 for a in attrs: threshold = md[a] - if my_Levenshtein_ratio(str(getattr(row, 'ltable_'+a)), str(getattr(row, 'rtable_'+a))) < threshold: + if norm_cos_sim(embedding_dict[str(getattr(row, 'ltable_'+a))], + embedding_dict[str(getattr(row, 'rtable_'+a))]) < threshold: explicable = False # 任意一个字段的相似度达不到阈值,这条md就不能解释当前元组 break # 不再与当前md的其他相似度阈值比较,跳转到下一条md if explicable: @@ -277,8 +278,8 @@ def ml_er(iter_round: int, config: Configuration = None, ): predictions_attrs.extend(['gold', 'predicted']) predictions = predictions[predictions_attrs] - md_paths = ['md_discovery/output/tp_mds.txt', 'md_discovery/output/tp_vio.txt', - 'md_discovery/output/fn_mds.txt', 'md_discovery/output/fn_vio.txt'] + md_paths = [md_output_dir + 'tp_mds.txt', md_output_dir + 'tp_vio.txt', + md_output_dir + 'fn_mds.txt', md_output_dir + 'fn_vio.txt'] epl_match = 0 # 可解释,预测match nepl_mismatch = 0 # 不可解释,预测mismatch @@ -293,19 +294,23 @@ def ml_er(iter_round: int, config: Configuration = None, ): nepl_mismatch += 1 interpretability = (epl_match + nepl_mismatch) / len(predictions) # 可解释性 - if indicators["my_recall"] >= 0.8: + if indicators["block_recall"] >= 0.8: f1 = indicators["F1"] else: - f1 = (2.0 * indicators["precision"] * indicators["my_recall"]) / (indicators["precision"] + indicators["my_recall"]) + f1 = (2.0 * indicators["precision"] * indicators["block_recall"]) / (indicators["precision"] + indicators["block_recall"]) performance = interpre_weight * interpretability + (1 - interpre_weight) * f1 ################################################################################################################ process_prediction_for_md_discovery(predictions) - output_path = "ml_er/output/eval_result_" + str(iter_round) + ".txt" + output_path = er_output_dir + "eval_result_" + str(iter_round) + ".txt" with open(output_path, 'w') as f: for key, value in six.iteritems(_get_metric(eval_result)): f.write(key + " : " + value) f.write('\n') - f.write('my_recall:' + str(indicators["my_recall"]) + '\n') + f.write('block_recall:' + str(indicators["block_recall"]) + '\n') f.write('interpretability:' + str(interpretability) + '\n') f.write('performance:' + str(performance) + '\n') + + +if __name__ == '__main__': + ml_er(1) diff --git a/settings.py b/settings.py index 3319be4..fbcabe8 100644 --- a/settings.py +++ b/settings.py @@ -1,6 +1,9 @@ -ltable_path = 'datasets\\Amazon.csv' -rtable_path = 'datasets\\GoogleProducts.csv' -mapping_path = 'datasets\\Amzon_GoogleProducts_perfectMapping.csv' +from sentence_transformers import SentenceTransformer +import numpy as np + +ltable_path = 'E:\\Data\\Research\\Projects\\matching_dependency\\datasets\\Amazon.csv' +rtable_path = 'E:\\Data\\Research\\Projects\\matching_dependency\\datasets\\GoogleProducts.csv' +mapping_path = 'E:\\Data\\Research\\Projects\\matching_dependency\\datasets\\Amzon_GoogleProducts_perfectMapping.csv' mapping_lid = 'idAmazon' # mapping表中左表id名 mapping_rid = 'idGoogleBase' # mapping表中右表id名 ltable_id = 'id' # 左表id字段名称 @@ -8,5 +11,11 @@ rtable_id = 'id' # 右表id字段名称 target_attr = 'id' # 进行md挖掘时的目标字段 lr_attrs_map = {'title': 'name'} # 如果两个表中存在对应字段名称不一样的情况,将名称加入列表便于调整一致 similarity_threshold = 0.7 +support_threshold = 1 confidence_threshold = 0.8 interpre_weight = 0.3 # 可解释性权重 +er_output_dir = 'E:\\Data\\Research\\Projects\\matching_dependency\\ml_er\\output\\' +md_output_dir = 'E:\\Data\\Research\\Projects\\matching_dependency\\md_discovery\\output\\' +model = SentenceTransformer('E:\\Data\\Research\\Models\\paraphrase-MiniLM-L6-v2') +embedding_dict = np.load('E:\\Data\\Research\\Projects\\matching_dependency\\md_discovery\\embedding_dic.npy', + allow_pickle=True).item()