/** * 微服务治理框架 * 包含服务发现、负载均衡和熔断降级功能 */ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class MicroServiceGovernance { // 服务注册中心组件 private final ServiceRegistry serviceRegistry; // 负载均衡器映射 private final Map loadBalancers; // 断路器映射 private final Map circuitBreakers; // 故障服务列表 private final Map faultyServices = new ConcurrentHashMap<>(); // 服务调用统计 private final Map callStats = new ConcurrentHashMap<>(); // 单例实例 private static volatile MicroServiceGovernance instance; private MicroServiceGovernance() { this.serviceRegistry = new ServiceRegistry(); this.loadBalancers = new ConcurrentHashMap<>(); this.circuitBreakers = new ConcurrentHashMap<>(); System.out.println("微服务治理框架初始化完成"); } // 获取单例实例 public static MicroServiceGovernance getInstance() { if (instance == null) { synchronized (MicroServiceGovernance.class) { if (instance == null) { instance = new MicroServiceGovernance(); } } } return instance; } // 服务注册 public void registerService(String serviceName, String host, int port) { serviceRegistry.register(serviceName, host, port); // 为新服务初始化负载均衡器 getLoadBalancer(serviceName); // 为新服务初始化断路器 getCircuitBreaker(serviceName); } // 服务注销 public void deregisterService(String serviceName, String host, int port) { serviceRegistry.deregister(serviceName, host, port); } // 获取负载均衡器 private LoadBalancer getLoadBalancer(String serviceName) { return loadBalancers.computeIfAbsent(serviceName, name -> { LoadBalancer balancer = new LoadBalancer(name); System.out.println("为服务 " + name + " 创建负载均衡器"); return balancer; }); } // 获取断路器 private CircuitBreaker getCircuitBreaker(String serviceName) { return circuitBreakers.computeIfAbsent(serviceName, name -> { CircuitBreaker breaker = new CircuitBreaker(name, 5, 10000, 2); System.out.println("为服务 " + name + " 创建断路器"); return breaker; }); } // 调用服务(适配测试类的方法签名) public String callService(String serviceName, String method, String path) { // 创建一个简单的参数映射 Map params = new HashMap<>(); params.put("path", path); params.put("timestamp", String.valueOf(System.currentTimeMillis())); return callServiceInternal(serviceName, method, params); } // 服务调用(集成了服务发现、负载均衡和熔断降级) public ServiceCallResult callService(String serviceName, String path, Map params) { // 获取断路器 CircuitBreaker circuitBreaker = getCircuitBreaker(serviceName); // 检查断路器状态 if (!circuitBreaker.allowRequest()) { System.out.println("服务调用被断路器拒绝: " + serviceName + " (状态: " + circuitBreaker.getState() + ")"); return new ServiceCallResult(false, "服务暂时不可用,请稍后再试", null); } try { // 获取服务实例列表 List instances = serviceRegistry.discover(serviceName); if (instances.isEmpty()) { circuitBreaker.recordFailure(); return new ServiceCallResult(false, "未找到可用的服务实例: " + serviceName, null); } // 负载均衡选择实例 LoadBalancer loadBalancer = getLoadBalancer(serviceName); ServiceRegistry.ServiceInstance instance = loadBalancer.choose(instances); if (instance == null) { circuitBreaker.recordFailure(); return new ServiceCallResult(false, "负载均衡失败,未选择到服务实例", null); } // 模拟服务调用 System.out.println("调用服务: " + serviceName + " 实例: " + instance + " 路径: " + path); // 模拟随机失败(用于测试熔断) boolean success = simulateServiceCall(instance, serviceName); // 更新调用统计 updateCallStats(serviceName, instance, success); if (success) { circuitBreaker.recordSuccess(); Map response = new HashMap<>(); response.put("instance", instance.toString()); response.put("path", path); response.put("timestamp", System.currentTimeMillis()); return new ServiceCallResult(true, "调用成功", response); } else { circuitBreaker.recordFailure(); return new ServiceCallResult(false, "服务调用失败", null); } } catch (Exception e) { circuitBreaker.recordFailure(); return new ServiceCallResult(false, "服务调用异常: " + e.getMessage(), null); } } // 内部调用服务方法 public String callServiceInternal(String serviceName, String method, Map params) { // 1. 通过服务注册中心发现服务 List instances = serviceRegistry.discover(serviceName); if (instances.isEmpty()) { return "服务不可用: " + serviceName; } // 2. 通过负载均衡器选择实例 LoadBalancer loadBalancer = getLoadBalancer(serviceName); ServiceRegistry.ServiceInstance instance = loadBalancer.choose(instances); if (instance == null) { return "无法选择服务实例: " + serviceName; } // 3. 通过断路器调用服务 String key = serviceName + ":" + instance.getHost() + ":" + instance.getPort(); CircuitBreaker circuitBreaker = circuitBreakers.computeIfAbsent(key, name -> new CircuitBreaker(name, 5, 10000, 2)); if (!circuitBreaker.allowRequest()) { return "服务调用被断路器拒绝: " + serviceName; } try { // 模拟服务调用 String result = simulateServiceCallInternal(instance, method, params); circuitBreaker.recordSuccess(); // 更新调用统计 updateCallStats(serviceName, instance, true); return result; } catch (Exception e) { circuitBreaker.recordFailure(); // 更新调用统计 updateCallStats(serviceName, instance, false); return "服务调用失败: " + e.getMessage(); } } // 模拟服务调用(内部方法) private String simulateServiceCallInternal(ServiceRegistry.ServiceInstance instance, String method, Map params) { // 检查故障服务列表 String faultKey = instance.getHost() + ":" + instance.getPort(); if (faultyServices.containsKey(faultKey) && faultyServices.get(faultKey)) { throw new RuntimeException("模拟服务故障: " + instance); } // 模拟处理延迟 try { Thread.sleep(10 + (int)(Math.random() * 50)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } // 生成调用结果 String path = params.getOrDefault("path", ""); return method + " " + instance.getHost() + ":" + instance.getPort() + path + " - 调用成功"; } // 模拟服务调用(随机失败以测试熔断) private boolean simulateServiceCall(ServiceRegistry.ServiceInstance instance, String serviceName) { // 检查故障服务列表 String faultKey = instance.getHost() + ":" + instance.getPort(); if (faultyServices.containsKey(faultKey) && faultyServices.get(faultKey)) { return false; } // 90%的成功率,用于测试 return Math.random() < 0.9; } // 更新调用统计 private void updateCallStats(String serviceName, ServiceRegistry.ServiceInstance instance, boolean success) { CallStats stats = callStats.computeIfAbsent(serviceName, k -> new CallStats()); stats.recordCall(success); // 更新实例统计 String instanceKey = instance.getHost() + ":" + instance.getPort(); stats.instanceStats.computeIfAbsent(instanceKey, k -> new ServiceInstanceStats()).recordCall(success); } // 列出所有服务 public List listServices() { List services = new ArrayList<>(); // 从服务注册中心获取所有服务 serviceRegistry.getAllServices().forEach(service -> { List instances = serviceRegistry.discover(service); for (ServiceRegistry.ServiceInstance instance : instances) { services.add(service + " - " + instance.getHost() + ":" + instance.getPort()); } }); return services; } // 模拟服务故障 public void simulateServiceFailure(String serviceName, String host, int port, boolean isFaulty) { String key = host + ":" + port; faultyServices.put(key, isFaulty); System.out.println("服务故障状态更新: " + serviceName + "[" + host + ":" + port + "] = " + (isFaulty ? "故障" : "正常")); } // 打印治理统计信息 public void printGovernanceStats() { System.out.println("===== 微服务治理统计 ====="); // 打印服务调用统计 callStats.forEach((service, stats) -> { System.out.println("\n服务: " + service); System.out.println(" 总调用次数: " + stats.getTotalCalls()); System.out.println(" 成功调用: " + stats.getSuccessCalls()); System.out.println(" 失败调用: " + stats.getFailureCalls()); System.out.println(" 成功率: " + String.format("%.2f%%", stats.getTotalCalls() > 0 ? (double) stats.getSuccessCalls() / stats.getTotalCalls() * 100 : 0)); // 打印实例统计 System.out.println(" 实例统计:"); stats.getInstanceStats().forEach((instance, instanceStats) -> { System.out.println(" " + instance + ": 调用 " + instanceStats.getTotalCalls() + " 次, 成功率 " + String.format("%.2f%%", instanceStats.getSuccessRate())); }); }); // 打印断路器状态 System.out.println("\n断路器状态:"); circuitBreakers.forEach((key, breaker) -> { System.out.println(" " + key + ": " + breaker.getState() + ", 失败计数: " + breaker.getFailureCount()); }); System.out.println("========================"); } // 获取服务治理统计信息 public Map getGovernanceStats() { Map stats = new HashMap<>(); // 服务统计 Map serviceStats = new HashMap<>(); for (String service : serviceRegistry.getAllServices()) { int instanceCount = serviceRegistry.discover(service).size(); serviceStats.put(service, instanceCount); } stats.put("services", serviceStats); // 断路器统计 Map breakerStats = new HashMap<>(); for (Map.Entry entry : circuitBreakers.entrySet()) { CircuitBreaker breaker = entry.getValue(); breakerStats.put(entry.getKey(), breaker.getState() + " (失败: " + breaker.getFailureCount() + ")"); } stats.put("circuitBreakers", breakerStats); // 添加调用统计 stats.put("callStats", callStats); return stats; } // 服务调用统计 private static class CallStats { private final AtomicInteger totalCalls = new AtomicInteger(0); private final AtomicInteger successCalls = new AtomicInteger(0); private final AtomicInteger failureCalls = new AtomicInteger(0); private final Map instanceStats = new ConcurrentHashMap<>(); // Getters public int getTotalCalls() { return totalCalls.get(); } public int getSuccessCalls() { return successCalls.get(); } public int getFailureCalls() { return failureCalls.get(); } public Map getInstanceStats() { return instanceStats; } // 更新统计 public void recordCall(boolean success) { totalCalls.incrementAndGet(); if (success) { successCalls.incrementAndGet(); } else { failureCalls.incrementAndGet(); } } } // 服务实例调用统计 private static class ServiceInstanceStats { private final AtomicInteger totalCalls = new AtomicInteger(0); private final AtomicInteger successCalls = new AtomicInteger(0); public void recordCall(boolean success) { totalCalls.incrementAndGet(); if (success) { successCalls.incrementAndGet(); } } public int getTotalCalls() { return totalCalls.get(); } public int getSuccessCalls() { return successCalls.get(); } public double getSuccessRate() { int total = getTotalCalls(); return total > 0 ? (double) getSuccessCalls() / total * 100 : 0; } } // 服务注册中心内部类 public static class ServiceRegistry { private final Map> services = new ConcurrentHashMap<>(); public void register(String serviceName, String host, int port) { services.computeIfAbsent(serviceName, k -> new ArrayList<>()).add(new ServiceInstance(host, port)); System.out.println("服务注册: " + serviceName + " at " + host + ":" + port); } public void deregister(String serviceName, String host, int port) { List instances = services.get(serviceName); if (instances != null) { instances.removeIf(instance -> instance.getHost().equals(host) && instance.getPort() == port); if (instances.isEmpty()) { services.remove(serviceName); } } } public List discover(String serviceName) { List instances = services.get(serviceName); return instances != null ? new ArrayList<>(instances) : Collections.emptyList(); } public Set getAllServices() { return new HashSet<>(services.keySet()); } public static class ServiceInstance { private final String host; private final int port; public ServiceInstance(String host, int port) { this.host = host; this.port = port; } public String getHost() { return host; } public int getPort() { return port; } @Override public String toString() { return host + ":" + port; } } } // 负载均衡器内部类 public static class LoadBalancer { private final String name; private final AtomicInteger counter = new AtomicInteger(0); public LoadBalancer(String name) { this.name = name; } // 轮询负载均衡 public ServiceRegistry.ServiceInstance choose(List instances) { if (instances == null || instances.isEmpty()) { return null; } // 轮询选择实例 int index = Math.abs(counter.getAndIncrement() % instances.size()); return instances.get(index); } } // 断路器内部类 public static class CircuitBreaker { public enum State { CLOSED, OPEN, HALF_OPEN } private final String name; private final int failureThreshold; private final long resetTimeoutMs; private final int successThreshold; private volatile State state = State.CLOSED; private final AtomicInteger failureCount = new AtomicInteger(0); private final AtomicInteger successCount = new AtomicInteger(0); private volatile long lastFailureTime = 0; public CircuitBreaker(String name, int failureThreshold, long resetTimeoutMs, int successThreshold) { this.name = name; this.failureThreshold = failureThreshold; this.resetTimeoutMs = resetTimeoutMs; this.successThreshold = successThreshold; } public boolean allowRequest() { if (state == State.CLOSED) { return true; } if (state == State.OPEN) { if (System.currentTimeMillis() - lastFailureTime > resetTimeoutMs) { state = State.HALF_OPEN; System.out.println("断路器状态切换: " + name + " 从 OPEN 到 HALF_OPEN"); return true; } return false; } return state == State.HALF_OPEN; } public void recordSuccess() { if (state == State.CLOSED) { failureCount.set(0); } else if (state == State.HALF_OPEN) { if (successCount.incrementAndGet() >= successThreshold) { reset(); System.out.println("断路器状态切换: " + name + " 从 HALF_OPEN 到 CLOSED"); } } } public void recordFailure() { if (state == State.CLOSED) { if (failureCount.incrementAndGet() >= failureThreshold) { state = State.OPEN; lastFailureTime = System.currentTimeMillis(); System.out.println("断路器状态切换: " + name + " 从 CLOSED 到 OPEN"); } } else if (state == State.HALF_OPEN) { state = State.OPEN; lastFailureTime = System.currentTimeMillis(); successCount.set(0); System.out.println("断路器状态切换: " + name + " 从 HALF_OPEN 到 OPEN"); } } private void reset() { state = State.CLOSED; failureCount.set(0); successCount.set(0); } public State getState() { return state; } public int getFailureCount() { return failureCount.get(); } public String getName() { return name; } } // 服务调用结果 public static class ServiceCallResult { private final boolean success; private final String message; private final Map data; public ServiceCallResult(boolean success, String message, Map data) { this.success = success; this.message = message; this.data = data; } public boolean isSuccess() { return success; } public String getMessage() { return message; } public Map getData() { return data; } } }