You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
fbxm/transaction_framework.py

1334 lines
48 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
分布式事务框架 - 支持多种补偿策略
此模块实现了一个完整的分布式事务框架,支持多种补偿策略,
包括即时补偿、定时补偿、手动补偿等,用于处理复杂的分布式事务场景。
"""
from abc import ABC, abstractmethod
from enum import Enum, auto
from typing import Dict, List, Optional, Any, Callable, Tuple, Set
import uuid
import time
import threading
import logging
# 配置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger("distributed_transaction")
class TransactionStatus(Enum):
"""
事务状态枚举
"""
CREATED = auto() # 事务已创建
RUNNING = auto() # 事务运行中
COMMITTING = auto() # 事务提交中
COMMITTED = auto() # 事务已提交
ROLLING_BACK = auto() # 事务回滚中
ROLLED_BACK = auto() # 事务已回滚
COMPENSATING = auto() # 事务补偿中
COMPENSATED = auto() # 事务已补偿
FAILED = auto() # 事务失败
SUSPENDED = auto() # 事务挂起
class CompensationStrategy(Enum):
"""
补偿策略枚举
"""
IMMEDIATE = auto() # 立即补偿
SCHEDULED = auto() # 定时补偿
MANUAL = auto() # 手动补偿
RETRY_THEN_COMPENSATE = auto() # 重试后补偿
class TransactionParticipant(ABC):
"""
事务参与者抽象基类
表示分布式事务中的一个参与者
"""
@abstractmethod
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""
准备阶段
Args:
transaction_id: 事务ID
context: 事务上下文
Returns:
是否准备成功
"""
pass
@abstractmethod
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""
提交阶段
Args:
transaction_id: 事务ID
context: 事务上下文
Returns:
是否提交成功
"""
pass
@abstractmethod
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""
回滚阶段
Args:
transaction_id: 事务ID
context: 事务上下文
Returns:
是否回滚成功
"""
pass
@abstractmethod
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""
补偿操作
Args:
transaction_id: 事务ID
context: 事务上下文
Returns:
是否补偿成功
"""
pass
class TransactionLog(ABC):
"""
事务日志抽象基类
用于记录事务执行过程中的各种事件
"""
@abstractmethod
def log_transaction_start(self, transaction_id: str, context: Dict[str, Any]) -> None:
"""记录事务开始"""
pass
@abstractmethod
def log_transaction_status(self, transaction_id: str, status: TransactionStatus) -> None:
"""记录事务状态变更"""
pass
@abstractmethod
def log_participant_prepare(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者准备结果"""
pass
@abstractmethod
def log_participant_commit(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者提交结果"""
pass
@abstractmethod
def log_participant_rollback(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者回滚结果"""
pass
@abstractmethod
def log_participant_compensation(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者补偿结果"""
pass
class InMemoryTransactionLog(TransactionLog):
"""
内存事务日志实现
用于演示的内存事务日志
"""
def __init__(self):
self._logs: Dict[str, List[Dict[str, Any]]] = {}
self._lock = threading.RLock()
def _add_log(self, transaction_id: str, log_entry: Dict[str, Any]) -> None:
"""添加日志条目"""
with self._lock:
if transaction_id not in self._logs:
self._logs[transaction_id] = []
self._logs[transaction_id].append({
"timestamp": time.time(),
**log_entry
})
def log_transaction_start(self, transaction_id: str, context: Dict[str, Any]) -> None:
"""记录事务开始"""
self._add_log(transaction_id, {
"type": "transaction_start",
"context": context
})
logger.info(f"Transaction {transaction_id} started")
def log_transaction_status(self, transaction_id: str, status: TransactionStatus) -> None:
"""记录事务状态变更"""
self._add_log(transaction_id, {
"type": "transaction_status",
"status": status.name
})
logger.info(f"Transaction {transaction_id} status changed to {status.name}")
def log_participant_prepare(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者准备结果"""
self._add_log(transaction_id, {
"type": "participant_prepare",
"participant_id": participant_id,
"success": success
})
logger.info(f"Transaction {transaction_id}: Participant {participant_id} prepare {'succeeded' if success else 'failed'}")
def log_participant_commit(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者提交结果"""
self._add_log(transaction_id, {
"type": "participant_commit",
"participant_id": participant_id,
"success": success
})
logger.info(f"Transaction {transaction_id}: Participant {participant_id} commit {'succeeded' if success else 'failed'}")
def log_participant_rollback(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者回滚结果"""
self._add_log(transaction_id, {
"type": "participant_rollback",
"participant_id": participant_id,
"success": success
})
logger.info(f"Transaction {transaction_id}: Participant {participant_id} rollback {'succeeded' if success else 'failed'}")
def log_participant_compensation(self, transaction_id: str, participant_id: str, success: bool) -> None:
"""记录参与者补偿结果"""
self._add_log(transaction_id, {
"type": "participant_compensation",
"participant_id": participant_id,
"success": success
})
logger.info(f"Transaction {transaction_id}: Participant {participant_id} compensation {'succeeded' if success else 'failed'}")
def get_transaction_logs(self, transaction_id: str) -> List[Dict[str, Any]]:
"""获取事务的所有日志"""
with self._lock:
return self._logs.get(transaction_id, []).copy()
class Compensator(ABC):
"""
补偿器抽象基类
定义不同补偿策略的实现
"""
@abstractmethod
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
participant_id: str, context: Dict[str, Any]) -> bool:
"""
执行补偿操作
Args:
transaction: 分布式事务实例
participant: 事务参与者
participant_id: 参与者ID
context: 补偿上下文
Returns:
是否补偿成功
"""
pass
class ImmediateCompensator(Compensator):
"""
立即补偿器
立即执行补偿操作
"""
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
participant_id: str, context: Dict[str, Any]) -> bool:
"""执行立即补偿"""
logger.info(f"Performing immediate compensation for participant {participant_id}")
try:
return participant.compensate(transaction.transaction_id, context)
except Exception as e:
logger.error(f"Exception during immediate compensation: {str(e)}")
return False
class ScheduledCompensator(Compensator):
"""
定时补偿器
延迟执行补偿操作
"""
def __init__(self, delay_seconds: int = 60):
"""
初始化定时补偿器
Args:
delay_seconds: 延迟执行的秒数
"""
self.delay_seconds = delay_seconds
self._scheduled_compensations: List[Tuple['DistributedTransaction', TransactionParticipant,
str, Dict[str, Any]]] = []
self._lock = threading.RLock()
# 启动调度线程
self._stop_event = threading.Event()
self._scheduler_thread = threading.Thread(target=self._scheduler_loop)
self._scheduler_thread.daemon = True
self._scheduler_thread.start()
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
participant_id: str, context: Dict[str, Any]) -> bool:
"""调度补偿操作"""
logger.info(f"Scheduling compensation for participant {participant_id} after {self.delay_seconds}s")
with self._lock:
self._scheduled_compensations.append((transaction, participant, participant_id, context))
return True # 调度成功
def _scheduler_loop(self) -> None:
"""调度循环"""
while not self._stop_event.is_set():
time.sleep(1)
self._process_scheduled_compensations()
def _process_scheduled_compensations(self) -> None:
"""处理定时的补偿操作"""
with self._lock:
if not self._scheduled_compensations:
return
# 这里简化处理,实际应该记录调度时间
transaction, participant, participant_id, context = self._scheduled_compensations.pop(0)
try:
logger.info(f"Executing scheduled compensation for participant {participant_id}")
success = participant.compensate(transaction.transaction_id, context)
transaction.transaction_log.log_participant_compensation(
transaction.transaction_id, participant_id, success
)
except Exception as e:
logger.error(f"Exception during scheduled compensation: {str(e)}")
def shutdown(self) -> None:
"""关闭补偿器"""
self._stop_event.set()
if self._scheduler_thread.is_alive():
self._scheduler_thread.join(timeout=2.0)
class RetryThenCompensateCompensator(Compensator):
"""
重试后补偿器
先尝试重试操作,失败后再进行补偿
"""
def __init__(self, max_retries: int = 3, retry_interval: int = 5):
"""
初始化重试后补偿器
Args:
max_retries: 最大重试次数
retry_interval: 重试间隔(秒)
"""
self.max_retries = max_retries
self.retry_interval = retry_interval
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
participant_id: str, context: Dict[str, Any]) -> bool:
"""执行重试后补偿策略"""
logger.info(f"Applying retry-then-compensate strategy for participant {participant_id}")
# 先尝试重试提交
for attempt in range(1, self.max_retries + 1):
logger.info(f"Retry attempt {attempt}/{self.max_retries} for participant {participant_id}")
try:
if participant.commit(transaction.transaction_id, context):
logger.info(f"Retry succeeded on attempt {attempt}")
return True
except Exception as e:
logger.warning(f"Retry attempt {attempt} failed: {str(e)}")
time.sleep(self.retry_interval)
# 重试失败后执行补偿
logger.info(f"All retries failed, performing compensation for participant {participant_id}")
try:
return participant.compensate(transaction.transaction_id, context)
except Exception as e:
logger.error(f"Exception during compensation after retries: {str(e)}")
return False
class ManualCompensator(Compensator):
"""
手动补偿器
标记需要手动介入进行补偿
"""
def __init__(self):
"""初始化手动补偿器"""
self._pending_compensations: List[Dict[str, Any]] = []
self._lock = threading.RLock()
def compensate(self, transaction: 'DistributedTransaction', participant: TransactionParticipant,
participant_id: str, context: Dict[str, Any]) -> bool:
"""标记为手动补偿"""
logger.warning(f"Manual compensation required for participant {participant_id}")
with self._lock:
self._pending_compensations.append({
"transaction_id": transaction.transaction_id,
"participant_id": participant_id,
"context": context,
"participant": participant,
"created_at": time.time()
})
return True # 标记成功
def get_pending_compensations(self) -> List[Dict[str, Any]]:
"""获取待手动补偿的项目"""
with self._lock:
return self._pending_compensations.copy()
def execute_pending_compensation(self, transaction_id: str, participant_id: str) -> bool:
"""执行待处理的补偿"""
with self._lock:
for i, item in enumerate(self._pending_compensations):
if item["transaction_id"] == transaction_id and item["participant_id"] == participant_id:
compensation_item = self._pending_compensations.pop(i)
break
else:
logger.error(f"Pending compensation not found: {transaction_id}, {participant_id}")
return False
try:
logger.info(f"Manually executing compensation for {participant_id} in transaction {transaction_id}")
participant = compensation_item["participant"]
context = compensation_item["context"]
return participant.compensate(transaction_id, context)
except Exception as e:
logger.error(f"Exception during manual compensation: {str(e)}")
return False
class CompensatorFactory:
"""
补偿器工厂类
根据策略创建对应的补偿器
"""
_compensators: Dict[CompensationStrategy, Compensator] = {}
@classmethod
def get_compensator(cls, strategy: CompensationStrategy) -> Compensator:
"""
获取指定策略的补偿器
Args:
strategy: 补偿策略
Returns:
补偿器实例
"""
if strategy not in cls._compensators:
if strategy == CompensationStrategy.IMMEDIATE:
cls._compensators[strategy] = ImmediateCompensator()
elif strategy == CompensationStrategy.SCHEDULED:
cls._compensators[strategy] = ScheduledCompensator()
elif strategy == CompensationStrategy.RETRY_THEN_COMPENSATE:
cls._compensators[strategy] = RetryThenCompensateCompensator()
elif strategy == CompensationStrategy.MANUAL:
cls._compensators[strategy] = ManualCompensator()
else:
raise ValueError(f"Unknown compensation strategy: {strategy}")
return cls._compensators[strategy]
@classmethod
def shutdown_all(cls) -> None:
"""关闭所有补偿器"""
for compensator in cls._compensators.values():
if hasattr(compensator, 'shutdown'):
compensator.shutdown()
class DistributedTransaction:
"""
分布式事务类
表示一个分布式事务实例
"""
def __init__(self, transaction_id: str, transaction_log: TransactionLog,
compensation_strategy: CompensationStrategy = CompensationStrategy.IMMEDIATE):
"""
初始化分布式事务
Args:
transaction_id: 事务ID
transaction_log: 事务日志
compensation_strategy: 补偿策略
"""
self.transaction_id = transaction_id
self.transaction_log = transaction_log
self.compensation_strategy = compensation_strategy
self.status = TransactionStatus.CREATED
self.participants: Dict[str, TransactionParticipant] = {}
self.prepared_participants: Set[str] = set()
self.committed_participants: Set[str] = set()
self.context: Dict[str, Any] = {}
self.created_at = time.time()
self.started_at: Optional[float] = None
self.completed_at: Optional[float] = None
self._lock = threading.RLock()
def add_participant(self, participant_id: str, participant: TransactionParticipant) -> None:
"""
添加事务参与者
Args:
participant_id: 参与者ID
participant: 参与者实例
"""
with self._lock:
if self.status != TransactionStatus.CREATED:
raise ValueError(f"Cannot add participant to transaction in {self.status.name} status")
self.participants[participant_id] = participant
logger.info(f"Added participant {participant_id} to transaction {self.transaction_id}")
def begin(self, context: Optional[Dict[str, Any]] = None) -> None:
"""
开始事务
Args:
context: 事务上下文
"""
with self._lock:
if self.status != TransactionStatus.CREATED:
raise ValueError(f"Transaction already {self.status.name}")
self.status = TransactionStatus.RUNNING
self.started_at = time.time()
if context:
self.context.update(context)
self.transaction_log.log_transaction_start(self.transaction_id, self.context)
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
def commit(self) -> bool:
"""
提交事务
Returns:
是否提交成功
"""
with self._lock:
if self.status != TransactionStatus.RUNNING:
raise ValueError(f"Cannot commit transaction in {self.status.name} status")
self.status = TransactionStatus.COMMITTING
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
try:
# 两阶段提交:准备阶段
prepare_success = self._prepare_all()
if prepare_success:
# 提交阶段
commit_success = self._commit_all()
with self._lock:
if commit_success:
self.status = TransactionStatus.COMMITTED
else:
self.status = TransactionStatus.FAILED
# 提交失败时需要进行补偿
self._compensate_all()
else:
# 准备失败,直接回滚
with self._lock:
self.status = TransactionStatus.ROLLING_BACK
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
self._rollback_all()
with self._lock:
self.status = TransactionStatus.ROLLED_BACK
with self._lock:
self.completed_at = time.time()
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
return self.status == TransactionStatus.COMMITTED
except Exception as e:
logger.error(f"Exception during transaction commit: {str(e)}")
with self._lock:
self.status = TransactionStatus.FAILED
self.completed_at = time.time()
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
# 异常时进行补偿
self._compensate_all()
return False
def rollback(self) -> bool:
"""
回滚事务
Returns:
是否回滚成功
"""
with self._lock:
if self.status not in [TransactionStatus.RUNNING, TransactionStatus.COMMITTING]:
raise ValueError(f"Cannot rollback transaction in {self.status.name} status")
self.status = TransactionStatus.ROLLING_BACK
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
try:
rollback_success = self._rollback_all()
with self._lock:
if rollback_success:
self.status = TransactionStatus.ROLLED_BACK
else:
self.status = TransactionStatus.FAILED
# 回滚失败时需要进行补偿
self._compensate_all()
self.completed_at = time.time()
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
return self.status == TransactionStatus.ROLLED_BACK
except Exception as e:
logger.error(f"Exception during transaction rollback: {str(e)}")
with self._lock:
self.status = TransactionStatus.FAILED
self.completed_at = time.time()
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
return False
def compensate(self) -> bool:
"""
补偿事务
Returns:
是否补偿成功
"""
with self._lock:
if self.status not in [TransactionStatus.FAILED, TransactionStatus.COMMITTED]:
raise ValueError(f"Cannot compensate transaction in {self.status.name} status")
self.status = TransactionStatus.COMPENSATING
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
try:
compensate_success = self._compensate_all()
with self._lock:
if compensate_success:
self.status = TransactionStatus.COMPENSATED
else:
self.status = TransactionStatus.FAILED
self.completed_at = time.time()
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
return self.status == TransactionStatus.COMPENSATED
except Exception as e:
logger.error(f"Exception during transaction compensation: {str(e)}")
with self._lock:
self.status = TransactionStatus.FAILED
self.completed_at = time.time()
self.transaction_log.log_transaction_status(self.transaction_id, self.status)
return False
def _prepare_all(self) -> bool:
"""准备所有参与者"""
all_prepared = True
for participant_id, participant in self.participants.items():
try:
success = participant.prepare(self.transaction_id, self.context)
self.transaction_log.log_participant_prepare(self.transaction_id, participant_id, success)
if success:
self.prepared_participants.add(participant_id)
else:
all_prepared = False
logger.error(f"Participant {participant_id} prepare failed")
except Exception as e:
all_prepared = False
logger.error(f"Exception during participant {participant_id} prepare: {str(e)}")
self.transaction_log.log_participant_prepare(self.transaction_id, participant_id, False)
return all_prepared
def _commit_all(self) -> bool:
"""提交所有参与者"""
all_committed = True
for participant_id, participant in self.participants.items():
if participant_id in self.prepared_participants:
try:
success = participant.commit(self.transaction_id, self.context)
self.transaction_log.log_participant_commit(self.transaction_id, participant_id, success)
if success:
self.committed_participants.add(participant_id)
else:
all_committed = False
logger.error(f"Participant {participant_id} commit failed")
except Exception as e:
all_committed = False
logger.error(f"Exception during participant {participant_id} commit: {str(e)}")
self.transaction_log.log_participant_commit(self.transaction_id, participant_id, False)
return all_committed
def _rollback_all(self) -> bool:
"""回滚所有参与者"""
all_rolled_back = True
for participant_id, participant in self.participants.items():
if participant_id in self.prepared_participants:
try:
success = participant.rollback(self.transaction_id, self.context)
self.transaction_log.log_participant_rollback(self.transaction_id, participant_id, success)
if not success:
all_rolled_back = False
logger.error(f"Participant {participant_id} rollback failed")
except Exception as e:
all_rolled_back = False
logger.error(f"Exception during participant {participant_id} rollback: {str(e)}")
self.transaction_log.log_participant_rollback(self.transaction_id, participant_id, False)
return all_rolled_back
def _compensate_all(self) -> bool:
"""补偿所有已提交的参与者"""
all_compensated = True
compensator = CompensatorFactory.get_compensator(self.compensation_strategy)
for participant_id, participant in self.participants.items():
if participant_id in self.committed_participants:
try:
success = compensator.compensate(self, participant, participant_id, self.context)
# 对于定时补偿和手动补偿,这里只记录调度成功
# 实际的补偿结果会在后续处理中记录
if self.compensation_strategy in [CompensationStrategy.IMMEDIATE,
CompensationStrategy.RETRY_THEN_COMPENSATE]:
self.transaction_log.log_participant_compensation(
self.transaction_id, participant_id, success
)
if not success:
all_compensated = False
logger.error(f"Participant {participant_id} compensation failed")
except Exception as e:
all_compensated = False
logger.error(f"Exception during participant {participant_id} compensation: {str(e)}")
self.transaction_log.log_participant_compensation(
self.transaction_id, participant_id, False
)
return all_compensated
def to_dict(self) -> Dict[str, Any]:
"""转换为字典表示"""
return {
"transaction_id": self.transaction_id,
"status": self.status.name,
"compensation_strategy": self.compensation_strategy.name,
"participants": list(self.participants.keys()),
"prepared_participants": list(self.prepared_participants),
"committed_participants": list(self.committed_participants),
"context": self.context,
"created_at": self.created_at,
"started_at": self.started_at,
"completed_at": self.completed_at
}
class TransactionManager:
"""
事务管理器
负责创建和管理分布式事务
"""
def __init__(self, transaction_log: Optional[TransactionLog] = None):
"""
初始化事务管理器
Args:
transaction_log: 事务日志,默认为内存日志
"""
self.transaction_log = transaction_log or InMemoryTransactionLog()
self.active_transactions: Dict[str, DistributedTransaction] = {}
self.completed_transactions: Dict[str, DistributedTransaction] = {}
self._lock = threading.RLock()
def create_transaction(self, transaction_id: Optional[str] = None,
compensation_strategy: CompensationStrategy = CompensationStrategy.IMMEDIATE)
-> DistributedTransaction:
"""
创建分布式事务
Args:
transaction_id: 事务ID不提供则自动生成
compensation_strategy: 补偿策略
Returns:
创建的事务实例
"""
if transaction_id is None:
transaction_id = str(uuid.uuid4())
transaction = DistributedTransaction(transaction_id, self.transaction_log, compensation_strategy)
with self._lock:
self.active_transactions[transaction_id] = transaction
return transaction
def begin_transaction(self, transaction_id: str, context: Optional[Dict[str, Any]] = None) -> bool:
"""
开始事务
Args:
transaction_id: 事务ID
context: 事务上下文
Returns:
是否成功开始
"""
with self._lock:
transaction = self.active_transactions.get(transaction_id)
if not transaction:
logger.error(f"Transaction {transaction_id} not found")
return False
try:
transaction.begin(context)
return True
except Exception as e:
logger.error(f"Failed to begin transaction {transaction_id}: {str(e)}")
return False
def commit_transaction(self, transaction_id: str) -> bool:
"""
提交事务
Args:
transaction_id: 事务ID
Returns:
是否成功提交
"""
with self._lock:
transaction = self.active_transactions.get(transaction_id)
if not transaction:
logger.error(f"Transaction {transaction_id} not found")
return False
success = transaction.commit()
with self._lock:
if transaction.status in [TransactionStatus.COMMITTED, TransactionStatus.ROLLED_BACK,
TransactionStatus.COMPENSATED, TransactionStatus.FAILED]:
self.active_transactions.pop(transaction_id, None)
self.completed_transactions[transaction_id] = transaction
return success
def rollback_transaction(self, transaction_id: str) -> bool:
"""
回滚事务
Args:
transaction_id: 事务ID
Returns:
是否成功回滚
"""
with self._lock:
transaction = self.active_transactions.get(transaction_id)
if not transaction:
logger.error(f"Transaction {transaction_id} not found")
return False
success = transaction.rollback()
with self._lock:
if transaction.status in [TransactionStatus.ROLLED_BACK, TransactionStatus.FAILED]:
self.active_transactions.pop(transaction_id, None)
self.completed_transactions[transaction_id] = transaction
return success
def compensate_transaction(self, transaction_id: str) -> bool:
"""
补偿事务
Args:
transaction_id: 事务ID
Returns:
是否成功补偿
"""
with self._lock:
transaction = self.active_transactions.get(transaction_id) or \
self.completed_transactions.get(transaction_id)
if not transaction:
logger.error(f"Transaction {transaction_id} not found")
return False
success = transaction.compensate()
with self._lock:
if transaction.status in [TransactionStatus.COMPENSATED, TransactionStatus.FAILED]:
if transaction_id in self.active_transactions:
self.active_transactions.pop(transaction_id)
self.completed_transactions[transaction_id] = transaction
return success
def get_transaction(self, transaction_id: str) -> Optional[DistributedTransaction]:
"""
获取事务
Args:
transaction_id: 事务ID
Returns:
事务实例如果不存在则返回None
"""
with self._lock:
return self.active_transactions.get(transaction_id) or \
self.completed_transactions.get(transaction_id)
def get_active_transactions(self) -> List[DistributedTransaction]:
"""
获取所有活跃事务
Returns:
活跃事务列表
"""
with self._lock:
return list(self.active_transactions.values())
def get_completed_transactions(self) -> List[DistributedTransaction]:
"""
获取所有已完成事务
Returns:
已完成事务列表
"""
with self._lock:
return list(self.completed_transactions.values())
def shutdown(self) -> None:
"""关闭事务管理器"""
CompensatorFactory.shutdown_all()
# 示例:实现一个电商订单处理的分布式事务
class InventoryParticipant(TransactionParticipant):
"""库存服务参与者"""
def __init__(self):
self.inventory = {
"product-1": 10,
"product-2": 5
}
self.reservations: Dict[str, Dict[str, Any]] = {}
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""准备预留库存"""
order = context.get("order", {})
product_id = order.get("product_id")
quantity = order.get("quantity", 0)
if not product_id or product_id not in self.inventory or self.inventory[product_id] < quantity:
logger.error(f"Insufficient inventory for {product_id}")
return False
# 预留库存
self.reservations[transaction_id] = {
"product_id": product_id,
"quantity": quantity
}
# 预扣库存(准备阶段不实际扣减,只做预留标记)
# 在实际系统中,这里可能会有乐观锁或其他并发控制机制
logger.info(f"Prepared inventory reservation for {product_id}, quantity: {quantity}")
return True
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""提交库存预留(实际扣减库存)"""
reservation = self.reservations.get(transaction_id)
if not reservation:
logger.error(f"Reservation not found for transaction {transaction_id}")
return False
product_id = reservation["product_id"]
quantity = reservation["quantity"]
# 实际扣减库存
self.inventory[product_id] -= quantity
logger.info(f"Committed inventory reservation: {product_id}, quantity: {quantity}")
return True
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""回滚库存预留"""
# 移除预留记录即可
if transaction_id in self.reservations:
del self.reservations[transaction_id]
logger.info(f"Rolled back inventory reservation for transaction {transaction_id}")
return True
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""补偿库存(恢复库存)"""
reservation = self.reservations.get(transaction_id)
if reservation:
product_id = reservation["product_id"]
quantity = reservation["quantity"]
# 恢复库存
self.inventory[product_id] += quantity
logger.info(f"Compensated inventory: {product_id}, quantity: {quantity}")
return True
return False
class PaymentParticipant(TransactionParticipant):
"""支付服务参与者"""
def __init__(self):
self.pending_payments: Dict[str, Dict[str, Any]] = {}
self.completed_payments: Dict[str, Dict[str, Any]] = {}
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""准备支付"""
order = context.get("order", {})
user_id = order.get("user_id")
amount = order.get("amount", 0)
if not user_id or amount <= 0:
logger.error(f"Invalid payment information")
return False
# 验证支付信息,但不实际扣款
self.pending_payments[transaction_id] = {
"user_id": user_id,
"amount": amount
}
logger.info(f"Prepared payment for user {user_id}, amount: {amount}")
return True
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""提交支付(实际扣款)"""
payment = self.pending_payments.get(transaction_id)
if not payment:
logger.error(f"Pending payment not found for transaction {transaction_id}")
return False
# 模拟支付处理
payment_id = f"pay-{uuid.uuid4()}"
payment["payment_id"] = payment_id
# 移到已完成支付
self.completed_payments[transaction_id] = payment
del self.pending_payments[transaction_id]
logger.info(f"Committed payment: {payment_id}")
return True
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""回滚支付"""
# 移除待处理支付记录
if transaction_id in self.pending_payments:
del self.pending_payments[transaction_id]
logger.info(f"Rolled back pending payment for transaction {transaction_id}")
return True
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""补偿支付(退款)"""
payment = self.completed_payments.get(transaction_id)
if payment:
# 模拟退款处理
refund_id = f"refund-{uuid.uuid4()}"
logger.info(f"Compensated payment: refund {refund_id} for {payment['amount']}")
# 在实际系统中这里应该调用退款API
return True
return False
class ShippingParticipant(TransactionParticipant):
"""物流服务参与者"""
def __init__(self):
self.pending_shipments: Dict[str, Dict[str, Any]] = {}
self.confirmed_shipments: Dict[str, Dict[str, Any]] = {}
def prepare(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""准备物流订单"""
order = context.get("order", {})
order_id = order.get("order_id")
address = order.get("shipping_address")
if not order_id or not address:
logger.error(f"Invalid shipping information")
return False
# 验证物流信息
self.pending_shipments[transaction_id] = {
"order_id": order_id,
"address": address
}
logger.info(f"Prepared shipping for order {order_id}")
return True
def commit(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""提交物流订单"""
shipment = self.pending_shipments.get(transaction_id)
if not shipment:
logger.error(f"Pending shipment not found for transaction {transaction_id}")
return False
# 确认物流订单
shipment_id = f"ship-{uuid.uuid4()}"
shipment["shipment_id"] = shipment_id
# 移到已确认物流
self.confirmed_shipments[transaction_id] = shipment
del self.pending_shipments[transaction_id]
logger.info(f"Committed shipment: {shipment_id}")
return True
def rollback(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""回滚物流订单"""
# 移除待处理物流记录
if transaction_id in self.pending_shipments:
del self.pending_shipments[transaction_id]
logger.info(f"Rolled back pending shipment for transaction {transaction_id}")
return True
def compensate(self, transaction_id: str, context: Dict[str, Any]) -> bool:
"""补偿物流(取消物流)"""
shipment = self.confirmed_shipments.get(transaction_id)
if shipment:
# 模拟取消物流
logger.info(f"Compensated shipment: cancelled {shipment['shipment_id']}")
# 在实际系统中这里应该调用取消物流API
return True
return False
# 示例使用
if __name__ == "__main__":
print("===== 分布式事务框架示例 =====")
# 创建事务管理器
transaction_manager = TransactionManager()
# 创建服务参与者
inventory_participant = InventoryParticipant()
payment_participant = PaymentParticipant()
shipping_participant = ShippingParticipant()
# 测试场景1成功的事务
print("\n--- 测试场景1: 成功的事务 ---")
# 创建事务
tx1 = transaction_manager.create_transaction(
transaction_id="tx-success",
compensation_strategy=CompensationStrategy.IMMEDIATE
)
# 添加参与者
tx1.add_participant("inventory", inventory_participant)
tx1.add_participant("payment", payment_participant)
tx1.add_participant("shipping", shipping_participant)
# 定义事务上下文
order_context = {
"order": {
"order_id": "order-1",
"user_id": "user-1",
"product_id": "product-1",
"quantity": 2,
"amount": 199.98,
"shipping_address": {
"street": "123 Main St",
"city": "Any City",
"zip": "12345"
}
}
}
# 开始事务
tx1.begin(order_context)
# 提交事务
success = transaction_manager.commit_transaction("tx-success")
print(f"Transaction result: {'Success' if success else 'Failed'}")
print(f"Final transaction status: {tx1.status.name}")
# 测试场景2失败的事务支付失败
print("\n--- 测试场景2: 失败的事务(支付失败) ---")
# 修改支付参与者使其在特定金额下失败
original_prepare = payment_participant.prepare
def failing_prepare(transaction_id, context):
order = context.get("order", {})
if order.get("amount") == 9999.99:
logger.error("Payment preparation failed for test purposes")
return False
return original_prepare(transaction_id, context)
payment_participant.prepare = failing_prepare
# 创建事务
tx2 = transaction_manager.create_transaction(
transaction_id="tx-fail",
compensation_strategy=CompensationStrategy.IMMEDIATE
)
# 添加参与者
tx2.add_participant("inventory", inventory_participant)
tx2.add_participant("payment", payment_participant)
tx2.add_participant("shipping", shipping_participant)
# 定义失败的事务上下文
failed_order_context = {
"order": {
"order_id": "order-2",
"user_id": "user-1",
"product_id": "product-2",
"quantity": 1,
"amount": 9999.99, # 触发支付失败
"shipping_address": {
"street": "123 Main St",
"city": "Any City",
"zip": "12345"
}
}
}
# 开始事务
tx2.begin(failed_order_context)
# 提交事务(应该失败)
success = transaction_manager.commit_transaction("tx-fail")
print(f"Transaction result: {'Success' if success else 'Failed'}")
print(f"Final transaction status: {tx2.status.name}")
# 测试场景3使用不同的补偿策略
print("\n--- 测试场景3: 使用重试后补偿策略 ---")
# 恢复支付参与者
payment_participant.prepare = original_prepare
# 创建使用重试后补偿策略的事务
tx3 = transaction_manager.create_transaction(
transaction_id="tx-retry-compensate",
compensation_strategy=CompensationStrategy.RETRY_THEN_COMPENSATE
)
# 添加参与者
tx3.add_participant("inventory", inventory_participant)
tx3.add_participant("payment", payment_participant)
tx3.add_participant("shipping", shipping_participant)
# 开始并提交事务
tx3.begin(order_context)
success = transaction_manager.commit_transaction("tx-retry-compensate")
print(f"Transaction result: {'Success' if success else 'Failed'}")
print(f"Final transaction status: {tx3.status.name}")
# 测试场景4手动补偿
print("\n--- 测试场景4: 手动补偿 ---")
# 创建使用手动补偿策略的事务
tx4 = transaction_manager.create_transaction(
transaction_id="tx-manual",
compensation_strategy=CompensationStrategy.MANUAL
)
# 添加参与者
tx4.add_participant("inventory", inventory_participant)
tx4.add_participant("payment", payment_participant)
tx4.add_participant("shipping", shipping_participant)
# 开始并提交事务
tx4.begin(order_context)
success = transaction_manager.commit_transaction("tx-manual")
print(f"Transaction result: {'Success' if success else 'Failed'}")
print(f"Final transaction status: {tx4.status.name}")
# 尝试手动补偿
if tx4.status == TransactionStatus.COMMITTED:
print("\n执行手动补偿...")
# 在实际系统中,这里应该通过管理界面触发
manual_compensator = CompensatorFactory.get_compensator(CompensationStrategy.MANUAL)
if isinstance(manual_compensator, ManualCompensator):
# 获取待补偿项目并执行
pending = manual_compensator.get_pending_compensations()
print(f"待手动补偿项目数量: {len(pending)}")
# 统计信息
print("\n--- 事务统计 ---")
print(f"活跃事务数量: {len(transaction_manager.get_active_transactions())}")
print(f"已完成事务数量: {len(transaction_manager.get_completed_transactions())}")
# 关闭事务管理器
transaction_manager.shutdown()
print("\n分布式事务框架已关闭")