完成编码, 尚未测试

main
HuangJintao 7 months ago
parent 747ee968b2
commit d79453e5c3

1
.gitignore vendored

@ -0,0 +1 @@
/md_discovery/output/mds.txt

@ -1,2 +0,0 @@
# md_bayesian_er_deepmatcher

@ -0,0 +1,84 @@
import json
from ConfigSpace import Categorical, Configuration, ConfigurationSpace, Integer, Float
from ConfigSpace.conditions import InCondition, EqualsCondition, AndConjunction
from ConfigSpace.read_and_write import json as csj
from smac import Scenario, HyperparameterOptimizationFacade
from ml_er.deepmatcher_er import matching
from setting import hpo_output_dir
class Optimization:
@property
def configspace(self) -> ConfigurationSpace:
cs = ConfigurationSpace(seed=0)
attr_summarizer = Categorical('attr_summarizer', ['sif', 'rnn', 'attention', 'hybrid'], default='hybrid')
attr_comparator = Categorical('attr_comparator', ['concat', 'diff', 'abs-diff', 'concat-diff', 'concat-abs-diff', 'mul'])
word_contextualizer = Categorical('word_contextualizer', ['gru', 'lstm', 'rnn', 'self-attention'])
word_comparator = Categorical('word_comparator', ['decomposable-attention', 'general-attention', 'dot-attention'])
word_aggregator = Categorical('word_aggregator', ['avg-pool', 'divsqrt-pool', 'inv-freq-avg-pool',
'sif-pool', 'max-pool', 'last-pool', 'last-simple-pool',
'birnn-last-pool', 'birnn-last-simple-pool', 'attention-with-rnn'])
classifier_layers = Integer('classifier_layers', (1, 4))
classifier_nonlinear = Categorical('classifier_nonlinear', ['leaky_relu', 'relu', 'elu', 'selu', 'glu', 'tanh', 'sigmoid'])
classifier_bypass = Categorical('classifier_bypass', ['residual', 'highway'])
embeddings = Categorical('embeddings', ['fasttext.en.bin', 'fasttext.wiki.vec', 'fasttext.crawl.vec',
'glove.6B.300d', 'glove.42B.300d', 'glove.840B.300d'])
cs.add_hyperparameters([attr_comparator, attr_summarizer, word_comparator, word_aggregator, word_contextualizer,
classifier_bypass, classifier_nonlinear, classifier_layers, embeddings])
return cs
def train(self, config: Configuration, seed: int = 0) -> float:
indicators = matching(config)
return 1 - indicators['performance']
def ml_er_hpo():
optimization = Optimization()
cs = optimization.configspace
str_configspace = csj.write(cs)
dict_configspace = json.loads(str_configspace)
# 将超参数空间保存本地
with open(hpo_output_dir + "configspace.json", "w") as f:
json.dump(dict_configspace, f, indent=4)
scenario = Scenario(
cs,
crash_cost=1.0,
deterministic=True,
n_trials=20,
n_workers=1
)
initial_design = HyperparameterOptimizationFacade.get_initial_design(scenario, n_configs=5)
smac = HyperparameterOptimizationFacade(
scenario,
optimization.train,
initial_design=initial_design,
overwrite=True, # If the run exists, we overwrite it; alternatively, we can continue from last state
)
incumbent = smac.optimize()
incumbent_cost = smac.validate(incumbent)
default = cs.get_default_configuration()
default_cost = smac.validate(default)
print(f"Default Cost: {default_cost}")
print(f"Incumbent Cost: {incumbent_cost}")
if incumbent_cost > default_cost:
incumbent = default
print(f"Updated Incumbent Cost: {default_cost}")
print(f"Optimized Configuration:{incumbent.values()}")
with open(hpo_output_dir + "incumbent.json", "w") as f:
json.dump(dict(incumbent), f, indent=4)
return incumbent
if __name__ == '__main__':
ml_er_hpo()

