""" 最终一致性模式 - 数据同步系统 此模块实现了最终一致性模式,用于处理分布式系统中的数据同步问题。 最终一致性系统允许数据在一段时间内处于不一致状态,但保证最终会达到一致。 """ from abc import ABC, abstractmethod from enum import Enum, auto from typing import Dict, List, Optional, Any, Set, Callable import threading import time import uuid import queue import random class SyncStatus(Enum): """同步状态枚举""" PENDING = auto() # 待同步 SYNCHRONIZING = auto() # 正在同步 COMPLETED = auto() # 同步完成 FAILED = auto() # 同步失败 RETRYING = auto() # 重试中 class ReplicationMessage: """ 复制消息类 表示需要在系统节点间复制的数据变更消息 """ def __init__(self, operation: str, entity_id: str, data: Dict[str, Any], source_node: str): """ 初始化复制消息 Args: operation: 操作类型 (create, update, delete) entity_id: 实体ID data: 实体数据 source_node: 源节点ID """ self.message_id = str(uuid.uuid4()) self.operation = operation self.entity_id = entity_id self.data = data self.source_node = source_node self.timestamp = time.time() self.version = 1 self.destination_nodes: Set[str] = set() self.delivered_nodes: Set[str] = set() self.failed_nodes: Set[str] = set() def mark_delivered(self, node_id: str) -> None: """标记消息已送达指定节点""" self.delivered_nodes.add(node_id) if node_id in self.failed_nodes: self.failed_nodes.remove(node_id) def mark_failed(self, node_id: str) -> None: """标记消息送达指定节点失败""" self.failed_nodes.add(node_id) def is_delivered_to_all(self) -> bool: """检查消息是否已送达所有目标节点""" return self.destination_nodes.issubset(self.delivered_nodes) def to_dict(self) -> Dict[str, Any]: """转换为字典表示""" return { "message_id": self.message_id, "operation": self.operation, "entity_id": self.entity_id, "data": self.data, "source_node": self.source_node, "timestamp": self.timestamp, "version": self.version, "destination_nodes": list(self.destination_nodes), "delivered_nodes": list(self.delivered_nodes), "failed_nodes": list(self.failed_nodes) } class MessageQueue: """ 消息队列类 用于在节点间传递复制消息 """ def __init__(self, node_id: str): """ 初始化消息队列 Args: node_id: 节点ID """ self.node_id = node_id self._queue = queue.Queue() self._lock = threading.RLock() def enqueue(self, message: ReplicationMessage) -> None: """ 将消息加入队列 Args: message: 复制消息 """ with self._lock: self._queue.put(message) def dequeue(self, timeout: Optional[float] = None) -> Optional[ReplicationMessage]: """ 从队列中取出消息 Args: timeout: 超时时间 Returns: 消息,如果队列为空且超时则返回None """ try: return self._queue.get(timeout=timeout) except queue.Empty: return None def size(self) -> int: """ 获取队列大小 Returns: 队列中的消息数量 """ return self._queue.qsize() class StorageProvider(ABC): """ 存储提供者抽象基类 定义数据存储的接口 """ @abstractmethod def get(self, entity_id: str) -> Optional[Dict[str, Any]]: """获取实体""" pass @abstractmethod def put(self, entity_id: str, data: Dict[str, Any]) -> None: """存储实体""" pass @abstractmethod def delete(self, entity_id: str) -> bool: """删除实体""" pass @abstractmethod def get_all(self) -> Dict[str, Dict[str, Any]]: """获取所有实体""" pass class InMemoryStorage(StorageProvider): """ 内存存储实现 用于演示的内存存储 """ def __init__(self): """初始化内存存储""" self._data: Dict[str, Dict[str, Any]] = {} self._lock = threading.RLock() def get(self, entity_id: str) -> Optional[Dict[str, Any]]: """获取实体""" with self._lock: return self._data.get(entity_id) def put(self, entity_id: str, data: Dict[str, Any]) -> None: """存储实体""" with self._lock: self._data[entity_id] = data def delete(self, entity_id: str) -> bool: """删除实体""" with self._lock: if entity_id in self._data: del self._data[entity_id] return True return False def get_all(self) -> Dict[str, Dict[str, Any]]: """获取所有实体""" with self._lock: return self._data.copy() class Node: """ 系统节点类 表示分布式系统中的一个节点 """ def __init__(self, node_id: str, storage: StorageProvider): """ 初始化节点 Args: node_id: 节点ID storage: 存储提供者 """ self.node_id = node_id self.storage = storage self.message_queue = MessageQueue(node_id) self.received_messages: Dict[str, ReplicationMessage] = {} self.sync_status: Dict[str, SyncStatus] = {} self._lock = threading.RLock() # 启动消息处理器线程 self._stop_event = threading.Event() self._processor_thread = threading.Thread(target=self._process_messages) self._processor_thread.daemon = True self._processor_thread.start() def create_entity(self, entity_id: str, data: Dict[str, Any]) -> ReplicationMessage: """ 创建实体 Args: entity_id: 实体ID data: 实体数据 Returns: 复制消息 """ # 添加元数据 data = { **data, "_version": 1, "_created_at": time.time(), "_updated_at": time.time(), "_created_by": self.node_id } # 存储数据 self.storage.put(entity_id, data) # 创建复制消息 message = ReplicationMessage("create", entity_id, data, self.node_id) return message def update_entity(self, entity_id: str, data: Dict[str, Any]) -> Optional[ReplicationMessage]: """ 更新实体 Args: entity_id: 实体ID data: 更新的数据 Returns: 复制消息,如果实体不存在则返回None """ # 获取当前数据 current_data = self.storage.get(entity_id) if not current_data: return None # 合并数据并更新元数据 updated_data = { **current_data, **{k: v for k, v in data.items() if not k.startswith('_')}, "_version": current_data["_version"] + 1, "_updated_at": time.time() } # 存储更新后的数据 self.storage.put(entity_id, updated_data) # 创建复制消息 message = ReplicationMessage("update", entity_id, updated_data, self.node_id) return message def delete_entity(self, entity_id: str) -> Optional[ReplicationMessage]: """ 删除实体 Args: entity_id: 实体ID Returns: 复制消息,如果实体不存在则返回None """ # 获取当前数据 current_data = self.storage.get(entity_id) if not current_data: return None # 删除数据 self.storage.delete(entity_id) # 创建复制消息 message = ReplicationMessage("delete", entity_id, current_data, self.node_id) return message def receive_message(self, message: ReplicationMessage) -> bool: """ 接收复制消息 Args: message: 复制消息 Returns: 是否成功处理消息 """ with self._lock: # 检查是否已经处理过该消息 if message.message_id in self.received_messages: return True # 检查版本号,确保处理最新的消息 current_data = self.storage.get(message.entity_id) if current_data and "_version" in current_data: message_version = message.data.get("_version", 1) if message_version <= current_data["_version"]: print(f"Node {self.node_id}: Ignoring outdated message {message.message_id}") return True # 将消息加入队列 self.message_queue.enqueue(message) return True def _process_messages(self) -> None: """ 处理消息队列中的消息 """ while not self._stop_event.is_set(): try: message = self.message_queue.dequeue(timeout=1.0) if message: self._apply_message(message) except Exception as e: print(f"Node {self.node_id}: Error processing message: {str(e)}") def _apply_message(self, message: ReplicationMessage) -> None: """ 应用消息到本地存储 Args: message: 复制消息 """ try: with self._lock: print(f"Node {self.node_id}: Applying message {message.message_id} ({message.operation})") if message.operation == "create" or message.operation == "update": # 存储数据 self.storage.put(message.entity_id, message.data) elif message.operation == "delete": # 删除数据 self.storage.delete(message.entity_id) # 记录已接收的消息 self.received_messages[message.message_id] = message # 更新同步状态 self.sync_status[message.entity_id] = SyncStatus.COMPLETED return True except Exception as e: print(f"Node {self.node_id}: Failed to apply message {message.message_id}: {str(e)}") with self._lock: self.sync_status[message.entity_id] = SyncStatus.FAILED return False def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]: """ 获取实体 Args: entity_id: 实体ID Returns: 实体数据,如果不存在则返回None """ return self.storage.get(entity_id) def get_all_entities(self) -> Dict[str, Dict[str, Any]]: """ 获取所有实体 Returns: 所有实体数据 """ return self.storage.get_all() def shutdown(self) -> None: """关闭节点""" self._stop_event.set() if self._processor_thread.is_alive(): self._processor_thread.join(timeout=2.0) class ReplicationManager: """ 复制管理器 负责管理节点间的数据复制 """ def __init__(self): """初始化复制管理器""" self.nodes: Dict[str, Node] = {} self._lock = threading.RLock() # 启动复制监控线程 self._stop_event = threading.Event() self._monitor_thread = threading.Thread(target=self._monitor_replication) self._monitor_thread.daemon = True self._monitor_thread.start() def add_node(self, node: Node) -> None: """ 添加节点 Args: node: 要添加的节点 """ with self._lock: self.nodes[node.node_id] = node print(f"Added node {node.node_id}") def remove_node(self, node_id: str) -> bool: """ 移除节点 Args: node_id: 要移除的节点ID Returns: 是否移除成功 """ with self._lock: if node_id in self.nodes: node = self.nodes.pop(node_id) node.shutdown() print(f"Removed node {node_id}") return True return False def replicate_message(self, message: ReplicationMessage, exclude_node: Optional[str] = None) -> None: """ 复制消息到所有其他节点 Args: message: 要复制的消息 exclude_node: 要排除的节点ID """ with self._lock: for node_id, node in self.nodes.items(): if node_id != exclude_node and node_id != message.source_node: message.destination_nodes.add(node_id) # 模拟网络延迟和故障 if random.random() < 0.9: # 90%的概率成功发送 node.receive_message(message) else: print(f"Failed to send message {message.message_id} to node {node_id}") message.mark_failed(node_id) def _monitor_replication(self) -> None: """ 监控复制状态并处理失败的复制 """ while not self._stop_event.is_set(): try: # 这里可以实现失败消息的重试逻辑 # 为了简化,我们省略具体实现 time.sleep(5.0) except Exception as e: print(f"Replication manager error: {str(e)}") def shutdown(self) -> None: """关闭复制管理器""" self._stop_event.set() if self._monitor_thread.is_alive(): self._monitor_thread.join(timeout=2.0) # 关闭所有节点 for node in list(self.nodes.values()): node.shutdown() self.nodes.clear() class DataSyncSystem: """ 数据同步系统 整合节点和复制管理器,提供最终一致性的数据同步 """ def __init__(self): """初始化数据同步系统""" self.replication_manager = ReplicationManager() def create_node(self, node_id: str, storage: Optional[StorageProvider] = None) -> Node: """ 创建并添加节点 Args: node_id: 节点ID storage: 存储提供者,默认为内存存储 Returns: 创建的节点 """ if storage is None: storage = InMemoryStorage() node = Node(node_id, storage) self.replication_manager.add_node(node) return node def perform_write_operation(self, node_id: str, operation: str, entity_id: str, data: Dict[str, Any]) -> bool: """ 在指定节点执行写操作 Args: node_id: 节点ID operation: 操作类型 (create, update, delete) entity_id: 实体ID data: 实体数据 Returns: 是否操作成功 """ node = self.replication_manager.nodes.get(node_id) if not node: print(f"Node {node_id} not found") return False try: # 在本地执行操作 if operation == "create": message = node.create_entity(entity_id, data) elif operation == "update": message = node.update_entity(entity_id, data) elif operation == "delete": message = node.delete_entity(entity_id) else: print(f"Unknown operation: {operation}") return False if not message: print(f"Operation failed: {operation} on {entity_id}") return False # 复制到其他节点 self.replication_manager.replicate_message(message) return True except Exception as e: print(f"Error performing {operation}: {str(e)}") return False def get_entity_from_node(self, node_id: str, entity_id: str) -> Optional[Dict[str, Any]]: """ 从指定节点获取实体 Args: node_id: 节点ID entity_id: 实体ID Returns: 实体数据,如果不存在则返回None """ node = self.replication_manager.nodes.get(node_id) if not node: return None return node.get_entity(entity_id) def get_all_entities_from_node(self, node_id: str) -> Dict[str, Dict[str, Any]]: """ 从指定节点获取所有实体 Args: node_id: 节点ID Returns: 所有实体数据 """ node = self.replication_manager.nodes.get(node_id) if not node: return {} return node.get_all_entities() def shutdown(self) -> None: """关闭数据同步系统""" self.replication_manager.shutdown() # 示例使用 if __name__ == "__main__": # 创建数据同步系统 sync_system = DataSyncSystem() # 创建三个节点 node1 = sync_system.create_node("node-1") node2 = sync_system.create_node("node-2") node3 = sync_system.create_node("node-3") print("\n=== 测试场景 1: 基本数据复制 ===") # 在节点1创建实体 print("在节点1创建用户实体...") sync_system.perform_write_operation( "node-1", "create", "user-1", {"username": "john_doe", "email": "john@example.com"} ) # 等待复制完成 time.sleep(1) # 检查各节点数据 print("\n各节点数据状态:") for node_id in ["node-1", "node-2", "node-3"]: user = sync_system.get_entity_from_node(node_id, "user-1") status = "✓" if user else "✗" print(f"节点 {node_id}: {status} {user}") print("\n=== 测试场景 2: 更新数据 ===") # 在节点2更新实体 print("在节点2更新用户邮箱...") sync_system.perform_write_operation( "node-2", "update", "user-1", {"email": "john.doe@example.com"} ) # 等待复制完成 time.sleep(1) # 检查各节点更新后的数据 print("\n各节点更新后的数据状态:") for node_id in ["node-1", "node-2", "node-3"]: user = sync_system.get_entity_from_node(node_id, "user-1") print(f"节点 {node_id}: 邮箱={user['email']}, 版本={user['_version']}") print("\n=== 测试场景 3: 删除数据 ===") # 在节点3删除实体 print("在节点3删除用户实体...") sync_system.perform_write_operation( "node-3", "delete", "user-1", {} ) # 等待复制完成 time.sleep(1) # 检查各节点删除后的数据 print("\n各节点删除后的数据状态:") for node_id in ["node-1", "node-2", "node-3"]: user = sync_system.get_entity_from_node(node_id, "user-1") status = "已删除" if user is None else "仍存在" print(f"节点 {node_id}: {status}") print("\n=== 测试场景 4: 多实体操作 ===") # 创建多个实体 entities = [ ("product-1", {"name": "Laptop", "price": 999.99}), ("product-2", {"name": "Smartphone", "price": 499.99}), ("product-3", {"name": "Tablet", "price": 299.99}) ] for entity_id, data in entities: sync_system.perform_write_operation("node-1", "create", entity_id, data) # 等待复制 time.sleep(1) # 统计各节点的实体数量 print("\n各节点实体数量:") for node_id in ["node-1", "node-2", "node-3"]: entities = sync_system.get_all_entities_from_node(node_id) print(f"节点 {node_id}: {len(entities)} 个实体") # 关闭系统 sync_system.shutdown() print("\n数据同步系统已关闭")