diff --git a/multi_process_infer_by_pairs.py b/functions/multi_process_infer_by_pairs.py similarity index 90% rename from multi_process_infer_by_pairs.py rename to functions/multi_process_infer_by_pairs.py index 911b402..199d4ef 100644 --- a/multi_process_infer_by_pairs.py +++ b/functions/multi_process_infer_by_pairs.py @@ -6,13 +6,18 @@ import copy conf_thresh = 0.8 + def my_Levenshtein_ratio(str1, str2): + if max(len(str1), len(str2)) == 0: + return 1 return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2)) def if_minimal(md, md_list, target_col): # 假设这个md是minimal minimal = True + if md_list.count(md) > 1: + return False for _ in md_list: if _ != md: # 假设列表中每一个md都使当前md不minimal @@ -48,6 +53,7 @@ def remove_by_confidence(md, l, relation, target_col, lock): def inference_from_record_pairs(path, threshold, target_col): data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') + data.fillna("", inplace=True) data = data.astype(str) columns = data.columns.values.tolist() @@ -71,7 +77,8 @@ def inference_from_record_pairs(path, threshold, target_col): sims[col] = similarity # 寻找violated md,从md列表中删除并加入vio列表 - for md in md_list: + # tmp_md_list = copy.deepcopy(md_list) + for md in md_list[:]: lhs_satis = True rhs_satis = True for col in list(set(columns) - {target_col}): @@ -101,33 +108,39 @@ def inference_from_record_pairs(path, threshold, target_col): if if_minimal(spec_l_md, md_list, target_col): md_list.append(spec_l_md) - for vio in minimal_vio: + # tmp_minimal_vio = copy.deepcopy(minimal_vio) + for vio in minimal_vio[:]: if not if_minimal(vio, md_list, target_col): minimal_vio.remove(vio) manager = multiprocessing.Manager() lock = manager.Lock() if len(minimal_vio) == 0: - return [], [] + return md_list, [] pool = multiprocessing.Pool(len(minimal_vio)) - tmp = copy.deepcopy(minimal_vio) + # tmp = copy.deepcopy(minimal_vio) with manager: proxy_minimal_vio = manager.list(minimal_vio) - for _ in tmp: + for _ in minimal_vio[:]: pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, target_col, lock)) pool.close() pool.join() minimal_vio = list(proxy_minimal_vio) - for _ in tmp: + for _ in minimal_vio[:]: if not if_minimal(_, minimal_vio, target_col): minimal_vio.remove(_) + for _ in md_list[:]: + if not if_minimal(_, md_list, target_col): + md_list.remove(_) + return md_list, minimal_vio def get_mds_metadata(md_list, dataset_path, target_col): data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1') + data.fillna("", inplace=True) data = data.astype(str) manager = multiprocessing.Manager()