@ -0,0 +1,221 @@
import itertools
import pickle
import random
import operator
from operator import itemgetter
import pandas as pd
import torch
import matplotlib.pyplot as plt
from torch import LongTensor
import torch.nn.functional
from tqdm import tqdm
from setting import *
# note 对表进行嵌入时定位了有空值的cell, 计算相似度时有空值则置为-1.0000
def mining(train: pd.DataFrame):
# data is train set, in which each row represents a tuple pair
train = train.astype(str)
train = pd.concat([train, pd.DataFrame({'label': train.pop('label')})], axis=1)
# 尝试不将左右表key手动调整相同而是只看gold属性是否为1
# 故将左右表key直接去除
data = train.drop(columns=['_id', 'ltable_id', 'rtable_id'], inplace=False)
# data中现存属性除key以外左右表属性和gold, 不含_id
columns = data.columns.values.tolist()
columns_without_prefix = [_.replace('ltable_', '') for _ in columns if _.startswith('ltable_')]
# 列表, 每个元素为二元组, 包含对应列的索引
col_tuple_list = build_col_tuple_list(columns)
length = data.shape[0]
width = data.shape[1]
# 嵌入data每一个cell, 纵向遍历
# note 此处已重设索引
data = data.reset_index(drop=True)
sentences = data.values.flatten(order='F').tolist()
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True)
split_embedding = torch.split(embedding, length, dim=0)
table_tensor = torch.stack(split_embedding, dim=0, out=None)
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
# sim_tensor_dict = {}
sim_tensor_list = []
for col_tuple in col_tuple_list:
mask = ((data[columns[col_tuple[0]]].isin([''])) | (data[columns[col_tuple[1]]].isin([''])))
empty_string_indices = data[mask].index.tolist() # 空字符串索引
lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1) # 求和得到对应属性2列张量相似度, 2列变1列
# 将有空字符串的位置强制置为-1.0000
sim_tensor = sim_tensor.scatter(0, torch.tensor(empty_string_indices, device='cuda').long(), -1.0000)
sim_tensor = torch.round(sim_tensor * 100) / 100
sim_tensor_list.append(sim_tensor.unsqueeze(1))
# sim_tensor_dict[columns[col_tuple[0]].replace('ltable_', '')] = sim_tensor
sim_table_tensor = torch.cat(sim_tensor_list, dim=1)
# 创建一个1列的tensor长度与相似度张量相同先初始化为全0
label_tensor = torch.zeros((sim_table_tensor.size(0), 1), device='cuda')
# 生成带标签的相似度张量
sim_table_tensor_labeled = torch.cat((sim_table_tensor, label_tensor), 1)
# 找到匹配元组对的行索引
mask = (data['label'].isin(['1']))
match_pair_indices = data[mask].index.tolist()
# 根据索引将匹配的行标签置为1
sim_table_tensor_labeled[match_pair_indices, -1] = 1.00
sorted_unique_value_tensor_list = []
for _ in range(len(col_tuple_list)):
# 将sim_table_tensor每一列的值从小到大排列加入列表
sorted_unique_value_tensor = torch.sort(sim_table_tensor[:, _].unique()).values
# 将每一列可能的相似度取值中小于0的都删掉
sorted_unique_value_tensor = sorted_unique_value_tensor[sorted_unique_value_tensor >= 0]
sorted_unique_value_tensor_list.append(sorted_unique_value_tensor)
# 随机生成候选MD, 形成一个二维张量, 每一行代表一个候选MD
candidate_mds_tensor = build_candidate_md_matrix(sorted_unique_value_tensor_list)
result_list = []
# 遍历每一个MD
for _ in tqdm(range(candidate_mds_tensor.shape[0])):
# 对每一个MD加一个0.5的标记, 意为match
md_tensor_labeled = torch.cat((candidate_mds_tensor[_], torch.tensor([0.5], device='cuda')), 0)
abs_support, confidence = get_metrics(md_tensor_labeled, sim_table_tensor_labeled)
if abs_support >= support_threshold and confidence >= confidence_threshold:
md_list_format = [round(i, 2) for i in candidate_mds_tensor[_].tolist()]
md_dict_format = {}
for k in range(0, len(columns_without_prefix)):
md_dict_format[columns_without_prefix[k]] = md_list_format[k]
result_list.append((md_dict_format, abs_support, confidence))
# result_list.sort(key=itemgetter(2), reverse=True)
# 按confidence->support的优先级排序
result_list.sort(key=itemgetter(2, 1), reverse=True)
result_list = merge_mds(result_list)
result_list.sort(key=itemgetter(2, 1), reverse=True)
# 保存到本地
mds_to_txt(result_list)
return result_list
def build_col_tuple_list(columns_):
col_tuple_list_ = []
for _ in columns_:
if _.startswith('ltable'):
left_index = columns_.index(_)
right_index = columns_.index(_.replace('ltable_', 'rtable_'))
col_tuple_list_.append((left_index, right_index))
return col_tuple_list_
def get_metrics(md_tensor_labeled_, sim_table_tensor_labeled_):
table_tensor_length = sim_table_tensor_labeled_.size()[0]
# MD原本为列向量, 转置为行向量
md_tensor_labeled_2d = md_tensor_labeled_.unsqueeze(1).transpose(0, 1)
# 沿行扩展1倍(不扩展), 沿列扩展至与相似度表同样长
md_tensor_labeled_2d = md_tensor_labeled_2d.repeat(table_tensor_length, 1)
# 去掉标签列, 判断每一行相似度是否大于等于MD要求, 该张量行数与sim_table_tensor_labeled_相同, 少一列标签列
support_tensor = torch.ge(sim_table_tensor_labeled_[:, :-1], md_tensor_labeled_2d[:, :-1])
# 沿行方向判断support_tensor每一行是否都为True, 行数不变, 压缩为1列
support_tensor = torch.all(support_tensor, dim=1, keepdim=True)
# 统计这个tensor中True的个数, 即为absolute support
abs_support_ = torch.sum(support_tensor).item()
# 保留标签列, 判断每一行相似度是否大于等于MD要求
support_tensor = torch.ge(sim_table_tensor_labeled_, md_tensor_labeled_2d)
# 统计既满足相似度要求也匹配的, abs_strict_support表示左右都满足的个数
support_tensor = torch.all(support_tensor, dim=1, keepdim=True)
abs_strict_support_ = torch.sum(support_tensor).item()
# 计算confidence
confidence_ = abs_strict_support_ / abs_support_ if abs_support_ > 0 else 0
return abs_support_, confidence_
# 随机生成MD, 拼成一个矩阵, 每一行代表一条MD
def build_candidate_md_matrix(sorted_unique_value_tensor_list_: list):
# 假设先随机抽取20000条
length_ = len(sorted_unique_value_tensor_list_)
N = 20000
# 对于第一列所有相似度取值, 随机有放回地抽取N个, 生成行索引
indices = torch.randint(0, len(sorted_unique_value_tensor_list_[0]), (N, 1))
# 为每一列生成一个索引张量, 表示从相应列张量中随机选择的值的索引
for _ in range(1, length_):
indices = torch.cat((indices, torch.randint(0, len(sorted_unique_value_tensor_list_[_]), (N, 1))), dim=1)
# 使用生成的索引从每个列相似度张量中选取值, 构成新的张量
candidate_md_matrix_list = []
for _ in range(length_):
candidate_md_matrix_list.append(sorted_unique_value_tensor_list_[_][indices[:, _].long()].unsqueeze(1))
candidate_md_matrix_ = torch.cat(candidate_md_matrix_list, dim=1)
# 此tensor将与其他置为-1的tensor拼接
joint_candidate_md_matrix_ = candidate_md_matrix_.clone()
# 随机将1列, 2列......, M-1列置为-1
for i in range(length_ - 1):
index_list_format = []
for j in range(candidate_md_matrix_.shape[0]):
# 对每条MD随机选择将要置为-1的列索引
index_list_format.append(random.sample([_ for _ in range(0, length_)], i + 1))
index = torch.tensor(index_list_format, device='cuda')
# 随机调整为-1后的MD集合
modified_candidate = candidate_md_matrix_.scatter(1, index, -1)
joint_candidate_md_matrix_ = torch.cat((joint_candidate_md_matrix_, modified_candidate), 0)
joint_candidate_md_matrix_ = joint_candidate_md_matrix_.unique(dim=0)
return joint_candidate_md_matrix_
def mds_to_txt(result_list_):
p = md_output_dir + r"\mds.txt"
with open(p, 'w') as f:
for _ in result_list_:
f.write(f'MD: {str(_[0])}\tAbsolute Support: {str(_[1])}\tConfidence: {str(_[2])}')
f.write('\n')
# 合并一些MD
def merge_mds(md_list_):
# 创建一个空字典用于分组
grouped_md_tuples = {}
# 遍历三元组并对它们进行分组
for md_tuple in md_list_:
# 提取Support和Confidence的值作为字典的键
key = (md_tuple[1], md_tuple[2])
# 检查键是否已经存在于分组字典中
if key in grouped_md_tuples:
# 如果存在,将三元组添加到对应的列表中
grouped_md_tuples[key].append(md_tuple)
else:
# 如果不存在,创建一个新的键值对
grouped_md_tuples[key] = [md_tuple]
# 不要键只要值
# 一个二级列表, 每个子列表中MD tuple的support和confidence一样
grouped_md_tuples = list(grouped_md_tuples.values())
for same_sc_list in grouped_md_tuples:
# 创建一个索引列表,用于标记需要删除的元组
indices_to_remove = []
# 获取元组列表的长度
length = len(same_sc_list)
# 遍历元组列表,进行比较和删除操作
for i in range(length):
for j in range(length):
# 比较两个元组的字典值
if i != j and all(same_sc_list[i][0][key_] >= same_sc_list[j][0][key_] for key_ in same_sc_list[i][0]):
# 如果同组内一个MD的所有相似度阈值都大于等于另一个MD, 则前者可以删除
indices_to_remove.append(i)
break # 由于列表大小会变化,跳出内层循环
# 根据索引列表逆序删除元组,以避免在删除时改变列表大小
for index in sorted(indices_to_remove, reverse=True):
del same_sc_list[index]
# 二级列表转一级列表
return list(itertools.chain.from_iterable(grouped_md_tuples))
if __name__ == '__main__':
_train = pd.read_csv(directory_path + r'\train_whole.csv')
result = mining(_train)
with open(md_output_dir + "mds.pickle", "wb") as file_:
pickle.dump(result, file_)

