import multiprocessing import pandas as pd import Levenshtein import copy conf_thresh = 0.8 def my_Levenshtein_ratio(str1, str2): if max(len(str1), len(str2)) == 0: return 1 return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2)) def if_minimal(md, md_list, target_col): # 假设这个md是minimal minimal = True if md_list.count(md) > 1: return False for _ in md_list: if _ != md: # 假设列表中每一个md都使当前md不minimal exist = True # 如果左边任何一个大于,则假设不成立 for col in list(set(_.keys()) - {target_col}): if _[col] > md[col]: exist = False # 如果右边小于,假设也不成立 if _[target_col] < md[target_col]: exist = False # 任何一次假设成立,当前md不minimal if exist: minimal = False break return minimal def remove_by_confidence(md, l, relation, target_col, lock): support, confidence = get_one_md_metadata(md, relation, target_col) # todo: replace constant 0.8 if confidence < 0.8: with lock: l.remove(md) # def remove_by_confidence(md, l, relation, target_col): # boolean, conf = satisfy_confidence(md, relation, 0.8, target_col) # if not boolean: # l.remove(md) # print(md, '\t', conf) def inference_from_record_pairs(path, threshold, target_col): data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') data.fillna("", inplace=True) data = data.astype(str) columns = data.columns.values.tolist() md_list = [] minimal_vio = [] init_md = {} for col in columns: init_md[col] = 1 if col == target_col else 0 md_list.append(init_md) for row1 in data.itertuples(): # 获取当前行的索引,从后一行开始切片 i = row1[0] data1 = data[i + 1:] for row2 in data1.itertuples(): violated_mds = [] # sims是两行的相似度 sims = {} for col in columns: similarity = my_Levenshtein_ratio(getattr(row1, col), getattr(row2, col)) sims[col] = similarity # 寻找violated md,从md列表中删除并加入vio列表 # tmp_md_list = copy.deepcopy(md_list) for md in md_list[:]: lhs_satis = True rhs_satis = True for col in list(set(columns) - {target_col}): if sims[col] < md[col]: lhs_satis = False if sims[target_col] < md[target_col]: rhs_satis = False if lhs_satis == True and rhs_satis == False: md_list.remove(md) violated_mds.append(md) minimal_vio.extend(violated_mds) for vio_md in violated_mds: # 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值 # if sims[target_col] >= threshold: # new_rhs = sims[target_col] # spec_r_md = copy.deepcopy(vio_md) # spec_r_md[target_col] = new_rhs # if if_minimal(spec_r_md, md_list, target_col): # md_list.append(spec_r_md) # 特殊化左侧 for col in list(set(columns) - {target_col}): if sims[col] + 0.001 <= 1: spec_l_md = copy.deepcopy(vio_md) spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.001 if if_minimal(spec_l_md, md_list, target_col): md_list.append(spec_l_md) # tmp_minimal_vio = copy.deepcopy(minimal_vio) for vio in minimal_vio[:]: if not if_minimal(vio, md_list, target_col): minimal_vio.remove(vio) manager = multiprocessing.Manager() lock = manager.Lock() if len(minimal_vio) == 0: return md_list, [] pool = multiprocessing.Pool(len(minimal_vio)) # tmp = copy.deepcopy(minimal_vio) with manager: proxy_minimal_vio = manager.list(minimal_vio) for _ in minimal_vio[:]: pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, target_col, lock)) pool.close() pool.join() minimal_vio = list(proxy_minimal_vio) for _ in minimal_vio[:]: if not if_minimal(_, minimal_vio, target_col): minimal_vio.remove(_) for _ in md_list[:]: if not if_minimal(_, md_list, target_col): md_list.remove(_) return md_list, minimal_vio def get_mds_metadata(md_list, dataset_path, target_col): data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1') data.fillna("", inplace=True) data = data.astype(str) manager = multiprocessing.Manager() if len(md_list) == 0: return [] pool = multiprocessing.Pool(len(md_list)) result = [] with manager: for _ in md_list: task = pool.apply_async(get_one_md_metadata, args=(_, data, target_col)) support, confidence = task.get() result.append({"md": _, "support": support, "confidence": confidence}) pool.close() pool.join() return result def get_one_md_metadata(md, dataframe, target_col): support = 0 pre_confidence = 0 for row1 in dataframe.itertuples(): i = row1[0] df_slice = dataframe[i + 1:] for row2 in df_slice.itertuples(): left_satisfy = True both_satisfy = True for col in dataframe.columns.values.tolist(): sim = my_Levenshtein_ratio(getattr(row1, col), getattr(row2, col)) if col == target_col: if sim < 1: both_satisfy = False else: if sim < md[col]: left_satisfy = False both_satisfy = False if left_satisfy: support += 1 if both_satisfy: pre_confidence += 1 confidence = 0 if support == 0 else pre_confidence / support # return {"md": md, "support": support, "confidence": confidence} return support, confidence