You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
496 lines
20 KiB
496 lines
20 KiB
/**
|
|
* 微服务治理框架
|
|
* 包含服务发现、负载均衡和熔断降级功能
|
|
*/
|
|
import java.util.*;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
|
public class MicroServiceGovernance {
|
|
// 服务注册中心组件
|
|
private final ServiceRegistry serviceRegistry;
|
|
// 负载均衡器映射
|
|
private final Map<String, LoadBalancer> loadBalancers;
|
|
// 断路器映射
|
|
private final Map<String, CircuitBreaker> circuitBreakers;
|
|
// 故障服务列表
|
|
private final Map<String, Boolean> faultyServices = new ConcurrentHashMap<>();
|
|
// 服务调用统计
|
|
private final Map<String, CallStats> callStats = new ConcurrentHashMap<>();
|
|
// 单例实例
|
|
private static volatile MicroServiceGovernance instance;
|
|
|
|
private MicroServiceGovernance() {
|
|
this.serviceRegistry = new ServiceRegistry();
|
|
this.loadBalancers = new ConcurrentHashMap<>();
|
|
this.circuitBreakers = new ConcurrentHashMap<>();
|
|
System.out.println("微服务治理框架初始化完成");
|
|
}
|
|
|
|
// 获取单例实例
|
|
public static MicroServiceGovernance getInstance() {
|
|
if (instance == null) {
|
|
synchronized (MicroServiceGovernance.class) {
|
|
if (instance == null) {
|
|
instance = new MicroServiceGovernance();
|
|
}
|
|
}
|
|
}
|
|
return instance;
|
|
}
|
|
|
|
// 服务注册
|
|
public void registerService(String serviceName, String host, int port) {
|
|
serviceRegistry.register(serviceName, host, port);
|
|
// 为新服务初始化负载均衡器
|
|
getLoadBalancer(serviceName);
|
|
// 为新服务初始化断路器
|
|
getCircuitBreaker(serviceName);
|
|
}
|
|
|
|
// 服务注销
|
|
public void deregisterService(String serviceName, String host, int port) {
|
|
serviceRegistry.deregister(serviceName, host, port);
|
|
}
|
|
|
|
// 获取负载均衡器
|
|
private LoadBalancer getLoadBalancer(String serviceName) {
|
|
return loadBalancers.computeIfAbsent(serviceName, name -> {
|
|
LoadBalancer balancer = new LoadBalancer(name);
|
|
System.out.println("为服务 " + name + " 创建负载均衡器");
|
|
return balancer;
|
|
});
|
|
}
|
|
|
|
// 获取断路器
|
|
private CircuitBreaker getCircuitBreaker(String serviceName) {
|
|
return circuitBreakers.computeIfAbsent(serviceName, name -> {
|
|
CircuitBreaker breaker = new CircuitBreaker(name, 5, 10000, 2);
|
|
System.out.println("为服务 " + name + " 创建断路器");
|
|
return breaker;
|
|
});
|
|
}
|
|
|
|
// 调用服务(适配测试类的方法签名)
|
|
public String callService(String serviceName, String method, String path) {
|
|
// 创建一个简单的参数映射
|
|
Map<String, String> params = new HashMap<>();
|
|
params.put("path", path);
|
|
params.put("timestamp", String.valueOf(System.currentTimeMillis()));
|
|
|
|
return callServiceInternal(serviceName, method, params);
|
|
}
|
|
|
|
// 服务调用(集成了服务发现、负载均衡和熔断降级)
|
|
public ServiceCallResult callService(String serviceName, String path, Map<String, String> params) {
|
|
// 获取断路器
|
|
CircuitBreaker circuitBreaker = getCircuitBreaker(serviceName);
|
|
|
|
// 检查断路器状态
|
|
if (!circuitBreaker.allowRequest()) {
|
|
System.out.println("服务调用被断路器拒绝: " + serviceName + " (状态: " + circuitBreaker.getState() + ")");
|
|
return new ServiceCallResult(false, "服务暂时不可用,请稍后再试", null);
|
|
}
|
|
|
|
try {
|
|
// 获取服务实例列表
|
|
List<ServiceRegistry.ServiceInstance> instances = serviceRegistry.discover(serviceName);
|
|
if (instances.isEmpty()) {
|
|
circuitBreaker.recordFailure();
|
|
return new ServiceCallResult(false, "未找到可用的服务实例: " + serviceName, null);
|
|
}
|
|
|
|
// 负载均衡选择实例
|
|
LoadBalancer loadBalancer = getLoadBalancer(serviceName);
|
|
ServiceRegistry.ServiceInstance instance = loadBalancer.choose(instances);
|
|
|
|
if (instance == null) {
|
|
circuitBreaker.recordFailure();
|
|
return new ServiceCallResult(false, "负载均衡失败,未选择到服务实例", null);
|
|
}
|
|
|
|
// 模拟服务调用
|
|
System.out.println("调用服务: " + serviceName + " 实例: " + instance + " 路径: " + path);
|
|
|
|
// 模拟随机失败(用于测试熔断)
|
|
boolean success = simulateServiceCall(instance, serviceName);
|
|
|
|
// 更新调用统计
|
|
updateCallStats(serviceName, instance, success);
|
|
|
|
if (success) {
|
|
circuitBreaker.recordSuccess();
|
|
Map<String, Object> response = new HashMap<>();
|
|
response.put("instance", instance.toString());
|
|
response.put("path", path);
|
|
response.put("timestamp", System.currentTimeMillis());
|
|
return new ServiceCallResult(true, "调用成功", response);
|
|
} else {
|
|
circuitBreaker.recordFailure();
|
|
return new ServiceCallResult(false, "服务调用失败", null);
|
|
}
|
|
} catch (Exception e) {
|
|
circuitBreaker.recordFailure();
|
|
return new ServiceCallResult(false, "服务调用异常: " + e.getMessage(), null);
|
|
}
|
|
}
|
|
|
|
// 内部调用服务方法
|
|
public String callServiceInternal(String serviceName, String method, Map<String, String> params) {
|
|
// 1. 通过服务注册中心发现服务
|
|
List<ServiceRegistry.ServiceInstance> instances = serviceRegistry.discover(serviceName);
|
|
if (instances.isEmpty()) {
|
|
return "服务不可用: " + serviceName;
|
|
}
|
|
|
|
// 2. 通过负载均衡器选择实例
|
|
LoadBalancer loadBalancer = getLoadBalancer(serviceName);
|
|
ServiceRegistry.ServiceInstance instance = loadBalancer.choose(instances);
|
|
if (instance == null) {
|
|
return "无法选择服务实例: " + serviceName;
|
|
}
|
|
|
|
// 3. 通过断路器调用服务
|
|
String key = serviceName + ":" + instance.getHost() + ":" + instance.getPort();
|
|
CircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent(key, name -> new CircuitBreaker(name, 5, 10000, 2));
|
|
|
|
if (!circuitBreaker.allowRequest()) {
|
|
return "服务调用被断路器拒绝: " + serviceName;
|
|
}
|
|
|
|
try {
|
|
// 模拟服务调用
|
|
String result = simulateServiceCallInternal(instance, method, params);
|
|
circuitBreaker.recordSuccess();
|
|
|
|
// 更新调用统计
|
|
updateCallStats(serviceName, instance, true);
|
|
|
|
return result;
|
|
} catch (Exception e) {
|
|
circuitBreaker.recordFailure();
|
|
|
|
// 更新调用统计
|
|
updateCallStats(serviceName, instance, false);
|
|
|
|
return "服务调用失败: " + e.getMessage();
|
|
}
|
|
}
|
|
|
|
// 模拟服务调用(内部方法)
|
|
private String simulateServiceCallInternal(ServiceRegistry.ServiceInstance instance, String method, Map<String, String> params) {
|
|
// 检查故障服务列表
|
|
String faultKey = instance.getHost() + ":" + instance.getPort();
|
|
if (faultyServices.containsKey(faultKey) && faultyServices.get(faultKey)) {
|
|
throw new RuntimeException("模拟服务故障: " + instance);
|
|
}
|
|
|
|
// 模拟处理延迟
|
|
try {
|
|
Thread.sleep(10 + (int)(Math.random() * 50));
|
|
} catch (InterruptedException e) {
|
|
Thread.currentThread().interrupt();
|
|
}
|
|
|
|
// 生成调用结果
|
|
String path = params.getOrDefault("path", "");
|
|
return method + " " + instance.getHost() + ":" + instance.getPort() + path + " - 调用成功";
|
|
}
|
|
|
|
// 模拟服务调用(随机失败以测试熔断)
|
|
private boolean simulateServiceCall(ServiceRegistry.ServiceInstance instance, String serviceName) {
|
|
// 检查故障服务列表
|
|
String faultKey = instance.getHost() + ":" + instance.getPort();
|
|
if (faultyServices.containsKey(faultKey) && faultyServices.get(faultKey)) {
|
|
return false;
|
|
}
|
|
// 90%的成功率,用于测试
|
|
return Math.random() < 0.9;
|
|
}
|
|
|
|
// 更新调用统计
|
|
private void updateCallStats(String serviceName, ServiceRegistry.ServiceInstance instance, boolean success) {
|
|
CallStats stats = callStats.computeIfAbsent(serviceName, k -> new CallStats());
|
|
stats.recordCall(success);
|
|
|
|
// 更新实例统计
|
|
String instanceKey = instance.getHost() + ":" + instance.getPort();
|
|
stats.instanceStats.computeIfAbsent(instanceKey, k -> new ServiceInstanceStats()).recordCall(success);
|
|
}
|
|
|
|
// 列出所有服务
|
|
public List<String> listServices() {
|
|
List<String> services = new ArrayList<>();
|
|
// 从服务注册中心获取所有服务
|
|
serviceRegistry.getAllServices().forEach(service -> {
|
|
List<ServiceRegistry.ServiceInstance> instances = serviceRegistry.discover(service);
|
|
for (ServiceRegistry.ServiceInstance instance : instances) {
|
|
services.add(service + " - " + instance.getHost() + ":" + instance.getPort());
|
|
}
|
|
});
|
|
return services;
|
|
}
|
|
|
|
// 模拟服务故障
|
|
public void simulateServiceFailure(String serviceName, String host, int port, boolean isFaulty) {
|
|
String key = host + ":" + port;
|
|
faultyServices.put(key, isFaulty);
|
|
System.out.println("服务故障状态更新: " + serviceName + "[" + host + ":" + port + "] = " + (isFaulty ? "故障" : "正常"));
|
|
}
|
|
|
|
// 打印治理统计信息
|
|
public void printGovernanceStats() {
|
|
System.out.println("===== 微服务治理统计 =====");
|
|
|
|
// 打印服务调用统计
|
|
callStats.forEach((service, stats) -> {
|
|
System.out.println("\n服务: " + service);
|
|
System.out.println(" 总调用次数: " + stats.getTotalCalls());
|
|
System.out.println(" 成功调用: " + stats.getSuccessCalls());
|
|
System.out.println(" 失败调用: " + stats.getFailureCalls());
|
|
System.out.println(" 成功率: " + String.format("%.2f%%",
|
|
stats.getTotalCalls() > 0 ? (double) stats.getSuccessCalls() / stats.getTotalCalls() * 100 : 0));
|
|
|
|
// 打印实例统计
|
|
System.out.println(" 实例统计:");
|
|
stats.getInstanceStats().forEach((instance, instanceStats) -> {
|
|
System.out.println(" " + instance + ": 调用 " + instanceStats.getTotalCalls() + " 次, 成功率 " +
|
|
String.format("%.2f%%", instanceStats.getSuccessRate()));
|
|
});
|
|
});
|
|
|
|
// 打印断路器状态
|
|
System.out.println("\n断路器状态:");
|
|
circuitBreakers.forEach((key, breaker) -> {
|
|
System.out.println(" " + key + ": " + breaker.getState() + ", 失败计数: " + breaker.getFailureCount());
|
|
});
|
|
|
|
System.out.println("========================");
|
|
}
|
|
|
|
// 获取服务治理统计信息
|
|
public Map<String, Object> getGovernanceStats() {
|
|
Map<String, Object> stats = new HashMap<>();
|
|
|
|
// 服务统计
|
|
Map<String, Integer> serviceStats = new HashMap<>();
|
|
for (String service : serviceRegistry.getAllServices()) {
|
|
int instanceCount = serviceRegistry.discover(service).size();
|
|
serviceStats.put(service, instanceCount);
|
|
}
|
|
stats.put("services", serviceStats);
|
|
|
|
// 断路器统计
|
|
Map<String, String> breakerStats = new HashMap<>();
|
|
for (Map.Entry<String, CircuitBreaker> entry : circuitBreakers.entrySet()) {
|
|
CircuitBreaker breaker = entry.getValue();
|
|
breakerStats.put(entry.getKey(), breaker.getState() + " (失败: " + breaker.getFailureCount() + ")");
|
|
}
|
|
stats.put("circuitBreakers", breakerStats);
|
|
|
|
// 添加调用统计
|
|
stats.put("callStats", callStats);
|
|
|
|
return stats;
|
|
}
|
|
|
|
// 服务调用统计
|
|
private static class CallStats {
|
|
private final AtomicInteger totalCalls = new AtomicInteger(0);
|
|
private final AtomicInteger successCalls = new AtomicInteger(0);
|
|
private final AtomicInteger failureCalls = new AtomicInteger(0);
|
|
private final Map<String, ServiceInstanceStats> instanceStats = new ConcurrentHashMap<>();
|
|
|
|
// Getters
|
|
public int getTotalCalls() { return totalCalls.get(); }
|
|
public int getSuccessCalls() { return successCalls.get(); }
|
|
public int getFailureCalls() { return failureCalls.get(); }
|
|
public Map<String, ServiceInstanceStats> getInstanceStats() { return instanceStats; }
|
|
|
|
// 更新统计
|
|
public void recordCall(boolean success) {
|
|
totalCalls.incrementAndGet();
|
|
if (success) {
|
|
successCalls.incrementAndGet();
|
|
} else {
|
|
failureCalls.incrementAndGet();
|
|
}
|
|
}
|
|
}
|
|
|
|
// 服务实例调用统计
|
|
private static class ServiceInstanceStats {
|
|
private final AtomicInteger totalCalls = new AtomicInteger(0);
|
|
private final AtomicInteger successCalls = new AtomicInteger(0);
|
|
|
|
public void recordCall(boolean success) {
|
|
totalCalls.incrementAndGet();
|
|
if (success) {
|
|
successCalls.incrementAndGet();
|
|
}
|
|
}
|
|
|
|
public int getTotalCalls() { return totalCalls.get(); }
|
|
public int getSuccessCalls() { return successCalls.get(); }
|
|
public double getSuccessRate() {
|
|
int total = getTotalCalls();
|
|
return total > 0 ? (double) getSuccessCalls() / total * 100 : 0;
|
|
}
|
|
}
|
|
|
|
// 服务注册中心内部类
|
|
public static class ServiceRegistry {
|
|
private final Map<String, List<ServiceInstance>> services = new ConcurrentHashMap<>();
|
|
|
|
public void register(String serviceName, String host, int port) {
|
|
services.computeIfAbsent(serviceName, k -> new ArrayList<>()).add(new ServiceInstance(host, port));
|
|
System.out.println("服务注册: " + serviceName + " at " + host + ":" + port);
|
|
}
|
|
|
|
public void deregister(String serviceName, String host, int port) {
|
|
List<ServiceInstance> instances = services.get(serviceName);
|
|
if (instances != null) {
|
|
instances.removeIf(instance -> instance.getHost().equals(host) && instance.getPort() == port);
|
|
if (instances.isEmpty()) {
|
|
services.remove(serviceName);
|
|
}
|
|
}
|
|
}
|
|
|
|
public List<ServiceInstance> discover(String serviceName) {
|
|
List<ServiceInstance> instances = services.get(serviceName);
|
|
return instances != null ? new ArrayList<>(instances) : Collections.emptyList();
|
|
}
|
|
|
|
public Set<String> getAllServices() {
|
|
return new HashSet<>(services.keySet());
|
|
}
|
|
|
|
public static class ServiceInstance {
|
|
private final String host;
|
|
private final int port;
|
|
|
|
public ServiceInstance(String host, int port) {
|
|
this.host = host;
|
|
this.port = port;
|
|
}
|
|
|
|
public String getHost() { return host; }
|
|
public int getPort() { return port; }
|
|
|
|
@Override
|
|
public String toString() { return host + ":" + port; }
|
|
}
|
|
}
|
|
|
|
// 负载均衡器内部类
|
|
public static class LoadBalancer {
|
|
private final String name;
|
|
private final AtomicInteger counter = new AtomicInteger(0);
|
|
|
|
public LoadBalancer(String name) {
|
|
this.name = name;
|
|
}
|
|
|
|
// 轮询负载均衡
|
|
public ServiceRegistry.ServiceInstance choose(List<ServiceRegistry.ServiceInstance> instances) {
|
|
if (instances == null || instances.isEmpty()) {
|
|
return null;
|
|
}
|
|
|
|
// 轮询选择实例
|
|
int index = Math.abs(counter.getAndIncrement() % instances.size());
|
|
return instances.get(index);
|
|
}
|
|
}
|
|
|
|
// 断路器内部类
|
|
public static class CircuitBreaker {
|
|
public enum State { CLOSED, OPEN, HALF_OPEN }
|
|
|
|
private final String name;
|
|
private final int failureThreshold;
|
|
private final long resetTimeoutMs;
|
|
private final int successThreshold;
|
|
|
|
private volatile State state = State.CLOSED;
|
|
private final AtomicInteger failureCount = new AtomicInteger(0);
|
|
private final AtomicInteger successCount = new AtomicInteger(0);
|
|
private volatile long lastFailureTime = 0;
|
|
|
|
public CircuitBreaker(String name, int failureThreshold, long resetTimeoutMs, int successThreshold) {
|
|
this.name = name;
|
|
this.failureThreshold = failureThreshold;
|
|
this.resetTimeoutMs = resetTimeoutMs;
|
|
this.successThreshold = successThreshold;
|
|
}
|
|
|
|
public boolean allowRequest() {
|
|
if (state == State.CLOSED) {
|
|
return true;
|
|
}
|
|
if (state == State.OPEN) {
|
|
if (System.currentTimeMillis() - lastFailureTime > resetTimeoutMs) {
|
|
state = State.HALF_OPEN;
|
|
System.out.println("断路器状态切换: " + name + " 从 OPEN 到 HALF_OPEN");
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
return state == State.HALF_OPEN;
|
|
}
|
|
|
|
public void recordSuccess() {
|
|
if (state == State.CLOSED) {
|
|
failureCount.set(0);
|
|
} else if (state == State.HALF_OPEN) {
|
|
if (successCount.incrementAndGet() >= successThreshold) {
|
|
reset();
|
|
System.out.println("断路器状态切换: " + name + " 从 HALF_OPEN 到 CLOSED");
|
|
}
|
|
}
|
|
}
|
|
|
|
public void recordFailure() {
|
|
if (state == State.CLOSED) {
|
|
if (failureCount.incrementAndGet() >= failureThreshold) {
|
|
state = State.OPEN;
|
|
lastFailureTime = System.currentTimeMillis();
|
|
System.out.println("断路器状态切换: " + name + " 从 CLOSED 到 OPEN");
|
|
}
|
|
} else if (state == State.HALF_OPEN) {
|
|
state = State.OPEN;
|
|
lastFailureTime = System.currentTimeMillis();
|
|
successCount.set(0);
|
|
System.out.println("断路器状态切换: " + name + " 从 HALF_OPEN 到 OPEN");
|
|
}
|
|
}
|
|
|
|
private void reset() {
|
|
state = State.CLOSED;
|
|
failureCount.set(0);
|
|
successCount.set(0);
|
|
}
|
|
|
|
public State getState() { return state; }
|
|
public int getFailureCount() { return failureCount.get(); }
|
|
public String getName() { return name; }
|
|
}
|
|
|
|
// 服务调用结果
|
|
public static class ServiceCallResult {
|
|
private final boolean success;
|
|
private final String message;
|
|
private final Map<String, Object> data;
|
|
|
|
public ServiceCallResult(boolean success, String message, Map<String, Object> data) {
|
|
this.success = success;
|
|
this.message = message;
|
|
this.data = data;
|
|
}
|
|
|
|
public boolean isSuccess() { return success; }
|
|
public String getMessage() { return message; }
|
|
public Map<String, Object> getData() { return data; }
|
|
}
|
|
} |