diff --git a/md_discovery/tmp_discover.py b/md_discovery/tmp_discover.py new file mode 100644 index 0000000..0edf1b9 --- /dev/null +++ b/md_discovery/tmp_discover.py @@ -0,0 +1,264 @@ +import multiprocessing +import time +from concurrent.futures import ProcessPoolExecutor +from multiprocessing.managers import SharedMemoryManager + +import numpy as np +import pandas +import pandas as pd +import Levenshtein +import copy + +import torch +from tqdm import tqdm + +from md_discovery.multi_process_infer_by_pairs import norm_cos_sim +from settings import embedding_dict, model + +conf_thresh = 0.8 + + +def my_Levenshtein_ratio(str1, str2): + if max(len(str1), len(str2)) == 0: + return 1 + return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2)) + + +def if_minimal(md, md_list, target_col): + # 假设这个md是minimal + if len(md_list) == 0: + return True + minimal = True + for _ in md_list: + if _ != md: + other_cols = list(set(_.keys()) - {target_col}) + # 假设列表中每一个md都使当前md不minimal + exist = True + # 如果左边任何一个大于,则假设不成立 + for col in other_cols: + if _[col] > md[col]: + exist = False + break + # 如果右边小于,假设也不成立 + if _[target_col] < md[target_col]: + exist = False + # 任何一次假设成立,当前md不minimal + if exist: + minimal = False + break + return minimal + + +def remove_by_confidence(md, md_list, relation, sim_tensor, target_col, lock): + support, confidence = get_one_md_metadata(md, relation, sim_tensor, target_col) + if confidence < 0.8: + with lock: + md_list.remove(md) + + +# def remove_by_confidence(md, l, relation, target_col): +# boolean, conf = satisfy_confidence(md, relation, 0.8, target_col) +# if not boolean: +# l.remove(md) +# print(md, '\t', conf) + +# def build_sim_matrix(): +# width +# return 0 +def inference_from_record_pairs(path, threshold, target_col): + data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1') + data.fillna("", inplace=True) + data = data.astype(str) + columns = data.columns.values.tolist() + cols_but_target = list(set(columns) - {target_col}) + length = data.shape[0] + width = data.shape[1] + + sentences = [] + for col in range(0, width): + for row in range(0, length): + cell_value = data.values[row, col] + sentences.append(cell_value) + embedding = model.encode(sentences, convert_to_tensor=True, device="cuda") + split_embedding = torch.split(embedding, length, dim=0) + table_tensor = torch.stack(split_embedding, dim=0, out=None) + norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2) + sim_tensor = torch.matmul(norm_table_tensor, norm_table_tensor.transpose(1, 2)) + sim_tensor = sim_tensor/2 + 0.5 + + torch.save(sim_tensor, "E:\\Data\\Research\\Projects\\matching_dependency\\tensor.pt") + + md_list = [] + minimal_vio = [] + init_md = {} + for col in columns: + init_md[col] = 1 if col == target_col else 0 + md_list.append(init_md) + + start = time.time() + for row1 in range(0, length - 1): + terminate = False + for row2 in range(row1 + 1, length): + violated_mds = [] + # sims是两行的相似度 + sims = {} + for col_index in range(0, width): + col = columns[col_index] + similarity = sim_tensor[col_index, row1, row2].item() + sims[col] = similarity + + # 寻找violated md,从md列表中删除并加入vio列表 + # tmp_md_list = copy.deepcopy(md_list) + for md in md_list[:]: + lhs_satis = True + rhs_satis = True + for col in list(set(columns) - {target_col}): + if sims[col] < md[col]: + lhs_satis = False + break + if sims[target_col] < md[target_col]: + rhs_satis = False + if lhs_satis == True and rhs_satis == False: + md_list.remove(md) + violated_mds.append(md) + + for vio_md in violated_mds: + # 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值 + # if sims[target_col] >= threshold: + # new_rhs = sims[target_col] + # spec_r_md = copy.deepcopy(vio_md) + # spec_r_md[target_col] = new_rhs + # if if_minimal(spec_r_md, md_list, target_col): + # md_list.append(spec_r_md) + # 特殊化左侧 + for col in list(set(columns) - {target_col}): + if sims[col] + 0.05 <= 1: + spec_l_md = copy.deepcopy(vio_md) + spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.05 + if if_minimal(spec_l_md, md_list, target_col): + md_list.append(spec_l_md) + if vio_md not in minimal_vio: + minimal_vio.append(vio_md) + if len(md_list) == 0: + terminate = True + break + + # tmp_minimal_vio = copy.deepcopy(minimal_vio) + if terminate: + break + if len(md_list) > 0: + for vio in minimal_vio[:]: + if not if_minimal(vio, md_list, target_col): + minimal_vio.remove(vio) + + print(time.time()-start, '\n') + print(len(md_list), '\n') + print(len(minimal_vio), '\n') + + if len(minimal_vio) == 0: + return md_list, [] + + # manager = multiprocessing.Manager() + # lock = manager.Lock() + # pool_size = 4 + # pool = multiprocessing.Pool(pool_size) + # with manager: + # proxy_minimal_vio = manager.list(minimal_vio) + # for _ in minimal_vio[:]: + # pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, sim_tensor, target_col, lock)) + # pool.close() + # pool.join() + # minimal_vio = list(proxy_minimal_vio) + + start = time.time() + minimal_vio.reverse() + i = 0 + while i < len(minimal_vio): + print(i) + print(len(minimal_vio)) + current_md = minimal_vio[i] + support, confidence = get_one_md_metadata(current_md, data, sim_tensor, target_col) + if support < 50: + minimal_vio_length = len(minimal_vio) + j = i + 1 + while j < len(minimal_vio): + specialization = True + next_md = minimal_vio[j] + for col in cols_but_target: + if current_md[col] > next_md[col]: + specialization = False + break + if specialization: + minimal_vio.remove(next_md) + else: + j += 1 + print('sup') + minimal_vio.remove(current_md) + if confidence < 0.8: + print('conf') + minimal_vio.remove(current_md) + if support >= 50 and confidence >= 0.8: + i += 1 + print(time.time()-start) + + + + + t1 = time.time() + for _ in minimal_vio[:]: + if not if_minimal(_, minimal_vio, target_col): + minimal_vio.remove(_) + print(time.time() - t1) + + return md_list, minimal_vio + + +def get_mds_metadata(md_list, dataset_path, sim_tensor, target_col): + data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1') + data.fillna("", inplace=True) + data = data.astype(str) + + manager = multiprocessing.Manager() + if len(md_list) == 0: + return [] + pool_size = 16 + pool = multiprocessing.Pool(pool_size) + result = [] + with manager: + for _ in md_list: + task = pool.apply_async(get_one_md_metadata, args=(_, data, sim_tensor, target_col)) + support, confidence = task.get() + result.append({"md": _, "support": support, "confidence": confidence}) + pool.close() + pool.join() + return result + + +def get_one_md_metadata(md, dataframe, sim_tensor, target_col): + support = 0 + pre_confidence = 0 + columns = dataframe.columns.values.tolist() + length = dataframe.shape[0] + width = dataframe.shape[1] + for row1 in range(0, length - 1): + for row2 in range(row1 + 1, length): + left_satisfy = True + both_satisfy = True + for col_index in range(0, width): + col = columns[col_index] + sim = sim_tensor[col_index, row1, row2].item() + if col == target_col: + if sim < 1: + both_satisfy = False + else: + if sim < md[col]: + left_satisfy = False + both_satisfy = False + if left_satisfy: + support += 1 + if both_satisfy: + pre_confidence += 1 + + confidence = 0 if support == 0 else pre_confidence / support + # return {"md": md, "support": support, "confidence": confidence} + return support, confidence diff --git a/tfile.py b/tfile.py new file mode 100644 index 0000000..f6005f7 --- /dev/null +++ b/tfile.py @@ -0,0 +1,48 @@ +import multiprocessing +import time +import numpy as np +import pandas as pd +import torch +from tqdm import tqdm + +from md_discovery.multi_process_infer_by_pairs import table_encode, inference_from_record_pairs +from md_discovery import tmp_discover +from settings import er_output_dir, similarity_threshold, target_attr, embedding_dict +def fuck(i): + i = i*i+1 + + +if __name__ == '__main__': + start = time.time() + tp_single_tuple_path = er_output_dir + "tp_single_tuple.csv" + # tp_mds, tp_vio = inference_from_record_pairs(tp_single_tuple_path, similarity_threshold, target_attr) + tp_mds, tp_vio = tmp_discover.inference_from_record_pairs(tp_single_tuple_path, similarity_threshold, target_attr) + print(time.time()-start) + + # li = [[[6, 6, 2], + # [2, 4, 6], + # [2, 4, 7], + # [3, 6, 4]], + # [[6, 2, 7], + # [3, 2, 4], + # [5, 3, 5], + # [6, 2, 4]], + # [[7, 2, 2], + # [6, 3, 2], + # [6, 4, 3], + # [6, 5, 6]]] + # tensor = torch.Tensor(li) + # norm_tensor = torch.nn.functional.normalize(tensor, dim=2) + # print(norm_tensor, '\n') + # sim_ten = torch.matmul(norm_tensor, norm_tensor.transpose(1, 2)) + # print(sim_ten/2 + 0.5, '\n') + # print(sim_ten.size()) + + # multiprocessing.set_start_method("spawn") + # manager = multiprocessing.Manager() + # lock = manager.Lock() + # pool = multiprocessing.Pool(16) + # with manager: + # for _ in tqdm(range(0, 1000)): + # result = pool.apply_async(fuck, args=(_,)) + # print(result)