@ -0,0 +1,173 @@
import json
import os
import pickle
import time
import deepmatcher as dm
import torch
import pandas as pd
import ConfigSpace
from ConfigSpace import Configuration
from ConfigSpace.read_and_write import json as csj
import torch.nn.functional
from tqdm import tqdm
from setting import *
def matching(config):
print(f'\033[33mConfig: {config}\033[0m')
start = time.time()
with open(md_output_dir + "mds.pickle", "rb") as file:
md_list = pickle.load(file)
train, valid, test = dm.data.process(
path=directory_path,
train='train_whole.csv',
validation='valid_whole.csv',
test='test_whole.csv',
use_magellan_convention=True, # 与Magellan命名风格相同
embeddings=config['embeddings'])
train_table = train.get_raw_table()
test_table = test.get_raw_table()
valid_table = valid.get_raw_table()
attr_summarizer = config['attr_summarizer']
if attr_summarizer == 'sif':
model_ = dm.MatchingModel(
attr_comparator=config['attr_comparator'],
classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[
'classifier_nonlinear'],
attr_summarizer=dm.attr_summarizers.SIF(word_contextualizer=config['word_contextualizer'],
word_comparator=config['word_comparator'],
word_aggregator=config['word_aggregator']))
elif attr_summarizer == 'rnn':
model_ = dm.MatchingModel(
attr_comparator=config['attr_comparator'],
classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[
'classifier_nonlinear'],
attr_summarizer=dm.attr_summarizers.RNN(word_contextualizer=config['word_contextualizer'],
word_comparator=config['word_comparator'],
word_aggregator=config['word_aggregator']))
elif attr_summarizer == 'attention':
model_ = dm.MatchingModel(
attr_comparator=config['attr_comparator'],
classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[
'classifier_nonlinear'],
attr_summarizer=dm.attr_summarizers.Attention(word_contextualizer=config['word_contextualizer'],
word_comparator=config['word_comparator'],
word_aggregator=config['word_aggregator']))
else: # 'hybrid'
model_ = dm.MatchingModel(
attr_comparator=config['attr_comparator'],
classifier=str(config['classifier_layers']) + '-layer-' + config['classifier_bypass'] + '-' + config[
'classifier_nonlinear'],
attr_summarizer=dm.attr_summarizers.Hybrid(word_contextualizer=config['word_contextualizer'],
word_comparator=config['word_comparator'],
word_aggregator=config['word_aggregator']))
model_.run_train(
train,
valid,
device='cuda',
epochs=10,
batch_size=16,
best_save_path=attr_summarizer + '_model.pth',
pos_neg_ratio=3)
indicators = {}
f1_score = model_.run_eval(test, device='cuda')
indicators["F1"] = f1_score
predictions = model_.run_prediction(test, device='cuda', output_attributes=True)
# predictions中没有predicted列, 根据match_score手动新增 deepmatcher在计算F1时的阈值为0.5
predictions['predicted'] = predictions['match_score'].apply(lambda score: 1 if score >= 0.5 else 0)
# 目前predictions包含的属性左右表全部属性+label+predicted+match_score+_id
sim_tensor_dict = build_col_pairs_sim_tensor_dict(predictions)
predictions['confidence'] = 0
epl_match = 0 # 可解释预测match
if len(md_list) > 0:
for row in tqdm(predictions.itertuples()):
x = is_explicable(row, md_list, sim_tensor_dict)
if x > 0 and str(getattr(row, 'predicted')) == str(1):
predictions.loc[row[0], 'confidence'] = x
epl_match += 1
df = predictions[predictions['predicted'] == str(1)]
interpretability = epl_match / len(df) # 可解释性
indicators['interpretability'] = interpretability
performance = interpre_weight * interpretability + (1 - interpre_weight) * indicators["F1"]
indicators['performance'] = performance
print(f'ER Indicators: {indicators}')
predictions.to_csv(er_output_dir + r'\predictions.csv', sep=',', index=False, header=True)
print(f'\033[33mTime consumed by matching in seconds: {time.time() - start}\033[0m')
return indicators
def build_col_pairs_sim_tensor_dict(predictions: pd.DataFrame):
predictions_attrs = predictions.columns.values.tolist()
col_tuple_list = []
for _ in predictions_attrs:
if _.startswith('ltable'):
left_index = predictions_attrs.index(_)
right_index = predictions_attrs.index(_.replace('ltable_', 'rtable_'))
col_tuple_list.append((left_index, right_index))
length = predictions.shape[0]
# width = predictions.shape[1]
predictions = predictions.reset_index(drop=True)
sentences = predictions.values.flatten(order='F').tolist()
embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True)
split_embedding = torch.split(embedding, length, dim=0)
table_tensor = torch.stack(split_embedding, dim=0, out=None)
# prediction的归一化嵌入张量
norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2)
sim_tensor_dict = {}
for col_tuple in col_tuple_list:
lattr_tensor = norm_table_tensor[col_tuple[0]]
rattr_tensor = norm_table_tensor[col_tuple[1]]
mul_tensor = lattr_tensor * rattr_tensor
sim_tensor = torch.sum(mul_tensor, 1)
sim_tensor = torch.round(sim_tensor * 100) / 100
sim_tensor_dict[predictions_attrs[col_tuple[0]].replace('ltable_', '')] = sim_tensor
return sim_tensor_dict
def is_explicable(row, all_mds: list, st_dict):
attrs = all_mds[0][0].keys() # 从第一条md_tuple中的md字典中读取所有字段
for md_tuple in all_mds:
explicable = True # 假设这条md能解释当前元组
for a in attrs:
if st_dict[a][row[0]].item() < md_tuple[0][a]:
explicable = False # 任意一个字段的相似度达不到阈值这条md就不能解释当前元组
break # 不再与当前md的其他相似度阈值比较跳转到下一条md
if explicable:
return md_tuple[2] # 任意一条md能解释直接返回
return -1.0 # 遍历结束,不能解释
def ml_er(config: Configuration):
indicators = matching(config)
output_path = er_output_dir + "eval_result.txt"
with open(output_path, 'w') as _f:
_f.write('F1:' + str(indicators["F1"]) + '\n')
_f.write('interpretability:' + str(indicators['interpretability']) + '\n')
_f.write('performance:' + str(indicators['performance']) + '\n')
if __name__ == '__main__':
if os.path.isfile(hpo_output_dir + "incumbent.json"):
with open(hpo_output_dir + "configspace.json", 'r') as f:
dict_configspace = json.load(f)
str_configspace = json.dumps(dict_configspace)
configspace = csj.read(str_configspace)
with open(hpo_output_dir + "incumbent.json", 'r') as f:
dic = json.load(f)
configuration = ConfigSpace.Configuration(configspace, values=dic)
ml_er(configuration)

