You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
273 lines
11 KiB
273 lines
11 KiB
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
多视角分析引擎
|
|
整合技术分析、基本面分析和风险管理三个视角的分析结果
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass
|
|
|
|
from utils import call_llm
|
|
from models.analysis_models import (
|
|
MultiPerspectiveAnalysis,
|
|
MarketAnalysis,
|
|
SectorAnalysis,
|
|
StockRecommendation,
|
|
RiskManagement,
|
|
TimeframeOutlook,
|
|
StructuredAnalysisResult
|
|
)
|
|
from prompts.localized_prompts import LocalizedPrompts, PromptOptimizer
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@dataclass
|
|
class AgentConfig:
|
|
"""智能体配置"""
|
|
name: str
|
|
role: str
|
|
system_prompt: str
|
|
description: str
|
|
parameters: Dict[str, Any]
|
|
examples: List[Dict[str, str]]
|
|
tags: List[str]
|
|
language: str
|
|
|
|
class MultiPerspectiveEngine:
|
|
"""多视角分析引擎"""
|
|
|
|
def __init__(self, agents_dir: str = "prompts/agents"):
|
|
self.agents_dir = Path(agents_dir)
|
|
self.agents: Dict[str, AgentConfig] = {}
|
|
self._load_agents()
|
|
|
|
def _load_agents(self):
|
|
"""加载智能体配置"""
|
|
try:
|
|
for agent_file in self.agents_dir.glob("*.json"):
|
|
with open(agent_file, 'r', encoding='utf-8') as f:
|
|
config_data = json.load(f)
|
|
agent_config = AgentConfig(**config_data)
|
|
self.agents[agent_config.role] = agent_config
|
|
logger.info(f"已加载智能体: {agent_config.name} ({agent_config.role})")
|
|
except Exception as e:
|
|
logger.error(f"加载智能体配置失败: {e}")
|
|
|
|
def _build_agent_prompt(self, agent_role: str, market_data: Dict[str, Any]) -> str:
|
|
"""构建特定智能体的分析提示"""
|
|
if agent_role not in self.agents:
|
|
raise ValueError(f"未找到智能体: {agent_role}")
|
|
|
|
# 使用本土化提示词
|
|
if agent_role == 'technical_analyst':
|
|
base_prompt = LocalizedPrompts.get_technical_analysis_prompt(market_data)
|
|
elif agent_role == 'fundamental_analyst':
|
|
base_prompt = LocalizedPrompts.get_fundamental_analysis_prompt(market_data)
|
|
elif agent_role == 'risk_analyst':
|
|
base_prompt = LocalizedPrompts.get_risk_management_prompt(market_data)
|
|
else:
|
|
# 回退到原有方式
|
|
agent = self.agents[agent_role]
|
|
base_prompt = f"{agent.system_prompt}\n\n"
|
|
|
|
# 添加具体市场数据
|
|
data_section = self._format_market_data(market_data)
|
|
|
|
# 判断市场状况并优化提示词
|
|
market_condition = self._assess_market_condition(market_data)
|
|
optimized_prompt = PromptOptimizer.optimize_for_market_condition(base_prompt, market_condition)
|
|
|
|
# 添加风险提示
|
|
final_prompt = PromptOptimizer.add_risk_warning(optimized_prompt)
|
|
|
|
# 组合最终提示词
|
|
complete_prompt = f"{final_prompt}\n\n{data_section}\n\n请基于以上信息提供结构化的专业分析。"
|
|
|
|
return complete_prompt
|
|
|
|
def _format_market_data(self, market_data: Dict[str, Any]) -> str:
|
|
"""格式化市场数据"""
|
|
data_text = "### 📊 当前市场数据\n\n"
|
|
|
|
# 市场情绪数据
|
|
if 'market_sentiment' in market_data:
|
|
sentiment = market_data['market_sentiment']
|
|
data_text += "**市场情绪指标**\n"
|
|
data_text += f"- 市场情绪指数(MSCI): {sentiment.get('index', 'N/A')}\n"
|
|
data_text += f"- 5日动量趋势: {sentiment.get('momentum_5d', 'N/A')}%\n"
|
|
data_text += f"- 市场波动率: {sentiment.get('volatility', 'N/A')}%\n"
|
|
data_text += f"- 成交量比率: {sentiment.get('volume_ratio', 'N/A')}x\n\n"
|
|
|
|
# 宏观经济数据
|
|
if 'macro_economy' in market_data:
|
|
macro = market_data['macro_economy']
|
|
data_text += "**宏观经济环境**\n"
|
|
data_text += f"- 基准利率: {macro.get('interest_rate', 'N/A')}%\n"
|
|
data_text += f"- 通胀水平: {macro.get('inflation', 'N/A')}%\n"
|
|
data_text += f"- GDP增速: {macro.get('gdp_growth', 'N/A')}%\n"
|
|
data_text += f"- 货币强度: {macro.get('currency_strength', 'N/A')}/100\n"
|
|
data_text += f"- 市场流动性: {macro.get('market_liquidity', 'N/A')}/100\n\n"
|
|
|
|
# 新闻情感数据
|
|
if 'news_sentiment' in market_data:
|
|
news = market_data['news_sentiment']
|
|
data_text += "**新闻情感分析**\n"
|
|
data_text += f"- 整体情感倾向: {news.get('sentiment', 'N/A')}\n"
|
|
data_text += f"- 正面消息占比: {news.get('positive_ratio', 0):.1%}\n"
|
|
data_text += f"- 负面消息占比: {news.get('negative_ratio', 0):.1%}\n"
|
|
if news.get('keywords'):
|
|
data_text += f"- 热点关键词: {', '.join(news['keywords'][:5])}\n"
|
|
data_text += "\n"
|
|
|
|
# 行业数据
|
|
if 'industry_data' in market_data:
|
|
industries = market_data['industry_data']
|
|
data_text += "**行业相对强度指数(IRSI)排名**\n"
|
|
for i, industry in enumerate(industries[:5], 1):
|
|
irsi = industry.get('irsi', 0)
|
|
strength = "强势" if irsi > 60 else "中性" if irsi > 40 else "弱势"
|
|
data_text += f"{i}. {industry.get('name', 'N/A')}: {irsi:.1f} ({strength})\n"
|
|
data_text += "\n"
|
|
|
|
# 个股数据
|
|
if 'stock_data' in market_data:
|
|
stocks = market_data['stock_data']
|
|
data_text += "**优秀个股TOP5 (RTSI排名)**\n"
|
|
for i, stock in enumerate(stocks[:5], 1):
|
|
rtsi = stock.get('rtsi', 0)
|
|
rating = "强烈推荐" if rtsi > 80 else "推荐" if rtsi > 65 else "中性" if rtsi > 50 else "观望"
|
|
data_text += f"{i}. {stock.get('name', 'N/A')}({stock.get('code', 'N/A')}): RTSI {rtsi:.1f} ({rating})\n"
|
|
data_text += "\n"
|
|
|
|
# 历史数据
|
|
if 'historical_data' in market_data:
|
|
historical = market_data['historical_data']
|
|
if historical.get('msci_trend'):
|
|
msci_trend = historical['msci_trend']
|
|
if len(msci_trend) >= 2:
|
|
trend_change = msci_trend[-1] - msci_trend[0]
|
|
trend_desc = "上升" if trend_change > 0 else "下降" if trend_change < 0 else "平稳"
|
|
data_text += f"**历史趋势**: 近期MSCI趋势{trend_desc} ({trend_change:+.1f}点)\n\n"
|
|
|
|
return data_text
|
|
|
|
def _assess_market_condition(self, market_data: Dict[str, Any]) -> str:
|
|
"""评估市场状况"""
|
|
try:
|
|
# 基于MSCI指数判断市场状况
|
|
msci = 50 # 默认值
|
|
if 'market_sentiment' in market_data:
|
|
msci = market_data['market_sentiment'].get('index', 50)
|
|
|
|
if msci > 70:
|
|
return "牛市"
|
|
elif msci < 30:
|
|
return "熊市"
|
|
else:
|
|
return "震荡市"
|
|
|
|
except Exception as e:
|
|
logger.warning(f"评估市场状况失败: {e}")
|
|
return "震荡市"
|
|
|
|
def analyze_single_perspective(self, agent_role: str, market_data: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""单一视角分析"""
|
|
try:
|
|
prompt = self._build_agent_prompt(agent_role, market_data)
|
|
|
|
# 调用LLM进行分析
|
|
result = call_llm(
|
|
prompt=prompt,
|
|
pydantic_model=StructuredAnalysisResult,
|
|
max_retries=3
|
|
)
|
|
|
|
return {
|
|
'agent_role': agent_role,
|
|
'agent_name': self.agents[agent_role].name,
|
|
'analysis': result,
|
|
'timestamp': market_data.get('timestamp')
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"{agent_role}分析失败: {e}")
|
|
return {
|
|
'agent_role': agent_role,
|
|
'agent_name': self.agents.get(agent_role, {}).get('name', 'Unknown'),
|
|
'analysis': None,
|
|
'error': str(e),
|
|
'timestamp': market_data.get('timestamp')
|
|
}
|
|
|
|
def analyze_multi_perspective(self, market_data: Dict[str, Any]) -> MultiPerspectiveAnalysis:
|
|
"""多视角综合分析"""
|
|
try:
|
|
# 并行执行三个视角的分析
|
|
perspectives = ['technical_analyst', 'fundamental_analyst', 'risk_analyst']
|
|
|
|
results = {}
|
|
for perspective in perspectives:
|
|
if perspective in self.agents:
|
|
result = self.analyze_single_perspective(perspective, market_data)
|
|
results[perspective] = result
|
|
|
|
# 构建多视角分析结果
|
|
multi_analysis = MultiPerspectiveAnalysis(
|
|
technical_perspective=results.get('technical_analyst', {}).get('analysis'),
|
|
fundamental_perspective=results.get('fundamental_analyst', {}).get('analysis'),
|
|
risk_perspective=results.get('risk_analyst', {}).get('analysis'),
|
|
consensus_view=self._generate_consensus(results),
|
|
confidence_score=self._calculate_confidence(results)
|
|
)
|
|
|
|
return multi_analysis
|
|
|
|
except Exception as e:
|
|
logger.error(f"多视角分析失败: {e}")
|
|
raise
|
|
|
|
def _generate_consensus(self, results: Dict[str, Any]) -> str:
|
|
"""生成共识观点"""
|
|
try:
|
|
# 简单的共识生成逻辑
|
|
valid_results = [r for r in results.values() if r.get('analysis') is not None]
|
|
|
|
if not valid_results:
|
|
return "分析数据不足,无法形成共识观点"
|
|
|
|
# 这里可以实现更复杂的共识算法
|
|
# 目前简单返回一个基于多视角的综合判断
|
|
return f"基于{len(valid_results)}个分析视角的综合判断,建议谨慎操作,关注风险控制。"
|
|
|
|
except Exception as e:
|
|
logger.error(f"生成共识观点失败: {e}")
|
|
return "共识观点生成失败"
|
|
|
|
def _calculate_confidence(self, results: Dict[str, Any]) -> float:
|
|
"""计算置信度"""
|
|
try:
|
|
valid_results = [r for r in results.values() if r.get('analysis') is not None]
|
|
|
|
if not valid_results:
|
|
return 0.0
|
|
|
|
# 基于有效分析结果数量计算置信度
|
|
confidence = len(valid_results) / len(self.agents) * 0.8
|
|
|
|
return min(confidence, 1.0)
|
|
|
|
except Exception as e:
|
|
logger.error(f"计算置信度失败: {e}")
|
|
return 0.0
|
|
|
|
def get_available_agents(self) -> List[str]:
|
|
"""获取可用的智能体列表"""
|
|
return list(self.agents.keys())
|
|
|
|
def get_agent_info(self, agent_role: str) -> Optional[AgentConfig]:
|
|
"""获取智能体信息"""
|
|
return self.agents.get(agent_role) |