并行化confidence筛选部分

增加输出support和confidence功能
pull/2/head
HuangJintao 1 year ago
parent b7034820eb
commit 542486fc26

@ -0,0 +1,48 @@
import time
from multi_process_infer_by_pairs import inference_from_record_pairs
from multi_process_infer_by_pairs import get_mds_metadata
if __name__ == '__main__':
# 目前可以仿照这个main函数写
path = "/home/w/PycharmProjects/py_entitymatching/py_entitymatching/datasets/end-to-end/Amazon-GoogleProducts/output/8.14/TP_single_tuple.csv"
start = time.time()
# 输入csv文件路径md左侧相似度阈值md右侧目标字段
# 输出2个md列表列表1中md无violation,列表2中md有violation但confidence满足阈值(0.8)
# 例如此处输入参数要求md左侧相似度字段至少为0.7,右侧指向'id'字段
mds, mds_vio = inference_from_record_pairs(path, 0.7, 'id')
# 如果不需要输出support和confidence去掉下面两行
mds_meta = get_mds_metadata(mds, path, 'id')
mds_vio_meta = get_mds_metadata(mds_vio, path, 'id')
# # 若不输出support和confidence使用以下两块代码
# # 将列表1写入本地路径需自己修改
# md_path = '/home/w/A-New Folder/8.14/Paper Dataset/TP_md_list.txt'
# with open(md_path, 'w') as f:
# for _ in mds:
# f.write(str(_) + '\n')
#
# # 将列表2写入本地路径需自己修改
# vio_path = '/home/w/A-New Folder/8.14/Paper Dataset/TP_vio_list.txt'
# with open(vio_path, 'w') as f:
# for _ in mds_vio:
# f.write(str(_) + '\n')
# 若输出support和confidence使用以下两块代码
# 将列表1写入本地路径需自己修改
md_path = '/home/w/A-New Folder/8.14/Goods Dataset/TP_md_list.txt'
with open(md_path, 'w') as f:
for _ in mds_meta:
for i in _.keys():
f.write(i + ':' + str(_[i]) + '\t')
f.write('\n')
# 将列表2写入本地路径需自己修改
vio_path = '/home/w/A-New Folder/8.14/Goods Dataset/TP_vio_list.txt'
with open(vio_path, 'w') as f:
for _ in mds_vio_meta:
for i in _.keys():
f.write(i + ':' + str(_[i]) + '\t')
f.write('\n')
print(time.time() - start)

