You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

363 lines
14 KiB

10 months ago
import json
import os
from time import time
import ConfigSpace
import pandas as pd
import torch
from ConfigSpace import Configuration
from ConfigSpace.read_and_write import json as csj
from pyjedai.clustering import UniqueMappingClustering
from tqdm import tqdm
from md_discovery.md_discover import md_discover
from settings import *
from pyjedai.datamodel import Data
from pyjedai.block_cleaning import BlockPurging
from pyjedai.block_cleaning import BlockFiltering
from pyjedai.matching import EntityMatching
from pyjedai.block_building import (
StandardBlocking,
QGramsBlocking,
ExtendedQGramsBlocking,
SuffixArraysBlocking,
ExtendedSuffixArraysBlocking,
)
from pyjedai.comparison_cleaning import (
WeightedEdgePruning,
WeightedNodePruning,
CardinalityEdgePruning,
CardinalityNodePruning,
BLAST,
ReciprocalCardinalityNodePruning,
ReciprocalWeightedNodePruning,
ComparisonPropagation
)
from pyjedai.clustering import *
def prepare_file_for_md_discovery(train: pd.DataFrame, t_single_tuple_path=er_output_dir + r"\t_single_tuple.csv"):
# 挑选出实际匹配的元组对
df = train[train['gold'] == '1']
# 将左表id赋值给右表
for index, row in df.iterrows():
df.loc[index, "rtable_" + rtable_id] = row["ltable_" + ltable_id]
train_columns = train.columns.values.tolist()
l_columns = []
r_columns = []
cols = []
# 左表和右表字段名分别加入两个列表
for _ in train_columns:
if _.startswith('ltable'):
l_columns.append(_)
elif _.startswith('rtable'):
r_columns.append(_)
# 将左表中字段名去掉前缀,作为统一的字段名列表(前提是两张表内对应字段名调整一致)
for _ in l_columns:
cols.append(_.replace('ltable_', ''))
ldf = df[l_columns]
rdf = df[r_columns]
ldf.columns = cols
rdf.columns = cols
t_single_tuple = pd.concat([ldf, rdf])
t_single_tuple = t_single_tuple.reset_index(drop=True)
t_single_tuple.to_csv(t_single_tuple_path, sep=',', index=False, header=True, quoting=1)
# 形成一个字典key为字段名称value为一维张量记录了预测表中这一字段每行的左右属性的相似度
def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame):
predictions_attrs = predictions.columns.values.tolist()
col_tuple_list = []
for _ in predictions_attrs:
if _.startswith('ltable'):
left_index = predictions_attrs.index(_)
right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_'))
col_tuple_list.append((left_index, right_index))
df = predictions.drop(columns=['gold', 'predicted', 'confidence'], inplace=False)
length = df.shape[0]
width = df.shape[1]
sentences = []
for col in range(0, width):
for row in range(0, length):
cell_value = df.values[row, col]
sentences.append(cell_value)
if len(sentences) == 0:
return {}
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda")
split_embedding = torch.split(embedding, length, dim=0)
table_tensor = torch.stack(split_embedding, dim=0, out=None)
# prediction的归一化嵌入张量
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
sim_tensor_dict = {}
for col_tuple in col_tuple_list:
lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1)
sim_tensor = torch.round(sim_tensor, decimals=4)
sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor
return sim_tensor_dict
def load_mds(paths: list) -> list:
if len(paths) == 0:
return []
all_mds = []
# 传入md路径列表
for md_path in paths:
if not os.path.exists(md_path):
continue
mds = []
# 打开每一个md文件
with open(md_path, 'r') as f_:
# 读取每一行的md加入该文件的md列表
for line in f_.readlines():
md_metadata = line.strip().split('\t')
# todo 如果MD文件的形式改了 这里也要改
md = eval(md_metadata[1])
mds.append(md)
all_mds.extend(mds)
return all_mds
def is_explicable(row, all_mds: list, st_dict):
attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段
for md_tuple in all_mds:
explicable = True # 假设这条md能解释当前元组
for a in attrs:
if a != target_attr:
if st_dict[a][row[0]].item() < md_tuple[0][a]:
explicable = False # 任意一个字段的相似度达不到阈值这条md就不能解释当前元组
break # 不再与当前md的其他相似度阈值比较跳转到下一条md
if explicable:
return md_tuple[2] # 任意一条md能解释直接返回
return -1.0 # 遍历结束,不能解释
def er_process(config: Configuration):
print(f'\033[33mConfig: {config}\033[0m')
start = time()
ltable = pd.read_csv(ltable_path, sep='|', engine='python', na_filter=False)
rtable = pd.read_csv(rtable_path, sep='|', engine='python', na_filter=False)
mapping = pd.read_csv(mapping_path, sep='|', engine='python')
data = Data(dataset_1=ltable,
id_column_name_1=ltable_id,
dataset_2=rtable,
id_column_name_2=rtable_id,
ground_truth=mapping)
# clean data(optional)
data.clean_dataset(remove_stopwords=False,
remove_punctuation=False,
remove_numbers=False,
remove_unicodes=False)
# block building
blocker_name = config["jed_blocker"]
block_attribute = config["block_attr"]
match blocker_name:
case "Standard":
blocker = StandardBlocking()
case "QGrams":
blocker = QGramsBlocking(config["qgrams"])
10 months ago
case "ExtendedQG":
blocker = ExtendedQGramsBlocking()
case "SuffixArrays":
blocker = SuffixArraysBlocking()
case "ExtendedSA":
blocker = ExtendedSuffixArraysBlocking()
blocks = blocker.build_blocks(data, attributes_1=[block_attribute], attributes_2=[block_attribute])
# block purging(optional)
bp = BlockPurging()
cleaned_blocks = bp.process(blocks, data, tqdm_disable=False)
# block cleaning(optional)
bf = BlockFiltering(ratio=config["block_filtering_ratio"]) # todo what is ratio for?
10 months ago
filtered_blocks = bf.process(cleaned_blocks, data, tqdm_disable=False)
# Comparison Cleaning - Meta Blocking(optional)
pruning_method = config["meta_blocker"]
weighting_scheme_name = config["weighting_scheme"]
match pruning_method:
case "WEP":
meta_blocker = WeightedEdgePruning(weighting_scheme=weighting_scheme_name)
case "WNP":
meta_blocker = WeightedNodePruning(weighting_scheme=weighting_scheme_name)
case "CEP":
meta_blocker = CardinalityEdgePruning(weighting_scheme=weighting_scheme_name)
case "CNP":
meta_blocker = CardinalityNodePruning(weighting_scheme=weighting_scheme_name)
case "BLAST":
meta_blocker = BLAST(weighting_scheme=weighting_scheme_name)
case "RCNP":
meta_blocker = ReciprocalCardinalityNodePruning(weighting_scheme=weighting_scheme_name)
case "RWNP":
meta_blocker = ReciprocalWeightedNodePruning(weighting_scheme=weighting_scheme_name)
case "CP":
meta_blocker = ComparisonPropagation()
candidate_pairs_blocks = meta_blocker.process(blocks=filtered_blocks, data=data, tqdm_disable=True)
# entity matching
em = EntityMatching(
metric=config["matching_metric"],
tokenizer=config["matching_tokenizer"],
vectorizer=config["matching_vectorizer"],
qgram=3,
similarity_threshold=config["similarity_threshold"]
10 months ago
)
pairs_graph = em.predict(candidate_pairs_blocks, data, tqdm_disable=True)
# draw(pairs_graph)
# entity clustering
clusteror_name = config["clusteror_name"]
match clusteror_name:
case "CCC":
clusteror = ConnectedComponentsClustering()
case "UMC":
clusteror = UniqueMappingClustering()
case "CenterC":
clusteror = CenterClustering()
case "BMC":
clusteror = BestMatchClustering()
case "MCC":
clusteror = MergeCenterClustering()
case "CC":
clusteror = CorrelationClustering()
case "CTC":
clusteror = CutClustering()
case "MCL":
clusteror = MarkovClustering()
case "KMAC":
clusteror = KiralyMSMApproximateClustering()
case "RSRC":
clusteror = RicochetSRClustering()
# 得到预测结果与评估指标
clusters = clusteror.process(pairs_graph, data, similarity_threshold=0.17) # todo cluster sim thresh
10 months ago
matches_dataframe = clusteror.export_to_df(clusters)
matches_dataframe_path = er_output_dir + r'\matches_dataframe.csv'
matches_dataframe.to_csv(matches_dataframe_path, sep=',', index=False, header=True, quoting=1)
evaluation = clusteror.evaluate(clusters)
# evaluation.pop('True Positives')
# evaluation.pop('False Positives')
# evaluation.pop('True Negatives')
# evaluation.pop('False Negatives')
ltable = ltable.astype(str)
rtable = rtable.astype(str)
mapping = mapping.astype(str)
result_df = matches_dataframe.astype(str)
lcolumns_dict = {}
rcolumns_dict = {}
ltable_attrs = ltable.columns.values.tolist()
rtable_attrs = rtable.columns.values.tolist()
for _ in ltable_attrs:
lcolumns_dict[_] = 'ltable_' + _
for _ in rtable_attrs:
rcolumns_dict[_] = 'rtable_' + _
result_lid_list = result_df['id1'].tolist()
selected_ltable = ltable[ltable[ltable_id].isin(result_lid_list)]
selected_ltable = selected_ltable.rename(columns=lcolumns_dict)
selected_ltable['key'] = 1
result_rid_list = result_df['id2'].tolist()
selected_rtable = rtable[rtable[rtable_id].isin(result_rid_list)]
selected_rtable = selected_rtable.rename(columns=rcolumns_dict)
selected_rtable['key'] = 1
predictions = pd.merge(selected_ltable, selected_rtable, on='key')
predictions.drop(columns='key', inplace=True)
predictions = predictions.reset_index(drop=True)
predictions['gold'] = '0'
predictions['predicted'] = '0'
gold_match_rows = []
predicted_match_rows = []
for tuple_ in tqdm(predictions.itertuples()):
lid = getattr(tuple_, 'ltable_' + ltable_id)
map_row = mapping[mapping[mapping_lid] == lid]
result_row = result_df[result_df['id1'] == lid]
if map_row is not None:
rid = map_row[mapping_rid]
for value in rid:
if value == getattr(tuple_, 'rtable_' + rtable_id):
gold_match_rows.append(tuple_[0])
if result_row is not None:
rid = result_row['id2']
for value in rid:
if value == getattr(tuple_, 'rtable_' + rtable_id):
predicted_match_rows.append(tuple_[0])
for _ in gold_match_rows:
predictions.loc[_, 'gold'] = '1'
for _ in predicted_match_rows:
predictions.loc[_, 'predicted'] = '1'
# 挖MD 计算可解释性
prepare_file_for_md_discovery(predictions)
predictions['confidence'] = 0
predicted_match = predictions[predictions['predicted'] == '1']
predicted_match = predicted_match.reset_index(drop=True)
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predicted_match)
md_discover(config, er_output_dir + r"\t_single_tuple.csv", md_output_dir + r"\mds.txt")
md_paths = [md_output_dir + r'\mds.txt']
md_list = load_mds(md_paths) # 从全局变量中读取所有的md
epl_match = 0 # 可解释预测match
if sim_tensor_dict:
if len(md_list) > 0:
for row in tqdm(predicted_match.itertuples()):
x = is_explicable(row, md_list, sim_tensor_dict)
if x > 0:
predicted_match.loc[row[0], 'confidence'] = x
epl_match += 1
interpretability = epl_match / len(predicted_match) # 可解释性
evaluation['interpretability'] = interpretability
else:
interpretability = 0
evaluation['interpretability'] = interpretability
f1 = evaluation['F1 %'] / 100
performance = interpre_weight * interpretability + (1 - interpre_weight) * f1
evaluation['performance'] = performance
print(f'\033[33mEvaluation: {evaluation}\033[0m')
predicted_match.to_csv(er_output_dir + r'\predicted_match.csv', sep=',', index=False, header=True)
################################################################################################################
print(f'\033[33mTime consumed by ML-ER in seconds: {time() - start}\033[0m')
return evaluation
def ml_er(config: Configuration = None):
indicators = er_process(config)
output_path = er_output_dir + r"\eval_result.txt"
with open(output_path, 'w') as f_:
for key, value in indicators:
f_.write(key + " : " + value)
f_.write('\n')
# if __name__ == '__main__':
# if os.path.isfile(hpo_output_dir + r"\incumbent.json"):
# with open(hpo_output_dir + r"\configspace.json", 'r') as f:
# dict_configspace = json.load(f)
# str_configspace = json.dumps(dict_configspace)
# configspace = csj.read(str_configspace)
# with open(hpo_output_dir + r"\incumbent.json", 'r') as f:
# dic = json.load(f)
# configuration = ConfigSpace.Configuration(configspace, values=dic)
# ml_er(configuration)
if __name__ == '__main__':
with open(hpo_output_dir + r"\configspace.json", 'r') as f:
dict_configspace = json.load(f)
str_configspace = json.dumps(dict_configspace)
configspace = csj.read(str_configspace)
with open(hpo_output_dir + r"\fuck.json", 'r') as f:
dic = json.load(f)
configuration = ConfigSpace.Configuration(configspace, values=dic)
ml_er(configuration)