代码
main
LiRen-qiu 7 months ago
parent c5b07ecb07
commit e7f1010ba2

@ -0,0 +1,197 @@
import numpy as np
from pymoo.algorithms.moo.moead import MOEAD
from pymoo.visualization.pcp import PCP
from pymoo.util.ref_dirs import get_reference_directions
from pymoo.optimize import minimize
from pymoo.core.problem import Problem
from pymoo.operators.sampling.lhs import LHS
from pymoo.operators.crossover.sbx import SBX
from pymoo.operators.mutation.pm import PM
from pymoo.termination import get_termination
from sklearn.gaussian_process import GaussianProcessRegressor
# 动态模型参数
alpha = 0.05 # 冰川吸引力系数
beta = 0.12 # 税率抑制系数
eta = 0.01 # 碳排放冰川消退系数
theta = 0.005 # 自然消退速率
lambda_ = 0.2 # 游客碳排放系数
mu = 0.01 # 基础设施碳排放系数
zeta = 0.3 # 社区投资增益系数
epsilon = 0.05 # 游客拥挤影响系数
kappa = 0.15 # 治安压力系数
G0 = 81285 # 初始冰川面积km²
C_max = 10000 # 最大碳排放(吨/年)
class SustainableTourismProblem(Problem):
def __init__(self):
super().__init__(
n_var=4, # Tax, Env_Invest, Comm_Invest, V_max
n_obj=3, # 最大化收益,最小化环境压力,最小化社会压力
n_constr=6,
xl=np.array([0.01, 0.0, 0.0, 15]),
xu=np.array([0.15, 20.0, 15.0, 30])
)
def _evaluate(self, X, out, *args, **kwargs):
F = [] # 目标函数值
G = [] # 约束违反值
for params in X:
Tax, Env_Invest, Comm_Invest, V_max = params
T = 50
# 模拟20年
V = np.zeros(T)
G_t = np.zeros(T)
C = np.zeros(T)
R = np.zeros(T)
S = np.zeros(T)
# 初始条件
V[0] = 20
G_t[0] = G0
R[0] = 70
# 动态模拟
for t in range(T - 1):
# 更新游客数量受V_max限制
growth_factor = alpha * (G_t[t] / G0) - beta * Tax
V[t + 1] = V[t] * (1 + growth_factor)
V[t + 1] = min(V[t + 1], V_max)
# 更新冰川面积和碳排放
C[t] = lambda_ * V[t] - mu * Env_Invest # 环保投资直接减少碳排放
G_t[t + 1] = G_t[t] - eta * C[t] - theta * G_t[t]
# 更新居民满意度
R[t + 1] = R[t] + zeta * Comm_Invest - epsilon * V[t]
R[t + 1] = max(min(R[t + 1], 100), 0)
# 更新治安事件率
S[t + 1] = kappa * (V[t] / V_max)
# 计算目标函数
total_income = np.sum(V * 0.234) # 假设人均消费$200
environmental_pressure = 0.5 * np.sum(C)
social_pressure = 0.3 * np.sum(100 - R) + 0.2 * np.sum(S)
# 计算约束违反
g1 = 80000 - np.min(G_t) # G(t) >=80000
g2 = np.sum(C) - C_max # C_total <= C_max
g3 = np.max(V) - V_max # V(t) <= V_max
g4 = 60 - np.min(R) # R(t) >=60
g5 = np.max(S) - 0.1 # S(t) <=0.1
g6 = 0.1 * total_income - (Env_Invest + Comm_Invest) # 总投资≥10%收入
constraints = [g1, g2, g3, g4, g5, g6]
F.append([-total_income, social_pressure, environmental_pressure])
G.append(constraints)
out["F"] = np.array(F)
out["G"] = np.array(G)
# MOEA/D-EGO算法
def moead_ego(problem, n_init=50, n_iter=200):
ref_dirs = get_reference_directions("das-dennis", 3, n_partitions=18)
# 初始样本生成
X_init = LHS().do(problem, n_init).get("X")
F_init, G_init = problem.evaluate(X_init, return_values_of=["F", "G"])
# 代理模型
class SurrogateModel:
def __init__(self):
self.models = {
'f1': GaussianProcessRegressor(),
'f2': GaussianProcessRegressor(),
'f3': GaussianProcessRegressor(),
'g1': GaussianProcessRegressor(),
'g2': GaussianProcessRegressor(),
'g3': GaussianProcessRegressor(),
'g4': GaussianProcessRegressor(),
'g5': GaussianProcessRegressor(),
'g6': GaussianProcessRegressor()
}
def fit(self, X, F, G):
self.models['f1'].fit(X, F[:, 0])
self.models['f2'].fit(X, F[:, 1])
self.models['f3'].fit(X, F[:, 2])
for i in range(6):
self.models[f'g{i + 1}'].fit(X, G[:, i])
def predict(self, X):
f1_pred = self.models['f1'].predict(X)
f2_pred = self.models['f2'].predict(X)
f3_pred = self.models['f3'].predict(X)
g_pred = np.column_stack([self.models[f'g{i + 1}'].predict(X) for i in range(6)])
return f1_pred, f2_pred, f3_pred, g_pred
surrogate = SurrogateModel()
surrogate.fit(X_init, F_init, G_init)
# MOEA/D配置
algorithm = MOEAD(
ref_dirs=ref_dirs,
n_neighbors=15,
decomposition="tchebi",
prob_neighbor_mating=0.9,
sampling=LHS(),
crossover=SBX(prob=0.9, eta=15),
mutation=PM(eta=20),
)
for _ in range(n_iter):
# 生成候选解并预测
X_candidate = LHS().do(problem, 100).get("X")
f1_pred, f2_pred,f3_pred, g_pred = surrogate.predict(X_candidate)
# 计算期望改进EI
improvement = (f1_pred.min() - f1_pred) + (f2_pred.min() - f2_pred)+(f3_pred.min() - f3_pred)
best_idx = np.argmax(improvement)
X_new = X_candidate[best_idx].reshape(1, -1)
# 真实评估并更新模型
F_new, G_new = problem.evaluate(X_new, return_values_of=["F", "G"])
X_init = np.vstack([X_init, X_new])
F_init = np.vstack([F_init, F_new])
G_init = np.vstack([G_init, G_new])
surrogate.fit(X_init, F_init, G_init)
return X_init, F_init, G_init
# 运行优化并可视化
problem = SustainableTourismProblem()
X_opt, F_opt, G_opt = moead_ego(problem)
import matplotlib.pyplot as plt
plt.figure(figsize=(10, 6))
plt.scatter(-F_opt[:, 0], F_opt[:, 1], c='blue', alpha=0.5)
plt.xlabel("Total Income ($M)")
plt.ylabel("Environmental & Social Pressure")
plt.title("MOEA/D-EGO Pareto Front")
plt.grid(True)
plt.show()
# 提取可行解
feasible_mask = np.all(G_opt <= 0, axis=1)
if np.any(feasible_mask):
best_idx = np.argmin(F_opt[feasible_mask][:, 1])
best_params = X_opt[feasible_mask][best_idx]
best_objectives = F_opt[feasible_mask][best_idx]
else:
best_idx = np.argmin(F_opt[:, 1]) # 如果没有可行解,选择最小化环境压力的解
best_params = X_opt[best_idx]
best_objectives = F_opt[best_idx]
print(
f"最优参数:\n税率={best_params[0]:.2f}\n环保投资={best_params[1]:.1f}百万美元\n社会投资={best_params[2]:.1f}百万美元\n游客容量上限={best_params[3]:.1f}千人/日")
print(f"\n最优解的目标函数值:")
print(f"经济效益(总收益):{-best_objectives[0]:.2f}百万美元") # 注意:收益是负值,需要取反
print(f"环境压力:{best_objectives[1]:.2f}")
print(f"社会压力:{best_objectives[2]:.2f}")
Loading…
Cancel
Save