@ -1,4 +1,3 @@
import numpy as np
import pandas as pd import pandas as pd
import time import time
import Levenshtein import Levenshtein
@ -17,7 +16,7 @@ def if_minimal(md, md_list, target_col):
# 假设列表中每一个md都使当前md不minimal # 假设列表中每一个md都使当前md不minimal
exist = True exist = True
# 如果左边任何一个大于,则假设不成立 # 如果左边任何一个大于,则假设不成立
for col in list(set(_.keys()) - set([target_col])): for col in list(set(_.keys()) - {target_col}):
if _[col] > md[col]: if _[col] > md[col]:
exist = False exist = False
# 如果右边小于,假设也不成立 # 如果右边小于,假设也不成立
@ -52,8 +51,10 @@ def satisfy_confidence(md, df, conf_thresh, target_col):
support += 1 support += 1
if both_satisfy: if both_satisfy:
support_plus += 1 support_plus += 1
if support == 0:
return False, 0.0
confidence = support_plus / support confidence = support_plus / support
return confidence >= conf_thresh return confidence >= conf_thresh, confidence
def inference_from_record_pairs(path, threshold, target_col): def inference_from_record_pairs(path, threshold, target_col):
@ -84,7 +85,7 @@ def inference_from_record_pairs(path, threshold, target_col):
for md in md_list: for md in md_list:
lhs_satis = True lhs_satis = True
rhs_satis = True rhs_satis = True
for col in list(set(columns) - set([target_col])): for col in list(set(columns) - {target_col}):
if sims[col] < md[col]: if sims[col] < md[col]:
lhs_satis = False lhs_satis = False
if sims[target_col] < md[target_col]: if sims[target_col] < md[target_col]:
@ -104,7 +105,7 @@ def inference_from_record_pairs(path, threshold, target_col):
# md_list.append(spec_r_md) # md_list.append(spec_r_md)
# 特殊化左侧 # 特殊化左侧
for col in list(set(columns) - set([target_col])): for col in list(set(columns) - {target_col}):
if sims[col] + 0.001 <= 1: if sims[col] + 0.001 <= 1:
spec_l_md = copy.deepcopy(vio_md) spec_l_md = copy.deepcopy(vio_md)
spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.001 spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.001
@ -115,12 +116,13 @@ def inference_from_record_pairs(path, threshold, target_col):
if not if_minimal(vio, md_list, target_col): if not if_minimal(vio, md_list, target_col):
minimal_vio.remove(vio) minimal_vio.remove(vio)
for _ in minimal_vio: tmp = copy.deepcopy(minimal_vio)
if not satisfy_confidence(_, data, 0.8, target_col): for _ in tmp:
satis, conf = satisfy_confidence(_, data, 0.8, target_col)
if not satis:
minimal_vio.remove(_) minimal_vio.remove(_)
list1 = copy.deepcopy(minimal_vio) for _ in tmp:
for _ in list1:
if not if_minimal(_, minimal_vio, target_col): if not if_minimal(_, minimal_vio, target_col):
minimal_vio.remove(_) minimal_vio.remove(_)
@ -129,7 +131,7 @@ def inference_from_record_pairs(path, threshold, target_col):
if __name__ == '__main__': if __name__ == '__main__':
# 目前可以仿照这个main函数写 # 目前可以仿照这个main函数写
path = "/home/w/PycharmProjects/py_entitymatching/py_entitymatching/datasets/end-to-end/Amazon-GoogleProducts/output/8.14/TP_single_tuple.csv" path = "/home/w/PycharmProjects/py_entitymatching/py_entitymatching/datasets/end-to-end/DBLP-ACM/output/7.6/TP_single_tuple.csv"
start = time.time() start = time.time()
# 输入csv文件路径md左侧相似度阈值md右侧目标字段 # 输入csv文件路径md左侧相似度阈值md右侧目标字段
# 输出2个md列表列表1中md无violation,列表2中md有violation但confidence满足阈值(0.8) # 输出2个md列表列表1中md无violation,列表2中md有violation但confidence满足阈值(0.8)
@ -137,13 +139,13 @@ if __name__ == '__main__':
mds, mds_vio = inference_from_record_pairs(path, 0.7, 'id') mds, mds_vio = inference_from_record_pairs(path, 0.7, 'id')
# 将列表1写入本地路径需自己修改 # 将列表1写入本地路径需自己修改
md_path = '/home/w/A-New Folder/8.14/Goods Dataset/TP_md_list.txt' md_path = '/home/w/A-New Folder/8.14/Paper Dataset/TP_md_list.txt'
with open(md_path, 'w') as f: with open(md_path, 'w') as f:
for _ in mds: for _ in mds:
f.write(str(_) + '\n') f.write(str(_) + '\n')
# 将列表2写入本地路径需自己修改 # 将列表2写入本地路径需自己修改
vio_path = '/home/w/A-New Folder/8.14/Goods Dataset/TP_vio_list.txt' vio_path = '/home/w/A-New Folder/8.14/Paper Dataset/TP_vio_list.txt'
with open(vio_path, 'w') as f: with open(vio_path, 'w') as f:
for _ in mds_vio: for _ in mds_vio:
f.write(str(_) + '\n') f.write(str(_) + '\n')

