import pandas as pd import numpy as np import matplotlib.pyplot as plt import torch import torch.nn.functional as F import torch.optim as optim import torch.nn as nn from torch.utils.data import DataLoader, TensorDataset from tqdm import tqdm device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") losses = [] data = pd.read_csv("./mchar_train/train.csv") data = np.array(data) labels = data[:, 0] data = data[:, 1:] data = np.resize(data, (42000, 28, 28)) data = torch.from_numpy(data).float().to(device) labels = torch.from_numpy(labels).to(device) class BasicBlock(nn.Module): expansion = 1 def __init__(self, in_planes, out_planes, stride=1): super(BasicBlock, self).__init__() self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(out_planes) self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1, padding=1, bias=False) self.bn2 = nn.BatchNorm2d(out_planes) self.relu = nn.ReLU(inplace=False) self.shortcut = nn.Sequential() if stride != 1 or in_planes != self.expansion * out_planes: self.shortcut = nn.Sequential( nn.Conv2d(in_planes, self.expansion * out_planes, kernel_size=1, stride=stride, bias=False), nn.BatchNorm2d(self.expansion * out_planes) ) def forward(self, x): out = self.relu(self.bn1(self.conv1(x))) out = self.relu(self.bn2(out)) out = out + self.shortcut(x) out = F.relu(out) return out class ResNet(nn.Module): # num_classes = 10 十分类问题 def __init__(self, block, num_blocks, num_classes=10): super(ResNet, self).__init__() self.in_planes = 64 self.conv1 = nn.Conv2d(1, 64, kernel_size=3, stride=1, padding=1, bias=False) self.bn1 = nn.BatchNorm2d(64) # 四个残差结构 self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1) self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2) self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2) self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2) self.linear = nn.Linear(512 * block.expansion, num_classes) # 激活函数 relu = max(0, x) self.relu = nn.ReLU(inplace=False) def _make_layer(self, block, planes, num_blocks, stride): strides = [stride] + [1] * (num_blocks - 1) layers = [] for stride in strides: layers.append(block(self.in_planes, planes, stride)) self.in_planes = planes * block.expansion return nn.Sequential(*layers) def forward(self, x): out = self.bn1(self.conv1(x)) out = self.layer1(out) out = self.layer2(out) out = self.layer3(out) out = self.layer4(out) # 平均池化 out = F.avg_pool2d(out, 4) out = out.view(out.size(0), -1) out = self.linear(out) return out model = ResNet(BasicBlock, [2, 2, 2, 2]) model.to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) data = torch.unsqueeze(data, 1) dataset = TensorDataset(data, labels) data_loader = DataLoader(dataset, batch_size=400, shuffle=True) epochs = 10 loss = 0 for epoch in tqdm(range(epochs), desc=f'Training Progress', leave=False): for inputs, labels in data_loader: outputs = model(inputs) # 梯度 loss = criterion(outputs, labels) losses.append(loss.item()) optimizer.zero_grad() loss.backward() optimizer.step() print(f'Epoch [{epoch + 1}/{epochs}], Loss: {float(loss.item()):.4f}') np.savetxt('loss.csv', losses, delimiter=',') y = losses y = np.array(y) x = np.arange(1, len(y) + 1) plt.xlabel("iteration") plt.ylabel("loss") plt.plot(x, y) plt.show() torch.save(model, 'model_name.pth') device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") model = torch.load('./model_name.pth', map_location=torch.device(device)) model.eval() # 设置为评估模式 data = pd.read_csv("./mchar_train/test.csv") data = np.array(data) data = torch.from_numpy(data).float().to(device) data = data.resize(28000, 28, 28) data = torch.unsqueeze(data, 1) # 使用模型进行预测 ans = np.zeros(28000) for i in tqdm(range(len(data)), desc='Predicting Progress', leave=False): d = torch.unsqueeze(data[i], 0) with torch.no_grad(): output = model(d) predicted_class = torch.argmax(output, dim=1).item() ans[i] = predicted_class np.savetxt('sample.csv', ans, delimiter=",")