|
|
"""
|
|
|
事件溯源模式 - 事件存储系统
|
|
|
|
|
|
此模块实现了一个简单的事件存储系统,用于记录所有状态变更。
|
|
|
事件溯源模式通过存储所有导致状态变更的事件,而不是仅存储当前状态,
|
|
|
使得系统可以通过重放事件来重建任何时间点的状态。
|
|
|
"""
|
|
|
|
|
|
from datetime import datetime
|
|
|
import json
|
|
|
from typing import Dict, List, Optional, Any, Type
|
|
|
|
|
|
|
|
|
class DomainEvent:
|
|
|
"""
|
|
|
领域事件基类
|
|
|
|
|
|
所有领域事件都应该继承此类,包含事件的基本属性如事件ID、时间戳等。
|
|
|
"""
|
|
|
|
|
|
def __init__(self, aggregate_id: str, event_type: str):
|
|
|
"""
|
|
|
初始化领域事件
|
|
|
|
|
|
Args:
|
|
|
aggregate_id: 聚合根ID
|
|
|
event_type: 事件类型
|
|
|
"""
|
|
|
self.aggregate_id = aggregate_id
|
|
|
self.event_type = event_type
|
|
|
self.timestamp = datetime.now().isoformat()
|
|
|
self.event_id = f"{event_type}-{aggregate_id}-{self.timestamp}"
|
|
|
self.data: Dict[str, Any] = {}
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
"""将事件转换为字典格式"""
|
|
|
return {
|
|
|
"event_id": self.event_id,
|
|
|
"aggregate_id": self.aggregate_id,
|
|
|
"event_type": self.event_type,
|
|
|
"timestamp": self.timestamp,
|
|
|
"data": self.data
|
|
|
}
|
|
|
|
|
|
@classmethod
|
|
|
def from_dict(cls, data: Dict[str, Any]) -> 'DomainEvent':
|
|
|
"""从字典创建事件实例"""
|
|
|
event = cls(data["aggregate_id"], data["event_type"])
|
|
|
event.event_id = data["event_id"]
|
|
|
event.timestamp = data["timestamp"]
|
|
|
event.data = data["data"]
|
|
|
return event
|
|
|
|
|
|
|
|
|
class EventStore:
|
|
|
"""
|
|
|
事件存储类
|
|
|
|
|
|
负责存储和检索领域事件
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
"""初始化事件存储"""
|
|
|
self._events: Dict[str, List[DomainEvent]] = {} # 按聚合根ID存储事件
|
|
|
self._all_events: List[DomainEvent] = [] # 存储所有事件
|
|
|
|
|
|
def save_event(self, event: DomainEvent) -> None:
|
|
|
"""
|
|
|
保存事件到存储
|
|
|
|
|
|
Args:
|
|
|
event: 要保存的领域事件
|
|
|
"""
|
|
|
if event.aggregate_id not in self._events:
|
|
|
self._events[event.aggregate_id] = []
|
|
|
|
|
|
self._events[event.aggregate_id].append(event)
|
|
|
self._all_events.append(event)
|
|
|
print(f"Event saved: {event.event_type} for {event.aggregate_id}")
|
|
|
|
|
|
def get_events_for_aggregate(self, aggregate_id: str) -> List[DomainEvent]:
|
|
|
"""
|
|
|
获取指定聚合根的所有事件
|
|
|
|
|
|
Args:
|
|
|
aggregate_id: 聚合根ID
|
|
|
|
|
|
Returns:
|
|
|
指定聚合根的事件列表
|
|
|
"""
|
|
|
return self._events.get(aggregate_id, []).copy()
|
|
|
|
|
|
def get_all_events(self) -> List[DomainEvent]:
|
|
|
"""
|
|
|
获取所有事件
|
|
|
|
|
|
Returns:
|
|
|
所有事件的列表
|
|
|
"""
|
|
|
return self._all_events.copy()
|
|
|
|
|
|
def get_events_by_type(self, event_type: str) -> List[DomainEvent]:
|
|
|
"""
|
|
|
按事件类型获取事件
|
|
|
|
|
|
Args:
|
|
|
event_type: 事件类型
|
|
|
|
|
|
Returns:
|
|
|
指定类型的事件列表
|
|
|
"""
|
|
|
return [event for event in self._all_events if event.event_type == event_type]
|
|
|
|
|
|
def save_to_file(self, file_path: str) -> None:
|
|
|
"""
|
|
|
将事件存储保存到文件
|
|
|
|
|
|
Args:
|
|
|
file_path: 文件路径
|
|
|
"""
|
|
|
events_data = [event.to_dict() for event in self._all_events]
|
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
|
json.dump(events_data, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
def load_from_file(self, file_path: str) -> None:
|
|
|
"""
|
|
|
从文件加载事件存储
|
|
|
|
|
|
Args:
|
|
|
file_path: 文件路径
|
|
|
"""
|
|
|
try:
|
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
events_data = json.load(f)
|
|
|
|
|
|
self._events = {}
|
|
|
self._all_events = []
|
|
|
|
|
|
for event_data in events_data:
|
|
|
event = DomainEvent.from_dict(event_data)
|
|
|
self.save_event(event)
|
|
|
|
|
|
except FileNotFoundError:
|
|
|
print(f"File not found: {file_path}")
|
|
|
except json.JSONDecodeError:
|
|
|
print(f"Invalid JSON in file: {file_path}")
|
|
|
|
|
|
|
|
|
class AggregateRoot:
|
|
|
"""
|
|
|
聚合根基类
|
|
|
|
|
|
作为领域驱动设计中的聚合根,负责管理自身状态和生成事件
|
|
|
"""
|
|
|
|
|
|
def __init__(self, aggregate_id: str):
|
|
|
"""
|
|
|
初始化聚合根
|
|
|
|
|
|
Args:
|
|
|
aggregate_id: 聚合根ID
|
|
|
"""
|
|
|
self.aggregate_id = aggregate_id
|
|
|
self.uncommitted_events: List[DomainEvent] = []
|
|
|
|
|
|
def apply_event(self, event: DomainEvent) -> None:
|
|
|
"""
|
|
|
应用事件到聚合根状态
|
|
|
|
|
|
Args:
|
|
|
event: 要应用的事件
|
|
|
"""
|
|
|
# 具体聚合根需要实现此方法来更新状态
|
|
|
pass
|
|
|
|
|
|
def add_event(self, event: DomainEvent) -> None:
|
|
|
"""
|
|
|
添加未提交的事件
|
|
|
|
|
|
Args:
|
|
|
event: 要添加的事件
|
|
|
"""
|
|
|
self.apply_event(event)
|
|
|
self.uncommitted_events.append(event)
|
|
|
|
|
|
def commit_events(self, event_store: EventStore) -> None:
|
|
|
"""
|
|
|
提交所有未提交的事件到事件存储
|
|
|
|
|
|
Args:
|
|
|
event_store: 事件存储实例
|
|
|
"""
|
|
|
for event in self.uncommitted_events:
|
|
|
event_store.save_event(event)
|
|
|
self.uncommitted_events.clear()
|
|
|
|
|
|
|
|
|
# 示例:定义具体的领域事件和聚合根
|
|
|
|
|
|
class UserCreatedEvent(DomainEvent):
|
|
|
"""用户创建事件"""
|
|
|
|
|
|
def __init__(self, user_id: str, username: str, email: str):
|
|
|
super().__init__(user_id, "UserCreated")
|
|
|
self.data = {
|
|
|
"username": username,
|
|
|
"email": email
|
|
|
}
|
|
|
|
|
|
|
|
|
class UserUpdatedEvent(DomainEvent):
|
|
|
"""用户更新事件"""
|
|
|
|
|
|
def __init__(self, user_id: str, updates: Dict[str, Any]):
|
|
|
super().__init__(user_id, "UserUpdated")
|
|
|
self.data = updates
|
|
|
|
|
|
|
|
|
class User(AggregateRoot):
|
|
|
"""用户聚合根"""
|
|
|
|
|
|
def __init__(self, user_id: str):
|
|
|
super().__init__(user_id)
|
|
|
self.username: Optional[str] = None
|
|
|
self.email: Optional[str] = None
|
|
|
|
|
|
def create(self, username: str, email: str) -> None:
|
|
|
"""创建用户"""
|
|
|
event = UserCreatedEvent(self.aggregate_id, username, email)
|
|
|
self.add_event(event)
|
|
|
|
|
|
def update(self, updates: Dict[str, Any]) -> None:
|
|
|
"""更新用户信息"""
|
|
|
event = UserUpdatedEvent(self.aggregate_id, updates)
|
|
|
self.add_event(event)
|
|
|
|
|
|
def apply_event(self, event: DomainEvent) -> None:
|
|
|
"""应用事件更新状态"""
|
|
|
if event.event_type == "UserCreated":
|
|
|
self.username = event.data["username"]
|
|
|
self.email = event.data["email"]
|
|
|
elif event.event_type == "UserUpdated":
|
|
|
for key, value in event.data.items():
|
|
|
if hasattr(self, key):
|
|
|
setattr(self, key, value)
|
|
|
|
|
|
|
|
|
# 示例使用
|
|
|
if __name__ == "__main__":
|
|
|
# 创建事件存储
|
|
|
event_store = EventStore()
|
|
|
|
|
|
# 创建用户聚合根
|
|
|
user = User("user-1")
|
|
|
user.create("john_doe", "john@example.com")
|
|
|
|
|
|
# 提交事件
|
|
|
user.commit_events(event_store)
|
|
|
|
|
|
# 更新用户
|
|
|
user.update({"email": "john.doe@example.com"})
|
|
|
user.commit_events(event_store)
|
|
|
|
|
|
# 从事件重建状态
|
|
|
replayed_user = User("user-1")
|
|
|
events = event_store.get_events_for_aggregate("user-1")
|
|
|
|
|
|
for event in events:
|
|
|
replayed_user.apply_event(event)
|
|
|
|
|
|
print(f"Replayed user: {replayed_user.username}, {replayed_user.email}")
|
|
|
print(f"Total events: {len(event_store.get_all_events())}") |