@ -0,0 +1,173 @@
import multiprocessing
import pandas as pd
import Levenshtein
import copy
conf_thresh = 0.8
def my_Levenshtein_ratio(str1, str2):
return 1 - Levenshtein.distance(str1, str2) / max(len(str1), len(str2))
def if_minimal(md, md_list, target_col):
# 假设这个md是minimal
minimal = True
for _ in md_list:
if _ != md:
# 假设列表中每一个md都使当前md不minimal
exist = True
# 如果左边任何一个大于,则假设不成立
for col in list(set(_.keys()) - {target_col}):
if _[col] > md[col]:
exist = False
# 如果右边小于,假设也不成立
if _[target_col] < md[target_col]:
exist = False
# 任何一次假设成立当前md不minimal
if exist:
minimal = False
break
return minimal
def remove_by_confidence(md, l, relation, target_col, lock):
support, confidence = get_one_md_metadata(md, relation, target_col)
# todo: replace constant 0.8
if confidence < 0.8:
with lock:
l.remove(md)
# def remove_by_confidence(md, l, relation, target_col):
# boolean, conf = satisfy_confidence(md, relation, 0.8, target_col)
# if not boolean:
# l.remove(md)
# print(md, '\t', conf)
def inference_from_record_pairs(path, threshold, target_col):
data = pd.read_csv(path, low_memory=False, encoding='ISO-8859-1')
data = data.astype(str)
columns = data.columns.values.tolist()
md_list = []
minimal_vio = []
init_md = {}
for col in columns:
init_md[col] = 1 if col == target_col else 0
md_list.append(init_md)
for row1 in data.itertuples():
# 获取当前行的索引,从后一行开始切片
i = row1[0]
data1 = data[i + 1:]
for row2 in data1.itertuples():
violated_mds = []
# sims是两行的相似度
sims = {}
for col in columns:
similarity = my_Levenshtein_ratio(getattr(row1, col), getattr(row2, col))
sims[col] = similarity
# 寻找violated md,从md列表中删除并加入vio列表
for md in md_list:
lhs_satis = True
rhs_satis = True
for col in list(set(columns) - {target_col}):
if sims[col] < md[col]:
lhs_satis = False
if sims[target_col] < md[target_col]:
rhs_satis = False
if lhs_satis == True and rhs_satis == False:
md_list.remove(md)
violated_mds.append(md)
minimal_vio.extend(violated_mds)
for vio_md in violated_mds:
# 特殊化右侧,我们需要右侧百分百相似,其实不需要降低右侧阈值
# if sims[target_col] >= threshold:
# new_rhs = sims[target_col]
# spec_r_md = copy.deepcopy(vio_md)
# spec_r_md[target_col] = new_rhs
# if if_minimal(spec_r_md, md_list, target_col):
# md_list.append(spec_r_md)
# 特殊化左侧
for col in list(set(columns) - {target_col}):
if sims[col] + 0.001 <= 1:
spec_l_md = copy.deepcopy(vio_md)
spec_l_md[col] = threshold if sims[col] < threshold else sims[col] + 0.001
if if_minimal(spec_l_md, md_list, target_col):
md_list.append(spec_l_md)
for vio in minimal_vio:
if not if_minimal(vio, md_list, target_col):
minimal_vio.remove(vio)
manager = multiprocessing.Manager()
lock = manager.Lock()
if len(minimal_vio) == 0:
return [], []
pool = multiprocessing.Pool(len(minimal_vio))
tmp = copy.deepcopy(minimal_vio)
with manager:
proxy_minimal_vio = manager.list(minimal_vio)
for _ in tmp:
pool.apply_async(remove_by_confidence, args=(_, proxy_minimal_vio, data, target_col, lock))
pool.close()
pool.join()
minimal_vio = list(proxy_minimal_vio)
for _ in tmp:
if not if_minimal(_, minimal_vio, target_col):
minimal_vio.remove(_)
return md_list, minimal_vio
def get_mds_metadata(md_list, dataset_path, target_col):
data = pd.read_csv(dataset_path, low_memory=False, encoding='ISO-8859-1')
data = data.astype(str)
manager = multiprocessing.Manager()
if len(md_list) == 0:
return []
pool = multiprocessing.Pool(len(md_list))
result = []
with manager:
for _ in md_list:
task = pool.apply_async(get_one_md_metadata, args=(_, data, target_col))
support, confidence = task.get()
result.append({"md": _, "support": support, "confidence": confidence})
pool.close()
pool.join()
return result
def get_one_md_metadata(md, dataframe, target_col):
support = 0
pre_confidence = 0
for row1 in dataframe.itertuples():
i = row1[0]
df_slice = dataframe[i + 1:]
for row2 in df_slice.itertuples():
left_satisfy = True
both_satisfy = True
for col in dataframe.columns.values.tolist():
sim = my_Levenshtein_ratio(getattr(row1, col), getattr(row2, col))
if col == target_col:
if sim < 1:
both_satisfy = False
else:
if sim < md[col]:
left_satisfy = False
both_satisfy = False
if left_satisfy:
support += 1
if both_satisfy:
pre_confidence += 1
confidence = 0 if support == 0 else pre_confidence / support
# return {"md": md, "support": support, "confidence": confidence}
return support, confidence
Loading…
Cancel
Save