|
|
import multiprocessing
|
|
|
import time
|
|
|
from concurrent.futures import ProcessPoolExecutor
|
|
|
from multiprocessing.managers import SharedMemoryManager
|
|
|
|
|
|
import numpy as np
|
|
|
import pandas
|
|
|
import pandas as pd
|
|
|
import Levenshtein
|
|
|
import copy
|
|
|
|
|
|
import torch
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
from md_discovery.multi_process_infer_by_pairs import norm_cos_sim
|
|
|
from settings import embedding_dict, model
|
|
|
|
|
|
conf_thresh = 0.8
|
|
|
|
|
|
|
|
|
def my_Levenshtein_ratio(str1, str2):
|
|
|
if max(len(str1), len(str2)) == 0:
|
|
|
return 1
|
|
|
return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2))
|
|
|
|
|
|
|
|
|
def if_minimal(md, md_list, target_col):
|
|
|
# 假设这个md是minimal
|
|
|
if len(md_list) == 0:
|
|
|
return True
|
|
|
minimal = True
|
|
|
for _ in md_list:
|
|
|
if _ != md:
|
|
|
other_cols = list(set(_.keys()) - {target_col})
|
|
|
# 假设列表中每一个md都使当前md不minimal
|
|
|
exist = True
|
|
|
# 如果左边任何一个大于,则假设不成立
|
|
|
for col in other_cols:
|
|
|
if _[col] > md[col]:
|
|
|
exist = False
|
|
|
break
|
|
|
# 如果右边小于,假设也不成立
|
|
|
if _[target_col] < md[target_col]:
|
|
|
exist = False
|
|
|
# 任何一次假设成立,当前md不minimal
|
|
|
if exist:
|
|
|
minimal = False
|
|
|
break
|
|
|
return minimal
|
|
|
|
|
|
|
|
|
def remove_by_confidence(md, md_list, relation, sim_tensor, target_col, lock):
|
|
|
support, confidence = get_one_md_metadata(md, relation, sim_tensor, target_col)
|
|
|
if confidence < 0.8:
|
|
|
with lock:
|
|
|
md_list.remove(md)
|
|
|
|
|
|
|
|
|
# def remove_by_confidence(md, l, relation, target_col):
|
|
|
# boolean, conf = satisfy_confidence(md, relation, 0.8, target_col)
|
|
|
# if not boolean:
|
|
|
# l.remove(md)
|
|
|
# print(md, '\t', conf)
|
|
|
|
|
|
# def build_sim_matrix():
|
|
|
# width
|
|
|
# return 0
|
|
|
def inference_from_record_pairs(path, threshold, target_col):
|
|
|
data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1')
|
|
|
data.fillna("", inplace=True)
|
|
|
data = data.astype(str)
|
|
|
columns = data.columns.values.tolist()
|
|
|
cols_but_target = list(set(columns) - {target_col})
|
|
|
length = data.shape[0]
|
|
|
width = data.shape[1]
|
|
|
|
|
|
sentences = []
|
|
|
for col in range(0, width):
|
|
|
for row in range(0, length):
|
|
|
cell_value = data.values[row, col]
|
|
|
sentences.append(cell_value)
|
|
|
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda")
|
|
|
split_embedding = torch.split(embedding, length, dim=0)
|
|
|
table_tensor = torch.stack(split_embedding, dim=0, out=None)
|
|
|
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
|
|
|
sim_tensor = torch.matmul(norm_table_tensor, norm_table_tensor.transpose(1, 2))
|
|
|
sim_tensor = sim_tensor/2 + 0.5
|
|
|
|
|
|
torch.save(sim_tensor, "E:\\Data\\Research\\Projects\\matching_dependency\\tensor.pt")
|
|
|
|
|
|
md_list = []
|
|
|
minimal_vio = []
|
|
|
init_md = {}
|
|
|
for col in columns:
|
|
|
init_md[col] = 1 if col == target_col else 0
|
|
|
md_list.append(init_md)
|
|
|
|
|
|
start = time.time()
|
|
|
for row1 in range(0, length - 1):
|
|
|
terminate = False
|
|
|
for row2 in range(row1 + 1, length):
|
|
|
violated_mds = []
|
|
|
# sims是两行的相似度
|
|
|
sims = {}
|
|
|
for col_index in range(0, width):
|
|
|
col = columns[col_index]
|
|
|
similarity = sim_tensor[col_index, row1, row2].item()
|
|
|
sims[col] = similarity
|
|
|
|
|
|
# 寻找violated md,从md列表中删除并加入vio列表
|
|
|
# tmp_md_list = copy.deepcopy(md_list)
|
|
|
for md in md_list[:]:
|
|
|
lhs_satis = True
|
|
|
rhs_satis = True
|
|
|
for col in list(set(columns) - {target_col}):
|
|
|
if sims[col] < md[col]:
|
|
|
lhs_satis = False
|
|
|
break
|
|
|
if sims[target_col] < md[target_col]:
|
|
|
rhs_satis = False
|
|
|
if lhs_satis == True and rhs_satis == False:
|
|
|
md_list.remove(md)
|
|
|
violated_mds.append(md)
|
|
|
|
|
|
for vio_md in violated_mds:
|
|
|
# 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值
|
|
|
# if sims[target_col] >= threshold:
|
|
|
# new_rhs = sims[target_col]
|
|
|
# spec_r_md = copy.deepcopy(vio_md)
|
|
|
# spec_r_md[target_col] = new_rhs
|
|
|
# if if_minimal(spec_r_md, md_list, target_col):
|
|
|
# md_list.append(spec_r_md)
|
|
|
# 特殊化左侧
|
|
|
for col in list(set(columns) - {target_col}):
|
|
|
if sims[col] + 0.05 <= 1:
|
|
|
spec_l_md = copy.deepcopy(vio_md)
|
|
|
spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.05
|
|
|
if if_minimal(spec_l_md, md_list, target_col):
|
|
|
md_list.append(spec_l_md)
|
|
|
if vio_md not in minimal_vio:
|
|
|
minimal_vio.append(vio_md)
|
|
|
if len(md_list) == 0:
|
|
|
terminate = True
|
|
|
break
|
|
|
|
|
|
# tmp_minimal_vio = copy.deepcopy(minimal_vio)
|
|
|
if terminate:
|
|
|
break
|
|
|
if len(md_list) > 0:
|
|
|
for vio in minimal_vio[:]:
|
|
|
if not if_minimal(vio, md_list, target_col):
|
|
|
minimal_vio.remove(vio)
|
|
|
|
|
|
print(time.time()-start, '\n')
|
|
|
print(len(md_list), '\n')
|
|
|
print(len(minimal_vio), '\n')
|
|
|
|
|
|
if len(minimal_vio) == 0:
|
|
|
return md_list, []
|
|
|
|
|
|
# manager = multiprocessing.Manager()
|
|
|
# lock = manager.Lock()
|
|
|
# pool_size = 4
|
|
|
# pool = multiprocessing.Pool(pool_size)
|
|
|
# with manager:
|
|
|
# proxy_minimal_vio = manager.list(minimal_vio)
|
|
|
# for _ in minimal_vio[:]:
|
|
|
# pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, sim_tensor, target_col, lock))
|
|
|
# pool.close()
|
|
|
# pool.join()
|
|
|
# minimal_vio = list(proxy_minimal_vio)
|
|
|
|
|
|
start = time.time()
|
|
|
minimal_vio.reverse()
|
|
|
i = 0
|
|
|
while i < len(minimal_vio):
|
|
|
print(i)
|
|
|
print(len(minimal_vio))
|
|
|
current_md = minimal_vio[i]
|
|
|
support, confidence = get_one_md_metadata(current_md, data, sim_tensor, target_col)
|
|
|
if support < 50:
|
|
|
minimal_vio_length = len(minimal_vio)
|
|
|
j = i + 1
|
|
|
while j < len(minimal_vio):
|
|
|
specialization = True
|
|
|
next_md = minimal_vio[j]
|
|
|
for col in cols_but_target:
|
|
|
if current_md[col] > next_md[col]:
|
|
|
specialization = False
|
|
|
break
|
|
|
if specialization:
|
|
|
minimal_vio.remove(next_md)
|
|
|
else:
|
|
|
j += 1
|
|
|
print('sup')
|
|
|
minimal_vio.remove(current_md)
|
|
|
if confidence < 0.8:
|
|
|
print('conf')
|
|
|
minimal_vio.remove(current_md)
|
|
|
if support >= 50 and confidence >= 0.8:
|
|
|
i += 1
|
|
|
print(time.time()-start)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
t1 = time.time()
|
|
|
for _ in minimal_vio[:]:
|
|
|
if not if_minimal(_, minimal_vio, target_col):
|
|
|
minimal_vio.remove(_)
|
|
|
print(time.time() - t1)
|
|
|
|
|
|
return md_list, minimal_vio
|
|
|
|
|
|
|
|
|
def get_mds_metadata(md_list, dataset_path, sim_tensor, target_col):
|
|
|
data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1')
|
|
|
data.fillna("", inplace=True)
|
|
|
data = data.astype(str)
|
|
|
|
|
|
manager = multiprocessing.Manager()
|
|
|
if len(md_list) == 0:
|
|
|
return []
|
|
|
pool_size = 16
|
|
|
pool = multiprocessing.Pool(pool_size)
|
|
|
result = []
|
|
|
with manager:
|
|
|
for _ in md_list:
|
|
|
task = pool.apply_async(get_one_md_metadata, args=(_, data, sim_tensor, target_col))
|
|
|
support, confidence = task.get()
|
|
|
result.append({"md": _, "support": support, "confidence": confidence})
|
|
|
pool.close()
|
|
|
pool.join()
|
|
|
return result
|
|
|
|
|
|
|
|
|
def get_one_md_metadata(md, dataframe, sim_tensor, target_col):
|
|
|
support = 0
|
|
|
pre_confidence = 0
|
|
|
columns = dataframe.columns.values.tolist()
|
|
|
length = dataframe.shape[0]
|
|
|
width = dataframe.shape[1]
|
|
|
for row1 in range(0, length - 1):
|
|
|
for row2 in range(row1 + 1, length):
|
|
|
left_satisfy = True
|
|
|
both_satisfy = True
|
|
|
for col_index in range(0, width):
|
|
|
col = columns[col_index]
|
|
|
sim = sim_tensor[col_index, row1, row2].item()
|
|
|
if col == target_col:
|
|
|
if sim < 1:
|
|
|
both_satisfy = False
|
|
|
else:
|
|
|
if sim < md[col]:
|
|
|
left_satisfy = False
|
|
|
both_satisfy = False
|
|
|
if left_satisfy:
|
|
|
support += 1
|
|
|
if both_satisfy:
|
|
|
pre_confidence += 1
|
|
|
|
|
|
confidence = 0 if support == 0 else pre_confidence / support
|
|
|
# return {"md": md, "support": support, "confidence": confidence}
|
|
|
return support, confidence
|