You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

611 lines
19 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
Saga模式 - 分布式事务协调器
此模块实现了Saga模式用于管理分布式事务。Saga模式将分布式事务分解为一系列本地事务
每个本地事务都有对应的补偿事务,当某个本地事务失败时,执行已完成事务的补偿操作。
"""
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Dict, List, Optional, Any, Callable
import time
import uuid
class SagaStatus(Enum):
"""Saga状态枚举"""
NOT_STARTED = auto()
RUNNING = auto()
COMPLETED = auto()
COMPENSATING = auto()
COMPENSATED = auto()
FAILED = auto()
class StepStatus(Enum):
"""Saga步骤状态枚举"""
PENDING = auto()
EXECUTING = auto()
COMPLETED = auto()
COMPENSATING = auto()
COMPENSATED = auto()
FAILED = auto()
class SagaStep:
"""
Saga步骤类
表示Saga中的一个步骤包含执行操作和补偿操作
"""
def __init__(self, name: str):
"""
初始化Saga步骤
Args:
name: 步骤名称
"""
self.name = name
self.status = StepStatus.PENDING
self.execution_error: Optional[str] = None
self.compensation_error: Optional[str] = None
self.execution_start_time: Optional[float] = None
self.execution_end_time: Optional[float] = None
self.compensation_start_time: Optional[float] = None
self.compensation_end_time: Optional[float] = None
self.result: Optional[Any] = None
def execute(self, execute_func: Callable, *args, **kwargs) -> Any:
"""
执行步骤
Args:
execute_func: 执行函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
执行结果
Raises:
Exception: 如果执行失败
"""
self.status = StepStatus.EXECUTING
self.execution_start_time = time.time()
try:
self.result = execute_func(*args, **kwargs)
self.status = StepStatus.COMPLETED
self.execution_end_time = time.time()
return self.result
except Exception as e:
self.status = StepStatus.FAILED
self.execution_error = str(e)
self.execution_end_time = time.time()
raise
def compensate(self, compensate_func: Callable, *args, **kwargs) -> Any:
"""
执行补偿操作
Args:
compensate_func: 补偿函数
*args: 位置参数
**kwargs: 关键字参数
Returns:
补偿结果
"""
self.status = StepStatus.COMPENSATING
self.compensation_start_time = time.time()
try:
result = compensate_func(*args, **kwargs)
self.status = StepStatus.COMPENSATED
self.compensation_end_time = time.time()
return result
except Exception as e:
self.status = StepStatus.FAILED
self.compensation_error = str(e)
self.compensation_end_time = time.time()
raise
def to_dict(self) -> Dict[str, Any]:
"""转换为字典表示"""
return {
"name": self.name,
"status": self.status.name,
"execution_error": self.execution_error,
"compensation_error": self.compensation_error,
"execution_start_time": self.execution_start_time,
"execution_end_time": self.execution_end_time,
"compensation_start_time": self.compensation_start_time,
"compensation_end_time": self.compensation_end_time,
"result": self.result
}
class SagaDefinition:
"""
Saga定义类
定义Saga的步骤和每个步骤的执行、补偿函数
"""
def __init__(self, name: str):
"""
初始化Saga定义
Args:
name: Saga名称
"""
self.name = name
self._steps: List[Dict[str, Any]] = []
def add_step(self, name: str, execute_func: Callable, compensate_func: Callable) -> 'SagaDefinition':
"""
添加Saga步骤
Args:
name: 步骤名称
execute_func: 执行函数
compensate_func: 补偿函数
Returns:
SagaDefinition实例支持链式调用
"""
self._steps.append({
"name": name,
"execute_func": execute_func,
"compensate_func": compensate_func
})
return self
def get_steps(self) -> List[Dict[str, Any]]:
"""
获取所有步骤
Returns:
步骤列表
"""
return self._steps.copy()
class Saga:
"""
Saga实例类
表示一个正在执行的Saga实例
"""
def __init__(self, saga_id: str, saga_definition: SagaDefinition, context: Dict[str, Any]):
"""
初始化Saga实例
Args:
saga_id: Saga实例ID
saga_definition: Saga定义
context: Saga上下文数据
"""
self.saga_id = saga_id
self.saga_definition = saga_definition
self.context = context
self.status = SagaStatus.NOT_STARTED
self.created_at = time.time()
self.started_at: Optional[float] = None
self.completed_at: Optional[float] = None
self.error: Optional[str] = None
# 初始化步骤
self.steps: List[SagaStep] = []
for step_def in saga_definition.get_steps():
self.steps.append(SagaStep(step_def["name"]))
def to_dict(self) -> Dict[str, Any]:
"""转换为字典表示"""
return {
"saga_id": self.saga_id,
"saga_name": self.saga_definition.name,
"status": self.status.name,
"context": self.context,
"created_at": self.created_at,
"started_at": self.started_at,
"completed_at": self.completed_at,
"error": self.error,
"steps": [step.to_dict() for step in self.steps]
}
class SagaCoordinator:
"""
Saga协调器
负责协调Saga的执行和补偿
"""
def __init__(self):
"""初始化Saga协调器"""
self._active_sagas: Dict[str, Saga] = {}
self._completed_sagas: Dict[str, Saga] = {}
def create_saga(self, saga_definition: SagaDefinition, context: Optional[Dict[str, Any]] = None) -> Saga:
"""
创建Saga实例
Args:
saga_definition: Saga定义
context: Saga上下文数据
Returns:
创建的Saga实例
"""
saga_id = str(uuid.uuid4())
saga = Saga(saga_id, saga_definition, context or {})
self._active_sagas[saga_id] = saga
return saga
def execute_saga(self, saga: Saga) -> Saga:
"""
执行Saga
Args:
saga: 要执行的Saga实例
Returns:
执行后的Saga实例
"""
if saga.status != SagaStatus.NOT_STARTED:
raise ValueError(f"Saga already started with status: {saga.status.name}")
saga.status = SagaStatus.RUNNING
saga.started_at = time.time()
try:
# 执行每个步骤
step_defs = saga.saga_definition.get_steps()
for i, (step, step_def) in enumerate(zip(saga.steps, step_defs)):
try:
# 执行步骤
print(f"Executing step {i+1}/{len(step_defs)}: {step.name}")
result = step.execute(
step_def["execute_func"],
saga.context
)
# 更新上下文
saga.context[f"step_{i}_result"] = result
except Exception as e:
print(f"Step failed: {step.name}, starting compensation")
saga.error = f"Step {step.name} failed: {str(e)}"
# 开始补偿
saga.status = SagaStatus.COMPENSATING
self._compensate_saga(saga, i-1)
break
if saga.status == SagaStatus.RUNNING:
# 所有步骤执行成功
saga.status = SagaStatus.COMPLETED
saga.completed_at = time.time()
print(f"Saga {saga.saga_id} completed successfully")
except Exception as e:
saga.status = SagaStatus.FAILED
saga.completed_at = time.time()
if not saga.error:
saga.error = str(e)
print(f"Saga {saga.saga_id} failed with error: {saga.error}")
finally:
# 移动到已完成的Saga
if saga.status in [SagaStatus.COMPLETED, SagaStatus.COMPENSATED, SagaStatus.FAILED]:
self._active_sagas.pop(saga.saga_id, None)
self._completed_sagas[saga.saga_id] = saga
return saga
def _compensate_saga(self, saga: Saga, last_completed_step_index: int) -> None:
"""
补偿Saga
Args:
saga: Saga实例
last_completed_step_index: 最后完成的步骤索引
"""
step_defs = saga.saga_definition.get_steps()
# 从最后完成的步骤开始向前补偿
for i in range(last_completed_step_index, -1, -1):
step = saga.steps[i]
step_def = step_defs[i]
if step.status == StepStatus.COMPLETED:
try:
print(f"Compensating step {i+1}/{len(step_defs)}: {step.name}")
step.compensate(
step_def["compensate_func"],
saga.context
)
except Exception as e:
print(f"Compensation failed for step {step.name}: {str(e)}")
saga.error += f"; Compensation failed for step {step.name}: {str(e)}"
saga.status = SagaStatus.FAILED
return
# 所有补偿都成功
saga.status = SagaStatus.COMPENSATED
saga.completed_at = time.time()
print(f"Saga {saga.saga_id} compensated successfully")
def get_saga(self, saga_id: str) -> Optional[Saga]:
"""
获取Saga实例
Args:
saga_id: Saga实例ID
Returns:
Saga实例如果不存在则返回None
"""
return self._active_sagas.get(saga_id) or self._completed_sagas.get(saga_id)
def get_active_sagas(self) -> List[Saga]:
"""
获取所有活跃的Saga实例
Returns:
活跃Saga列表
"""
return list(self._active_sagas.values())
def get_completed_sagas(self) -> List[Saga]:
"""
获取所有已完成的Saga实例
Returns:
已完成Saga列表
"""
return list(self._completed_sagas.values())
# 示例实现一个电商订单处理的Saga
# 模拟服务
class InventoryService:
"""库存服务"""
def __init__(self):
self.inventory = {
"product-1": 10,
"product-2": 5
}
def reserve(self, product_id: str, quantity: int) -> Dict[str, Any]:
"""预留库存"""
if product_id not in self.inventory or self.inventory[product_id] < quantity:
raise ValueError(f"Insufficient inventory for product {product_id}")
self.inventory[product_id] -= quantity
reservation_id = f"res-{uuid.uuid4()}"
print(f"Reserved {quantity} of {product_id}, reservation ID: {reservation_id}")
return {
"reservation_id": reservation_id,
"product_id": product_id,
"quantity": quantity
}
def cancel_reservation(self, reservation: Dict[str, Any]) -> None:
"""取消库存预留"""
product_id = reservation["product_id"]
quantity = reservation["quantity"]
self.inventory[product_id] += quantity
print(f"Cancelled reservation {reservation['reservation_id']} for {quantity} of {product_id}")
class PaymentService:
"""支付服务"""
def __init__(self):
self.payments = {}
def process_payment(self, user_id: str, amount: float) -> Dict[str, Any]:
"""处理支付"""
if amount <= 0:
raise ValueError("Payment amount must be positive")
payment_id = f"pay-{uuid.uuid4()}"
self.payments[payment_id] = {
"user_id": user_id,
"amount": amount,
"status": "completed",
"created_at": time.time()
}
print(f"Processed payment {payment_id} of ${amount} for user {user_id}")
return {
"payment_id": payment_id,
"user_id": user_id,
"amount": amount
}
def refund_payment(self, payment: Dict[str, Any]) -> None:
"""退款"""
payment_id = payment["payment_id"]
if payment_id in self.payments:
self.payments[payment_id]["status"] = "refunded"
print(f"Refunded payment {payment_id} of ${payment['amount']} for user {payment['user_id']}")
class ShippingService:
"""物流服务"""
def __init__(self):
self.shipments = {}
def create_shipment(self, order_id: str, address: Dict[str, str]) -> Dict[str, Any]:
"""创建物流订单"""
shipment_id = f"ship-{uuid.uuid4()}"
self.shipments[shipment_id] = {
"order_id": order_id,
"address": address,
"status": "created",
"created_at": time.time()
}
print(f"Created shipment {shipment_id} for order {order_id}")
return {
"shipment_id": shipment_id,
"order_id": order_id
}
def cancel_shipment(self, shipment: Dict[str, Any]) -> None:
"""取消物流订单"""
shipment_id = shipment["shipment_id"]
if shipment_id in self.shipments:
self.shipments[shipment_id]["status"] = "cancelled"
print(f"Cancelled shipment {shipment_id} for order {shipment['order_id']}")
# 订单处理Saga
def create_order_saga_definition(inventory_service, payment_service, shipping_service) -> SagaDefinition:
"""创建订单处理的Saga定义"""
def reserve_inventory(context: Dict[str, Any]) -> Dict[str, Any]:
"""预留库存"""
order = context["order"]
return inventory_service.reserve(order["product_id"], order["quantity"])
def cancel_inventory_reservation(context: Dict[str, Any]) -> None:
"""取消库存预留"""
reservation = context.get("step_0_result")
if reservation:
inventory_service.cancel_reservation(reservation)
def process_payment(context: Dict[str, Any]) -> Dict[str, Any]:
"""处理支付"""
order = context["order"]
return payment_service.process_payment(order["user_id"], order["amount"])
def refund_payment(context: Dict[str, Any]) -> None:
"""退款"""
payment = context.get("step_1_result")
if payment:
payment_service.refund_payment(payment)
def create_shipment(context: Dict[str, Any]) -> Dict[str, Any]:
"""创建物流订单"""
order = context["order"]
return shipping_service.create_shipment(order["order_id"], order["shipping_address"])
def cancel_shipment(context: Dict[str, Any]) -> None:
"""取消物流订单"""
shipment = context.get("step_2_result")
if shipment:
shipping_service.cancel_shipment(shipment)
# 创建Saga定义
saga_def = SagaDefinition("OrderProcessingSaga")
saga_def.add_step("ReserveInventory", reserve_inventory, cancel_inventory_reservation)
saga_def.add_step("ProcessPayment", process_payment, refund_payment)
saga_def.add_step("CreateShipment", create_shipment, cancel_shipment)
return saga_def
# 示例使用
if __name__ == "__main__":
# 初始化服务
inventory_service = InventoryService()
payment_service = PaymentService()
shipping_service = ShippingService()
# 创建Saga定义
order_saga_def = create_order_saga_definition(
inventory_service,
payment_service,
shipping_service
)
# 创建Saga协调器
coordinator = SagaCoordinator()
# 创建成功的订单场景
print("\n=== Test Case 1: Successful Order Processing ===")
order_context = {
"order": {
"order_id": "order-1",
"user_id": "user-1",
"product_id": "product-1",
"quantity": 2,
"amount": 99.99,
"shipping_address": {
"street": "123 Main St",
"city": "Any City",
"zip": "12345"
}
}
}
saga1 = coordinator.create_saga(order_saga_def, order_context)
result1 = coordinator.execute_saga(saga1)
print(f"Saga 1 status: {result1.status.name}")
# 创建失败的订单场景(库存不足)
print("\n=== Test Case 2: Failed Order Processing (Insufficient Inventory) ===")
failed_order_context = {
"order": {
"order_id": "order-2",
"user_id": "user-1",
"product_id": "product-1",
"quantity": 100, # 库存不足
"amount": 4999.99,
"shipping_address": {
"street": "123 Main St",
"city": "Any City",
"zip": "12345"
}
}
}
saga2 = coordinator.create_saga(order_saga_def, failed_order_context)
result2 = coordinator.execute_saga(saga2)
print(f"Saga 2 status: {result2.status.name}")
# 创建失败的订单场景(支付失败)
print("\n=== Test Case 3: Failed Order Processing (Payment Failure) ===")
# 修改支付服务使其在特定金额下失败
original_process_payment = payment_service.process_payment
def failing_process_payment(user_id, amount):
if amount == 9999.99:
raise ValueError("Payment gateway rejected")
return original_process_payment(user_id, amount)
payment_service.process_payment = failing_process_payment
payment_failure_context = {
"order": {
"order_id": "order-3",
"user_id": "user-1",
"product_id": "product-2",
"quantity": 1,
"amount": 9999.99, # 触发支付失败
"shipping_address": {
"street": "123 Main St",
"city": "Any City",
"zip": "12345"
}
}
}
saga3 = coordinator.create_saga(order_saga_def, payment_failure_context)
result3 = coordinator.execute_saga(saga3)
print(f"Saga 3 status: {result3.status.name}")
# 显示Saga状态
print("\n=== Saga Execution Summary ===")
print(f"Completed Sagas: {len(coordinator.get_completed_sagas())}")
print(f"Active Sagas: {len(coordinator.get_active_sagas())}")