You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
6.1 KiB

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
import joblib
import os
import sys
# 添加项目根目录到Python路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class AutoEncoder(nn.Module):
"""
自编码器用于异常检测
"""
def __init__(self, input_dim, hidden_dim1, hidden_dim2):
super(AutoEncoder, self).__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim1),
nn.ReLU(True),
nn.Linear(hidden_dim1, hidden_dim2),
nn.ReLU(True)
)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(hidden_dim2, hidden_dim1),
nn.ReLU(True),
nn.Linear(hidden_dim1, input_dim),
nn.ReLU(True)
)
def forward(self, x):
x = self.encoder(x)
x = self.decoder(x)
return x
class AdversarialAutoEncoder(nn.Module):
"""
对抗自编码器
"""
def __init__(self, input_dim, hidden_dim1, hidden_dim2, latent_dim):
super(AdversarialAutoEncoder, self).__init__()
# 编码器
self.encoder = nn.Sequential(
nn.Linear(input_dim, hidden_dim1),
nn.ReLU(True),
nn.Linear(hidden_dim1, hidden_dim2),
nn.ReLU(True),
nn.Linear(hidden_dim2, latent_dim),
nn.ReLU(True)
)
# 解码器
self.decoder = nn.Sequential(
nn.Linear(latent_dim, hidden_dim2),
nn.ReLU(True),
nn.Linear(hidden_dim2, hidden_dim1),
nn.ReLU(True),
nn.Linear(hidden_dim1, input_dim),
nn.Sigmoid() # 使用Sigmoid确保输出在0-1之间
)
# 判别器
self.discriminator = nn.Sequential(
nn.Linear(latent_dim, hidden_dim2),
nn.ReLU(True),
nn.Linear(hidden_dim2, hidden_dim1),
nn.ReLU(True),
nn.Linear(hidden_dim1, 1),
nn.Sigmoid()
)
def encode(self, x):
return self.encoder(x)
def decode(self, z):
return self.decoder(z)
def discriminate(self, z):
return self.discriminator(z)
def forward(self, x):
z = self.encode(x)
recon_x = self.decode(z)
return recon_x, z
def train_adversarial_autoencoder():
"""
训练对抗自编码器
"""
# 读取数据
print("读取信贷数据...")
df = pd.read_csv('data/credit_data.csv')
# 只使用数值特征进行自编码器训练
numerical_features = ['age', 'income', 'employment_length', 'loan_amount',
'credit_score', 'debt_to_income', 'num_credit_lines']
X = df[numerical_features]
# 标准化数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 转换为PyTorch张量
X_tensor = torch.FloatTensor(X_scaled)
# 设置模型参数
input_dim = X_tensor.shape[1]
hidden_dim1 = 64
hidden_dim2 = 32
latent_dim = 16
# 创建模型
model = AdversarialAutoEncoder(input_dim, hidden_dim1, hidden_dim2, latent_dim)
# 设置损失函数和优化器
reconstruction_criterion = nn.MSELoss()
adversarial_criterion = nn.BCELoss()
autoencoder_optimizer = optim.Adam(
list(model.encoder.parameters()) + list(model.decoder.parameters()),
lr=0.001
)
discriminator_optimizer = optim.Adam(model.discriminator.parameters(), lr=0.001)
# 训练模型
num_epochs = 100
batch_size = 64
print("开始训练对抗自编码器...")
for epoch in range(num_epochs):
for i in range(0, len(X_tensor), batch_size):
batch = X_tensor[i:i+batch_size]
# 训练自编码器
autoencoder_optimizer.zero_grad()
recon_batch, latent_batch = model(batch)
real_labels = torch.ones(batch.size(0), 1)
fake_labels = torch.zeros(batch.size(0), 1)
# 重构损失
recon_loss = reconstruction_criterion(recon_batch, batch)
# 对抗损失 - 生成器希望判别器将生成的潜在向量识别为真实
disc_fake = model.discriminate(latent_batch)
adversarial_loss = adversarial_criterion(disc_fake, real_labels)
autoencoder_loss = recon_loss + 0.1 * adversarial_loss
autoencoder_loss.backward()
autoencoder_optimizer.step()
# 训练判别器
discriminator_optimizer.zero_grad()
# 真实潜在向量(从标准正态分布采样)
real_latent = torch.randn(batch.size(0), latent_dim)
disc_real = model.discriminate(real_latent)
disc_real_loss = adversarial_criterion(disc_real, real_labels)
# 生成的潜在向量
disc_fake = model.discriminate(latent_batch.detach())
disc_fake_loss = adversarial_criterion(disc_fake, fake_labels)
discriminator_loss = disc_real_loss + disc_fake_loss
discriminator_loss.backward()
discriminator_optimizer.step()
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{num_epochs}], '
f'Recon Loss: {recon_loss.item():.4f}, '
f'Adversarial Loss: {adversarial_loss.item():.4f}, '
f'Discriminator Loss: {discriminator_loss.item():.4f}')
# 保存模型和标准化器
print("保存对抗自编码器模型...")
torch.save(model.state_dict(), 'models/adversarial_autoencoder.pth')
joblib.dump(scaler, 'models/ae_scaler.pkl')
return model, scaler
if __name__ == "__main__":
# 检查是否有可用的GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")
model, scaler = train_adversarial_autoencoder()
print("对抗自编码器训练完成!")