import torch import torch.nn as nn import torch.optim as optim import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler import joblib import os import sys # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) class AutoEncoder(nn.Module): """ 自编码器用于异常检测 """ def __init__(self, input_dim, hidden_dim1, hidden_dim2): super(AutoEncoder, self).__init__() # 编码器 self.encoder = nn.Sequential( nn.Linear(input_dim, hidden_dim1), nn.ReLU(True), nn.Linear(hidden_dim1, hidden_dim2), nn.ReLU(True) ) # 解码器 self.decoder = nn.Sequential( nn.Linear(hidden_dim2, hidden_dim1), nn.ReLU(True), nn.Linear(hidden_dim1, input_dim), nn.ReLU(True) ) def forward(self, x): x = self.encoder(x) x = self.decoder(x) return x class AdversarialAutoEncoder(nn.Module): """ 对抗自编码器 """ def __init__(self, input_dim, hidden_dim1, hidden_dim2, latent_dim): super(AdversarialAutoEncoder, self).__init__() # 编码器 self.encoder = nn.Sequential( nn.Linear(input_dim, hidden_dim1), nn.ReLU(True), nn.Linear(hidden_dim1, hidden_dim2), nn.ReLU(True), nn.Linear(hidden_dim2, latent_dim), nn.ReLU(True) ) # 解码器 self.decoder = nn.Sequential( nn.Linear(latent_dim, hidden_dim2), nn.ReLU(True), nn.Linear(hidden_dim2, hidden_dim1), nn.ReLU(True), nn.Linear(hidden_dim1, input_dim), nn.Sigmoid() # 使用Sigmoid确保输出在0-1之间 ) # 判别器 self.discriminator = nn.Sequential( nn.Linear(latent_dim, hidden_dim2), nn.ReLU(True), nn.Linear(hidden_dim2, hidden_dim1), nn.ReLU(True), nn.Linear(hidden_dim1, 1), nn.Sigmoid() ) def encode(self, x): return self.encoder(x) def decode(self, z): return self.decoder(z) def discriminate(self, z): return self.discriminator(z) def forward(self, x): z = self.encode(x) recon_x = self.decode(z) return recon_x, z def train_adversarial_autoencoder(): """ 训练对抗自编码器 """ # 读取数据 print("读取信贷数据...") df = pd.read_csv('data/credit_data.csv') # 只使用数值特征进行自编码器训练 numerical_features = ['age', 'income', 'employment_length', 'loan_amount', 'credit_score', 'debt_to_income', 'num_credit_lines'] X = df[numerical_features] # 标准化数据 scaler = StandardScaler() X_scaled = scaler.fit_transform(X) # 转换为PyTorch张量 X_tensor = torch.FloatTensor(X_scaled) # 设置模型参数 input_dim = X_tensor.shape[1] hidden_dim1 = 64 hidden_dim2 = 32 latent_dim = 16 # 创建模型 model = AdversarialAutoEncoder(input_dim, hidden_dim1, hidden_dim2, latent_dim) # 设置损失函数和优化器 reconstruction_criterion = nn.MSELoss() adversarial_criterion = nn.BCELoss() autoencoder_optimizer = optim.Adam( list(model.encoder.parameters()) + list(model.decoder.parameters()), lr=0.001 ) discriminator_optimizer = optim.Adam(model.discriminator.parameters(), lr=0.001) # 训练模型 num_epochs = 100 batch_size = 64 print("开始训练对抗自编码器...") for epoch in range(num_epochs): for i in range(0, len(X_tensor), batch_size): batch = X_tensor[i:i+batch_size] # 训练自编码器 autoencoder_optimizer.zero_grad() recon_batch, latent_batch = model(batch) real_labels = torch.ones(batch.size(0), 1) fake_labels = torch.zeros(batch.size(0), 1) # 重构损失 recon_loss = reconstruction_criterion(recon_batch, batch) # 对抗损失 - 生成器希望判别器将生成的潜在向量识别为真实 disc_fake = model.discriminate(latent_batch) adversarial_loss = adversarial_criterion(disc_fake, real_labels) autoencoder_loss = recon_loss + 0.1 * adversarial_loss autoencoder_loss.backward() autoencoder_optimizer.step() # 训练判别器 discriminator_optimizer.zero_grad() # 真实潜在向量(从标准正态分布采样) real_latent = torch.randn(batch.size(0), latent_dim) disc_real = model.discriminate(real_latent) disc_real_loss = adversarial_criterion(disc_real, real_labels) # 生成的潜在向量 disc_fake = model.discriminate(latent_batch.detach()) disc_fake_loss = adversarial_criterion(disc_fake, fake_labels) discriminator_loss = disc_real_loss + disc_fake_loss discriminator_loss.backward() discriminator_optimizer.step() if (epoch + 1) % 10 == 0: print(f'Epoch [{epoch+1}/{num_epochs}], ' f'Recon Loss: {recon_loss.item():.4f}, ' f'Adversarial Loss: {adversarial_loss.item():.4f}, ' f'Discriminator Loss: {discriminator_loss.item():.4f}') # 保存模型和标准化器 print("保存对抗自编码器模型...") torch.save(model.state_dict(), 'models/adversarial_autoencoder.pth') joblib.dump(scaler, 'models/ae_scaler.pkl') return model, scaler if __name__ == "__main__": # 检查是否有可用的GPU device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') print(f"使用设备: {device}") model, scaler = train_adversarial_autoencoder() print("对抗自编码器训练完成!")