|
|
"""
|
|
|
Saga模式 - 分布式事务协调器
|
|
|
|
|
|
此模块实现了Saga模式,用于管理分布式事务。Saga模式将分布式事务分解为一系列本地事务,
|
|
|
每个本地事务都有对应的补偿事务,当某个本地事务失败时,执行已完成事务的补偿操作。
|
|
|
"""
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
from enum import Enum, auto
|
|
|
from typing import Dict, List, Optional, Any, Callable
|
|
|
import time
|
|
|
import uuid
|
|
|
|
|
|
|
|
|
class SagaStatus(Enum):
|
|
|
"""Saga状态枚举"""
|
|
|
NOT_STARTED = auto()
|
|
|
RUNNING = auto()
|
|
|
COMPLETED = auto()
|
|
|
COMPENSATING = auto()
|
|
|
COMPENSATED = auto()
|
|
|
FAILED = auto()
|
|
|
|
|
|
|
|
|
class StepStatus(Enum):
|
|
|
"""Saga步骤状态枚举"""
|
|
|
PENDING = auto()
|
|
|
EXECUTING = auto()
|
|
|
COMPLETED = auto()
|
|
|
COMPENSATING = auto()
|
|
|
COMPENSATED = auto()
|
|
|
FAILED = auto()
|
|
|
|
|
|
|
|
|
class SagaStep:
|
|
|
"""
|
|
|
Saga步骤类
|
|
|
|
|
|
表示Saga中的一个步骤,包含执行操作和补偿操作
|
|
|
"""
|
|
|
|
|
|
def __init__(self, name: str):
|
|
|
"""
|
|
|
初始化Saga步骤
|
|
|
|
|
|
Args:
|
|
|
name: 步骤名称
|
|
|
"""
|
|
|
self.name = name
|
|
|
self.status = StepStatus.PENDING
|
|
|
self.execution_error: Optional[str] = None
|
|
|
self.compensation_error: Optional[str] = None
|
|
|
self.execution_start_time: Optional[float] = None
|
|
|
self.execution_end_time: Optional[float] = None
|
|
|
self.compensation_start_time: Optional[float] = None
|
|
|
self.compensation_end_time: Optional[float] = None
|
|
|
self.result: Optional[Any] = None
|
|
|
|
|
|
def execute(self, execute_func: Callable, *args, **kwargs) -> Any:
|
|
|
"""
|
|
|
执行步骤
|
|
|
|
|
|
Args:
|
|
|
execute_func: 执行函数
|
|
|
*args: 位置参数
|
|
|
**kwargs: 关键字参数
|
|
|
|
|
|
Returns:
|
|
|
执行结果
|
|
|
|
|
|
Raises:
|
|
|
Exception: 如果执行失败
|
|
|
"""
|
|
|
self.status = StepStatus.EXECUTING
|
|
|
self.execution_start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
self.result = execute_func(*args, **kwargs)
|
|
|
self.status = StepStatus.COMPLETED
|
|
|
self.execution_end_time = time.time()
|
|
|
return self.result
|
|
|
except Exception as e:
|
|
|
self.status = StepStatus.FAILED
|
|
|
self.execution_error = str(e)
|
|
|
self.execution_end_time = time.time()
|
|
|
raise
|
|
|
|
|
|
def compensate(self, compensate_func: Callable, *args, **kwargs) -> Any:
|
|
|
"""
|
|
|
执行补偿操作
|
|
|
|
|
|
Args:
|
|
|
compensate_func: 补偿函数
|
|
|
*args: 位置参数
|
|
|
**kwargs: 关键字参数
|
|
|
|
|
|
Returns:
|
|
|
补偿结果
|
|
|
"""
|
|
|
self.status = StepStatus.COMPENSATING
|
|
|
self.compensation_start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
result = compensate_func(*args, **kwargs)
|
|
|
self.status = StepStatus.COMPENSATED
|
|
|
self.compensation_end_time = time.time()
|
|
|
return result
|
|
|
except Exception as e:
|
|
|
self.status = StepStatus.FAILED
|
|
|
self.compensation_error = str(e)
|
|
|
self.compensation_end_time = time.time()
|
|
|
raise
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
"""转换为字典表示"""
|
|
|
return {
|
|
|
"name": self.name,
|
|
|
"status": self.status.name,
|
|
|
"execution_error": self.execution_error,
|
|
|
"compensation_error": self.compensation_error,
|
|
|
"execution_start_time": self.execution_start_time,
|
|
|
"execution_end_time": self.execution_end_time,
|
|
|
"compensation_start_time": self.compensation_start_time,
|
|
|
"compensation_end_time": self.compensation_end_time,
|
|
|
"result": self.result
|
|
|
}
|
|
|
|
|
|
|
|
|
class SagaDefinition:
|
|
|
"""
|
|
|
Saga定义类
|
|
|
|
|
|
定义Saga的步骤和每个步骤的执行、补偿函数
|
|
|
"""
|
|
|
|
|
|
def __init__(self, name: str):
|
|
|
"""
|
|
|
初始化Saga定义
|
|
|
|
|
|
Args:
|
|
|
name: Saga名称
|
|
|
"""
|
|
|
self.name = name
|
|
|
self._steps: List[Dict[str, Any]] = []
|
|
|
|
|
|
def add_step(self, name: str, execute_func: Callable, compensate_func: Callable) -> 'SagaDefinition':
|
|
|
"""
|
|
|
添加Saga步骤
|
|
|
|
|
|
Args:
|
|
|
name: 步骤名称
|
|
|
execute_func: 执行函数
|
|
|
compensate_func: 补偿函数
|
|
|
|
|
|
Returns:
|
|
|
SagaDefinition实例,支持链式调用
|
|
|
"""
|
|
|
self._steps.append({
|
|
|
"name": name,
|
|
|
"execute_func": execute_func,
|
|
|
"compensate_func": compensate_func
|
|
|
})
|
|
|
return self
|
|
|
|
|
|
def get_steps(self) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
获取所有步骤
|
|
|
|
|
|
Returns:
|
|
|
步骤列表
|
|
|
"""
|
|
|
return self._steps.copy()
|
|
|
|
|
|
|
|
|
class Saga:
|
|
|
"""
|
|
|
Saga实例类
|
|
|
|
|
|
表示一个正在执行的Saga实例
|
|
|
"""
|
|
|
|
|
|
def __init__(self, saga_id: str, saga_definition: SagaDefinition, context: Dict[str, Any]):
|
|
|
"""
|
|
|
初始化Saga实例
|
|
|
|
|
|
Args:
|
|
|
saga_id: Saga实例ID
|
|
|
saga_definition: Saga定义
|
|
|
context: Saga上下文数据
|
|
|
"""
|
|
|
self.saga_id = saga_id
|
|
|
self.saga_definition = saga_definition
|
|
|
self.context = context
|
|
|
self.status = SagaStatus.NOT_STARTED
|
|
|
self.created_at = time.time()
|
|
|
self.started_at: Optional[float] = None
|
|
|
self.completed_at: Optional[float] = None
|
|
|
self.error: Optional[str] = None
|
|
|
|
|
|
# 初始化步骤
|
|
|
self.steps: List[SagaStep] = []
|
|
|
for step_def in saga_definition.get_steps():
|
|
|
self.steps.append(SagaStep(step_def["name"]))
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
"""转换为字典表示"""
|
|
|
return {
|
|
|
"saga_id": self.saga_id,
|
|
|
"saga_name": self.saga_definition.name,
|
|
|
"status": self.status.name,
|
|
|
"context": self.context,
|
|
|
"created_at": self.created_at,
|
|
|
"started_at": self.started_at,
|
|
|
"completed_at": self.completed_at,
|
|
|
"error": self.error,
|
|
|
"steps": [step.to_dict() for step in self.steps]
|
|
|
}
|
|
|
|
|
|
|
|
|
class SagaCoordinator:
|
|
|
"""
|
|
|
Saga协调器
|
|
|
|
|
|
负责协调Saga的执行和补偿
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
"""初始化Saga协调器"""
|
|
|
self._active_sagas: Dict[str, Saga] = {}
|
|
|
self._completed_sagas: Dict[str, Saga] = {}
|
|
|
|
|
|
def create_saga(self, saga_definition: SagaDefinition, context: Optional[Dict[str, Any]] = None) -> Saga:
|
|
|
"""
|
|
|
创建Saga实例
|
|
|
|
|
|
Args:
|
|
|
saga_definition: Saga定义
|
|
|
context: Saga上下文数据
|
|
|
|
|
|
Returns:
|
|
|
创建的Saga实例
|
|
|
"""
|
|
|
saga_id = str(uuid.uuid4())
|
|
|
saga = Saga(saga_id, saga_definition, context or {})
|
|
|
self._active_sagas[saga_id] = saga
|
|
|
return saga
|
|
|
|
|
|
def execute_saga(self, saga: Saga) -> Saga:
|
|
|
"""
|
|
|
执行Saga
|
|
|
|
|
|
Args:
|
|
|
saga: 要执行的Saga实例
|
|
|
|
|
|
Returns:
|
|
|
执行后的Saga实例
|
|
|
"""
|
|
|
if saga.status != SagaStatus.NOT_STARTED:
|
|
|
raise ValueError(f"Saga already started with status: {saga.status.name}")
|
|
|
|
|
|
saga.status = SagaStatus.RUNNING
|
|
|
saga.started_at = time.time()
|
|
|
|
|
|
try:
|
|
|
# 执行每个步骤
|
|
|
step_defs = saga.saga_definition.get_steps()
|
|
|
|
|
|
for i, (step, step_def) in enumerate(zip(saga.steps, step_defs)):
|
|
|
try:
|
|
|
# 执行步骤
|
|
|
print(f"Executing step {i+1}/{len(step_defs)}: {step.name}")
|
|
|
result = step.execute(
|
|
|
step_def["execute_func"],
|
|
|
saga.context
|
|
|
)
|
|
|
|
|
|
# 更新上下文
|
|
|
saga.context[f"step_{i}_result"] = result
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"Step failed: {step.name}, starting compensation")
|
|
|
saga.error = f"Step {step.name} failed: {str(e)}"
|
|
|
|
|
|
# 开始补偿
|
|
|
saga.status = SagaStatus.COMPENSATING
|
|
|
self._compensate_saga(saga, i-1)
|
|
|
break
|
|
|
|
|
|
if saga.status == SagaStatus.RUNNING:
|
|
|
# 所有步骤执行成功
|
|
|
saga.status = SagaStatus.COMPLETED
|
|
|
saga.completed_at = time.time()
|
|
|
print(f"Saga {saga.saga_id} completed successfully")
|
|
|
|
|
|
except Exception as e:
|
|
|
saga.status = SagaStatus.FAILED
|
|
|
saga.completed_at = time.time()
|
|
|
if not saga.error:
|
|
|
saga.error = str(e)
|
|
|
print(f"Saga {saga.saga_id} failed with error: {saga.error}")
|
|
|
finally:
|
|
|
# 移动到已完成的Saga
|
|
|
if saga.status in [SagaStatus.COMPLETED, SagaStatus.COMPENSATED, SagaStatus.FAILED]:
|
|
|
self._active_sagas.pop(saga.saga_id, None)
|
|
|
self._completed_sagas[saga.saga_id] = saga
|
|
|
|
|
|
return saga
|
|
|
|
|
|
def _compensate_saga(self, saga: Saga, last_completed_step_index: int) -> None:
|
|
|
"""
|
|
|
补偿Saga
|
|
|
|
|
|
Args:
|
|
|
saga: Saga实例
|
|
|
last_completed_step_index: 最后完成的步骤索引
|
|
|
"""
|
|
|
step_defs = saga.saga_definition.get_steps()
|
|
|
|
|
|
# 从最后完成的步骤开始向前补偿
|
|
|
for i in range(last_completed_step_index, -1, -1):
|
|
|
step = saga.steps[i]
|
|
|
step_def = step_defs[i]
|
|
|
|
|
|
if step.status == StepStatus.COMPLETED:
|
|
|
try:
|
|
|
print(f"Compensating step {i+1}/{len(step_defs)}: {step.name}")
|
|
|
step.compensate(
|
|
|
step_def["compensate_func"],
|
|
|
saga.context
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(f"Compensation failed for step {step.name}: {str(e)}")
|
|
|
saga.error += f"; Compensation failed for step {step.name}: {str(e)}"
|
|
|
saga.status = SagaStatus.FAILED
|
|
|
return
|
|
|
|
|
|
# 所有补偿都成功
|
|
|
saga.status = SagaStatus.COMPENSATED
|
|
|
saga.completed_at = time.time()
|
|
|
print(f"Saga {saga.saga_id} compensated successfully")
|
|
|
|
|
|
def get_saga(self, saga_id: str) -> Optional[Saga]:
|
|
|
"""
|
|
|
获取Saga实例
|
|
|
|
|
|
Args:
|
|
|
saga_id: Saga实例ID
|
|
|
|
|
|
Returns:
|
|
|
Saga实例,如果不存在则返回None
|
|
|
"""
|
|
|
return self._active_sagas.get(saga_id) or self._completed_sagas.get(saga_id)
|
|
|
|
|
|
def get_active_sagas(self) -> List[Saga]:
|
|
|
"""
|
|
|
获取所有活跃的Saga实例
|
|
|
|
|
|
Returns:
|
|
|
活跃Saga列表
|
|
|
"""
|
|
|
return list(self._active_sagas.values())
|
|
|
|
|
|
def get_completed_sagas(self) -> List[Saga]:
|
|
|
"""
|
|
|
获取所有已完成的Saga实例
|
|
|
|
|
|
Returns:
|
|
|
已完成Saga列表
|
|
|
"""
|
|
|
return list(self._completed_sagas.values())
|
|
|
|
|
|
|
|
|
# 示例:实现一个电商订单处理的Saga
|
|
|
|
|
|
# 模拟服务
|
|
|
|
|
|
class InventoryService:
|
|
|
"""库存服务"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.inventory = {
|
|
|
"product-1": 10,
|
|
|
"product-2": 5
|
|
|
}
|
|
|
|
|
|
def reserve(self, product_id: str, quantity: int) -> Dict[str, Any]:
|
|
|
"""预留库存"""
|
|
|
if product_id not in self.inventory or self.inventory[product_id] < quantity:
|
|
|
raise ValueError(f"Insufficient inventory for product {product_id}")
|
|
|
|
|
|
self.inventory[product_id] -= quantity
|
|
|
reservation_id = f"res-{uuid.uuid4()}"
|
|
|
print(f"Reserved {quantity} of {product_id}, reservation ID: {reservation_id}")
|
|
|
return {
|
|
|
"reservation_id": reservation_id,
|
|
|
"product_id": product_id,
|
|
|
"quantity": quantity
|
|
|
}
|
|
|
|
|
|
def cancel_reservation(self, reservation: Dict[str, Any]) -> None:
|
|
|
"""取消库存预留"""
|
|
|
product_id = reservation["product_id"]
|
|
|
quantity = reservation["quantity"]
|
|
|
self.inventory[product_id] += quantity
|
|
|
print(f"Cancelled reservation {reservation['reservation_id']} for {quantity} of {product_id}")
|
|
|
|
|
|
|
|
|
class PaymentService:
|
|
|
"""支付服务"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.payments = {}
|
|
|
|
|
|
def process_payment(self, user_id: str, amount: float) -> Dict[str, Any]:
|
|
|
"""处理支付"""
|
|
|
if amount <= 0:
|
|
|
raise ValueError("Payment amount must be positive")
|
|
|
|
|
|
payment_id = f"pay-{uuid.uuid4()}"
|
|
|
self.payments[payment_id] = {
|
|
|
"user_id": user_id,
|
|
|
"amount": amount,
|
|
|
"status": "completed",
|
|
|
"created_at": time.time()
|
|
|
}
|
|
|
print(f"Processed payment {payment_id} of ${amount} for user {user_id}")
|
|
|
return {
|
|
|
"payment_id": payment_id,
|
|
|
"user_id": user_id,
|
|
|
"amount": amount
|
|
|
}
|
|
|
|
|
|
def refund_payment(self, payment: Dict[str, Any]) -> None:
|
|
|
"""退款"""
|
|
|
payment_id = payment["payment_id"]
|
|
|
if payment_id in self.payments:
|
|
|
self.payments[payment_id]["status"] = "refunded"
|
|
|
print(f"Refunded payment {payment_id} of ${payment['amount']} for user {payment['user_id']}")
|
|
|
|
|
|
|
|
|
class ShippingService:
|
|
|
"""物流服务"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.shipments = {}
|
|
|
|
|
|
def create_shipment(self, order_id: str, address: Dict[str, str]) -> Dict[str, Any]:
|
|
|
"""创建物流订单"""
|
|
|
shipment_id = f"ship-{uuid.uuid4()}"
|
|
|
self.shipments[shipment_id] = {
|
|
|
"order_id": order_id,
|
|
|
"address": address,
|
|
|
"status": "created",
|
|
|
"created_at": time.time()
|
|
|
}
|
|
|
print(f"Created shipment {shipment_id} for order {order_id}")
|
|
|
return {
|
|
|
"shipment_id": shipment_id,
|
|
|
"order_id": order_id
|
|
|
}
|
|
|
|
|
|
def cancel_shipment(self, shipment: Dict[str, Any]) -> None:
|
|
|
"""取消物流订单"""
|
|
|
shipment_id = shipment["shipment_id"]
|
|
|
if shipment_id in self.shipments:
|
|
|
self.shipments[shipment_id]["status"] = "cancelled"
|
|
|
print(f"Cancelled shipment {shipment_id} for order {shipment['order_id']}")
|
|
|
|
|
|
|
|
|
# 订单处理Saga
|
|
|
|
|
|
def create_order_saga_definition(inventory_service, payment_service, shipping_service) -> SagaDefinition:
|
|
|
"""创建订单处理的Saga定义"""
|
|
|
|
|
|
def reserve_inventory(context: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""预留库存"""
|
|
|
order = context["order"]
|
|
|
return inventory_service.reserve(order["product_id"], order["quantity"])
|
|
|
|
|
|
def cancel_inventory_reservation(context: Dict[str, Any]) -> None:
|
|
|
"""取消库存预留"""
|
|
|
reservation = context.get("step_0_result")
|
|
|
if reservation:
|
|
|
inventory_service.cancel_reservation(reservation)
|
|
|
|
|
|
def process_payment(context: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""处理支付"""
|
|
|
order = context["order"]
|
|
|
return payment_service.process_payment(order["user_id"], order["amount"])
|
|
|
|
|
|
def refund_payment(context: Dict[str, Any]) -> None:
|
|
|
"""退款"""
|
|
|
payment = context.get("step_1_result")
|
|
|
if payment:
|
|
|
payment_service.refund_payment(payment)
|
|
|
|
|
|
def create_shipment(context: Dict[str, Any]) -> Dict[str, Any]:
|
|
|
"""创建物流订单"""
|
|
|
order = context["order"]
|
|
|
return shipping_service.create_shipment(order["order_id"], order["shipping_address"])
|
|
|
|
|
|
def cancel_shipment(context: Dict[str, Any]) -> None:
|
|
|
"""取消物流订单"""
|
|
|
shipment = context.get("step_2_result")
|
|
|
if shipment:
|
|
|
shipping_service.cancel_shipment(shipment)
|
|
|
|
|
|
# 创建Saga定义
|
|
|
saga_def = SagaDefinition("OrderProcessingSaga")
|
|
|
saga_def.add_step("ReserveInventory", reserve_inventory, cancel_inventory_reservation)
|
|
|
saga_def.add_step("ProcessPayment", process_payment, refund_payment)
|
|
|
saga_def.add_step("CreateShipment", create_shipment, cancel_shipment)
|
|
|
|
|
|
return saga_def
|
|
|
|
|
|
|
|
|
# 示例使用
|
|
|
if __name__ == "__main__":
|
|
|
# 初始化服务
|
|
|
inventory_service = InventoryService()
|
|
|
payment_service = PaymentService()
|
|
|
shipping_service = ShippingService()
|
|
|
|
|
|
# 创建Saga定义
|
|
|
order_saga_def = create_order_saga_definition(
|
|
|
inventory_service,
|
|
|
payment_service,
|
|
|
shipping_service
|
|
|
)
|
|
|
|
|
|
# 创建Saga协调器
|
|
|
coordinator = SagaCoordinator()
|
|
|
|
|
|
# 创建成功的订单场景
|
|
|
print("\n=== Test Case 1: Successful Order Processing ===")
|
|
|
order_context = {
|
|
|
"order": {
|
|
|
"order_id": "order-1",
|
|
|
"user_id": "user-1",
|
|
|
"product_id": "product-1",
|
|
|
"quantity": 2,
|
|
|
"amount": 99.99,
|
|
|
"shipping_address": {
|
|
|
"street": "123 Main St",
|
|
|
"city": "Any City",
|
|
|
"zip": "12345"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
saga1 = coordinator.create_saga(order_saga_def, order_context)
|
|
|
result1 = coordinator.execute_saga(saga1)
|
|
|
print(f"Saga 1 status: {result1.status.name}")
|
|
|
|
|
|
# 创建失败的订单场景(库存不足)
|
|
|
print("\n=== Test Case 2: Failed Order Processing (Insufficient Inventory) ===")
|
|
|
failed_order_context = {
|
|
|
"order": {
|
|
|
"order_id": "order-2",
|
|
|
"user_id": "user-1",
|
|
|
"product_id": "product-1",
|
|
|
"quantity": 100, # 库存不足
|
|
|
"amount": 4999.99,
|
|
|
"shipping_address": {
|
|
|
"street": "123 Main St",
|
|
|
"city": "Any City",
|
|
|
"zip": "12345"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
saga2 = coordinator.create_saga(order_saga_def, failed_order_context)
|
|
|
result2 = coordinator.execute_saga(saga2)
|
|
|
print(f"Saga 2 status: {result2.status.name}")
|
|
|
|
|
|
# 创建失败的订单场景(支付失败)
|
|
|
print("\n=== Test Case 3: Failed Order Processing (Payment Failure) ===")
|
|
|
|
|
|
# 修改支付服务使其在特定金额下失败
|
|
|
original_process_payment = payment_service.process_payment
|
|
|
|
|
|
def failing_process_payment(user_id, amount):
|
|
|
if amount == 9999.99:
|
|
|
raise ValueError("Payment gateway rejected")
|
|
|
return original_process_payment(user_id, amount)
|
|
|
|
|
|
payment_service.process_payment = failing_process_payment
|
|
|
|
|
|
payment_failure_context = {
|
|
|
"order": {
|
|
|
"order_id": "order-3",
|
|
|
"user_id": "user-1",
|
|
|
"product_id": "product-2",
|
|
|
"quantity": 1,
|
|
|
"amount": 9999.99, # 触发支付失败
|
|
|
"shipping_address": {
|
|
|
"street": "123 Main St",
|
|
|
"city": "Any City",
|
|
|
"zip": "12345"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
saga3 = coordinator.create_saga(order_saga_def, payment_failure_context)
|
|
|
result3 = coordinator.execute_saga(saga3)
|
|
|
print(f"Saga 3 status: {result3.status.name}")
|
|
|
|
|
|
# 显示Saga状态
|
|
|
print("\n=== Saga Execution Summary ===")
|
|
|
print(f"Completed Sagas: {len(coordinator.get_completed_sagas())}")
|
|
|
print(f"Active Sagas: {len(coordinator.get_active_sagas())}") |