@ -0,0 +1,12 @@
from sentence_transformers import SentenceTransformer
directory_path = r'E:\Data\Research\Datasets\DeepMatcher dataset\Textual\Abt-Buy'
er_output_dir = r'E:\Data\Research\Projects\md_bayesian_er_deepmatcher\ml_er\output'
md_output_dir = r'E:\Data\Research\Projects\md_bayesian_er_deepmatcher\md_discovery\output'
hpo_output_dir = r'E:\Data\Research\Projects\md_bayesian_er_deepmatcher\hpo\output'
model = SentenceTransformer('E:\\Data\\Research\\Models\\all-MiniLM-L6-v2')
interpre_weight = 0 # 可解释性权重
support_threshold = 1
confidence_threshold = 0.75

@ -0,0 +1,29 @@
# train/valid/test set中只有左右id映射, 该方法根据左右表以及id构建完整的set
import pandas as pd
from setting import directory_path
def build_whole_X_set(X_set, _ltable, _rtable):
merged_set = pd.merge(X_set, _ltable, on='ltable_id', how='left')
merged_set = pd.merge(merged_set, _rtable, on='rtable_id', how='left')
merged_set.insert(0, '_id', range(len(merged_set)))
return merged_set
if __name__ == '__main__':
# 读入两张表, 加前缀
ltable = pd.read_csv(directory_path + r'\tableA.csv', encoding='ISO-8859-1').rename(columns=lambda x: f'ltable_{x}')
rtable = pd.read_csv(directory_path + r'\tableB.csv', encoding='ISO-8859-1').rename(columns=lambda x: f'rtable_{x}')
train = pd.read_csv(directory_path + r'\train.csv', encoding='ISO-8859-1')
valid = pd.read_csv(directory_path + r'\valid.csv', encoding='ISO-8859-1')
test = pd.read_csv(directory_path + r'\test.csv', encoding='ISO-8859-1')
train = build_whole_X_set(train, ltable, rtable)
valid = build_whole_X_set(valid, ltable, rtable)
test = build_whole_X_set(test, ltable, rtable)
train.to_csv(directory_path + r'\train_whole.csv', sep=',', index=False, header=True)
valid.to_csv(directory_path + r'\valid_whole.csv', sep=',', index=False, header=True)
test.to_csv(directory_path + r'\test_whole.csv', sep=',', index=False, header=True)
Loading…
Cancel
Save