You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

707 lines
21 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
最终一致性模式 - 数据同步系统
此模块实现了最终一致性模式,用于处理分布式系统中的数据同步问题。
最终一致性系统允许数据在一段时间内处于不一致状态,但保证最终会达到一致。
"""
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Dict, List, Optional, Any, Set, Callable
import threading
import time
import uuid
import queue
import random
class SyncStatus(Enum):
"""同步状态枚举"""
PENDING = auto() # 待同步
SYNCHRONIZING = auto() # 正在同步
COMPLETED = auto() # 同步完成
FAILED = auto() # 同步失败
RETRYING = auto() # 重试中
class ReplicationMessage:
"""
复制消息类
表示需要在系统节点间复制的数据变更消息
"""
def __init__(self, operation: str, entity_id: str, data: Dict[str, Any], source_node: str):
"""
初始化复制消息
Args:
operation: 操作类型 (create, update, delete)
entity_id: 实体ID
data: 实体数据
source_node: 源节点ID
"""
self.message_id = str(uuid.uuid4())
self.operation = operation
self.entity_id = entity_id
self.data = data
self.source_node = source_node
self.timestamp = time.time()
self.version = 1
self.destination_nodes: Set[str] = set()
self.delivered_nodes: Set[str] = set()
self.failed_nodes: Set[str] = set()
def mark_delivered(self, node_id: str) -> None:
"""标记消息已送达指定节点"""
self.delivered_nodes.add(node_id)
if node_id in self.failed_nodes:
self.failed_nodes.remove(node_id)
def mark_failed(self, node_id: str) -> None:
"""标记消息送达指定节点失败"""
self.failed_nodes.add(node_id)
def is_delivered_to_all(self) -> bool:
"""检查消息是否已送达所有目标节点"""
return self.destination_nodes.issubset(self.delivered_nodes)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典表示"""
return {
"message_id": self.message_id,
"operation": self.operation,
"entity_id": self.entity_id,
"data": self.data,
"source_node": self.source_node,
"timestamp": self.timestamp,
"version": self.version,
"destination_nodes": list(self.destination_nodes),
"delivered_nodes": list(self.delivered_nodes),
"failed_nodes": list(self.failed_nodes)
}
class MessageQueue:
"""
消息队列类
用于在节点间传递复制消息
"""
def __init__(self, node_id: str):
"""
初始化消息队列
Args:
node_id: 节点ID
"""
self.node_id = node_id
self._queue = queue.Queue()
self._lock = threading.RLock()
def enqueue(self, message: ReplicationMessage) -> None:
"""
将消息加入队列
Args:
message: 复制消息
"""
with self._lock:
self._queue.put(message)
def dequeue(self, timeout: Optional[float] = None) -> Optional[ReplicationMessage]:
"""
从队列中取出消息
Args:
timeout: 超时时间
Returns:
消息如果队列为空且超时则返回None
"""
try:
return self._queue.get(timeout=timeout)
except queue.Empty:
return None
def size(self) -> int:
"""
获取队列大小
Returns:
队列中的消息数量
"""
return self._queue.qsize()
class StorageProvider(ABC):
"""
存储提供者抽象基类
定义数据存储的接口
"""
@abstractmethod
def get(self, entity_id: str) -> Optional[Dict[str, Any]]:
"""获取实体"""
pass
@abstractmethod
def put(self, entity_id: str, data: Dict[str, Any]) -> None:
"""存储实体"""
pass
@abstractmethod
def delete(self, entity_id: str) -> bool:
"""删除实体"""
pass
@abstractmethod
def get_all(self) -> Dict[str, Dict[str, Any]]:
"""获取所有实体"""
pass
class InMemoryStorage(StorageProvider):
"""
内存存储实现
用于演示的内存存储
"""
def __init__(self):
"""初始化内存存储"""
self._data: Dict[str, Dict[str, Any]] = {}
self._lock = threading.RLock()
def get(self, entity_id: str) -> Optional[Dict[str, Any]]:
"""获取实体"""
with self._lock:
return self._data.get(entity_id)
def put(self, entity_id: str, data: Dict[str, Any]) -> None:
"""存储实体"""
with self._lock:
self._data[entity_id] = data
def delete(self, entity_id: str) -> bool:
"""删除实体"""
with self._lock:
if entity_id in self._data:
del self._data[entity_id]
return True
return False
def get_all(self) -> Dict[str, Dict[str, Any]]:
"""获取所有实体"""
with self._lock:
return self._data.copy()
class Node:
"""
系统节点类
表示分布式系统中的一个节点
"""
def __init__(self, node_id: str, storage: StorageProvider):
"""
初始化节点
Args:
node_id: 节点ID
storage: 存储提供者
"""
self.node_id = node_id
self.storage = storage
self.message_queue = MessageQueue(node_id)
self.received_messages: Dict[str, ReplicationMessage] = {}
self.sync_status: Dict[str, SyncStatus] = {}
self._lock = threading.RLock()
# 启动消息处理器线程
self._stop_event = threading.Event()
self._processor_thread = threading.Thread(target=self._process_messages)
self._processor_thread.daemon = True
self._processor_thread.start()
def create_entity(self, entity_id: str, data: Dict[str, Any]) -> ReplicationMessage:
"""
创建实体
Args:
entity_id: 实体ID
data: 实体数据
Returns:
复制消息
"""
# 添加元数据
data = {
**data,
"_version": 1,
"_created_at": time.time(),
"_updated_at": time.time(),
"_created_by": self.node_id
}
# 存储数据
self.storage.put(entity_id, data)
# 创建复制消息
message = ReplicationMessage("create", entity_id, data, self.node_id)
return message
def update_entity(self, entity_id: str, data: Dict[str, Any]) -> Optional[ReplicationMessage]:
"""
更新实体
Args:
entity_id: 实体ID
data: 更新的数据
Returns:
复制消息如果实体不存在则返回None
"""
# 获取当前数据
current_data = self.storage.get(entity_id)
if not current_data:
return None
# 合并数据并更新元数据
updated_data = {
**current_data,
**{k: v for k, v in data.items() if not k.startswith('_')},
"_version": current_data["_version"] + 1,
"_updated_at": time.time()
}
# 存储更新后的数据
self.storage.put(entity_id, updated_data)
# 创建复制消息
message = ReplicationMessage("update", entity_id, updated_data, self.node_id)
return message
def delete_entity(self, entity_id: str) -> Optional[ReplicationMessage]:
"""
删除实体
Args:
entity_id: 实体ID
Returns:
复制消息如果实体不存在则返回None
"""
# 获取当前数据
current_data = self.storage.get(entity_id)
if not current_data:
return None
# 删除数据
self.storage.delete(entity_id)
# 创建复制消息
message = ReplicationMessage("delete", entity_id, current_data, self.node_id)
return message
def receive_message(self, message: ReplicationMessage) -> bool:
"""
接收复制消息
Args:
message: 复制消息
Returns:
是否成功处理消息
"""
with self._lock:
# 检查是否已经处理过该消息
if message.message_id in self.received_messages:
return True
# 检查版本号,确保处理最新的消息
current_data = self.storage.get(message.entity_id)
if current_data and "_version" in current_data:
message_version = message.data.get("_version", 1)
if message_version <= current_data["_version"]:
print(f"Node {self.node_id}: Ignoring outdated message {message.message_id}")
return True
# 将消息加入队列
self.message_queue.enqueue(message)
return True
def _process_messages(self) -> None:
"""
处理消息队列中的消息
"""
while not self._stop_event.is_set():
try:
message = self.message_queue.dequeue(timeout=1.0)
if message:
self._apply_message(message)
except Exception as e:
print(f"Node {self.node_id}: Error processing message: {str(e)}")
def _apply_message(self, message: ReplicationMessage) -> None:
"""
应用消息到本地存储
Args:
message: 复制消息
"""
try:
with self._lock:
print(f"Node {self.node_id}: Applying message {message.message_id} ({message.operation})")
if message.operation == "create" or message.operation == "update":
# 存储数据
self.storage.put(message.entity_id, message.data)
elif message.operation == "delete":
# 删除数据
self.storage.delete(message.entity_id)
# 记录已接收的消息
self.received_messages[message.message_id] = message
# 更新同步状态
self.sync_status[message.entity_id] = SyncStatus.COMPLETED
return True
except Exception as e:
print(f"Node {self.node_id}: Failed to apply message {message.message_id}: {str(e)}")
with self._lock:
self.sync_status[message.entity_id] = SyncStatus.FAILED
return False
def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]:
"""
获取实体
Args:
entity_id: 实体ID
Returns:
实体数据如果不存在则返回None
"""
return self.storage.get(entity_id)
def get_all_entities(self) -> Dict[str, Dict[str, Any]]:
"""
获取所有实体
Returns:
所有实体数据
"""
return self.storage.get_all()
def shutdown(self) -> None:
"""关闭节点"""
self._stop_event.set()
if self._processor_thread.is_alive():
self._processor_thread.join(timeout=2.0)
class ReplicationManager:
"""
复制管理器
负责管理节点间的数据复制
"""
def __init__(self):
"""初始化复制管理器"""
self.nodes: Dict[str, Node] = {}
self._lock = threading.RLock()
# 启动复制监控线程
self._stop_event = threading.Event()
self._monitor_thread = threading.Thread(target=self._monitor_replication)
self._monitor_thread.daemon = True
self._monitor_thread.start()
def add_node(self, node: Node) -> None:
"""
添加节点
Args:
node: 要添加的节点
"""
with self._lock:
self.nodes[node.node_id] = node
print(f"Added node {node.node_id}")
def remove_node(self, node_id: str) -> bool:
"""
移除节点
Args:
node_id: 要移除的节点ID
Returns:
是否移除成功
"""
with self._lock:
if node_id in self.nodes:
node = self.nodes.pop(node_id)
node.shutdown()
print(f"Removed node {node_id}")
return True
return False
def replicate_message(self, message: ReplicationMessage, exclude_node: Optional[str] = None) -> None:
"""
复制消息到所有其他节点
Args:
message: 要复制的消息
exclude_node: 要排除的节点ID
"""
with self._lock:
for node_id, node in self.nodes.items():
if node_id != exclude_node and node_id != message.source_node:
message.destination_nodes.add(node_id)
# 模拟网络延迟和故障
if random.random() < 0.9: # 90%的概率成功发送
node.receive_message(message)
else:
print(f"Failed to send message {message.message_id} to node {node_id}")
message.mark_failed(node_id)
def _monitor_replication(self) -> None:
"""
监控复制状态并处理失败的复制
"""
while not self._stop_event.is_set():
try:
# 这里可以实现失败消息的重试逻辑
# 为了简化,我们省略具体实现
time.sleep(5.0)
except Exception as e:
print(f"Replication manager error: {str(e)}")
def shutdown(self) -> None:
"""关闭复制管理器"""
self._stop_event.set()
if self._monitor_thread.is_alive():
self._monitor_thread.join(timeout=2.0)
# 关闭所有节点
for node in list(self.nodes.values()):
node.shutdown()
self.nodes.clear()
class DataSyncSystem:
"""
数据同步系统
整合节点和复制管理器,提供最终一致性的数据同步
"""
def __init__(self):
"""初始化数据同步系统"""
self.replication_manager = ReplicationManager()
def create_node(self, node_id: str, storage: Optional[StorageProvider] = None) -> Node:
"""
创建并添加节点
Args:
node_id: 节点ID
storage: 存储提供者,默认为内存存储
Returns:
创建的节点
"""
if storage is None:
storage = InMemoryStorage()
node = Node(node_id, storage)
self.replication_manager.add_node(node)
return node
def perform_write_operation(self, node_id: str, operation: str, entity_id: str, data: Dict[str, Any]) -> bool:
"""
在指定节点执行写操作
Args:
node_id: 节点ID
operation: 操作类型 (create, update, delete)
entity_id: 实体ID
data: 实体数据
Returns:
是否操作成功
"""
node = self.replication_manager.nodes.get(node_id)
if not node:
print(f"Node {node_id} not found")
return False
try:
# 在本地执行操作
if operation == "create":
message = node.create_entity(entity_id, data)
elif operation == "update":
message = node.update_entity(entity_id, data)
elif operation == "delete":
message = node.delete_entity(entity_id)
else:
print(f"Unknown operation: {operation}")
return False
if not message:
print(f"Operation failed: {operation} on {entity_id}")
return False
# 复制到其他节点
self.replication_manager.replicate_message(message)
return True
except Exception as e:
print(f"Error performing {operation}: {str(e)}")
return False
def get_entity_from_node(self, node_id: str, entity_id: str) -> Optional[Dict[str, Any]]:
"""
从指定节点获取实体
Args:
node_id: 节点ID
entity_id: 实体ID
Returns:
实体数据如果不存在则返回None
"""
node = self.replication_manager.nodes.get(node_id)
if not node:
return None
return node.get_entity(entity_id)
def get_all_entities_from_node(self, node_id: str) -> Dict[str, Dict[str, Any]]:
"""
从指定节点获取所有实体
Args:
node_id: 节点ID
Returns:
所有实体数据
"""
node = self.replication_manager.nodes.get(node_id)
if not node:
return {}
return node.get_all_entities()
def shutdown(self) -> None:
"""关闭数据同步系统"""
self.replication_manager.shutdown()
# 示例使用
if __name__ == "__main__":
# 创建数据同步系统
sync_system = DataSyncSystem()
# 创建三个节点
node1 = sync_system.create_node("node-1")
node2 = sync_system.create_node("node-2")
node3 = sync_system.create_node("node-3")
print("\n=== 测试场景 1: 基本数据复制 ===")
# 在节点1创建实体
print("在节点1创建用户实体...")
sync_system.perform_write_operation(
"node-1",
"create",
"user-1",
{"username": "john_doe", "email": "john@example.com"}
)
# 等待复制完成
time.sleep(1)
# 检查各节点数据
print("\n各节点数据状态:")
for node_id in ["node-1", "node-2", "node-3"]:
user = sync_system.get_entity_from_node(node_id, "user-1")
status = "" if user else ""
print(f"节点 {node_id}: {status} {user}")
print("\n=== 测试场景 2: 更新数据 ===")
# 在节点2更新实体
print("在节点2更新用户邮箱...")
sync_system.perform_write_operation(
"node-2",
"update",
"user-1",
{"email": "john.doe@example.com"}
)
# 等待复制完成
time.sleep(1)
# 检查各节点更新后的数据
print("\n各节点更新后的数据状态:")
for node_id in ["node-1", "node-2", "node-3"]:
user = sync_system.get_entity_from_node(node_id, "user-1")
print(f"节点 {node_id}: 邮箱={user['email']}, 版本={user['_version']}")
print("\n=== 测试场景 3: 删除数据 ===")
# 在节点3删除实体
print("在节点3删除用户实体...")
sync_system.perform_write_operation(
"node-3",
"delete",
"user-1",
{}
)
# 等待复制完成
time.sleep(1)
# 检查各节点删除后的数据
print("\n各节点删除后的数据状态:")
for node_id in ["node-1", "node-2", "node-3"]:
user = sync_system.get_entity_from_node(node_id, "user-1")
status = "已删除" if user is None else "仍存在"
print(f"节点 {node_id}: {status}")
print("\n=== 测试场景 4: 多实体操作 ===")
# 创建多个实体
entities = [
("product-1", {"name": "Laptop", "price": 999.99}),
("product-2", {"name": "Smartphone", "price": 499.99}),
("product-3", {"name": "Tablet", "price": 299.99})
]
for entity_id, data in entities:
sync_system.perform_write_operation("node-1", "create", entity_id, data)
# 等待复制
time.sleep(1)
# 统计各节点的实体数量
print("\n各节点实体数量:")
for node_id in ["node-1", "node-2", "node-3"]:
entities = sync_system.get_all_entities_from_node(node_id)
print(f"节点 {node_id}: {len(entities)} 个实体")
# 关闭系统
sync_system.shutdown()
print("\n数据同步系统已关闭")