|
|
"""
|
|
|
分布式事务框架 - 支持多种补偿策略
|
|
|
|
|
|
此模块实现了一个完整的分布式事务框架,支持多种补偿策略,
|
|
|
包括即时补偿、定时补偿、手动补偿等,用于处理复杂的分布式事务场景。
|
|
|
"""
|
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
from enum import Enum, auto
|
|
|
from typing import Dict, List, Optional, Any, Callable, Tuple, Set
|
|
|
import uuid
|
|
|
import time
|
|
|
import threading
|
|
|
import logging
|
|
|
|
|
|
# 配置日志
|
|
|
logging.basicConfig(level=logging.INFO,
|
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
logger = logging.getLogger("distributed_transaction")
|
|
|
|
|
|
|
|
|
class TransactionStatus(Enum):
|
|
|
"""
|
|
|
事务状态枚举
|
|
|
"""
|
|
|
CREATED = auto() # 事务已创建
|
|
|
RUNNING = auto() # 事务运行中
|
|
|
COMMITTING = auto() # 事务提交中
|
|
|
COMMITTED = auto() # 事务已提交
|
|
|
ROLLING_BACK = auto() # 事务回滚中
|
|
|
ROLLED_BACK = auto() # 事务已回滚
|
|
|
COMPENSATING = auto() # 事务补偿中
|
|
|
COMPENSATED = auto() # 事务已补偿
|
|
|
FAILED = auto() # 事务失败
|
|
|
SUSPENDED = auto() # 事务挂起
|
|
|
|
|
|
|
|
|
class CompensationStrategy(Enum):
|
|
|
"""
|
|
|
补偿策略枚举
|
|
|
"""
|
|
|
IMMEDIATE = auto() # 立即补偿
|
|
|
SCHEDULED = auto() # 定时补偿
|
|
|
MANUAL = auto() # 手动补偿
|
|
|
RETRY_THEN_COMPENSATE = auto() # 重试后补偿
|
|
|
|
|
|
|
|
|
class TransactionParticipant(ABC):
|
|
|
"""
|
|
|
事务参与者抽象基类
|
|
|
|
|
|
表示分布式事务中的一个参与者
|
|
|
"""
|
|
|
|
|
|
@abstractmethod
|
|
|
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
准备阶段
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
context: 事务上下文
|
|
|
|
|
|
Returns:
|
|
|
是否准备成功
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
提交阶段
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
context: 事务上下文
|
|
|
|
|
|
Returns:
|
|
|
是否提交成功
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
回滚阶段
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
context: 事务上下文
|
|
|
|
|
|
Returns:
|
|
|
是否回滚成功
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
补偿操作
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
context: 事务上下文
|
|
|
|
|
|
Returns:
|
|
|
是否补偿成功
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
|
|
|
class TransactionLog(ABC):
|
|
|
"""
|
|
|
事务日志抽象基类
|
|
|
|
|
|
用于记录事务执行过程中的各种事件
|
|
|
"""
|
|
|
|
|
|
@abstractmethod
|
|
|
def log_transaction_start(self, transaction_id: str, context: Dict[str, Any]) -> None:
|
|
|
"""记录事务开始"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def log_transaction_status(self, transaction_id: str, status: TransactionStatus) -> None:
|
|
|
"""记录事务状态变更"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def log_participant_prepare(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者准备结果"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def log_participant_commit(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者提交结果"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def log_participant_rollback(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者回滚结果"""
|
|
|
pass
|
|
|
|
|
|
@abstractmethod
|
|
|
def log_participant_compensation(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者补偿结果"""
|
|
|
pass
|
|
|
|
|
|
|
|
|
class InMemoryTransactionLog(TransactionLog):
|
|
|
"""
|
|
|
内存事务日志实现
|
|
|
|
|
|
用于演示的内存事务日志
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self._logs: Dict[str, List[Dict[str, Any]]] = {}
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
def _add_log(self, transaction_id: str, log_entry: Dict[str, Any]) -> None:
|
|
|
"""添加日志条目"""
|
|
|
with self._lock:
|
|
|
if transaction_id not in self._logs:
|
|
|
self._logs[transaction_id] = []
|
|
|
self._logs[transaction_id].append({
|
|
|
"timestamp": time.time(),
|
|
|
**log_entry
|
|
|
})
|
|
|
|
|
|
def log_transaction_start(self, transaction_id: str, context: Dict[str, Any]) -> None:
|
|
|
"""记录事务开始"""
|
|
|
self._add_log(transaction_id, {
|
|
|
"type": "transaction_start",
|
|
|
"context": context
|
|
|
})
|
|
|
logger.info(f"Transaction {transaction_id} started")
|
|
|
|
|
|
def log_transaction_status(self, transaction_id: str, status: TransactionStatus) -> None:
|
|
|
"""记录事务状态变更"""
|
|
|
self._add_log(transaction_id, {
|
|
|
"type": "transaction_status",
|
|
|
"status": status.name
|
|
|
})
|
|
|
logger.info(f"Transaction {transaction_id} status changed to {status.name}")
|
|
|
|
|
|
def log_participant_prepare(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者准备结果"""
|
|
|
self._add_log(transaction_id, {
|
|
|
"type": "participant_prepare",
|
|
|
"participant_id": participant_id,
|
|
|
"success": success
|
|
|
})
|
|
|
logger.info(f"Transaction {transaction_id}: Participant {participant_id} prepare {'succeeded' if success else 'failed'}")
|
|
|
|
|
|
def log_participant_commit(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者提交结果"""
|
|
|
self._add_log(transaction_id, {
|
|
|
"type": "participant_commit",
|
|
|
"participant_id": participant_id,
|
|
|
"success": success
|
|
|
})
|
|
|
logger.info(f"Transaction {transaction_id}: Participant {participant_id} commit {'succeeded' if success else 'failed'}")
|
|
|
|
|
|
def log_participant_rollback(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者回滚结果"""
|
|
|
self._add_log(transaction_id, {
|
|
|
"type": "participant_rollback",
|
|
|
"participant_id": participant_id,
|
|
|
"success": success
|
|
|
})
|
|
|
logger.info(f"Transaction {transaction_id}: Participant {participant_id} rollback {'succeeded' if success else 'failed'}")
|
|
|
|
|
|
def log_participant_compensation(self, transaction_id: str, participant_id: str, success: bool) -> None:
|
|
|
"""记录参与者补偿结果"""
|
|
|
self._add_log(transaction_id, {
|
|
|
"type": "participant_compensation",
|
|
|
"participant_id": participant_id,
|
|
|
"success": success
|
|
|
})
|
|
|
logger.info(f"Transaction {transaction_id}: Participant {participant_id} compensation {'succeeded' if success else 'failed'}")
|
|
|
|
|
|
def get_transaction_logs(self, transaction_id: str) -> List[Dict[str, Any]]:
|
|
|
"""获取事务的所有日志"""
|
|
|
with self._lock:
|
|
|
return self._logs.get(transaction_id, []).copy()
|
|
|
|
|
|
|
|
|
class Compensator(ABC):
|
|
|
"""
|
|
|
补偿器抽象基类
|
|
|
|
|
|
定义不同补偿策略的实现
|
|
|
"""
|
|
|
|
|
|
@abstractmethod
|
|
|
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
|
|
|
participant_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""
|
|
|
执行补偿操作
|
|
|
|
|
|
Args:
|
|
|
transaction: 分布式事务实例
|
|
|
participant: 事务参与者
|
|
|
participant_id: 参与者ID
|
|
|
context: 补偿上下文
|
|
|
|
|
|
Returns:
|
|
|
是否补偿成功
|
|
|
"""
|
|
|
pass
|
|
|
|
|
|
|
|
|
class ImmediateCompensator(Compensator):
|
|
|
"""
|
|
|
立即补偿器
|
|
|
|
|
|
立即执行补偿操作
|
|
|
"""
|
|
|
|
|
|
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
|
|
|
participant_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""执行立即补偿"""
|
|
|
logger.info(f"Performing immediate compensation for participant {participant_id}")
|
|
|
try:
|
|
|
return participant.compensate(transaction.transaction_id, context)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during immediate compensation: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
class ScheduledCompensator(Compensator):
|
|
|
"""
|
|
|
定时补偿器
|
|
|
|
|
|
延迟执行补偿操作
|
|
|
"""
|
|
|
|
|
|
def __init__(self, delay_seconds: int = 60):
|
|
|
"""
|
|
|
初始化定时补偿器
|
|
|
|
|
|
Args:
|
|
|
delay_seconds: 延迟执行的秒数
|
|
|
"""
|
|
|
self.delay_seconds = delay_seconds
|
|
|
self._scheduled_compensations: List[Tuple['DistributedTransaction', TransactionParticipant,
|
|
|
str, Dict[str, Any]]] = []
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
# 启动调度线程
|
|
|
self._stop_event = threading.Event()
|
|
|
self._scheduler_thread = threading.Thread(target=self._scheduler_loop)
|
|
|
self._scheduler_thread.daemon = True
|
|
|
self._scheduler_thread.start()
|
|
|
|
|
|
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
|
|
|
participant_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""调度补偿操作"""
|
|
|
logger.info(f"Scheduling compensation for participant {participant_id} after {self.delay_seconds}s")
|
|
|
with self._lock:
|
|
|
self._scheduled_compensations.append((transaction, participant, participant_id, context))
|
|
|
return True # 调度成功
|
|
|
|
|
|
def _scheduler_loop(self) -> None:
|
|
|
"""调度循环"""
|
|
|
while not self._stop_event.is_set():
|
|
|
time.sleep(1)
|
|
|
self._process_scheduled_compensations()
|
|
|
|
|
|
def _process_scheduled_compensations(self) -> None:
|
|
|
"""处理定时的补偿操作"""
|
|
|
with self._lock:
|
|
|
if not self._scheduled_compensations:
|
|
|
return
|
|
|
|
|
|
# 这里简化处理,实际应该记录调度时间
|
|
|
transaction, participant, participant_id, context = self._scheduled_compensations.pop(0)
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Executing scheduled compensation for participant {participant_id}")
|
|
|
success = participant.compensate(transaction.transaction_id, context)
|
|
|
transaction.transaction_log.log_participant_compensation(
|
|
|
transaction.transaction_id, participant_id, success
|
|
|
)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during scheduled compensation: {str(e)}")
|
|
|
|
|
|
def shutdown(self) -> None:
|
|
|
"""关闭补偿器"""
|
|
|
self._stop_event.set()
|
|
|
if self._scheduler_thread.is_alive():
|
|
|
self._scheduler_thread.join(timeout=2.0)
|
|
|
|
|
|
|
|
|
class RetryThenCompensateCompensator(Compensator):
|
|
|
"""
|
|
|
重试后补偿器
|
|
|
|
|
|
先尝试重试操作,失败后再进行补偿
|
|
|
"""
|
|
|
|
|
|
def __init__(self, max_retries: int = 3, retry_interval: int = 5):
|
|
|
"""
|
|
|
初始化重试后补偿器
|
|
|
|
|
|
Args:
|
|
|
max_retries: 最大重试次数
|
|
|
retry_interval: 重试间隔(秒)
|
|
|
"""
|
|
|
self.max_retries = max_retries
|
|
|
self.retry_interval = retry_interval
|
|
|
|
|
|
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
|
|
|
participant_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""执行重试后补偿策略"""
|
|
|
logger.info(f"Applying retry-then-compensate strategy for participant {participant_id}")
|
|
|
|
|
|
# 先尝试重试提交
|
|
|
for attempt in range(1, self.max_retries + 1):
|
|
|
logger.info(f"Retry attempt {attempt}/{self.max_retries} for participant {participant_id}")
|
|
|
try:
|
|
|
if participant.commit(transaction.transaction_id, context):
|
|
|
logger.info(f"Retry succeeded on attempt {attempt}")
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Retry attempt {attempt} failed: {str(e)}")
|
|
|
|
|
|
time.sleep(self.retry_interval)
|
|
|
|
|
|
# 重试失败后执行补偿
|
|
|
logger.info(f"All retries failed, performing compensation for participant {participant_id}")
|
|
|
try:
|
|
|
return participant.compensate(transaction.transaction_id, context)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during compensation after retries: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
class ManualCompensator(Compensator):
|
|
|
"""
|
|
|
手动补偿器
|
|
|
|
|
|
标记需要手动介入进行补偿
|
|
|
"""
|
|
|
|
|
|
def __init__(self):
|
|
|
"""初始化手动补偿器"""
|
|
|
self._pending_compensations: List[Dict[str, Any]] = []
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
|
|
|
participant_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""标记为手动补偿"""
|
|
|
logger.warning(f"Manual compensation required for participant {participant_id}")
|
|
|
with self._lock:
|
|
|
self._pending_compensations.append({
|
|
|
"transaction_id": transaction.transaction_id,
|
|
|
"participant_id": participant_id,
|
|
|
"context": context,
|
|
|
"participant": participant,
|
|
|
"created_at": time.time()
|
|
|
})
|
|
|
return True # 标记成功
|
|
|
|
|
|
def get_pending_compensations(self) -> List[Dict[str, Any]]:
|
|
|
"""获取待手动补偿的项目"""
|
|
|
with self._lock:
|
|
|
return self._pending_compensations.copy()
|
|
|
|
|
|
def execute_pending_compensation(self, transaction_id: str, participant_id: str) -> bool:
|
|
|
"""执行待处理的补偿"""
|
|
|
with self._lock:
|
|
|
for i, item in enumerate(self._pending_compensations):
|
|
|
if item["transaction_id"] == transaction_id and item["participant_id"] == participant_id:
|
|
|
compensation_item = self._pending_compensations.pop(i)
|
|
|
break
|
|
|
else:
|
|
|
logger.error(f"Pending compensation not found: {transaction_id}, {participant_id}")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
logger.info(f"Manually executing compensation for {participant_id} in transaction {transaction_id}")
|
|
|
participant = compensation_item["participant"]
|
|
|
context = compensation_item["context"]
|
|
|
return participant.compensate(transaction_id, context)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during manual compensation: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
|
|
|
class CompensatorFactory:
|
|
|
"""
|
|
|
补偿器工厂类
|
|
|
|
|
|
根据策略创建对应的补偿器
|
|
|
"""
|
|
|
|
|
|
_compensators: Dict[CompensationStrategy, Compensator] = {}
|
|
|
|
|
|
@classmethod
|
|
|
def get_compensator(cls, strategy: CompensationStrategy) -> Compensator:
|
|
|
"""
|
|
|
获取指定策略的补偿器
|
|
|
|
|
|
Args:
|
|
|
strategy: 补偿策略
|
|
|
|
|
|
Returns:
|
|
|
补偿器实例
|
|
|
"""
|
|
|
if strategy not in cls._compensators:
|
|
|
if strategy == CompensationStrategy.IMMEDIATE:
|
|
|
cls._compensators[strategy] = ImmediateCompensator()
|
|
|
elif strategy == CompensationStrategy.SCHEDULED:
|
|
|
cls._compensators[strategy] = ScheduledCompensator()
|
|
|
elif strategy == CompensationStrategy.RETRY_THEN_COMPENSATE:
|
|
|
cls._compensators[strategy] = RetryThenCompensateCompensator()
|
|
|
elif strategy == CompensationStrategy.MANUAL:
|
|
|
cls._compensators[strategy] = ManualCompensator()
|
|
|
else:
|
|
|
raise ValueError(f"Unknown compensation strategy: {strategy}")
|
|
|
|
|
|
return cls._compensators[strategy]
|
|
|
|
|
|
@classmethod
|
|
|
def shutdown_all(cls) -> None:
|
|
|
"""关闭所有补偿器"""
|
|
|
for compensator in cls._compensators.values():
|
|
|
if hasattr(compensator, 'shutdown'):
|
|
|
compensator.shutdown()
|
|
|
|
|
|
|
|
|
class DistributedTransaction:
|
|
|
"""
|
|
|
分布式事务类
|
|
|
|
|
|
表示一个分布式事务实例
|
|
|
"""
|
|
|
|
|
|
def __init__(self, transaction_id: str, transaction_log: TransactionLog,
|
|
|
compensation_strategy: CompensationStrategy = CompensationStrategy.IMMEDIATE):
|
|
|
"""
|
|
|
初始化分布式事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
transaction_log: 事务日志
|
|
|
compensation_strategy: 补偿策略
|
|
|
"""
|
|
|
self.transaction_id = transaction_id
|
|
|
self.transaction_log = transaction_log
|
|
|
self.compensation_strategy = compensation_strategy
|
|
|
self.status = TransactionStatus.CREATED
|
|
|
self.participants: Dict[str, TransactionParticipant] = {}
|
|
|
self.prepared_participants: Set[str] = set()
|
|
|
self.committed_participants: Set[str] = set()
|
|
|
self.context: Dict[str, Any] = {}
|
|
|
self.created_at = time.time()
|
|
|
self.started_at: Optional[float] = None
|
|
|
self.completed_at: Optional[float] = None
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
def add_participant(self, participant_id: str, participant: TransactionParticipant) -> None:
|
|
|
"""
|
|
|
添加事务参与者
|
|
|
|
|
|
Args:
|
|
|
participant_id: 参与者ID
|
|
|
participant: 参与者实例
|
|
|
"""
|
|
|
with self._lock:
|
|
|
if self.status != TransactionStatus.CREATED:
|
|
|
raise ValueError(f"Cannot add participant to transaction in {self.status.name} status")
|
|
|
|
|
|
self.participants[participant_id] = participant
|
|
|
logger.info(f"Added participant {participant_id} to transaction {self.transaction_id}")
|
|
|
|
|
|
def begin(self, context: Optional[Dict[str, Any]] = None) -> None:
|
|
|
"""
|
|
|
开始事务
|
|
|
|
|
|
Args:
|
|
|
context: 事务上下文
|
|
|
"""
|
|
|
with self._lock:
|
|
|
if self.status != TransactionStatus.CREATED:
|
|
|
raise ValueError(f"Transaction already {self.status.name}")
|
|
|
|
|
|
self.status = TransactionStatus.RUNNING
|
|
|
self.started_at = time.time()
|
|
|
if context:
|
|
|
self.context.update(context)
|
|
|
|
|
|
self.transaction_log.log_transaction_start(self.transaction_id, self.context)
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
def commit(self) -> bool:
|
|
|
"""
|
|
|
提交事务
|
|
|
|
|
|
Returns:
|
|
|
是否提交成功
|
|
|
"""
|
|
|
with self._lock:
|
|
|
if self.status != TransactionStatus.RUNNING:
|
|
|
raise ValueError(f"Cannot commit transaction in {self.status.name} status")
|
|
|
|
|
|
self.status = TransactionStatus.COMMITTING
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
try:
|
|
|
# 两阶段提交:准备阶段
|
|
|
prepare_success = self._prepare_all()
|
|
|
|
|
|
if prepare_success:
|
|
|
# 提交阶段
|
|
|
commit_success = self._commit_all()
|
|
|
|
|
|
with self._lock:
|
|
|
if commit_success:
|
|
|
self.status = TransactionStatus.COMMITTED
|
|
|
else:
|
|
|
self.status = TransactionStatus.FAILED
|
|
|
# 提交失败时需要进行补偿
|
|
|
self._compensate_all()
|
|
|
else:
|
|
|
# 准备失败,直接回滚
|
|
|
with self._lock:
|
|
|
self.status = TransactionStatus.ROLLING_BACK
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
self._rollback_all()
|
|
|
|
|
|
with self._lock:
|
|
|
self.status = TransactionStatus.ROLLED_BACK
|
|
|
|
|
|
with self._lock:
|
|
|
self.completed_at = time.time()
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
return self.status == TransactionStatus.COMMITTED
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during transaction commit: {str(e)}")
|
|
|
|
|
|
with self._lock:
|
|
|
self.status = TransactionStatus.FAILED
|
|
|
self.completed_at = time.time()
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
# 异常时进行补偿
|
|
|
self._compensate_all()
|
|
|
return False
|
|
|
|
|
|
def rollback(self) -> bool:
|
|
|
"""
|
|
|
回滚事务
|
|
|
|
|
|
Returns:
|
|
|
是否回滚成功
|
|
|
"""
|
|
|
with self._lock:
|
|
|
if self.status not in [TransactionStatus.RUNNING, TransactionStatus.COMMITTING]:
|
|
|
raise ValueError(f"Cannot rollback transaction in {self.status.name} status")
|
|
|
|
|
|
self.status = TransactionStatus.ROLLING_BACK
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
try:
|
|
|
rollback_success = self._rollback_all()
|
|
|
|
|
|
with self._lock:
|
|
|
if rollback_success:
|
|
|
self.status = TransactionStatus.ROLLED_BACK
|
|
|
else:
|
|
|
self.status = TransactionStatus.FAILED
|
|
|
# 回滚失败时需要进行补偿
|
|
|
self._compensate_all()
|
|
|
|
|
|
self.completed_at = time.time()
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
return self.status == TransactionStatus.ROLLED_BACK
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during transaction rollback: {str(e)}")
|
|
|
|
|
|
with self._lock:
|
|
|
self.status = TransactionStatus.FAILED
|
|
|
self.completed_at = time.time()
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
return False
|
|
|
|
|
|
def compensate(self) -> bool:
|
|
|
"""
|
|
|
补偿事务
|
|
|
|
|
|
Returns:
|
|
|
是否补偿成功
|
|
|
"""
|
|
|
with self._lock:
|
|
|
if self.status not in [TransactionStatus.FAILED, TransactionStatus.COMMITTED]:
|
|
|
raise ValueError(f"Cannot compensate transaction in {self.status.name} status")
|
|
|
|
|
|
self.status = TransactionStatus.COMPENSATING
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
try:
|
|
|
compensate_success = self._compensate_all()
|
|
|
|
|
|
with self._lock:
|
|
|
if compensate_success:
|
|
|
self.status = TransactionStatus.COMPENSATED
|
|
|
else:
|
|
|
self.status = TransactionStatus.FAILED
|
|
|
|
|
|
self.completed_at = time.time()
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
return self.status == TransactionStatus.COMPENSATED
|
|
|
|
|
|
except Exception as e:
|
|
|
logger.error(f"Exception during transaction compensation: {str(e)}")
|
|
|
|
|
|
with self._lock:
|
|
|
self.status = TransactionStatus.FAILED
|
|
|
self.completed_at = time.time()
|
|
|
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
|
|
|
|
|
|
return False
|
|
|
|
|
|
def _prepare_all(self) -> bool:
|
|
|
"""准备所有参与者"""
|
|
|
all_prepared = True
|
|
|
|
|
|
for participant_id, participant in self.participants.items():
|
|
|
try:
|
|
|
success = participant.prepare(self.transaction_id, self.context)
|
|
|
self.transaction_log.log_participant_prepare(self.transaction_id, participant_id, success)
|
|
|
|
|
|
if success:
|
|
|
self.prepared_participants.add(participant_id)
|
|
|
else:
|
|
|
all_prepared = False
|
|
|
logger.error(f"Participant {participant_id} prepare failed")
|
|
|
|
|
|
except Exception as e:
|
|
|
all_prepared = False
|
|
|
logger.error(f"Exception during participant {participant_id} prepare: {str(e)}")
|
|
|
self.transaction_log.log_participant_prepare(self.transaction_id, participant_id, False)
|
|
|
|
|
|
return all_prepared
|
|
|
|
|
|
def _commit_all(self) -> bool:
|
|
|
"""提交所有参与者"""
|
|
|
all_committed = True
|
|
|
|
|
|
for participant_id, participant in self.participants.items():
|
|
|
if participant_id in self.prepared_participants:
|
|
|
try:
|
|
|
success = participant.commit(self.transaction_id, self.context)
|
|
|
self.transaction_log.log_participant_commit(self.transaction_id, participant_id, success)
|
|
|
|
|
|
if success:
|
|
|
self.committed_participants.add(participant_id)
|
|
|
else:
|
|
|
all_committed = False
|
|
|
logger.error(f"Participant {participant_id} commit failed")
|
|
|
|
|
|
except Exception as e:
|
|
|
all_committed = False
|
|
|
logger.error(f"Exception during participant {participant_id} commit: {str(e)}")
|
|
|
self.transaction_log.log_participant_commit(self.transaction_id, participant_id, False)
|
|
|
|
|
|
return all_committed
|
|
|
|
|
|
def _rollback_all(self) -> bool:
|
|
|
"""回滚所有参与者"""
|
|
|
all_rolled_back = True
|
|
|
|
|
|
for participant_id, participant in self.participants.items():
|
|
|
if participant_id in self.prepared_participants:
|
|
|
try:
|
|
|
success = participant.rollback(self.transaction_id, self.context)
|
|
|
self.transaction_log.log_participant_rollback(self.transaction_id, participant_id, success)
|
|
|
|
|
|
if not success:
|
|
|
all_rolled_back = False
|
|
|
logger.error(f"Participant {participant_id} rollback failed")
|
|
|
|
|
|
except Exception as e:
|
|
|
all_rolled_back = False
|
|
|
logger.error(f"Exception during participant {participant_id} rollback: {str(e)}")
|
|
|
self.transaction_log.log_participant_rollback(self.transaction_id, participant_id, False)
|
|
|
|
|
|
return all_rolled_back
|
|
|
|
|
|
def _compensate_all(self) -> bool:
|
|
|
"""补偿所有已提交的参与者"""
|
|
|
all_compensated = True
|
|
|
compensator = CompensatorFactory.get_compensator(self.compensation_strategy)
|
|
|
|
|
|
for participant_id, participant in self.participants.items():
|
|
|
if participant_id in self.committed_participants:
|
|
|
try:
|
|
|
success = compensator.compensate(self, participant, participant_id, self.context)
|
|
|
|
|
|
# 对于定时补偿和手动补偿,这里只记录调度成功
|
|
|
# 实际的补偿结果会在后续处理中记录
|
|
|
if self.compensation_strategy in [CompensationStrategy.IMMEDIATE,
|
|
|
CompensationStrategy.RETRY_THEN_COMPENSATE]:
|
|
|
self.transaction_log.log_participant_compensation(
|
|
|
self.transaction_id, participant_id, success
|
|
|
)
|
|
|
|
|
|
if not success:
|
|
|
all_compensated = False
|
|
|
logger.error(f"Participant {participant_id} compensation failed")
|
|
|
|
|
|
except Exception as e:
|
|
|
all_compensated = False
|
|
|
logger.error(f"Exception during participant {participant_id} compensation: {str(e)}")
|
|
|
self.transaction_log.log_participant_compensation(
|
|
|
self.transaction_id, participant_id, False
|
|
|
)
|
|
|
|
|
|
return all_compensated
|
|
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
|
"""转换为字典表示"""
|
|
|
return {
|
|
|
"transaction_id": self.transaction_id,
|
|
|
"status": self.status.name,
|
|
|
"compensation_strategy": self.compensation_strategy.name,
|
|
|
"participants": list(self.participants.keys()),
|
|
|
"prepared_participants": list(self.prepared_participants),
|
|
|
"committed_participants": list(self.committed_participants),
|
|
|
"context": self.context,
|
|
|
"created_at": self.created_at,
|
|
|
"started_at": self.started_at,
|
|
|
"completed_at": self.completed_at
|
|
|
}
|
|
|
|
|
|
|
|
|
class TransactionManager:
|
|
|
"""
|
|
|
事务管理器
|
|
|
|
|
|
负责创建和管理分布式事务
|
|
|
"""
|
|
|
|
|
|
def __init__(self, transaction_log: Optional[TransactionLog] = None):
|
|
|
"""
|
|
|
初始化事务管理器
|
|
|
|
|
|
Args:
|
|
|
transaction_log: 事务日志,默认为内存日志
|
|
|
"""
|
|
|
self.transaction_log = transaction_log or InMemoryTransactionLog()
|
|
|
self.active_transactions: Dict[str, DistributedTransaction] = {}
|
|
|
self.completed_transactions: Dict[str, DistributedTransaction] = {}
|
|
|
self._lock = threading.RLock()
|
|
|
|
|
|
def create_transaction(self, transaction_id: Optional[str] = None,
|
|
|
compensation_strategy: CompensationStrategy = CompensationStrategy.IMMEDIATE)
|
|
|
-> DistributedTransaction:
|
|
|
"""
|
|
|
创建分布式事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID,不提供则自动生成
|
|
|
compensation_strategy: 补偿策略
|
|
|
|
|
|
Returns:
|
|
|
创建的事务实例
|
|
|
"""
|
|
|
if transaction_id is None:
|
|
|
transaction_id = str(uuid.uuid4())
|
|
|
|
|
|
transaction = DistributedTransaction(transaction_id, self.transaction_log, compensation_strategy)
|
|
|
|
|
|
with self._lock:
|
|
|
self.active_transactions[transaction_id] = transaction
|
|
|
|
|
|
return transaction
|
|
|
|
|
|
def begin_transaction(self, transaction_id: str, context: Optional[Dict[str, Any]] = None) -> bool:
|
|
|
"""
|
|
|
开始事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
context: 事务上下文
|
|
|
|
|
|
Returns:
|
|
|
是否成功开始
|
|
|
"""
|
|
|
with self._lock:
|
|
|
transaction = self.active_transactions.get(transaction_id)
|
|
|
if not transaction:
|
|
|
logger.error(f"Transaction {transaction_id} not found")
|
|
|
return False
|
|
|
|
|
|
try:
|
|
|
transaction.begin(context)
|
|
|
return True
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to begin transaction {transaction_id}: {str(e)}")
|
|
|
return False
|
|
|
|
|
|
def commit_transaction(self, transaction_id: str) -> bool:
|
|
|
"""
|
|
|
提交事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
|
|
|
Returns:
|
|
|
是否成功提交
|
|
|
"""
|
|
|
with self._lock:
|
|
|
transaction = self.active_transactions.get(transaction_id)
|
|
|
if not transaction:
|
|
|
logger.error(f"Transaction {transaction_id} not found")
|
|
|
return False
|
|
|
|
|
|
success = transaction.commit()
|
|
|
|
|
|
with self._lock:
|
|
|
if transaction.status in [TransactionStatus.COMMITTED, TransactionStatus.ROLLED_BACK,
|
|
|
TransactionStatus.COMPENSATED, TransactionStatus.FAILED]:
|
|
|
self.active_transactions.pop(transaction_id, None)
|
|
|
self.completed_transactions[transaction_id] = transaction
|
|
|
|
|
|
return success
|
|
|
|
|
|
def rollback_transaction(self, transaction_id: str) -> bool:
|
|
|
"""
|
|
|
回滚事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
|
|
|
Returns:
|
|
|
是否成功回滚
|
|
|
"""
|
|
|
with self._lock:
|
|
|
transaction = self.active_transactions.get(transaction_id)
|
|
|
if not transaction:
|
|
|
logger.error(f"Transaction {transaction_id} not found")
|
|
|
return False
|
|
|
|
|
|
success = transaction.rollback()
|
|
|
|
|
|
with self._lock:
|
|
|
if transaction.status in [TransactionStatus.ROLLED_BACK, TransactionStatus.FAILED]:
|
|
|
self.active_transactions.pop(transaction_id, None)
|
|
|
self.completed_transactions[transaction_id] = transaction
|
|
|
|
|
|
return success
|
|
|
|
|
|
def compensate_transaction(self, transaction_id: str) -> bool:
|
|
|
"""
|
|
|
补偿事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
|
|
|
Returns:
|
|
|
是否成功补偿
|
|
|
"""
|
|
|
with self._lock:
|
|
|
transaction = self.active_transactions.get(transaction_id) or \
|
|
|
self.completed_transactions.get(transaction_id)
|
|
|
|
|
|
if not transaction:
|
|
|
logger.error(f"Transaction {transaction_id} not found")
|
|
|
return False
|
|
|
|
|
|
success = transaction.compensate()
|
|
|
|
|
|
with self._lock:
|
|
|
if transaction.status in [TransactionStatus.COMPENSATED, TransactionStatus.FAILED]:
|
|
|
if transaction_id in self.active_transactions:
|
|
|
self.active_transactions.pop(transaction_id)
|
|
|
self.completed_transactions[transaction_id] = transaction
|
|
|
|
|
|
return success
|
|
|
|
|
|
def get_transaction(self, transaction_id: str) -> Optional[DistributedTransaction]:
|
|
|
"""
|
|
|
获取事务
|
|
|
|
|
|
Args:
|
|
|
transaction_id: 事务ID
|
|
|
|
|
|
Returns:
|
|
|
事务实例,如果不存在则返回None
|
|
|
"""
|
|
|
with self._lock:
|
|
|
return self.active_transactions.get(transaction_id) or \
|
|
|
self.completed_transactions.get(transaction_id)
|
|
|
|
|
|
def get_active_transactions(self) -> List[DistributedTransaction]:
|
|
|
"""
|
|
|
获取所有活跃事务
|
|
|
|
|
|
Returns:
|
|
|
活跃事务列表
|
|
|
"""
|
|
|
with self._lock:
|
|
|
return list(self.active_transactions.values())
|
|
|
|
|
|
def get_completed_transactions(self) -> List[DistributedTransaction]:
|
|
|
"""
|
|
|
获取所有已完成事务
|
|
|
|
|
|
Returns:
|
|
|
已完成事务列表
|
|
|
"""
|
|
|
with self._lock:
|
|
|
return list(self.completed_transactions.values())
|
|
|
|
|
|
def shutdown(self) -> None:
|
|
|
"""关闭事务管理器"""
|
|
|
CompensatorFactory.shutdown_all()
|
|
|
|
|
|
|
|
|
# 示例:实现一个电商订单处理的分布式事务
|
|
|
|
|
|
class InventoryParticipant(TransactionParticipant):
|
|
|
"""库存服务参与者"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.inventory = {
|
|
|
"product-1": 10,
|
|
|
"product-2": 5
|
|
|
}
|
|
|
self.reservations: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""准备预留库存"""
|
|
|
order = context.get("order", {})
|
|
|
product_id = order.get("product_id")
|
|
|
quantity = order.get("quantity", 0)
|
|
|
|
|
|
if not product_id or product_id not in self.inventory or self.inventory[product_id] < quantity:
|
|
|
logger.error(f"Insufficient inventory for {product_id}")
|
|
|
return False
|
|
|
|
|
|
# 预留库存
|
|
|
self.reservations[transaction_id] = {
|
|
|
"product_id": product_id,
|
|
|
"quantity": quantity
|
|
|
}
|
|
|
|
|
|
# 预扣库存(准备阶段不实际扣减,只做预留标记)
|
|
|
# 在实际系统中,这里可能会有乐观锁或其他并发控制机制
|
|
|
|
|
|
logger.info(f"Prepared inventory reservation for {product_id}, quantity: {quantity}")
|
|
|
return True
|
|
|
|
|
|
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""提交库存预留(实际扣减库存)"""
|
|
|
reservation = self.reservations.get(transaction_id)
|
|
|
if not reservation:
|
|
|
logger.error(f"Reservation not found for transaction {transaction_id}")
|
|
|
return False
|
|
|
|
|
|
product_id = reservation["product_id"]
|
|
|
quantity = reservation["quantity"]
|
|
|
|
|
|
# 实际扣减库存
|
|
|
self.inventory[product_id] -= quantity
|
|
|
|
|
|
logger.info(f"Committed inventory reservation: {product_id}, quantity: {quantity}")
|
|
|
return True
|
|
|
|
|
|
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""回滚库存预留"""
|
|
|
# 移除预留记录即可
|
|
|
if transaction_id in self.reservations:
|
|
|
del self.reservations[transaction_id]
|
|
|
logger.info(f"Rolled back inventory reservation for transaction {transaction_id}")
|
|
|
return True
|
|
|
|
|
|
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""补偿库存(恢复库存)"""
|
|
|
reservation = self.reservations.get(transaction_id)
|
|
|
if reservation:
|
|
|
product_id = reservation["product_id"]
|
|
|
quantity = reservation["quantity"]
|
|
|
|
|
|
# 恢复库存
|
|
|
self.inventory[product_id] += quantity
|
|
|
|
|
|
logger.info(f"Compensated inventory: {product_id}, quantity: {quantity}")
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
class PaymentParticipant(TransactionParticipant):
|
|
|
"""支付服务参与者"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.pending_payments: Dict[str, Dict[str, Any]] = {}
|
|
|
self.completed_payments: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""准备支付"""
|
|
|
order = context.get("order", {})
|
|
|
user_id = order.get("user_id")
|
|
|
amount = order.get("amount", 0)
|
|
|
|
|
|
if not user_id or amount <= 0:
|
|
|
logger.error(f"Invalid payment information")
|
|
|
return False
|
|
|
|
|
|
# 验证支付信息,但不实际扣款
|
|
|
self.pending_payments[transaction_id] = {
|
|
|
"user_id": user_id,
|
|
|
"amount": amount
|
|
|
}
|
|
|
|
|
|
logger.info(f"Prepared payment for user {user_id}, amount: {amount}")
|
|
|
return True
|
|
|
|
|
|
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""提交支付(实际扣款)"""
|
|
|
payment = self.pending_payments.get(transaction_id)
|
|
|
if not payment:
|
|
|
logger.error(f"Pending payment not found for transaction {transaction_id}")
|
|
|
return False
|
|
|
|
|
|
# 模拟支付处理
|
|
|
payment_id = f"pay-{uuid.uuid4()}"
|
|
|
payment["payment_id"] = payment_id
|
|
|
|
|
|
# 移到已完成支付
|
|
|
self.completed_payments[transaction_id] = payment
|
|
|
del self.pending_payments[transaction_id]
|
|
|
|
|
|
logger.info(f"Committed payment: {payment_id}")
|
|
|
return True
|
|
|
|
|
|
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""回滚支付"""
|
|
|
# 移除待处理支付记录
|
|
|
if transaction_id in self.pending_payments:
|
|
|
del self.pending_payments[transaction_id]
|
|
|
logger.info(f"Rolled back pending payment for transaction {transaction_id}")
|
|
|
return True
|
|
|
|
|
|
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""补偿支付(退款)"""
|
|
|
payment = self.completed_payments.get(transaction_id)
|
|
|
if payment:
|
|
|
# 模拟退款处理
|
|
|
refund_id = f"refund-{uuid.uuid4()}"
|
|
|
|
|
|
logger.info(f"Compensated payment: refund {refund_id} for {payment['amount']}")
|
|
|
# 在实际系统中,这里应该调用退款API
|
|
|
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
class ShippingParticipant(TransactionParticipant):
|
|
|
"""物流服务参与者"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.pending_shipments: Dict[str, Dict[str, Any]] = {}
|
|
|
self.confirmed_shipments: Dict[str, Dict[str, Any]] = {}
|
|
|
|
|
|
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""准备物流订单"""
|
|
|
order = context.get("order", {})
|
|
|
order_id = order.get("order_id")
|
|
|
address = order.get("shipping_address")
|
|
|
|
|
|
if not order_id or not address:
|
|
|
logger.error(f"Invalid shipping information")
|
|
|
return False
|
|
|
|
|
|
# 验证物流信息
|
|
|
self.pending_shipments[transaction_id] = {
|
|
|
"order_id": order_id,
|
|
|
"address": address
|
|
|
}
|
|
|
|
|
|
logger.info(f"Prepared shipping for order {order_id}")
|
|
|
return True
|
|
|
|
|
|
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""提交物流订单"""
|
|
|
shipment = self.pending_shipments.get(transaction_id)
|
|
|
if not shipment:
|
|
|
logger.error(f"Pending shipment not found for transaction {transaction_id}")
|
|
|
return False
|
|
|
|
|
|
# 确认物流订单
|
|
|
shipment_id = f"ship-{uuid.uuid4()}"
|
|
|
shipment["shipment_id"] = shipment_id
|
|
|
|
|
|
# 移到已确认物流
|
|
|
self.confirmed_shipments[transaction_id] = shipment
|
|
|
del self.pending_shipments[transaction_id]
|
|
|
|
|
|
logger.info(f"Committed shipment: {shipment_id}")
|
|
|
return True
|
|
|
|
|
|
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""回滚物流订单"""
|
|
|
# 移除待处理物流记录
|
|
|
if transaction_id in self.pending_shipments:
|
|
|
del self.pending_shipments[transaction_id]
|
|
|
logger.info(f"Rolled back pending shipment for transaction {transaction_id}")
|
|
|
return True
|
|
|
|
|
|
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
|
|
|
"""补偿物流(取消物流)"""
|
|
|
shipment = self.confirmed_shipments.get(transaction_id)
|
|
|
if shipment:
|
|
|
# 模拟取消物流
|
|
|
logger.info(f"Compensated shipment: cancelled {shipment['shipment_id']}")
|
|
|
# 在实际系统中,这里应该调用取消物流API
|
|
|
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
# 示例使用
|
|
|
if __name__ == "__main__":
|
|
|
print("===== 分布式事务框架示例 =====")
|
|
|
|
|
|
# 创建事务管理器
|
|
|
transaction_manager = TransactionManager()
|
|
|
|
|
|
# 创建服务参与者
|
|
|
inventory_participant = InventoryParticipant()
|
|
|
payment_participant = PaymentParticipant()
|
|
|
shipping_participant = ShippingParticipant()
|
|
|
|
|
|
# 测试场景1:成功的事务
|
|
|
print("\n--- 测试场景1: 成功的事务 ---")
|
|
|
|
|
|
# 创建事务
|
|
|
tx1 = transaction_manager.create_transaction(
|
|
|
transaction_id="tx-success",
|
|
|
compensation_strategy=CompensationStrategy.IMMEDIATE
|
|
|
)
|
|
|
|
|
|
# 添加参与者
|
|
|
tx1.add_participant("inventory", inventory_participant)
|
|
|
tx1.add_participant("payment", payment_participant)
|
|
|
tx1.add_participant("shipping", shipping_participant)
|
|
|
|
|
|
# 定义事务上下文
|
|
|
order_context = {
|
|
|
"order": {
|
|
|
"order_id": "order-1",
|
|
|
"user_id": "user-1",
|
|
|
"product_id": "product-1",
|
|
|
"quantity": 2,
|
|
|
"amount": 199.98,
|
|
|
"shipping_address": {
|
|
|
"street": "123 Main St",
|
|
|
"city": "Any City",
|
|
|
"zip": "12345"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
# 开始事务
|
|
|
tx1.begin(order_context)
|
|
|
|
|
|
# 提交事务
|
|
|
success = transaction_manager.commit_transaction("tx-success")
|
|
|
print(f"Transaction result: {'Success' if success else 'Failed'}")
|
|
|
print(f"Final transaction status: {tx1.status.name}")
|
|
|
|
|
|
# 测试场景2:失败的事务(支付失败)
|
|
|
print("\n--- 测试场景2: 失败的事务(支付失败) ---")
|
|
|
|
|
|
# 修改支付参与者使其在特定金额下失败
|
|
|
original_prepare = payment_participant.prepare
|
|
|
|
|
|
def failing_prepare(transaction_id, context):
|
|
|
order = context.get("order", {})
|
|
|
if order.get("amount") == 9999.99:
|
|
|
logger.error("Payment preparation failed for test purposes")
|
|
|
return False
|
|
|
return original_prepare(transaction_id, context)
|
|
|
|
|
|
payment_participant.prepare = failing_prepare
|
|
|
|
|
|
# 创建事务
|
|
|
tx2 = transaction_manager.create_transaction(
|
|
|
transaction_id="tx-fail",
|
|
|
compensation_strategy=CompensationStrategy.IMMEDIATE
|
|
|
)
|
|
|
|
|
|
# 添加参与者
|
|
|
tx2.add_participant("inventory", inventory_participant)
|
|
|
tx2.add_participant("payment", payment_participant)
|
|
|
tx2.add_participant("shipping", shipping_participant)
|
|
|
|
|
|
# 定义失败的事务上下文
|
|
|
failed_order_context = {
|
|
|
"order": {
|
|
|
"order_id": "order-2",
|
|
|
"user_id": "user-1",
|
|
|
"product_id": "product-2",
|
|
|
"quantity": 1,
|
|
|
"amount": 9999.99, # 触发支付失败
|
|
|
"shipping_address": {
|
|
|
"street": "123 Main St",
|
|
|
"city": "Any City",
|
|
|
"zip": "12345"
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
# 开始事务
|
|
|
tx2.begin(failed_order_context)
|
|
|
|
|
|
# 提交事务(应该失败)
|
|
|
success = transaction_manager.commit_transaction("tx-fail")
|
|
|
print(f"Transaction result: {'Success' if success else 'Failed'}")
|
|
|
print(f"Final transaction status: {tx2.status.name}")
|
|
|
|
|
|
# 测试场景3:使用不同的补偿策略
|
|
|
print("\n--- 测试场景3: 使用重试后补偿策略 ---")
|
|
|
|
|
|
# 恢复支付参与者
|
|
|
payment_participant.prepare = original_prepare
|
|
|
|
|
|
# 创建使用重试后补偿策略的事务
|
|
|
tx3 = transaction_manager.create_transaction(
|
|
|
transaction_id="tx-retry-compensate",
|
|
|
compensation_strategy=CompensationStrategy.RETRY_THEN_COMPENSATE
|
|
|
)
|
|
|
|
|
|
# 添加参与者
|
|
|
tx3.add_participant("inventory", inventory_participant)
|
|
|
tx3.add_participant("payment", payment_participant)
|
|
|
tx3.add_participant("shipping", shipping_participant)
|
|
|
|
|
|
# 开始并提交事务
|
|
|
tx3.begin(order_context)
|
|
|
success = transaction_manager.commit_transaction("tx-retry-compensate")
|
|
|
print(f"Transaction result: {'Success' if success else 'Failed'}")
|
|
|
print(f"Final transaction status: {tx3.status.name}")
|
|
|
|
|
|
# 测试场景4:手动补偿
|
|
|
print("\n--- 测试场景4: 手动补偿 ---")
|
|
|
|
|
|
# 创建使用手动补偿策略的事务
|
|
|
tx4 = transaction_manager.create_transaction(
|
|
|
transaction_id="tx-manual",
|
|
|
compensation_strategy=CompensationStrategy.MANUAL
|
|
|
)
|
|
|
|
|
|
# 添加参与者
|
|
|
tx4.add_participant("inventory", inventory_participant)
|
|
|
tx4.add_participant("payment", payment_participant)
|
|
|
tx4.add_participant("shipping", shipping_participant)
|
|
|
|
|
|
# 开始并提交事务
|
|
|
tx4.begin(order_context)
|
|
|
success = transaction_manager.commit_transaction("tx-manual")
|
|
|
print(f"Transaction result: {'Success' if success else 'Failed'}")
|
|
|
print(f"Final transaction status: {tx4.status.name}")
|
|
|
|
|
|
# 尝试手动补偿
|
|
|
if tx4.status == TransactionStatus.COMMITTED:
|
|
|
print("\n执行手动补偿...")
|
|
|
# 在实际系统中,这里应该通过管理界面触发
|
|
|
manual_compensator = CompensatorFactory.get_compensator(CompensationStrategy.MANUAL)
|
|
|
if isinstance(manual_compensator, ManualCompensator):
|
|
|
# 获取待补偿项目并执行
|
|
|
pending = manual_compensator.get_pending_compensations()
|
|
|
print(f"待手动补偿项目数量: {len(pending)}")
|
|
|
|
|
|
# 统计信息
|
|
|
print("\n--- 事务统计 ---")
|
|
|
print(f"活跃事务数量: {len(transaction_manager.get_active_transactions())}")
|
|
|
print(f"已完成事务数量: {len(transaction_manager.get_completed_transactions())}")
|
|
|
|
|
|
# 关闭事务管理器
|
|
|
transaction_manager.shutdown()
|
|
|
print("\n分布式事务框架已关闭") |