import itertools import pickle import random import operator from operator import itemgetter import pandas as pd import torch import matplotlib.pyplot as plt from torch import LongTensor from tqdm import tqdm from settings import * # note 对表进行嵌入时定位了有空值的cell, 计算相似度时有空值则置为-1.0000 def mining(train: pd.DataFrame): # data is train set, in which each row represents a tuple pair train = train.astype(str) # 将label列移到最后 train = pd.concat([train, pd.DataFrame({'label': train.pop('label')})], axis=1) # 尝试不将左右表key手动调整相同,而是只看gold属性是否为1 # 故将左右表key直接去除 data = train.drop(columns=['_id', 'ltable_id', 'rtable_id'], inplace=False) # data中现存属性:除key以外左右表属性和gold, 不含_id columns = data.columns.values.tolist() columns_without_prefix = [_.replace('ltable_', '') for _ in columns if _.startswith('ltable_')] # 列表, 每个元素为二元组, 包含对应列的索引 col_tuple_list = build_col_tuple_list(columns) length = data.shape[0] width = data.shape[1] # 嵌入data每一个cell, 纵向遍历 # note 此处已重设索引 data = data.reset_index(drop=True) sentences = data.values.flatten(order='F').tolist() embedding = model.encode(sentences, convert_to_tensor=True, device="cuda", batch_size=256, show_progress_bar=True) split_embedding = torch.split(embedding, length, dim=0) table_tensor = torch.stack(split_embedding, dim=0, out=None) norm_table_tensor = torch.nn.functional.normalize(table_tensor, dim=2) # sim_tensor_dict = {} sim_tensor_list = [] for col_tuple in col_tuple_list: mask = ((data[columns[col_tuple[0]]].isin([''])) | (data[columns[col_tuple[1]]].isin(['']))) empty_string_indices = data[mask].index.tolist() # 空字符串索引 lattr_tensor = norm_table_tensor[col_tuple[0]] rattr_tensor = norm_table_tensor[col_tuple[1]] mul_tensor = lattr_tensor * rattr_tensor sim_tensor = torch.sum(mul_tensor, 1) # 求和得到对应属性2列张量相似度, 2列变1列 # 将有空字符串的位置强制置为-1.0000 sim_tensor = sim_tensor.scatter(0, torch.tensor(empty_string_indices, device='cuda').long(), -1.0000) sim_tensor = torch.round(sim_tensor, decimals=2) sim_tensor_list.append(sim_tensor.unsqueeze(1)) # sim_tensor_dict[columns[col_tuple[0]].replace('ltable_', '')] = sim_tensor sim_table_tensor = torch.cat(sim_tensor_list, dim=1) # 创建一个1列的tensor,长度与相似度张量相同,先初始化为全0 label_tensor = torch.zeros((sim_table_tensor.size(0), 1), device='cuda') # 生成带标签的相似度张量 sim_table_tensor_labeled = torch.cat((sim_table_tensor, label_tensor), 1) # 找到匹配元组对的行索引 mask = (data['label'].isin(['1'])) match_pair_indices = data[mask].index.tolist() # 根据索引将匹配的行标签置为1 sim_table_tensor_labeled[match_pair_indices, -1] = 1.00 sorted_unique_value_tensor_list = [] for _ in range(len(col_tuple_list)): # 将sim_table_tensor每一列的值从小到大排列,加入列表 sorted_unique_value_tensor = torch.sort(sim_table_tensor[:, _].unique()).values # 将每一列可能的相似度取值中小于0的都删掉 sorted_unique_value_tensor = sorted_unique_value_tensor[sorted_unique_value_tensor >= 0] sorted_unique_value_tensor_list.append(sorted_unique_value_tensor) # 随机生成候选MD, 形成一个二维张量, 每一行代表一个候选MD candidate_mds_tensor = build_candidate_md_matrix(sorted_unique_value_tensor_list) result_list = [] # 遍历每一个MD for _ in tqdm(range(candidate_mds_tensor.shape[0])): # 对每一个MD加一个0.5的标记, 意为match md_tensor_labeled = torch.cat((candidate_mds_tensor[_], torch.tensor([0.5], device='cuda')), 0) abs_support, confidence = get_metrics(md_tensor_labeled, sim_table_tensor_labeled) if abs_support >= support_threshold and confidence >= confidence_threshold: md_list_format = [round(i, 2) for i in candidate_mds_tensor[_].tolist()] md_dict_format = {} for k in range(0, len(columns_without_prefix)): md_dict_format[columns_without_prefix[k]] = md_list_format[k] result_list.append((md_dict_format, abs_support, confidence)) # result_list.sort(key=itemgetter(2), reverse=True) # 按confidence->support的优先级排序 result_list.sort(key=itemgetter(2, 1), reverse=True) result_list = merge_mds(result_list) result_list.sort(key=itemgetter(2, 1), reverse=True) # 保存到本地 mds_to_txt(result_list) return result_list # 遍历MD列表, 将满足的直接加入结果列表, 不满足的看能否收紧, 不能收紧直接跳过 # 若能收紧则将收紧后的一个个加入暂存列表, 并在该轮遍历结束后替换MD列表, 直到MD列表为空 # while len(md_list) > 0: # tmp_list = [] # for md_tensor in tqdm(md_list): # md_tensor_labeled = torch.cat((md_tensor, torch.tensor([0.5], device='cuda')), 0) # abs_support, confidence = get_metrics(md_tensor_labeled, sim_table_tensor_labeled) # # 如果support小于1, 没必要收紧阈值, 跳过 # if abs_support >= 1: # # 如果support满足但confidence不满足, 需要收紧阈值 # if confidence < confidence_threshold: # for _ in range(len(md_tensor)): # new_md_tensor = md_tensor.clone() # if new_md_tensor[_] == -1.00: # new_md_tensor[_] = sorted_unique_value_tensor_list[_][0] # if len(tmp_list) == 0: # tmp_list.append(new_md_tensor) # else: # stacked_tmp_tensors = torch.stack(tmp_list) # is_contained = (stacked_tmp_tensors == new_md_tensor) .all(dim=1).any() # if not is_contained: # tmp_list.append(new_md_tensor) # else: # a_tensor = sorted_unique_value_tensor_list[_] # b_value = new_md_tensor[_] # next_index = torch.where(a_tensor == b_value)[0].item() + 1 # if next_index < len(a_tensor): # new_md_tensor[_] = a_tensor[next_index] # tmp_list.append(new_md_tensor) # # torch.where(sorted_unique_value_tensor_list[2] == 0.16)[0].item() # # 如果都满足, 直接加进结果列表 # else: # result_list.append(md_tensor) # md_list = tmp_list def build_col_tuple_list(columns_): col_tuple_list_ = [] for _ in columns_: if _.startswith('ltable'): left_index = columns_.index(_) right_index = columns_.index(_.replace('ltable_', 'rtable_')) col_tuple_list_.append((left_index, right_index)) return col_tuple_list_ # def init_md_list(md_dimension: int): # md_list_ = [] # # 创建全为-1的初始MD, 保留两位小数 # init_md_tensor = torch.full((md_dimension, ), -1.0, device='cuda') # init_md_tensor = torch.round(init_md_tensor, decimals=2) # md_list_.append(init_md_tensor) # return md_list_ def get_metrics(md_tensor_labeled_, sim_table_tensor_labeled_): table_tensor_length = sim_table_tensor_labeled_.size()[0] # MD原本为列向量, 转置为行向量 md_tensor_labeled_2d = md_tensor_labeled_.unsqueeze(1).transpose(0, 1) # 沿行扩展1倍(不扩展), 沿列扩展至与相似度表同样长 md_tensor_labeled_2d = md_tensor_labeled_2d.repeat(table_tensor_length, 1) # 去掉标签列, 判断每一行相似度是否大于等于MD要求, 该张量行数与sim_table_tensor_labeled_相同, 少一列标签列 support_tensor = torch.ge(sim_table_tensor_labeled_[:, :-1], md_tensor_labeled_2d[:, :-1]) # 沿行方向判断support_tensor每一行是否都为True, 行数不变, 压缩为1列 support_tensor = torch.all(support_tensor, dim=1, keepdim=True) # 统计这个tensor中True的个数, 即为absolute support abs_support_ = torch.sum(support_tensor).item() # 保留标签列, 判断每一行相似度是否大于等于MD要求 support_tensor = torch.ge(sim_table_tensor_labeled_, md_tensor_labeled_2d) # 统计既满足相似度要求也匹配的, abs_strict_support表示左右都满足的个数 support_tensor = torch.all(support_tensor, dim=1, keepdim=True) abs_strict_support_ = torch.sum(support_tensor).item() # 计算confidence confidence_ = abs_strict_support_ / abs_support_ if abs_support_ > 0 else 0 return abs_support_, confidence_ # 随机生成MD, 拼成一个矩阵, 每一行代表一条MD def build_candidate_md_matrix(sorted_unique_value_tensor_list_: list): # 假设先随机抽取20000条 length_ = len(sorted_unique_value_tensor_list_) N = 20000 # 对于第一列所有相似度取值, 随机有放回地抽取N个, 生成行索引 indices = torch.randint(0, len(sorted_unique_value_tensor_list_[0]), (N, 1)) # 为每一列生成一个索引张量, 表示从相应列张量中随机选择的值的索引 for _ in range(1, length_): indices = torch.cat((indices, torch.randint(0, len(sorted_unique_value_tensor_list_[_]), (N, 1))), dim=1) # 使用生成的索引从每个列相似度张量中选取值, 构成新的张量 candidate_md_matrix_list = [] for _ in range(length_): candidate_md_matrix_list.append(sorted_unique_value_tensor_list_[_][indices[:, _].long()].unsqueeze(1)) candidate_md_matrix_ = torch.cat(candidate_md_matrix_list, dim=1) # 此tensor将与其他置为-1的tensor拼接 joint_candidate_md_matrix_ = candidate_md_matrix_.clone() # 随机将1列, 2列......, M-1列置为-1 for i in range(length_ - 1): index_list_format = [] for j in range(candidate_md_matrix_.shape[0]): # 对每条MD,随机选择将要置为-1的列索引 index_list_format.append(random.sample([_ for _ in range(0, length_)], i + 1)) index = torch.tensor(index_list_format, device='cuda') # 随机调整为-1后的MD集合 modified_candidate = candidate_md_matrix_.scatter(1, index, -1) joint_candidate_md_matrix_ = torch.cat((joint_candidate_md_matrix_, modified_candidate), 0) joint_candidate_md_matrix_ = joint_candidate_md_matrix_.unique(dim=0) return joint_candidate_md_matrix_ def mds_to_txt(result_list_): p = md_output_dir + r"\mds.txt" with open(p, 'w') as f: for _ in result_list_: f.write(f'MD: {str(_[0])}\tAbsolute Support: {str(_[1])}\tConfidence: {str(_[2])}') f.write('\n') # 合并一些MD def merge_mds(md_list_): # 创建一个空字典用于分组 grouped_md_tuples = {} # 遍历三元组并对它们进行分组 for md_tuple in md_list_: # 提取Support和Confidence的值作为字典的键 key = (md_tuple[1], md_tuple[2]) # 检查键是否已经存在于分组字典中 if key in grouped_md_tuples: # 如果存在,将三元组添加到对应的列表中 grouped_md_tuples[key].append(md_tuple) else: # 如果不存在,创建一个新的键值对 grouped_md_tuples[key] = [md_tuple] # 不要键只要值 # 一个二级列表, 每个子列表中MD tuple的support和confidence一样 grouped_md_tuples = list(grouped_md_tuples.values()) for same_sc_list in grouped_md_tuples: # 创建一个索引列表,用于标记需要删除的元组 indices_to_remove = [] # 获取元组列表的长度 length = len(same_sc_list) # 遍历元组列表,进行比较和删除操作 for i in range(length): for j in range(length): # 比较两个元组的字典值 if i != j and all(same_sc_list[i][0][key_] >= same_sc_list[j][0][key_] for key_ in same_sc_list[i][0]): # 如果同组内一个MD的所有相似度阈值都大于等于另一个MD, 则前者可以删除 indices_to_remove.append(i) break # 由于列表大小会变化,跳出内层循环 # 根据索引列表逆序删除元组,以避免在删除时改变列表大小 for index in sorted(indices_to_remove, reverse=True): del same_sc_list[index] # 二级列表转一级列表 return list(itertools.chain.from_iterable(grouped_md_tuples)) if __name__ == '__main__': _train = pd.read_csv(directory_path + r'\train_whole.csv') result = mining(_train) with open(md_output_dir + r"\mds.pickle", "wb") as file_: pickle.dump(result, file_)