ADD file via upload

main
pghfovyix 3 months ago
parent bdb2812c81
commit e31618fb85

@ -0,0 +1,113 @@
"""
工具编排器核心模块
采用策略模式支持动态工具加载
"""
import importlib
from abc import ABC, abstractmethod
from typing import Dict, List, Any
import logging
logger = logging.getLogger(__name__)
class ToolInterface(ABC):
"""工具接口,所有工具必须实现此接口"""
@abstractmethod
def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
pass
@property
@abstractmethod
def name(self) -> str:
pass
@property
@abstractmethod
def version(self) -> str:
pass
class ToolOrchestrator:
"""重构后的工具编排器"""
def __init__(self, config_path: str = None):
self.tools: Dict[str, ToolInterface] = {}
self.config = self._load_config(config_path)
self.logger = logging.getLogger(self.__class__.__name__)
def _load_config(self, config_path: str) -> Dict:
"""加载配置文件"""
import yaml
import os
if config_path is None:
config_path = os.path.join(
os.path.dirname(__file__),
'../../config/tools.yaml'
)
try:
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
except FileNotFoundError:
self.logger.warning(f"配置文件未找到: {config_path}")
return {"tools": []}
def register_tool(self, tool: ToolInterface):
"""注册工具"""
if tool.name in self.tools:
self.logger.warning(f"工具 {tool.name} 已存在,将被覆盖")
self.tools[tool.name] = tool
self.logger.info(f"注册工具: {tool.name} v{tool.version}")
def load_tools_from_config(self):
"""从配置文件动态加载工具"""
if "tools" not in self.config:
return
for tool_config in self.config["tools"]:
try:
module_name = tool_config["module"]
class_name = tool_config["class"]
# 动态导入模块
module = importlib.import_module(module_name)
tool_class = getattr(module, class_name)
# 实例化工具
tool_instance = tool_class()
self.register_tool(tool_instance)
except (ImportError, AttributeError, KeyError) as e:
self.logger.error(f"加载工具失败 {tool_config}: {str(e)}")
def execute_workflow(self, workflow: List[Dict]) -> List[Dict]:
"""执行工作流"""
results = []
for step in workflow:
tool_name = step.get("tool")
params = step.get("params", {})
if tool_name not in self.tools:
self.logger.error(f"工具未找到: {tool_name}")
continue
try:
tool = self.tools[tool_name]
self.logger.info(f"执行工具: {tool_name}")
result = tool.execute(params)
results.append({
"tool": tool_name,
"status": "success",
"result": result
})
except Exception as e:
self.logger.error(f"工具执行失败 {tool_name}: {str(e)}")
results.append({
"tool": tool_name,
"status": "failed",
"error": str(e)
})
return results
Loading…
Cancel
Save