You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

486 lines
25 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import cv2
import numpy as np
import tkinter as tk
from tkinter import filedialog
from read_data import *
from PIL import Image
class ModelObj: # 网络对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
self.ObjID = ObjID # 图元号
self.ObjType = ObjType # 图元类别
self.ObjLable = ObjLable # 对象标签
self.ParaString = ParaString # 参数字符串
self.ObjX = ObjX # 对象位置x坐标
self.ObjY = ObjY # 对象位置y坐标
class Data_Class(ModelObj): # 数据集网络对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.LoadData = self.load_data # 基本操作函数
self.SetDataPara = self.set_data_para # 参数设置函数
# 定义加载数据集load_data()
def load_data(self, DataPara):
global SubFolders
listimages = [] # 存储图片的列表
list_path = []
SubFolders, train_Path = read_folders(DataPara["train_imgPath"])
list_path.append(train_Path)
_, path_list = read_folders(DataPara["test_imgPath"])
list_path.append(path_list)
for data_path in list_path:
images = [] # 存储图片的列表
for path in data_path:
# print(path)
img = self.image_to_array(path, DataPara["img_width"], DataPara["img_height"]) # 读取图片数据
img = img.T # 转置,图像的行和列将互换位置。
images.append(img) # 将图片数组添加到训练集图片列表中
listimages.append(np.array(images)) # 返回转换后的数组
return listimages[0], listimages[1]
def set_data_para(self):# 定义加载数据集的参数SetLoadData()
# 设置数据集路径信息
train_imgPath = 'data_classification/train/' # 训练集文件夹的位置
test_imgPath = 'data_classification/test/' # 测试集文件夹的位置
img_width = 48 # 图片宽度
img_height = 48 # 图片高度
# 设置每批次读入图片的数量
batch_size = 32 # 批次大小
# 返回DataPara参数这里用一个字典来存储
DataPara = {"train_imgPath": train_imgPath,
"test_imgPath": test_imgPath,
"img_width": img_width,
"img_height": img_height,
"batch_size": batch_size}
return DataPara
# 定义一个函数,实现图片转换为数组的功能
def image_to_array(self, path, height, width):
img = Image.open(path).convert("L") # 转换为灰度模式
img = img.resize((height, width)) # 将图片缩放为height*width的固定大小
data = np.array(img) # 将图片转换为数组格式
data = data / 255.0 # 将数组中的数值归一化除以255
return data
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.LoadData,
self.SetDataPara, self.ParaString, self.ObjX, self.ObjY]
return result
# if __name__ == '__main__':
# DataSet = Data_Class("DataSet1", 1, "数据集1", ".", 120, 330)
# print(DataSet)
class Conv_Class(ModelObj): # 卷积对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.ConvProc = self.conv_proc # 基本操作函数
self.SetConvPara = self.setconv_para # 参数设置函数
# 定义卷积函数ConvProc()
def conv_proc(self, image, ConvPara):
# 获取输入数据的大小,这里假设是单通道的图片
c, image_h, image_w = image.shape
kernel_h = ConvPara["kernel_h"] # 获取卷积核的大小和卷积核
kernel_w = ConvPara["kernel_w"]
kernel = ConvPara["kernel"]
out_h = (image_h - kernel_h) // ConvPara["stride"] + 1 # 计算输出数据的大小
out_w = (image_w - kernel_w) // ConvPara["stride"] + 1
output = np.zeros((c, out_h, out_w)) # 初始化输出数据为零矩阵
for k in range(c): # 遍历每个通道
for i in range(out_h): # 遍历每个输出位置
for j in range(out_w):
stride = ConvPara["stride"] # 获得步长
output[k, i, j] = np.sum(
image[k, i * stride:i * stride + 3, j * stride:j * stride + 3] * kernel) # 计算卷积操作
return output # 返回卷积计算后的数组(特征向量)
def setconv_para(self):# 定义设置卷积参数的函数SetConvPara()
kernel_h = 3 # 设置卷积核大小这里假设是3x3
kernel_w = 3
kernel = [[1.289202, -1.471377, -0.238452], # 卷积核为3x3的单位矩阵(高斯分布)
[-0.562343, -0.019988, -0.441446],
[1.627381, 1.390266, 0.812486]]
stride = 1 # 设置步长这里假设是1
padding = 0 # 设置填充这里假设是0
ConvPara = {"kernel": kernel, # 返回ConvPara参数这里用一个字典来存储
"kernel_h": kernel_h,
"kernel_w": kernel_w,
"stride": stride,
"padding": padding}
return ConvPara
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.ConvProc, self.SetConvPara, self.ParaString, self.ObjX,
self.ObjY]
return result
class Pool_Class(ModelObj): # 池化对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.MaxPoolProc = self.pool_proc # 基本操作函数
self.SetPollPara = self.setpool_para # 参数设置函数
def pool_proc(self, image, PoolPara):
pool_mode = PoolPara["pool_mode"]
pool_size = PoolPara["pool_size"]
stride = PoolPara["stride"]
c, h, w = image.shape # 获取输入特征图的高度和宽度
out_h = int((h - pool_size) / stride) + 1 # 计算输出特征图的高度
out_w = int((w - pool_size) / stride) + 1 # 计算输出特征图的宽度
out = np.zeros((c, out_h, out_w)) # 初始化输出特征图为全零数组
for k in range(c): # 对于输出的每一个位置上计算:
for i in range(out_h):
for j in range(out_w):
window = image[k, i * stride:i * stride + pool_size, j * stride:j * stride + pool_size]
if pool_mode == "max": # 最大池化
out[k][i][j] = np.max(window)
elif pool_mode == "avg": # 平均池化
out[k][i][j] = np.mean(window)
elif pool_mode == "min": # 最小池化
out[k][i][j] = np.min(window)
else: # 无效的池化类型
raise ValueError("Invalid pooling mode")
return out # 返回特征图。
# 定义设置池化参数的函数
def setpool_para(self):
pool_mode = "max" # 设置池大小和池类型这里假设是2x2最大池化
pool_size = 2
stride = 2 # 设置步长这里假设是2
PoolPara = {"pool_mode": pool_mode, "pool_size": pool_size, "stride": stride} # 返回PoolPara参数这里用一个字典来存储
return PoolPara # 返回PoolPara参数
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.MaxPoolProc, self.SetPollPara, self.ParaString, self.ObjX,
self.ObjY]
return result
class FullConn_Class(ModelObj): # 全连接对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.FullConnProc = self.fullconn_proc # 基本操作函数
self.SetFullConnPara = self.setfullconn_para # 参数设置函数
def fullconn_proc(self, inputdata, FullConnPara):
weights = FullConnPara["weights"] # 从FullConnPara参数中获取权重矩阵
bias = FullConnPara["bias"] # 偏置向量
inputdata = inputdata.reshape(1, inputdata.shape[1] * inputdata.shape[2]) # 对输入进行展平处理,变换为单通道的一维数组格式
output = np.dot(inputdata, weights.T) + bias # 计算全连接层的线性变换inputdata与权重矩阵w进行乘法再加上偏置向量b
return output # 返回全连接计算后的数组
# 定义一个函数来设置全连接层的相关参数,这里可以根据实际情况修改或随机生成
def setfullconn_para(self, data, num_outputs):
# 获取池化后的图片数组的长度和宽度
c, height, width = data
num_outputs = num_outputs
weights = np.random.randn(num_outputs, height * width)
bias = np.random.randn(1, num_outputs)
# 返回FullConnPara参数这里用一个字典来存储
FullConnPara = {"weights": weights, "bias": bias, "num_outputs": num_outputs}
return FullConnPara
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.FullConnProc, self.SetFullConnPara, self.ParaString, self.ObjX,
self.ObjY]
return result
class Nonline_Class(ModelObj): # 非线性对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.NonlinearProc = self.nonlinear_proc # 基本操作函数
self.SetNonLPara = self.setnonl_para # 参数设置函数
# 定义非线性函数
def nonlinear_proc(self, inputdata, NonLPara):
nonlinearmode = NonLPara["nonlinearmode"] # 从NonLPara参数中获取非线性函数类型
if nonlinearmode == "Sigmoid": # 判断nonlinearmode进行相应的计算
output = 1 / (1 + np.exp(-inputdata)) # Sigmoid函数将任何实数的输入映射到0和1之间的输出
elif nonlinearmode == "ReLU":
output = np.maximum(inputdata, 0) # ReLU函数将负数输入置为0而正数输入保持不变
elif nonlinearmode == "Tanh":
output = np.tanh(inputdata) # Tanh函数将任何实数的输入映射到-1和1之间的输出
else:
raise ValueError("Invalid nonlinear mode") # 非法的非线性类型,抛出异常
return output # 返回计算后的值
# 定义设置非线性参数的函数
def setnonl_para(self):
# 可以选择"Sigmoid", "ReLU" 或 "Tanh"
nonlinearmode = "ReLU" # 确定参数信息:非线性函数的类型
NonLPara = {"nonlinearmode": nonlinearmode} # 返回NonLPara参数这里用一个字典来存储
return NonLPara # 返回NonLPara参数
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.NonlinearProc, self.SetNonLPara, self.ParaString, self.ObjX,
self.ObjY]
return result
class Label: # 标签
# 设置标签类别列表并将其转化为one-hot向量的形式
def setlabel_para(self, label_list):
num_classes = len(label_list)
identity_matrix = np.eye(num_classes)
label_dict = {label: identity_matrix[i] for i, label in enumerate(label_list)}
return label_dict
# 读取样本数据集,遍历每个样本,并将样本和对应的标签组成元组,返回标记好标签的样本列表
def label_proc(self, samples, labels, label_dict):
labeled_samples = [(sample, label_dict[label]) for sample, label in zip(samples, labels)]
return labeled_samples
def label_array(self, i):
# 读取标签数据
path_csv = 'train.csv'
df = pd.read_csv(path_csv, header=None, skiprows=range(0, i * 32), nrows=(i + 1) * 32 - i * 32)
# print(df)
# 将标签数据转化成数组
right_label = df.iloc[:, 0].tolist()
right_label = list(map(int, right_label))
right_label = [x for x in right_label]
# print(right_label)
return right_label
class Classifier_Class(ModelObj): # 分类对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.ClassifierProc = self.classifier_proc # 基本操作函数
self.SetClassifyPara = self.setclassify_para # 参数设置函数
def classifier_proc(self, inputdata, ClassifyPara):
def softmax(x): # 定义softmax函数
x -= np.max(x) # 减去最大值,防止数值溢出
return np.exp(x) / np.sum(np.exp(x)) # 计算指数和归一化
threshold = ClassifyPara["threshold"] # 从ClassifyPara参数中获取阈值
output = -1 # 初始化输出为-1
prob = softmax(inputdata) # 调用softmax函数得到概率分布向量
prob1 = prob[prob >= threshold] # 如果概率高于阈值,就将该类别加入输出结果
index = np.where(prob == max(prob1)) # 使用where()函数来返回等于概率最大值的元素的索引
output = index[1].item(0) + 1 # 使用item()方法来将索引转换为标准Python标量
return output # 返回分类标签
# 定义设置分类函数参数的函数
def setclassify_para(self):
threshold = 0.1 # 设定阈值,可以根据你的数据和任务来调整阈值
ClassifyPara = {"threshold": threshold} # 返回ClassifyPara参数这里用一个字典来存储
return ClassifyPara # 返回ClassifyPara参数
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.ClassifierProc, self.SetClassifyPara, self.ParaString, self.ObjX,
self.ObjY]
return result
class Error_Class(ModelObj): # 误差计算对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.ErrorProc = self.error_proc # 基本操作函数
self.SetErrorPara = self.seterror_para # 参数设置函数
def error_proc(self, input, label, ErrorPara):
label_list, loss_type = ErrorPara # 读取标签列表和损失函数类型
one_hot_matrix = np.eye(len(label_list)) # 创建一个单位矩阵,大小为标签类别的个数
index = [x for x in label]
# print(label)
label_one_hot = np.take(one_hot_matrix, index, axis=0) # 从one-hot矩阵中取出对应的向量
# print(label_one_hot)
if loss_type == "CEE": # 确定损失函数类别实现不同的损失函数计算输入值与label之间的误差
# 使用交叉熵损失函数,公式为:-sum(label_one_hot * log(input)) / n
loss = -np.sum(label_one_hot * np.log(input)) / len(input)
elif loss_type == "MSE":
# 使用均方误差损失函数公式为sum((input - label_one_hot) ** 2) / n
loss = np.sum((input - label_one_hot) ** 2) / len(input)
elif loss_type == "MAE":
# 使用平均绝对误差损失函数公式为sum(abs(input - label_one_hot)) / n
loss = np.sum(np.abs(input - label_one_hot)) / len(input)
else:
raise ValueError("Invalid loss type") # 如果损失函数类型不在以上三种中,抛出异常
return loss # 返回误差值loss
# 定义设置误差参数的函数
def seterror_para(self):
label_list = [0, 1, 2, 3, 4, 5, 6] # 确定参数信息: 标签类别,损失函数类型
loss_type = "CEE" # 假设损失函数类型为交叉熵Cross Entropy ErrorCEE
ErrorPara = (label_list, loss_type) # 返回ErrorProc参数这里用一个元组来存储
return ErrorPara # 返回ErrorPara参数
def output(self): # 输出方法
# 创建一个空列表
result = [self.ObjID, self.ObjType, self.ObjLable, self.ErrorProc, self.SetErrorPara, self.ParaString, self.ObjX,
self.ObjY]
return result
class AjConv_Class(ModelObj): # 卷积调整对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.AjConvProc = self.ajconv_proc # 基本操作函数
self.SetAjConvPara = self.setajconv_para # 参数设置函数
def ajconv_proc(self, images, AjConvPara):
kernel_grad_list = []
bias_grad = 0 # 初始化偏置项梯度为零
for c in images:
# 计算卷积核和偏置项的梯度
kernel_grad = np.zeros_like(AjConvPara['kernel_info']) # 初始化卷积核梯度为零矩阵
for i in range(AjConvPara['loss'].shape[0]): # 遍历误差值矩阵的行
for j in range(AjConvPara['loss'].shape[1]): # 遍历误差值矩阵的列
# 将输入数据数组中对应的子矩阵旋转180度与误差值相乘累加到卷积核梯度上
kernel_grad += np.rot90(c[i:i + kernel_grad.shape[0], j:j + kernel_grad.shape[0]], 2) * AjConvPara['loss'][i, j]
# 将误差值累加到偏置项梯度上
bias_grad += AjConvPara['loss'][i, j]
kernel_grad_list.append(kernel_grad)
# 使用stack函数沿着第0个轴把一百个a数组堆叠起来
result = np.stack(kernel_grad_list, axis=0)
kernel_grad = np.sum(result, axis=0) / len(images) # 沿着第0个维度求和
# 更新卷积核和偏置项参数
kernel = AjConvPara['kernel_info'] - AjConvPara['learning_rate'] * kernel_grad # 卷积核参数减去学习率乘以卷积核梯度
return kernel # 返回更新后的卷积核
def setajconv_para(self, loss, ConvPara):
kernel = ConvPara['kernel'] # 卷积核信息
learning_rate = 0.01 # 学习率
loss = np.array([[loss]])
AjConvPara = {'kernel_info': kernel, 'learning_rate': learning_rate, 'loss': loss}
return AjConvPara
class AjFullconn_Class(ModelObj): # 全连接调整对象
def __init__(self, ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY):
super().__init__(ObjID, ObjType, ObjLable, ParaString, ObjX, ObjY)
self.AjFullconnProc = self.ajfullconn_proc # 基本操作函数
self.SetAjFCPara = self.setajfc_para # 参数设置函数
def ajfullconn_proc(self, AjFCPara):
# 根据激活函数的参数选择相应的函数和导数
# 计算权重矩阵和偏置向量的梯度,使用链式法则
gradient_weights = np.outer(AjFCPara['loss'], AjFCPara['learning_rate'])
# 更新权重矩阵和偏置向量
weight_matrix = AjFCPara['weights'] - gradient_weights
bias_vector = AjFCPara['bias'] - AjFCPara['learning_rate'] * AjFCPara['bias']
# 返回更新后的权重矩阵和偏置向量
return weight_matrix, bias_vector
def setajfc_para(self, loss, FullConnPara):
weights = FullConnPara["weights"]
bias = FullConnPara["bias"]
loss = np.array([loss])
AjFCPara = {
'weights': weights, # 全连接权重
'bias': bias, # 全连接偏置
'learning_rate': 0.01, # 学习率
'loss': loss # 误差值
}
return AjFCPara
def main():
DataPara = DataSet.SetDataPara() # setload_data()函数,获取加载数据集的参数
train_images, test_images = DataSet.LoadData(DataPara)
ConvPara = Conv.SetConvPara() # 调用SetConvPara()函数,获取卷积层参数
PoolPara = Pool.SetPollPara()
FullConnPara = FullConn.SetFullConnPara((1, 23, 23), 7)
NonLPara = Nonline.SetNonLPara()
ClassifyPara = Classifier.SetClassifyPara()
ErrorPara = Error.SetErrorPara()
LabelPara = Label()
# AjFCPara = AjFullconn.SetAjFCPara()
for i in range(len(train_images) // 32):
images = train_images[i * 32:(i + 1) * 32]
# 存储卷积处理后的图片的列表
conv_images = []
# 获取训练集的图片数据
for image in images:
# 获取矩阵的维度
dim = len(image.shape)
# 如果是二维矩阵,则转化为三维矩阵
if dim == 2:
image_h, image_w = image.shape
image = np.reshape(image, (1, image_h, image_w))
# 调用ConvProc()函数根据ConvPara参数完成卷积计算
output = Conv.ConvProc(image, ConvPara)
# 将卷积结果存储到列表
conv_images.append(output)
# 若为三维矩阵,则保持不变直接卷积处理
elif dim == 3:
output = Conv.ConvProc(image, ConvPara)
conv_images.append(output)
# 将卷积处理后的图片列表转换为数组形式,方便后续处理
conv_images = np.array(conv_images)
pool_images = [] # 存储池化处理后的图片的列表
for image in conv_images: # 获取卷积后的图片数据
output = Pool.MaxPoolProc(image, PoolPara)
pool_images.append(output) # 将池化结果存储到列表
pool_images = np.array(pool_images) # 将池化处理后的图片列表转换为数组形式,方便后续处理
# print(conv_images)
# print(pool_images[0].shape)
# 存储全连接处理后的图片的列表
fullconn_images = []
# 获取池化后的图片数据
for image in pool_images:
output = FullConn.FullConnProc(image, FullConnPara)
# 将全连接处理后的结果存储到列表
fullconn_images.append(output)
# 将全连接处理后的图片列表转换为数组形式,方便后续处理
fullconn_images = np.array(fullconn_images)
# print(fullconn_images)
# 存储非线性处理后的图片的列表
nonlinear_images = []
for image in fullconn_images: # 获取全连接处理后的图片数据
output = Nonline.NonlinearProc(image, NonLPara)
# 将非线性处理后的结果存储到列表
nonlinear_images.append(output)
# 将非线性处理后的图片列表转换为数组形式,方便后续处理
nonlinear_images = np.array(nonlinear_images)
# print(nonlinear_images)
classifier_images = [] # 存储分类处理后的图片的列表
prob_images = [] # 存储分类处理后的概率向量
def softmax(x): # 定义softmax函数
x -= np.max(x) # 减去最大值,防止数值溢出
return np.exp(x) / np.sum(np.exp(x)) # 计算指数和归一化
for image in nonlinear_images: # 获取非线性处理后的图片数据
prob = softmax(image) # 调用softmax函数得到概率分布向量
prob_images.append(prob) # 将概率向量结果存储到列表
output = Classifier.ClassifierProc(image, ClassifyPara) # 进行分类处理
classifier_images.append(output) # 将分类结果存储到列表
classifier_images = np.array(classifier_images) # 将分类的结果列表转换为数组形式,方便后续处理
print(classifier_images)
# print(setlabel_para())
label_dict = LabelPara.setlabel_para([0, 1, 2, 3, 4, 5, 6])
right_label = LabelPara.label_array(i)
labeled_samples = LabelPara.label_proc(images, right_label, label_dict)
print(right_label)
# 假设有以下输入值和真实标签值,输入值是一个概率矩阵,真实标签值是一个类别列表
prob_images = np.squeeze(prob_images)
# print(prob_images)
loss = Error.ErrorProc(prob_images, right_label, ErrorPara) # 计算误差值
print(loss)
AjConvPara = AjConv.SetAjConvPara(loss, ConvPara)
ConvPara['kernel'] = AjConv.AjConvProc(images, AjConvPara)
print(ConvPara['kernel'])
AjFCPara = AjFullconn.SetAjFCPara(loss, FullConnPara)
weight, bias = AjFullconn.AjFullconnProc(AjFCPara)
FullConnPara['weights'] = weight
FullConnPara['bias'] = bias
# print(weight, bias)
if __name__ == '__main__':
DataSet = Data_Class("DataSet1", 1, "数据集1", [], 120, 330)
Conv = Conv_Class("Conv1", 2, "卷积1", [], 250, 330)
Pool = Pool_Class("Pool1", 3, "最大池化1", [], 380, 330)
FullConn = FullConn_Class("FullConn1", 4, "全连接1", [], 510, 330)
Nonline = Nonline_Class("Nonline1", 5, "非线性函数1", [], 640, 330)
Classifier = Classifier_Class("Classifier1", 6, "分类1", [], 780, 330)
Error = Error_Class("Error1", 7, "误差计算1", [], 710, 124)
AjConv = AjConv_Class("AjConv1", 8, "卷积调整1", [], 250, 70)
AjFullconn = AjFullconn_Class("AjFullconn1", 9, "全连接调整1", [], 510, 120)
# AllModelObj = [DataSet, Conv, Pool, FullConn, Nonline, Classifier, Error, AjConv, AjFullconn]
main()