You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
192 lines
6.1 KiB
192 lines
6.1 KiB
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import numpy as np
|
|
import pandas as pd
|
|
from sklearn.preprocessing import StandardScaler
|
|
import joblib
|
|
import os
|
|
import sys
|
|
|
|
# 添加项目根目录到Python路径
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
class AutoEncoder(nn.Module):
|
|
"""
|
|
自编码器用于异常检测
|
|
"""
|
|
def __init__(self, input_dim, hidden_dim1, hidden_dim2):
|
|
super(AutoEncoder, self).__init__()
|
|
# 编码器
|
|
self.encoder = nn.Sequential(
|
|
nn.Linear(input_dim, hidden_dim1),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim1, hidden_dim2),
|
|
nn.ReLU(True)
|
|
)
|
|
|
|
# 解码器
|
|
self.decoder = nn.Sequential(
|
|
nn.Linear(hidden_dim2, hidden_dim1),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim1, input_dim),
|
|
nn.ReLU(True)
|
|
)
|
|
|
|
def forward(self, x):
|
|
x = self.encoder(x)
|
|
x = self.decoder(x)
|
|
return x
|
|
|
|
class AdversarialAutoEncoder(nn.Module):
|
|
"""
|
|
对抗自编码器
|
|
"""
|
|
def __init__(self, input_dim, hidden_dim1, hidden_dim2, latent_dim):
|
|
super(AdversarialAutoEncoder, self).__init__()
|
|
# 编码器
|
|
self.encoder = nn.Sequential(
|
|
nn.Linear(input_dim, hidden_dim1),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim1, hidden_dim2),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim2, latent_dim),
|
|
nn.ReLU(True)
|
|
)
|
|
|
|
# 解码器
|
|
self.decoder = nn.Sequential(
|
|
nn.Linear(latent_dim, hidden_dim2),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim2, hidden_dim1),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim1, input_dim),
|
|
nn.Sigmoid() # 使用Sigmoid确保输出在0-1之间
|
|
)
|
|
|
|
# 判别器
|
|
self.discriminator = nn.Sequential(
|
|
nn.Linear(latent_dim, hidden_dim2),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim2, hidden_dim1),
|
|
nn.ReLU(True),
|
|
nn.Linear(hidden_dim1, 1),
|
|
nn.Sigmoid()
|
|
)
|
|
|
|
def encode(self, x):
|
|
return self.encoder(x)
|
|
|
|
def decode(self, z):
|
|
return self.decoder(z)
|
|
|
|
def discriminate(self, z):
|
|
return self.discriminator(z)
|
|
|
|
def forward(self, x):
|
|
z = self.encode(x)
|
|
recon_x = self.decode(z)
|
|
return recon_x, z
|
|
|
|
def train_adversarial_autoencoder():
|
|
"""
|
|
训练对抗自编码器
|
|
"""
|
|
# 读取数据
|
|
print("读取信贷数据...")
|
|
df = pd.read_csv('data/credit_data.csv')
|
|
|
|
# 只使用数值特征进行自编码器训练
|
|
numerical_features = ['age', 'income', 'employment_length', 'loan_amount',
|
|
'credit_score', 'debt_to_income', 'num_credit_lines']
|
|
X = df[numerical_features]
|
|
|
|
# 标准化数据
|
|
scaler = StandardScaler()
|
|
X_scaled = scaler.fit_transform(X)
|
|
|
|
# 转换为PyTorch张量
|
|
X_tensor = torch.FloatTensor(X_scaled)
|
|
|
|
# 设置模型参数
|
|
input_dim = X_tensor.shape[1]
|
|
hidden_dim1 = 64
|
|
hidden_dim2 = 32
|
|
latent_dim = 16
|
|
|
|
# 创建模型
|
|
model = AdversarialAutoEncoder(input_dim, hidden_dim1, hidden_dim2, latent_dim)
|
|
|
|
# 设置损失函数和优化器
|
|
reconstruction_criterion = nn.MSELoss()
|
|
adversarial_criterion = nn.BCELoss()
|
|
|
|
autoencoder_optimizer = optim.Adam(
|
|
list(model.encoder.parameters()) + list(model.decoder.parameters()),
|
|
lr=0.001
|
|
)
|
|
discriminator_optimizer = optim.Adam(model.discriminator.parameters(), lr=0.001)
|
|
|
|
# 训练模型
|
|
num_epochs = 100
|
|
batch_size = 64
|
|
|
|
print("开始训练对抗自编码器...")
|
|
for epoch in range(num_epochs):
|
|
for i in range(0, len(X_tensor), batch_size):
|
|
batch = X_tensor[i:i+batch_size]
|
|
|
|
# 训练自编码器
|
|
autoencoder_optimizer.zero_grad()
|
|
|
|
recon_batch, latent_batch = model(batch)
|
|
real_labels = torch.ones(batch.size(0), 1)
|
|
fake_labels = torch.zeros(batch.size(0), 1)
|
|
|
|
# 重构损失
|
|
recon_loss = reconstruction_criterion(recon_batch, batch)
|
|
|
|
# 对抗损失 - 生成器希望判别器将生成的潜在向量识别为真实
|
|
disc_fake = model.discriminate(latent_batch)
|
|
adversarial_loss = adversarial_criterion(disc_fake, real_labels)
|
|
|
|
autoencoder_loss = recon_loss + 0.1 * adversarial_loss
|
|
autoencoder_loss.backward()
|
|
autoencoder_optimizer.step()
|
|
|
|
# 训练判别器
|
|
discriminator_optimizer.zero_grad()
|
|
|
|
# 真实潜在向量(从标准正态分布采样)
|
|
real_latent = torch.randn(batch.size(0), latent_dim)
|
|
disc_real = model.discriminate(real_latent)
|
|
disc_real_loss = adversarial_criterion(disc_real, real_labels)
|
|
|
|
# 生成的潜在向量
|
|
disc_fake = model.discriminate(latent_batch.detach())
|
|
disc_fake_loss = adversarial_criterion(disc_fake, fake_labels)
|
|
|
|
discriminator_loss = disc_real_loss + disc_fake_loss
|
|
discriminator_loss.backward()
|
|
discriminator_optimizer.step()
|
|
|
|
if (epoch + 1) % 10 == 0:
|
|
print(f'Epoch [{epoch+1}/{num_epochs}], '
|
|
f'Recon Loss: {recon_loss.item():.4f}, '
|
|
f'Adversarial Loss: {adversarial_loss.item():.4f}, '
|
|
f'Discriminator Loss: {discriminator_loss.item():.4f}')
|
|
|
|
# 保存模型和标准化器
|
|
print("保存对抗自编码器模型...")
|
|
torch.save(model.state_dict(), 'models/adversarial_autoencoder.pth')
|
|
joblib.dump(scaler, 'models/ae_scaler.pkl')
|
|
|
|
return model, scaler
|
|
|
|
if __name__ == "__main__":
|
|
# 检查是否有可用的GPU
|
|
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
|
|
print(f"使用设备: {device}")
|
|
|
|
model, scaler = train_adversarial_autoencoder()
|
|
print("对抗自编码器训练完成!") |