You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

272 lines
7.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
事件溯源模式 - 事件存储系统
此模块实现了一个简单的事件存储系统,用于记录所有状态变更。
事件溯源模式通过存储所有导致状态变更的事件,而不是仅存储当前状态,
使得系统可以通过重放事件来重建任何时间点的状态。
"""
from datetime import datetime
import json
from typing import Dict, List, Optional, Any, Type
class DomainEvent:
"""
领域事件基类
所有领域事件都应该继承此类包含事件的基本属性如事件ID、时间戳等。
"""
def __init__(self, aggregate_id: str, event_type: str):
"""
初始化领域事件
Args:
aggregate_id: 聚合根ID
event_type: 事件类型
"""
self.aggregate_id = aggregate_id
self.event_type = event_type
self.timestamp = datetime.now().isoformat()
self.event_id = f"{event_type}-{aggregate_id}-{self.timestamp}"
self.data: Dict[str, Any] = {}
def to_dict(self) -> Dict[str, Any]:
"""将事件转换为字典格式"""
return {
"event_id": self.event_id,
"aggregate_id": self.aggregate_id,
"event_type": self.event_type,
"timestamp": self.timestamp,
"data": self.data
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'DomainEvent':
"""从字典创建事件实例"""
event = cls(data["aggregate_id"], data["event_type"])
event.event_id = data["event_id"]
event.timestamp = data["timestamp"]
event.data = data["data"]
return event
class EventStore:
"""
事件存储类
负责存储和检索领域事件
"""
def __init__(self):
"""初始化事件存储"""
self._events: Dict[str, List[DomainEvent]] = {} # 按聚合根ID存储事件
self._all_events: List[DomainEvent] = [] # 存储所有事件
def save_event(self, event: DomainEvent) -> None:
"""
保存事件到存储
Args:
event: 要保存的领域事件
"""
if event.aggregate_id not in self._events:
self._events[event.aggregate_id] = []
self._events[event.aggregate_id].append(event)
self._all_events.append(event)
print(f"Event saved: {event.event_type} for {event.aggregate_id}")
def get_events_for_aggregate(self, aggregate_id: str) -> List[DomainEvent]:
"""
获取指定聚合根的所有事件
Args:
aggregate_id: 聚合根ID
Returns:
指定聚合根的事件列表
"""
return self._events.get(aggregate_id, []).copy()
def get_all_events(self) -> List[DomainEvent]:
"""
获取所有事件
Returns:
所有事件的列表
"""
return self._all_events.copy()
def get_events_by_type(self, event_type: str) -> List[DomainEvent]:
"""
按事件类型获取事件
Args:
event_type: 事件类型
Returns:
指定类型的事件列表
"""
return [event for event in self._all_events if event.event_type == event_type]
def save_to_file(self, file_path: str) -> None:
"""
将事件存储保存到文件
Args:
file_path: 文件路径
"""
events_data = [event.to_dict() for event in self._all_events]
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(events_data, f, indent=2, ensure_ascii=False)
def load_from_file(self, file_path: str) -> None:
"""
从文件加载事件存储
Args:
file_path: 文件路径
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
events_data = json.load(f)
self._events = {}
self._all_events = []
for event_data in events_data:
event = DomainEvent.from_dict(event_data)
self.save_event(event)
except FileNotFoundError:
print(f"File not found: {file_path}")
except json.JSONDecodeError:
print(f"Invalid JSON in file: {file_path}")
class AggregateRoot:
"""
聚合根基类
作为领域驱动设计中的聚合根,负责管理自身状态和生成事件
"""
def __init__(self, aggregate_id: str):
"""
初始化聚合根
Args:
aggregate_id: 聚合根ID
"""
self.aggregate_id = aggregate_id
self.uncommitted_events: List[DomainEvent] = []
def apply_event(self, event: DomainEvent) -> None:
"""
应用事件到聚合根状态
Args:
event: 要应用的事件
"""
# 具体聚合根需要实现此方法来更新状态
pass
def add_event(self, event: DomainEvent) -> None:
"""
添加未提交的事件
Args:
event: 要添加的事件
"""
self.apply_event(event)
self.uncommitted_events.append(event)
def commit_events(self, event_store: EventStore) -> None:
"""
提交所有未提交的事件到事件存储
Args:
event_store: 事件存储实例
"""
for event in self.uncommitted_events:
event_store.save_event(event)
self.uncommitted_events.clear()
# 示例:定义具体的领域事件和聚合根
class UserCreatedEvent(DomainEvent):
"""用户创建事件"""
def __init__(self, user_id: str, username: str, email: str):
super().__init__(user_id, "UserCreated")
self.data = {
"username": username,
"email": email
}
class UserUpdatedEvent(DomainEvent):
"""用户更新事件"""
def __init__(self, user_id: str, updates: Dict[str, Any]):
super().__init__(user_id, "UserUpdated")
self.data = updates
class User(AggregateRoot):
"""用户聚合根"""
def __init__(self, user_id: str):
super().__init__(user_id)
self.username: Optional[str] = None
self.email: Optional[str] = None
def create(self, username: str, email: str) -> None:
"""创建用户"""
event = UserCreatedEvent(self.aggregate_id, username, email)
self.add_event(event)
def update(self, updates: Dict[str, Any]) -> None:
"""更新用户信息"""
event = UserUpdatedEvent(self.aggregate_id, updates)
self.add_event(event)
def apply_event(self, event: DomainEvent) -> None:
"""应用事件更新状态"""
if event.event_type == "UserCreated":
self.username = event.data["username"]
self.email = event.data["email"]
elif event.event_type == "UserUpdated":
for key, value in event.data.items():
if hasattr(self, key):
setattr(self, key, value)
# 示例使用
if __name__ == "__main__":
# 创建事件存储
event_store = EventStore()
# 创建用户聚合根
user = User("user-1")
user.create("john_doe", "john@example.com")
# 提交事件
user.commit_events(event_store)
# 更新用户
user.update({"email": "john.doe@example.com"})
user.commit_events(event_store)
# 从事件重建状态
replayed_user = User("user-1")
events = event_store.get_events_for_aggregate("user-1")
for event in events:
replayed_user.apply_event(event)
print(f"Replayed user: {replayed_user.username}, {replayed_user.email}")
print(f"Total events: {len(event_store.get_all_events())}")