""" 分布式事务框架 - 支持多种补偿策略 此模块实现了一个完整的分布式事务框架,支持多种补偿策略, 包括即时补偿、定时补偿、手动补偿等,用于处理复杂的分布式事务场景。 """ from abc import ABC, abstractmethod from enum import Enum, auto from typing import Dict, List, Optional, Any, Callable, Tuple, Set import uuid import time import threading import logging # 配置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("distributed_transaction") class TransactionStatus(Enum): """ 事务状态枚举 """ CREATED = auto() # 事务已创建 RUNNING = auto() # 事务运行中 COMMITTING = auto() # 事务提交中 COMMITTED = auto() # 事务已提交 ROLLING_BACK = auto() # 事务回滚中 ROLLED_BACK = auto() # 事务已回滚 COMPENSATING = auto() # 事务补偿中 COMPENSATED = auto() # 事务已补偿 FAILED = auto() # 事务失败 SUSPENDED = auto() # 事务挂起 class CompensationStrategy(Enum): """ 补偿策略枚举 """ IMMEDIATE = auto() # 立即补偿 SCHEDULED = auto() # 定时补偿 MANUAL = auto() # 手动补偿 RETRY_THEN_COMPENSATE = auto() # 重试后补偿 class TransactionParticipant(ABC): """ 事务参与者抽象基类 表示分布式事务中的一个参与者 """ @abstractmethod def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool: """ 准备阶段 Args: transaction_id: 事务ID context: 事务上下文 Returns: 是否准备成功 """ pass @abstractmethod def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool: """ 提交阶段 Args: transaction_id: 事务ID context: 事务上下文 Returns: 是否提交成功 """ pass @abstractmethod def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool: """ 回滚阶段 Args: transaction_id: 事务ID context: 事务上下文 Returns: 是否回滚成功 """ pass @abstractmethod def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool: """ 补偿操作 Args: transaction_id: 事务ID context: 事务上下文 Returns: 是否补偿成功 """ pass class TransactionLog(ABC): """ 事务日志抽象基类 用于记录事务执行过程中的各种事件 """ @abstractmethod def log_transaction_start(self, transaction_id: str, context: Dict[str, Any]) -> None: """记录事务开始""" pass @abstractmethod def log_transaction_status(self, transaction_id: str, status: TransactionStatus) -> None: """记录事务状态变更""" pass @abstractmethod def log_participant_prepare(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者准备结果""" pass @abstractmethod def log_participant_commit(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者提交结果""" pass @abstractmethod def log_participant_rollback(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者回滚结果""" pass @abstractmethod def log_participant_compensation(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者补偿结果""" pass class InMemoryTransactionLog(TransactionLog): """ 内存事务日志实现 用于演示的内存事务日志 """ def __init__(self): self._logs: Dict[str, List[Dict[str, Any]]] = {} self._lock = threading.RLock() def _add_log(self, transaction_id: str, log_entry: Dict[str, Any]) -> None: """添加日志条目""" with self._lock: if transaction_id not in self._logs: self._logs[transaction_id] = [] self._logs[transaction_id].append({ "timestamp": time.time(), **log_entry }) def log_transaction_start(self, transaction_id: str, context: Dict[str, Any]) -> None: """记录事务开始""" self._add_log(transaction_id, { "type": "transaction_start", "context": context }) logger.info(f"Transaction {transaction_id} started") def log_transaction_status(self, transaction_id: str, status: TransactionStatus) -> None: """记录事务状态变更""" self._add_log(transaction_id, { "type": "transaction_status", "status": status.name }) logger.info(f"Transaction {transaction_id} status changed to {status.name}") def log_participant_prepare(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者准备结果""" self._add_log(transaction_id, { "type": "participant_prepare", "participant_id": participant_id, "success": success }) logger.info(f"Transaction {transaction_id}: Participant {participant_id} prepare {'succeeded' if success else 'failed'}") def log_participant_commit(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者提交结果""" self._add_log(transaction_id, { "type": "participant_commit", "participant_id": participant_id, "success": success }) logger.info(f"Transaction {transaction_id}: Participant {participant_id} commit {'succeeded' if success else 'failed'}") def log_participant_rollback(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者回滚结果""" self._add_log(transaction_id, { "type": "participant_rollback", "participant_id": participant_id, "success": success }) logger.info(f"Transaction {transaction_id}: Participant {participant_id} rollback {'succeeded' if success else 'failed'}") def log_participant_compensation(self, transaction_id: str, participant_id: str, success: bool) -> None: """记录参与者补偿结果""" self._add_log(transaction_id, { "type": "participant_compensation", "participant_id": participant_id, "success": success }) logger.info(f"Transaction {transaction_id}: Participant {participant_id} compensation {'succeeded' if success else 'failed'}") def get_transaction_logs(self, transaction_id: str) -> List[Dict[str, Any]]: """获取事务的所有日志""" with self._lock: return self._logs.get(transaction_id, []).copy() class Compensator(ABC): """ 补偿器抽象基类 定义不同补偿策略的实现 """ @abstractmethod def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant, participant_id: str, context: Dict[str, Any]) -> bool: """ 执行补偿操作 Args: transaction: 分布式事务实例 participant: 事务参与者 participant_id: 参与者ID context: 补偿上下文 Returns: 是否补偿成功 """ pass class ImmediateCompensator(Compensator): """ 立即补偿器 立即执行补偿操作 """ def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant, participant_id: str, context: Dict[str, Any]) -> bool: """执行立即补偿""" logger.info(f"Performing immediate compensation for participant {participant_id}") try: return participant.compensate(transaction.transaction_id, context) except Exception as e: logger.error(f"Exception during immediate compensation: {str(e)}") return False class ScheduledCompensator(Compensator): """ 定时补偿器 延迟执行补偿操作 """ def __init__(self, delay_seconds: int = 60): """ 初始化定时补偿器 Args: delay_seconds: 延迟执行的秒数 """ self.delay_seconds = delay_seconds self._scheduled_compensations: List[Tuple['DistributedTransaction', TransactionParticipant, str, Dict[str, Any]]] = [] self._lock = threading.RLock() # 启动调度线程 self._stop_event = threading.Event() self._scheduler_thread = threading.Thread(target=self._scheduler_loop) self._scheduler_thread.daemon = True self._scheduler_thread.start() def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant, participant_id: str, context: Dict[str, Any]) -> bool: """调度补偿操作""" logger.info(f"Scheduling compensation for participant {participant_id} after {self.delay_seconds}s") with self._lock: self._scheduled_compensations.append((transaction, participant, participant_id, context)) return True # 调度成功 def _scheduler_loop(self) -> None: """调度循环""" while not self._stop_event.is_set(): time.sleep(1) self._process_scheduled_compensations() def _process_scheduled_compensations(self) -> None: """处理定时的补偿操作""" with self._lock: if not self._scheduled_compensations: return # 这里简化处理,实际应该记录调度时间 transaction, participant, participant_id, context = self._scheduled_compensations.pop(0) try: logger.info(f"Executing scheduled compensation for participant {participant_id}") success = participant.compensate(transaction.transaction_id, context) transaction.transaction_log.log_participant_compensation( transaction.transaction_id, participant_id, success ) except Exception as e: logger.error(f"Exception during scheduled compensation: {str(e)}") def shutdown(self) -> None: """关闭补偿器""" self._stop_event.set() if self._scheduler_thread.is_alive(): self._scheduler_thread.join(timeout=2.0) class RetryThenCompensateCompensator(Compensator): """ 重试后补偿器 先尝试重试操作,失败后再进行补偿 """ def __init__(self, max_retries: int = 3, retry_interval: int = 5): """ 初始化重试后补偿器 Args: max_retries: 最大重试次数 retry_interval: 重试间隔(秒) """ self.max_retries = max_retries self.retry_interval = retry_interval def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant, participant_id: str, context: Dict[str, Any]) -> bool: """执行重试后补偿策略""" logger.info(f"Applying retry-then-compensate strategy for participant {participant_id}") # 先尝试重试提交 for attempt in range(1, self.max_retries + 1): logger.info(f"Retry attempt {attempt}/{self.max_retries} for participant {participant_id}") try: if participant.commit(transaction.transaction_id, context): logger.info(f"Retry succeeded on attempt {attempt}") return True except Exception as e: logger.warning(f"Retry attempt {attempt} failed: {str(e)}") time.sleep(self.retry_interval) # 重试失败后执行补偿 logger.info(f"All retries failed, performing compensation for participant {participant_id}") try: return participant.compensate(transaction.transaction_id, context) except Exception as e: logger.error(f"Exception during compensation after retries: {str(e)}") return False class ManualCompensator(Compensator): """ 手动补偿器 标记需要手动介入进行补偿 """ def __init__(self): """初始化手动补偿器""" self._pending_compensations: List[Dict[str, Any]] = [] self._lock = threading.RLock() def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant, participant_id: str, context: Dict[str, Any]) -> bool: """标记为手动补偿""" logger.warning(f"Manual compensation required for participant {participant_id}") with self._lock: self._pending_compensations.append({ "transaction_id": transaction.transaction_id, "participant_id": participant_id, "context": context, "participant": participant, "created_at": time.time() }) return True # 标记成功 def get_pending_compensations(self) -> List[Dict[str, Any]]: """获取待手动补偿的项目""" with self._lock: return self._pending_compensations.copy() def execute_pending_compensation(self, transaction_id: str, participant_id: str) -> bool: """执行待处理的补偿""" with self._lock: for i, item in enumerate(self._pending_compensations): if item["transaction_id"] == transaction_id and item["participant_id"] == participant_id: compensation_item = self._pending_compensations.pop(i) break else: logger.error(f"Pending compensation not found: {transaction_id}, {participant_id}") return False try: logger.info(f"Manually executing compensation for {participant_id} in transaction {transaction_id}") participant = compensation_item["participant"] context = compensation_item["context"] return participant.compensate(transaction_id, context) except Exception as e: logger.error(f"Exception during manual compensation: {str(e)}") return False class CompensatorFactory: """ 补偿器工厂类 根据策略创建对应的补偿器 """ _compensators: Dict[CompensationStrategy, Compensator] = {} @classmethod def get_compensator(cls, strategy: CompensationStrategy) -> Compensator: """ 获取指定策略的补偿器 Args: strategy: 补偿策略 Returns: 补偿器实例 """ if strategy not in cls._compensators: if strategy == CompensationStrategy.IMMEDIATE: cls._compensators[strategy] = ImmediateCompensator() elif strategy == CompensationStrategy.SCHEDULED: cls._compensators[strategy] = ScheduledCompensator() elif strategy == CompensationStrategy.RETRY_THEN_COMPENSATE: cls._compensators[strategy] = RetryThenCompensateCompensator() elif strategy == CompensationStrategy.MANUAL: cls._compensators[strategy] = ManualCompensator() else: raise ValueError(f"Unknown compensation strategy: {strategy}") return cls._compensators[strategy] @classmethod def shutdown_all(cls) -> None: """关闭所有补偿器""" for compensator in cls._compensators.values(): if hasattr(compensator, 'shutdown'): compensator.shutdown() class DistributedTransaction: """ 分布式事务类 表示一个分布式事务实例 """ def __init__(self, transaction_id: str, transaction_log: TransactionLog, compensation_strategy: CompensationStrategy = CompensationStrategy.IMMEDIATE): """ 初始化分布式事务 Args: transaction_id: 事务ID transaction_log: 事务日志 compensation_strategy: 补偿策略 """ self.transaction_id = transaction_id self.transaction_log = transaction_log self.compensation_strategy = compensation_strategy self.status = TransactionStatus.CREATED self.participants: Dict[str, TransactionParticipant] = {} self.prepared_participants: Set[str] = set() self.committed_participants: Set[str] = set() self.context: Dict[str, Any] = {} self.created_at = time.time() self.started_at: Optional[float] = None self.completed_at: Optional[float] = None self._lock = threading.RLock() def add_participant(self, participant_id: str, participant: TransactionParticipant) -> None: """ 添加事务参与者 Args: participant_id: 参与者ID participant: 参与者实例 """ with self._lock: if self.status != TransactionStatus.CREATED: raise ValueError(f"Cannot add participant to transaction in {self.status.name} status") self.participants[participant_id] = participant logger.info(f"Added participant {participant_id} to transaction {self.transaction_id}") def begin(self, context: Optional[Dict[str, Any]] = None) -> None: """ 开始事务 Args: context: 事务上下文 """ with self._lock: if self.status != TransactionStatus.CREATED: raise ValueError(f"Transaction already {self.status.name}") self.status = TransactionStatus.RUNNING self.started_at = time.time() if context: self.context.update(context) self.transaction_log.log_transaction_start(self.transaction_id, self.context) self.transaction_log.log_transaction_status(self.transaction_id, self.status) def commit(self) -> bool: """ 提交事务 Returns: 是否提交成功 """ with self._lock: if self.status != TransactionStatus.RUNNING: raise ValueError(f"Cannot commit transaction in {self.status.name} status") self.status = TransactionStatus.COMMITTING self.transaction_log.log_transaction_status(self.transaction_id, self.status) try: # 两阶段提交:准备阶段 prepare_success = self._prepare_all() if prepare_success: # 提交阶段 commit_success = self._commit_all() with self._lock: if commit_success: self.status = TransactionStatus.COMMITTED else: self.status = TransactionStatus.FAILED # 提交失败时需要进行补偿 self._compensate_all() else: # 准备失败,直接回滚 with self._lock: self.status = TransactionStatus.ROLLING_BACK self.transaction_log.log_transaction_status(self.transaction_id, self.status) self._rollback_all() with self._lock: self.status = TransactionStatus.ROLLED_BACK with self._lock: self.completed_at = time.time() self.transaction_log.log_transaction_status(self.transaction_id, self.status) return self.status == TransactionStatus.COMMITTED except Exception as e: logger.error(f"Exception during transaction commit: {str(e)}") with self._lock: self.status = TransactionStatus.FAILED self.completed_at = time.time() self.transaction_log.log_transaction_status(self.transaction_id, self.status) # 异常时进行补偿 self._compensate_all() return False def rollback(self) -> bool: """ 回滚事务 Returns: 是否回滚成功 """ with self._lock: if self.status not in [TransactionStatus.RUNNING, TransactionStatus.COMMITTING]: raise ValueError(f"Cannot rollback transaction in {self.status.name} status") self.status = TransactionStatus.ROLLING_BACK self.transaction_log.log_transaction_status(self.transaction_id, self.status) try: rollback_success = self._rollback_all() with self._lock: if rollback_success: self.status = TransactionStatus.ROLLED_BACK else: self.status = TransactionStatus.FAILED # 回滚失败时需要进行补偿 self._compensate_all() self.completed_at = time.time() self.transaction_log.log_transaction_status(self.transaction_id, self.status) return self.status == TransactionStatus.ROLLED_BACK except Exception as e: logger.error(f"Exception during transaction rollback: {str(e)}") with self._lock: self.status = TransactionStatus.FAILED self.completed_at = time.time() self.transaction_log.log_transaction_status(self.transaction_id, self.status) return False def compensate(self) -> bool: """ 补偿事务 Returns: 是否补偿成功 """ with self._lock: if self.status not in [TransactionStatus.FAILED, TransactionStatus.COMMITTED]: raise ValueError(f"Cannot compensate transaction in {self.status.name} status") self.status = TransactionStatus.COMPENSATING self.transaction_log.log_transaction_status(self.transaction_id, self.status) try: compensate_success = self._compensate_all() with self._lock: if compensate_success: self.status = TransactionStatus.COMPENSATED else: self.status = TransactionStatus.FAILED self.completed_at = time.time() self.transaction_log.log_transaction_status(self.transaction_id, self.status) return self.status == TransactionStatus.COMPENSATED except Exception as e: logger.error(f"Exception during transaction compensation: {str(e)}") with self._lock: self.status = TransactionStatus.FAILED self.completed_at = time.time() self.transaction_log.log_transaction_status(self.transaction_id, self.status) return False def _prepare_all(self) -> bool: """准备所有参与者""" all_prepared = True for participant_id, participant in self.participants.items(): try: success = participant.prepare(self.transaction_id, self.context) self.transaction_log.log_participant_prepare(self.transaction_id, participant_id, success) if success: self.prepared_participants.add(participant_id) else: all_prepared = False logger.error(f"Participant {participant_id} prepare failed") except Exception as e: all_prepared = False logger.error(f"Exception during participant {participant_id} prepare: {str(e)}") self.transaction_log.log_participant_prepare(self.transaction_id, participant_id, False) return all_prepared def _commit_all(self) -> bool: """提交所有参与者""" all_committed = True for participant_id, participant in self.participants.items(): if participant_id in self.prepared_participants: try: success = participant.commit(self.transaction_id, self.context) self.transaction_log.log_participant_commit(self.transaction_id, participant_id, success) if success: self.committed_participants.add(participant_id) else: all_committed = False logger.error(f"Participant {participant_id} commit failed") except Exception as e: all_committed = False logger.error(f"Exception during participant {participant_id} commit: {str(e)}") self.transaction_log.log_participant_commit(self.transaction_id, participant_id, False) return all_committed def _rollback_all(self) -> bool: """回滚所有参与者""" all_rolled_back = True for participant_id, participant in self.participants.items(): if participant_id in self.prepared_participants: try: success = participant.rollback(self.transaction_id, self.context) self.transaction_log.log_participant_rollback(self.transaction_id, participant_id, success) if not success: all_rolled_back = False logger.error(f"Participant {participant_id} rollback failed") except Exception as e: all_rolled_back = False logger.error(f"Exception during participant {participant_id} rollback: {str(e)}") self.transaction_log.log_participant_rollback(self.transaction_id, participant_id, False) return all_rolled_back def _compensate_all(self) -> bool: """补偿所有已提交的参与者""" all_compensated = True compensator = CompensatorFactory.get_compensator(self.compensation_strategy) for participant_id, participant in self.participants.items(): if participant_id in self.committed_participants: try: success = compensator.compensate(self, participant, participant_id, self.context) # 对于定时补偿和手动补偿,这里只记录调度成功 # 实际的补偿结果会在后续处理中记录 if self.compensation_strategy in [CompensationStrategy.IMMEDIATE, CompensationStrategy.RETRY_THEN_COMPENSATE]: self.transaction_log.log_participant_compensation( self.transaction_id, participant_id, success ) if not success: all_compensated = False logger.error(f"Participant {participant_id} compensation failed") except Exception as e: all_compensated = False logger.error(f"Exception during participant {participant_id} compensation: {str(e)}") self.transaction_log.log_participant_compensation( self.transaction_id, participant_id, False ) return all_compensated def to_dict(self) -> Dict[str, Any]: """转换为字典表示""" return { "transaction_id": self.transaction_id, "status": self.status.name, "compensation_strategy": self.compensation_strategy.name, "participants": list(self.participants.keys()), "prepared_participants": list(self.prepared_participants), "committed_participants": list(self.committed_participants), "context": self.context, "created_at": self.created_at, "started_at": self.started_at, "completed_at": self.completed_at } class TransactionManager: """ 事务管理器 负责创建和管理分布式事务 """ def __init__(self, transaction_log: Optional[TransactionLog] = None): """ 初始化事务管理器 Args: transaction_log: 事务日志,默认为内存日志 """ self.transaction_log = transaction_log or InMemoryTransactionLog() self.active_transactions: Dict[str, DistributedTransaction] = {} self.completed_transactions: Dict[str, DistributedTransaction] = {} self._lock = threading.RLock() def create_transaction(self, transaction_id: Optional[str] = None, compensation_strategy: CompensationStrategy = CompensationStrategy.IMMEDIATE) -> DistributedTransaction: """ 创建分布式事务 Args: transaction_id: 事务ID,不提供则自动生成 compensation_strategy: 补偿策略 Returns: 创建的事务实例 """ if transaction_id is None: transaction_id = str(uuid.uuid4()) transaction = DistributedTransaction(transaction_id, self.transaction_log, compensation_strategy) with self._lock: self.active_transactions[transaction_id] = transaction return transaction def begin_transaction(self, transaction_id: str, context: Optional[Dict[str, Any]] = None) -> bool: """ 开始事务 Args: transaction_id: 事务ID context: 事务上下文 Returns: 是否成功开始 """ with self._lock: transaction = self.active_transactions.get(transaction_id) if not transaction: logger.error(f"Transaction {transaction_id} not found") return False try: transaction.begin(context) return True except Exception as e: logger.error(f"Failed to begin transaction {transaction_id}: {str(e)}") return False def commit_transaction(self, transaction_id: str) -> bool: """ 提交事务 Args: transaction_id: 事务ID Returns: 是否成功提交 """ with self._lock: transaction = self.active_transactions.get(transaction_id) if not transaction: logger.error(f"Transaction {transaction_id} not found") return False success = transaction.commit() with self._lock: if transaction.status in [TransactionStatus.COMMITTED, TransactionStatus.ROLLED_BACK, TransactionStatus.COMPENSATED, TransactionStatus.FAILED]: self.active_transactions.pop(transaction_id, None) self.completed_transactions[transaction_id] = transaction return success def rollback_transaction(self, transaction_id: str) -> bool: """ 回滚事务 Args: transaction_id: 事务ID Returns: 是否成功回滚 """ with self._lock: transaction = self.active_transactions.get(transaction_id) if not transaction: logger.error(f"Transaction {transaction_id} not found") return False success = transaction.rollback() with self._lock: if transaction.status in [TransactionStatus.ROLLED_BACK, TransactionStatus.FAILED]: self.active_transactions.pop(transaction_id, None) self.completed_transactions[transaction_id] = transaction return success def compensate_transaction(self, transaction_id: str) -> bool: """ 补偿事务 Args: transaction_id: 事务ID Returns: 是否成功补偿 """ with self._lock: transaction = self.active_transactions.get(transaction_id) or \ self.completed_transactions.get(transaction_id) if not transaction: logger.error(f"Transaction {transaction_id} not found") return False success = transaction.compensate() with self._lock: if transaction.status in [TransactionStatus.COMPENSATED, TransactionStatus.FAILED]: if transaction_id in self.active_transactions: self.active_transactions.pop(transaction_id) self.completed_transactions[transaction_id] = transaction return success def get_transaction(self, transaction_id: str) -> Optional[DistributedTransaction]: """ 获取事务 Args: transaction_id: 事务ID Returns: 事务实例,如果不存在则返回None """ with self._lock: return self.active_transactions.get(transaction_id) or \ self.completed_transactions.get(transaction_id) def get_active_transactions(self) -> List[DistributedTransaction]: """ 获取所有活跃事务 Returns: 活跃事务列表 """ with self._lock: return list(self.active_transactions.values()) def get_completed_transactions(self) -> List[DistributedTransaction]: """ 获取所有已完成事务 Returns: 已完成事务列表 """ with self._lock: return list(self.completed_transactions.values()) def shutdown(self) -> None: """关闭事务管理器""" CompensatorFactory.shutdown_all() # 示例:实现一个电商订单处理的分布式事务 class InventoryParticipant(TransactionParticipant): """库存服务参与者""" def __init__(self): self.inventory = { "product-1": 10, "product-2": 5 } self.reservations: Dict[str, Dict[str, Any]] = {} def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool: """准备预留库存""" order = context.get("order", {}) product_id = order.get("product_id") quantity = order.get("quantity", 0) if not product_id or product_id not in self.inventory or self.inventory[product_id] < quantity: logger.error(f"Insufficient inventory for {product_id}") return False # 预留库存 self.reservations[transaction_id] = { "product_id": product_id, "quantity": quantity } # 预扣库存(准备阶段不实际扣减,只做预留标记) # 在实际系统中,这里可能会有乐观锁或其他并发控制机制 logger.info(f"Prepared inventory reservation for {product_id}, quantity: {quantity}") return True def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool: """提交库存预留(实际扣减库存)""" reservation = self.reservations.get(transaction_id) if not reservation: logger.error(f"Reservation not found for transaction {transaction_id}") return False product_id = reservation["product_id"] quantity = reservation["quantity"] # 实际扣减库存 self.inventory[product_id] -= quantity logger.info(f"Committed inventory reservation: {product_id}, quantity: {quantity}") return True def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool: """回滚库存预留""" # 移除预留记录即可 if transaction_id in self.reservations: del self.reservations[transaction_id] logger.info(f"Rolled back inventory reservation for transaction {transaction_id}") return True def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool: """补偿库存(恢复库存)""" reservation = self.reservations.get(transaction_id) if reservation: product_id = reservation["product_id"] quantity = reservation["quantity"] # 恢复库存 self.inventory[product_id] += quantity logger.info(f"Compensated inventory: {product_id}, quantity: {quantity}") return True return False class PaymentParticipant(TransactionParticipant): """支付服务参与者""" def __init__(self): self.pending_payments: Dict[str, Dict[str, Any]] = {} self.completed_payments: Dict[str, Dict[str, Any]] = {} def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool: """准备支付""" order = context.get("order", {}) user_id = order.get("user_id") amount = order.get("amount", 0) if not user_id or amount <= 0: logger.error(f"Invalid payment information") return False # 验证支付信息,但不实际扣款 self.pending_payments[transaction_id] = { "user_id": user_id, "amount": amount } logger.info(f"Prepared payment for user {user_id}, amount: {amount}") return True def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool: """提交支付(实际扣款)""" payment = self.pending_payments.get(transaction_id) if not payment: logger.error(f"Pending payment not found for transaction {transaction_id}") return False # 模拟支付处理 payment_id = f"pay-{uuid.uuid4()}" payment["payment_id"] = payment_id # 移到已完成支付 self.completed_payments[transaction_id] = payment del self.pending_payments[transaction_id] logger.info(f"Committed payment: {payment_id}") return True def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool: """回滚支付""" # 移除待处理支付记录 if transaction_id in self.pending_payments: del self.pending_payments[transaction_id] logger.info(f"Rolled back pending payment for transaction {transaction_id}") return True def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool: """补偿支付(退款)""" payment = self.completed_payments.get(transaction_id) if payment: # 模拟退款处理 refund_id = f"refund-{uuid.uuid4()}" logger.info(f"Compensated payment: refund {refund_id} for {payment['amount']}") # 在实际系统中,这里应该调用退款API return True return False class ShippingParticipant(TransactionParticipant): """物流服务参与者""" def __init__(self): self.pending_shipments: Dict[str, Dict[str, Any]] = {} self.confirmed_shipments: Dict[str, Dict[str, Any]] = {} def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool: """准备物流订单""" order = context.get("order", {}) order_id = order.get("order_id") address = order.get("shipping_address") if not order_id or not address: logger.error(f"Invalid shipping information") return False # 验证物流信息 self.pending_shipments[transaction_id] = { "order_id": order_id, "address": address } logger.info(f"Prepared shipping for order {order_id}") return True def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool: """提交物流订单""" shipment = self.pending_shipments.get(transaction_id) if not shipment: logger.error(f"Pending shipment not found for transaction {transaction_id}") return False # 确认物流订单 shipment_id = f"ship-{uuid.uuid4()}" shipment["shipment_id"] = shipment_id # 移到已确认物流 self.confirmed_shipments[transaction_id] = shipment del self.pending_shipments[transaction_id] logger.info(f"Committed shipment: {shipment_id}") return True def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool: """回滚物流订单""" # 移除待处理物流记录 if transaction_id in self.pending_shipments: del self.pending_shipments[transaction_id] logger.info(f"Rolled back pending shipment for transaction {transaction_id}") return True def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool: """补偿物流(取消物流)""" shipment = self.confirmed_shipments.get(transaction_id) if shipment: # 模拟取消物流 logger.info(f"Compensated shipment: cancelled {shipment['shipment_id']}") # 在实际系统中,这里应该调用取消物流API return True return False # 示例使用 if __name__ == "__main__": print("===== 分布式事务框架示例 =====") # 创建事务管理器 transaction_manager = TransactionManager() # 创建服务参与者 inventory_participant = InventoryParticipant() payment_participant = PaymentParticipant() shipping_participant = ShippingParticipant() # 测试场景1:成功的事务 print("\n--- 测试场景1: 成功的事务 ---") # 创建事务 tx1 = transaction_manager.create_transaction( transaction_id="tx-success", compensation_strategy=CompensationStrategy.IMMEDIATE ) # 添加参与者 tx1.add_participant("inventory", inventory_participant) tx1.add_participant("payment", payment_participant) tx1.add_participant("shipping", shipping_participant) # 定义事务上下文 order_context = { "order": { "order_id": "order-1", "user_id": "user-1", "product_id": "product-1", "quantity": 2, "amount": 199.98, "shipping_address": { "street": "123 Main St", "city": "Any City", "zip": "12345" } } } # 开始事务 tx1.begin(order_context) # 提交事务 success = transaction_manager.commit_transaction("tx-success") print(f"Transaction result: {'Success' if success else 'Failed'}") print(f"Final transaction status: {tx1.status.name}") # 测试场景2:失败的事务(支付失败) print("\n--- 测试场景2: 失败的事务(支付失败) ---") # 修改支付参与者使其在特定金额下失败 original_prepare = payment_participant.prepare def failing_prepare(transaction_id, context): order = context.get("order", {}) if order.get("amount") == 9999.99: logger.error("Payment preparation failed for test purposes") return False return original_prepare(transaction_id, context) payment_participant.prepare = failing_prepare # 创建事务 tx2 = transaction_manager.create_transaction( transaction_id="tx-fail", compensation_strategy=CompensationStrategy.IMMEDIATE ) # 添加参与者 tx2.add_participant("inventory", inventory_participant) tx2.add_participant("payment", payment_participant) tx2.add_participant("shipping", shipping_participant) # 定义失败的事务上下文 failed_order_context = { "order": { "order_id": "order-2", "user_id": "user-1", "product_id": "product-2", "quantity": 1, "amount": 9999.99, # 触发支付失败 "shipping_address": { "street": "123 Main St", "city": "Any City", "zip": "12345" } } } # 开始事务 tx2.begin(failed_order_context) # 提交事务(应该失败) success = transaction_manager.commit_transaction("tx-fail") print(f"Transaction result: {'Success' if success else 'Failed'}") print(f"Final transaction status: {tx2.status.name}") # 测试场景3:使用不同的补偿策略 print("\n--- 测试场景3: 使用重试后补偿策略 ---") # 恢复支付参与者 payment_participant.prepare = original_prepare # 创建使用重试后补偿策略的事务 tx3 = transaction_manager.create_transaction( transaction_id="tx-retry-compensate", compensation_strategy=CompensationStrategy.RETRY_THEN_COMPENSATE ) # 添加参与者 tx3.add_participant("inventory", inventory_participant) tx3.add_participant("payment", payment_participant) tx3.add_participant("shipping", shipping_participant) # 开始并提交事务 tx3.begin(order_context) success = transaction_manager.commit_transaction("tx-retry-compensate") print(f"Transaction result: {'Success' if success else 'Failed'}") print(f"Final transaction status: {tx3.status.name}") # 测试场景4:手动补偿 print("\n--- 测试场景4: 手动补偿 ---") # 创建使用手动补偿策略的事务 tx4 = transaction_manager.create_transaction( transaction_id="tx-manual", compensation_strategy=CompensationStrategy.MANUAL ) # 添加参与者 tx4.add_participant("inventory", inventory_participant) tx4.add_participant("payment", payment_participant) tx4.add_participant("shipping", shipping_participant) # 开始并提交事务 tx4.begin(order_context) success = transaction_manager.commit_transaction("tx-manual") print(f"Transaction result: {'Success' if success else 'Failed'}") print(f"Final transaction status: {tx4.status.name}") # 尝试手动补偿 if tx4.status == TransactionStatus.COMMITTED: print("\n执行手动补偿...") # 在实际系统中,这里应该通过管理界面触发 manual_compensator = CompensatorFactory.get_compensator(CompensationStrategy.MANUAL) if isinstance(manual_compensator, ManualCompensator): # 获取待补偿项目并执行 pending = manual_compensator.get_pending_compensations() print(f"待手动补偿项目数量: {len(pending)}") # 统计信息 print("\n--- 事务统计 ---") print(f"活跃事务数量: {len(transaction_manager.get_active_transactions())}") print(f"已完成事务数量: {len(transaction_manager.get_completed_transactions())}") # 关闭事务管理器 transaction_manager.shutdown() print("\n分布式事务框架已关闭")