|
|
"""
|
|
|
最终一致性模式 - 数据同步系统
|
|
|
|
|
|
此模块实现了最终一致性模式,用于处理分布式系统中的数据同步问题。
|
|
|
最终一致性系统允许数据在一段时间内处于不一致状态,但保证最终会达到一致。
|
|
|
"""
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
from enum import Enum, auto
|
|
|
from typing import Dict, List, Optional, Any, Set, Callable
|
|
|
import threading
|
|
|
import time
|
|
|
import uuid
|
|
|
import queue
|
|
|
import random
|
|
|
|
|
|
|
|
|
class SyncStatus(Enum):
|
|
|
"""同步状态枚举"""
|
|
|
PENDING = auto() # 待同步
|
|
|
SYNCHRONIZING = auto() # 正在同步
|
|
|
COMPLETED = auto() # 同步完成
|
|
|
FAILED = auto() # 同步失败
|
|
|
RETRYING = auto() # 重试中
|
|
|
|
|
|
|
|
|
class ReplicationMessage:
|
|
|
"""
|
|
|
复制消息类
|
|
|
|
|
|
表示需要在系统节点间复制的数据变更消息
|
|
|
"""
|
|
|
|
|
|
def __init__(self, operation: str, entity_id: str, data: Dict[str, Any], source_node: str):
|
|
|
"""
|
|
|
初始化复制消息
|
|
|
|
|
|
Args:
|
|
|
operation: 操作类型 (create, update, delete)
|
|
|
entity_id: 实体ID
|
|
|
data: 实体数据
|
|
|
source_node: 源节点ID
|
|
|
"""
|
|
|
self.message_id = str(uuid.uuid4())
|
|
|
self.operation = operation
|
|
|
self.entity_id = entity_id
|
|
|
self.data = data
|
|
|
self.source_node = source_node
|
|
|
self.timestamp = time.time()
|
|
|
self.version = 1
|
|
|
self.destination_nodes: Set[str] = set()
|
|
|
self.delivered_nodes: Set[str] = set()
|
|
|
self.failed_nodes: Set[str] = set()
|
|
|
|
|
|
def mark_delivered(self, node_id: str) -> None:
|
|
|
"""标记消息已送达指定节点"""
|
|
|
self.delivered_nodes.add(node_id)
|
|
|
if node_id in self.failed_nodes:
|
|
|
self.failed_nodes.remove(node_id)
|
|
|
|
|
|
def mark_failed(self, node_id: str) -> None:
|
|
|
"""标记消息送达指定节点失败"""
|
|
|
self.failed_nodes.add(node_id)
|
|
|
|
|
|
def is_delivered_to_all(self) -> bool:
|
|
|
"""检查消息是否已送达所有目标节点"""
|
|
|
return self.destination_nodes.issubset(self.delivered_nodes)
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
"""转换为字典表示"""
|
|
|
return {
|
|
|
"message_id": self.message_id,
|
|
|
"operation": self.operation,
|
|
|
"entity_id": self.entity_id,
|
|
|
"data": self.data,
|
|
|
"source_node": self.source_node,
|
|
|
"timestamp": self.timestamp,
|
|
|
"version": self.version,
|
|
|
"destination_nodes": list(self.destination_nodes),
|
|
|
"delivered_nodes": list(self.delivered_nodes),
|
|
|
"failed_nodes": list(self.failed_nodes)
|
|
|
}
|
|
|
|
|
|
|
|
|
class MessageQueue:
|
|
|
"""
|
|
|
消息队列类
|
|
|
|
|
|
用于在节点间传递复制消息
|
|
|
"""
|
|
|
|
|
|
def __init__(self, node_id: str):
|
|
|
"""
|
|
|
初始化消息队列
|
|
|
|
|
|
Args:
|
|
|
node_id: 节点ID
|
|
|
"""
|
|
|
self.node_id = node_id
|
|
|
self._queue = queue.Queue()
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
def enqueue(self, message: ReplicationMessage) -> None:
|
|
|
"""
|
|
|
将消息加入队列
|
|
|
|
|
|
Args:
|
|
|
message: 复制消息
|
|
|
"""
|
|
|
with self._lock:
|
|
|
self._queue.put(message)
|
|
|
|
|
|
def dequeue(self, timeout: Optional[float] = None) -> Optional[ReplicationMessage]:
|
|
|
"""
|
|
|
从队列中取出消息
|
|
|
|
|
|
Args:
|
|
|
timeout: 超时时间
|
|
|
|
|
|
Returns:
|
|
|
消息,如果队列为空且超时则返回None
|
|
|
"""
|
|
|
try:
|
|
|
return self._queue.get(timeout=timeout)
|
|
|
except queue.Empty:
|
|
|
return None
|
|
|
|
|
|
def size(self) -> int:
|
|
|
"""
|
|
|
获取队列大小
|
|
|
|
|
|
Returns:
|
|
|
队列中的消息数量
|
|
|
"""
|
|
|
return self._queue.qsize()
|
|
|
|
|
|
|
|
|
class StorageProvider(ABC):
|
|
|
"""
|
|
|
存储提供者抽象基类
|
|
|
|
|
|
定义数据存储的接口
|
|
|
"""
|
|
|
|
|
|
@abstractmethod
|
|
|
def get(self, entity_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""获取实体"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def put(self, entity_id: str, data: Dict[str, Any]) -> None:
|
|
|
"""存储实体"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def delete(self, entity_id: str) -> bool:
|
|
|
"""删除实体"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def get_all(self) -> Dict[str, Dict[str, Any]]:
|
|
|
"""获取所有实体"""
|
|
|
pass
|
|
|
|
|
|
|
|
|
class InMemoryStorage(StorageProvider):
|
|
|
"""
|
|
|
内存存储实现
|
|
|
|
|
|
用于演示的内存存储
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
"""初始化内存存储"""
|
|
|
self._data: Dict[str, Dict[str, Any]] = {}
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
def get(self, entity_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""获取实体"""
|
|
|
with self._lock:
|
|
|
return self._data.get(entity_id)
|
|
|
|
|
|
def put(self, entity_id: str, data: Dict[str, Any]) -> None:
|
|
|
"""存储实体"""
|
|
|
with self._lock:
|
|
|
self._data[entity_id] = data
|
|
|
|
|
|
def delete(self, entity_id: str) -> bool:
|
|
|
"""删除实体"""
|
|
|
with self._lock:
|
|
|
if entity_id in self._data:
|
|
|
del self._data[entity_id]
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def get_all(self) -> Dict[str, Dict[str, Any]]:
|
|
|
"""获取所有实体"""
|
|
|
with self._lock:
|
|
|
return self._data.copy()
|
|
|
|
|
|
|
|
|
class Node:
|
|
|
"""
|
|
|
系统节点类
|
|
|
|
|
|
表示分布式系统中的一个节点
|
|
|
"""
|
|
|
|
|
|
def __init__(self, node_id: str, storage: StorageProvider):
|
|
|
"""
|
|
|
初始化节点
|
|
|
|
|
|
Args:
|
|
|
node_id: 节点ID
|
|
|
storage: 存储提供者
|
|
|
"""
|
|
|
self.node_id = node_id
|
|
|
self.storage = storage
|
|
|
self.message_queue = MessageQueue(node_id)
|
|
|
self.received_messages: Dict[str, ReplicationMessage] = {}
|
|
|
self.sync_status: Dict[str, SyncStatus] = {}
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
# 启动消息处理器线程
|
|
|
self._stop_event = threading.Event()
|
|
|
self._processor_thread = threading.Thread(target=self._process_messages)
|
|
|
self._processor_thread.daemon = True
|
|
|
self._processor_thread.start()
|
|
|
|
|
|
def create_entity(self, entity_id: str, data: Dict[str, Any]) -> ReplicationMessage:
|
|
|
"""
|
|
|
创建实体
|
|
|
|
|
|
Args:
|
|
|
entity_id: 实体ID
|
|
|
data: 实体数据
|
|
|
|
|
|
Returns:
|
|
|
复制消息
|
|
|
"""
|
|
|
# 添加元数据
|
|
|
data = {
|
|
|
**data,
|
|
|
"_version": 1,
|
|
|
"_created_at": time.time(),
|
|
|
"_updated_at": time.time(),
|
|
|
"_created_by": self.node_id
|
|
|
}
|
|
|
|
|
|
# 存储数据
|
|
|
self.storage.put(entity_id, data)
|
|
|
|
|
|
# 创建复制消息
|
|
|
message = ReplicationMessage("create", entity_id, data, self.node_id)
|
|
|
|
|
|
return message
|
|
|
|
|
|
def update_entity(self, entity_id: str, data: Dict[str, Any]) -> Optional[ReplicationMessage]:
|
|
|
"""
|
|
|
更新实体
|
|
|
|
|
|
Args:
|
|
|
entity_id: 实体ID
|
|
|
data: 更新的数据
|
|
|
|
|
|
Returns:
|
|
|
复制消息,如果实体不存在则返回None
|
|
|
"""
|
|
|
# 获取当前数据
|
|
|
current_data = self.storage.get(entity_id)
|
|
|
if not current_data:
|
|
|
return None
|
|
|
|
|
|
# 合并数据并更新元数据
|
|
|
updated_data = {
|
|
|
**current_data,
|
|
|
**{k: v for k, v in data.items() if not k.startswith('_')},
|
|
|
"_version": current_data["_version"] + 1,
|
|
|
"_updated_at": time.time()
|
|
|
}
|
|
|
|
|
|
# 存储更新后的数据
|
|
|
self.storage.put(entity_id, updated_data)
|
|
|
|
|
|
# 创建复制消息
|
|
|
message = ReplicationMessage("update", entity_id, updated_data, self.node_id)
|
|
|
|
|
|
return message
|
|
|
|
|
|
def delete_entity(self, entity_id: str) -> Optional[ReplicationMessage]:
|
|
|
"""
|
|
|
删除实体
|
|
|
|
|
|
Args:
|
|
|
entity_id: 实体ID
|
|
|
|
|
|
Returns:
|
|
|
复制消息,如果实体不存在则返回None
|
|
|
"""
|
|
|
# 获取当前数据
|
|
|
current_data = self.storage.get(entity_id)
|
|
|
if not current_data:
|
|
|
return None
|
|
|
|
|
|
# 删除数据
|
|
|
self.storage.delete(entity_id)
|
|
|
|
|
|
# 创建复制消息
|
|
|
message = ReplicationMessage("delete", entity_id, current_data, self.node_id)
|
|
|
|
|
|
return message
|
|
|
|
|
|
def receive_message(self, message: ReplicationMessage) -> bool:
|
|
|
"""
|
|
|
接收复制消息
|
|
|
|
|
|
Args:
|
|
|
message: 复制消息
|
|
|
|
|
|
Returns:
|
|
|
是否成功处理消息
|
|
|
"""
|
|
|
with self._lock:
|
|
|
# 检查是否已经处理过该消息
|
|
|
if message.message_id in self.received_messages:
|
|
|
return True
|
|
|
|
|
|
# 检查版本号,确保处理最新的消息
|
|
|
current_data = self.storage.get(message.entity_id)
|
|
|
if current_data and "_version" in current_data:
|
|
|
message_version = message.data.get("_version", 1)
|
|
|
if message_version <= current_data["_version"]:
|
|
|
print(f"Node {self.node_id}: Ignoring outdated message {message.message_id}")
|
|
|
return True
|
|
|
|
|
|
# 将消息加入队列
|
|
|
self.message_queue.enqueue(message)
|
|
|
return True
|
|
|
|
|
|
def _process_messages(self) -> None:
|
|
|
"""
|
|
|
处理消息队列中的消息
|
|
|
"""
|
|
|
while not self._stop_event.is_set():
|
|
|
try:
|
|
|
message = self.message_queue.dequeue(timeout=1.0)
|
|
|
if message:
|
|
|
self._apply_message(message)
|
|
|
except Exception as e:
|
|
|
print(f"Node {self.node_id}: Error processing message: {str(e)}")
|
|
|
|
|
|
def _apply_message(self, message: ReplicationMessage) -> None:
|
|
|
"""
|
|
|
应用消息到本地存储
|
|
|
|
|
|
Args:
|
|
|
message: 复制消息
|
|
|
"""
|
|
|
try:
|
|
|
with self._lock:
|
|
|
print(f"Node {self.node_id}: Applying message {message.message_id} ({message.operation})")
|
|
|
|
|
|
if message.operation == "create" or message.operation == "update":
|
|
|
# 存储数据
|
|
|
self.storage.put(message.entity_id, message.data)
|
|
|
elif message.operation == "delete":
|
|
|
# 删除数据
|
|
|
self.storage.delete(message.entity_id)
|
|
|
|
|
|
# 记录已接收的消息
|
|
|
self.received_messages[message.message_id] = message
|
|
|
|
|
|
# 更新同步状态
|
|
|
self.sync_status[message.entity_id] = SyncStatus.COMPLETED
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Node {self.node_id}: Failed to apply message {message.message_id}: {str(e)}")
|
|
|
with self._lock:
|
|
|
self.sync_status[message.entity_id] = SyncStatus.FAILED
|
|
|
return False
|
|
|
|
|
|
def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
获取实体
|
|
|
|
|
|
Args:
|
|
|
entity_id: 实体ID
|
|
|
|
|
|
Returns:
|
|
|
实体数据,如果不存在则返回None
|
|
|
"""
|
|
|
return self.storage.get(entity_id)
|
|
|
|
|
|
def get_all_entities(self) -> Dict[str, Dict[str, Any]]:
|
|
|
"""
|
|
|
获取所有实体
|
|
|
|
|
|
Returns:
|
|
|
所有实体数据
|
|
|
"""
|
|
|
return self.storage.get_all()
|
|
|
|
|
|
def shutdown(self) -> None:
|
|
|
"""关闭节点"""
|
|
|
self._stop_event.set()
|
|
|
if self._processor_thread.is_alive():
|
|
|
self._processor_thread.join(timeout=2.0)
|
|
|
|
|
|
|
|
|
class ReplicationManager:
|
|
|
"""
|
|
|
复制管理器
|
|
|
|
|
|
负责管理节点间的数据复制
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
"""初始化复制管理器"""
|
|
|
self.nodes: Dict[str, Node] = {}
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
# 启动复制监控线程
|
|
|
self._stop_event = threading.Event()
|
|
|
self._monitor_thread = threading.Thread(target=self._monitor_replication)
|
|
|
self._monitor_thread.daemon = True
|
|
|
self._monitor_thread.start()
|
|
|
|
|
|
def add_node(self, node: Node) -> None:
|
|
|
"""
|
|
|
添加节点
|
|
|
|
|
|
Args:
|
|
|
node: 要添加的节点
|
|
|
"""
|
|
|
with self._lock:
|
|
|
self.nodes[node.node_id] = node
|
|
|
print(f"Added node {node.node_id}")
|
|
|
|
|
|
def remove_node(self, node_id: str) -> bool:
|
|
|
"""
|
|
|
移除节点
|
|
|
|
|
|
Args:
|
|
|
node_id: 要移除的节点ID
|
|
|
|
|
|
Returns:
|
|
|
是否移除成功
|
|
|
"""
|
|
|
with self._lock:
|
|
|
if node_id in self.nodes:
|
|
|
node = self.nodes.pop(node_id)
|
|
|
node.shutdown()
|
|
|
print(f"Removed node {node_id}")
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def replicate_message(self, message: ReplicationMessage, exclude_node: Optional[str] = None) -> None:
|
|
|
"""
|
|
|
复制消息到所有其他节点
|
|
|
|
|
|
Args:
|
|
|
message: 要复制的消息
|
|
|
exclude_node: 要排除的节点ID
|
|
|
"""
|
|
|
with self._lock:
|
|
|
for node_id, node in self.nodes.items():
|
|
|
if node_id != exclude_node and node_id != message.source_node:
|
|
|
message.destination_nodes.add(node_id)
|
|
|
# 模拟网络延迟和故障
|
|
|
if random.random() < 0.9: # 90%的概率成功发送
|
|
|
node.receive_message(message)
|
|
|
else:
|
|
|
print(f"Failed to send message {message.message_id} to node {node_id}")
|
|
|
message.mark_failed(node_id)
|
|
|
|
|
|
def _monitor_replication(self) -> None:
|
|
|
"""
|
|
|
监控复制状态并处理失败的复制
|
|
|
"""
|
|
|
while not self._stop_event.is_set():
|
|
|
try:
|
|
|
# 这里可以实现失败消息的重试逻辑
|
|
|
# 为了简化,我们省略具体实现
|
|
|
time.sleep(5.0)
|
|
|
except Exception as e:
|
|
|
print(f"Replication manager error: {str(e)}")
|
|
|
|
|
|
def shutdown(self) -> None:
|
|
|
"""关闭复制管理器"""
|
|
|
self._stop_event.set()
|
|
|
if self._monitor_thread.is_alive():
|
|
|
self._monitor_thread.join(timeout=2.0)
|
|
|
|
|
|
# 关闭所有节点
|
|
|
for node in list(self.nodes.values()):
|
|
|
node.shutdown()
|
|
|
self.nodes.clear()
|
|
|
|
|
|
|
|
|
class DataSyncSystem:
|
|
|
"""
|
|
|
数据同步系统
|
|
|
|
|
|
整合节点和复制管理器,提供最终一致性的数据同步
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
"""初始化数据同步系统"""
|
|
|
self.replication_manager = ReplicationManager()
|
|
|
|
|
|
def create_node(self, node_id: str, storage: Optional[StorageProvider] = None) -> Node:
|
|
|
"""
|
|
|
创建并添加节点
|
|
|
|
|
|
Args:
|
|
|
node_id: 节点ID
|
|
|
storage: 存储提供者,默认为内存存储
|
|
|
|
|
|
Returns:
|
|
|
创建的节点
|
|
|
"""
|
|
|
if storage is None:
|
|
|
storage = InMemoryStorage()
|
|
|
|
|
|
node = Node(node_id, storage)
|
|
|
self.replication_manager.add_node(node)
|
|
|
return node
|
|
|
|
|
|
def perform_write_operation(self, node_id: str, operation: str, entity_id: str, data: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
在指定节点执行写操作
|
|
|
|
|
|
Args:
|
|
|
node_id: 节点ID
|
|
|
operation: 操作类型 (create, update, delete)
|
|
|
entity_id: 实体ID
|
|
|
data: 实体数据
|
|
|
|
|
|
Returns:
|
|
|
是否操作成功
|
|
|
"""
|
|
|
node = self.replication_manager.nodes.get(node_id)
|
|
|
if not node:
|
|
|
print(f"Node {node_id} not found")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
# 在本地执行操作
|
|
|
if operation == "create":
|
|
|
message = node.create_entity(entity_id, data)
|
|
|
elif operation == "update":
|
|
|
message = node.update_entity(entity_id, data)
|
|
|
elif operation == "delete":
|
|
|
message = node.delete_entity(entity_id)
|
|
|
else:
|
|
|
print(f"Unknown operation: {operation}")
|
|
|
return False
|
|
|
|
|
|
if not message:
|
|
|
print(f"Operation failed: {operation} on {entity_id}")
|
|
|
return False
|
|
|
|
|
|
# 复制到其他节点
|
|
|
self.replication_manager.replicate_message(message)
|
|
|
|
|
|
return True
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Error performing {operation}: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def get_entity_from_node(self, node_id: str, entity_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
从指定节点获取实体
|
|
|
|
|
|
Args:
|
|
|
node_id: 节点ID
|
|
|
entity_id: 实体ID
|
|
|
|
|
|
Returns:
|
|
|
实体数据,如果不存在则返回None
|
|
|
"""
|
|
|
node = self.replication_manager.nodes.get(node_id)
|
|
|
if not node:
|
|
|
return None
|
|
|
|
|
|
return node.get_entity(entity_id)
|
|
|
|
|
|
def get_all_entities_from_node(self, node_id: str) -> Dict[str, Dict[str, Any]]:
|
|
|
"""
|
|
|
从指定节点获取所有实体
|
|
|
|
|
|
Args:
|
|
|
node_id: 节点ID
|
|
|
|
|
|
Returns:
|
|
|
所有实体数据
|
|
|
"""
|
|
|
node = self.replication_manager.nodes.get(node_id)
|
|
|
if not node:
|
|
|
return {}
|
|
|
|
|
|
return node.get_all_entities()
|
|
|
|
|
|
def shutdown(self) -> None:
|
|
|
"""关闭数据同步系统"""
|
|
|
self.replication_manager.shutdown()
|
|
|
|
|
|
|
|
|
# 示例使用
|
|
|
if __name__ == "__main__":
|
|
|
# 创建数据同步系统
|
|
|
sync_system = DataSyncSystem()
|
|
|
|
|
|
# 创建三个节点
|
|
|
node1 = sync_system.create_node("node-1")
|
|
|
node2 = sync_system.create_node("node-2")
|
|
|
node3 = sync_system.create_node("node-3")
|
|
|
|
|
|
print("\n=== 测试场景 1: 基本数据复制 ===")
|
|
|
|
|
|
# 在节点1创建实体
|
|
|
print("在节点1创建用户实体...")
|
|
|
sync_system.perform_write_operation(
|
|
|
"node-1",
|
|
|
"create",
|
|
|
"user-1",
|
|
|
{"username": "john_doe", "email": "john@example.com"}
|
|
|
)
|
|
|
|
|
|
# 等待复制完成
|
|
|
time.sleep(1)
|
|
|
|
|
|
# 检查各节点数据
|
|
|
print("\n各节点数据状态:")
|
|
|
for node_id in ["node-1", "node-2", "node-3"]:
|
|
|
user = sync_system.get_entity_from_node(node_id, "user-1")
|
|
|
status = "✓" if user else "✗"
|
|
|
print(f"节点 {node_id}: {status} {user}")
|
|
|
|
|
|
print("\n=== 测试场景 2: 更新数据 ===")
|
|
|
|
|
|
# 在节点2更新实体
|
|
|
print("在节点2更新用户邮箱...")
|
|
|
sync_system.perform_write_operation(
|
|
|
"node-2",
|
|
|
"update",
|
|
|
"user-1",
|
|
|
{"email": "john.doe@example.com"}
|
|
|
)
|
|
|
|
|
|
# 等待复制完成
|
|
|
time.sleep(1)
|
|
|
|
|
|
# 检查各节点更新后的数据
|
|
|
print("\n各节点更新后的数据状态:")
|
|
|
for node_id in ["node-1", "node-2", "node-3"]:
|
|
|
user = sync_system.get_entity_from_node(node_id, "user-1")
|
|
|
print(f"节点 {node_id}: 邮箱={user['email']}, 版本={user['_version']}")
|
|
|
|
|
|
print("\n=== 测试场景 3: 删除数据 ===")
|
|
|
|
|
|
# 在节点3删除实体
|
|
|
print("在节点3删除用户实体...")
|
|
|
sync_system.perform_write_operation(
|
|
|
"node-3",
|
|
|
"delete",
|
|
|
"user-1",
|
|
|
{}
|
|
|
)
|
|
|
|
|
|
# 等待复制完成
|
|
|
time.sleep(1)
|
|
|
|
|
|
# 检查各节点删除后的数据
|
|
|
print("\n各节点删除后的数据状态:")
|
|
|
for node_id in ["node-1", "node-2", "node-3"]:
|
|
|
user = sync_system.get_entity_from_node(node_id, "user-1")
|
|
|
status = "已删除" if user is None else "仍存在"
|
|
|
print(f"节点 {node_id}: {status}")
|
|
|
|
|
|
print("\n=== 测试场景 4: 多实体操作 ===")
|
|
|
|
|
|
# 创建多个实体
|
|
|
entities = [
|
|
|
("product-1", {"name": "Laptop", "price": 999.99}),
|
|
|
("product-2", {"name": "Smartphone", "price": 499.99}),
|
|
|
("product-3", {"name": "Tablet", "price": 299.99})
|
|
|
]
|
|
|
|
|
|
for entity_id, data in entities:
|
|
|
sync_system.perform_write_operation("node-1", "create", entity_id, data)
|
|
|
|
|
|
# 等待复制
|
|
|
time.sleep(1)
|
|
|
|
|
|
# 统计各节点的实体数量
|
|
|
print("\n各节点实体数量:")
|
|
|
for node_id in ["node-1", "node-2", "node-3"]:
|
|
|
entities = sync_system.get_all_entities_from_node(node_id)
|
|
|
print(f"节点 {node_id}: {len(entities)} 个实体")
|
|
|
|
|
|
# 关闭系统
|
|
|
sync_system.shutdown()
|
|
|
print("\n数据同步系统已关闭") |