You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

617 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import requests
from bs4 import BeautifulSoup
import json
import time
import re
import pandas as pd
import random
from urllib.parse import quote, urljoin
from datetime import datetime
import logging
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class HuxiuSpider:
def __init__(self):
self.session = requests.Session()
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
self.base_url = "https://www.huxiu.com"
def random_delay(self):
"""随机延迟"""
time.sleep(random.uniform(2, 4))
def get_with_retry(self, url, retries=3):
"""带重试的请求"""
for attempt in range(retries):
try:
response = self.session.get(url, headers=self.headers, timeout=15)
if response.status_code == 200:
return response
else:
logger.warning(f"请求返回状态码 {response.status_code}: {url}")
except Exception as e:
logger.warning(f"请求失败 {url} (尝试 {attempt + 1}/{retries}): {e}")
if attempt < retries - 1:
time.sleep(2 ** attempt)
return None
def get_articles_from_homepage(self):
"""从首页获取文章"""
try:
logger.info("从虎嗅网首页获取文章...")
response = self.get_with_retry(self.base_url)
if not response:
return []
soup = BeautifulSoup(response.text, 'html.parser')
articles = []
# 查找文章卡片
article_selectors = [
'.article-item',
'.mod-art',
'.vertical-article',
'.article-card',
'.newsfeed-item'
]
for selector in article_selectors:
items = soup.select(selector)
if items:
logger.info(f"找到 {len(items)} 个文章项目,使用选择器: {selector}")
for item in items[:20]: # 限制数量
article = self.parse_article_card(item)
if article:
articles.append(article)
break
return articles
except Exception as e:
logger.error(f"从首页获取文章时出错: {e}")
return []
def parse_article_card(self, item):
"""解析文章卡片"""
try:
# 查找标题
title_elem = item.find('a', href=re.compile(r'/article/\d+\.html'))
if not title_elem:
return None
title = title_elem.get_text().strip()
link = title_elem.get('href')
if link and not link.startswith('http'):
link = urljoin(self.base_url, link)
# 查找摘要
summary_elem = item.find('p', class_=re.compile(r'brief|summary|desc'))
if not summary_elem:
summary_elem = item.find('div', class_=re.compile(r'brief|summary|desc'))
summary = summary_elem.get_text().strip() if summary_elem else ""
# 查找作者和时间
author_elem = item.find(['span', 'a'], class_=re.compile(r'author|writer'))
author = author_elem.get_text().strip() if author_elem else ""
time_elem = item.find('span', class_=re.compile(r'time|date'))
publish_time = time_elem.get_text().strip() if time_elem else ""
return {
'title': title,
'link': link,
'summary': summary,
'author': author,
'publish_time': publish_time,
'source': 'homepage'
}
except Exception as e:
logger.error(f"解析文章卡片时出错: {e}")
return None
def search_articles_direct(self, keywords):
"""直接搜索文章"""
all_articles = []
for keyword in keywords:
logger.info(f"直接搜索关键词: {keyword}")
try:
# 使用虎嗅的搜索页面
search_url = f"https://www.huxiu.com/search.html?keyword={quote(keyword)}"
response = self.get_with_retry(search_url)
if not response:
continue
soup = BeautifulSoup(response.text, 'html.parser')
# 解析搜索结果
search_results = soup.find_all('div', class_=re.compile(r'article-item|search-result'))
for item in search_results:
article = self.parse_search_result(item, keyword)
if article:
all_articles.append(article)
logger.info(f"关键词 '{keyword}' 找到 {len(search_results)} 个结果")
self.random_delay()
except Exception as e:
logger.error(f"搜索关键词 '{keyword}' 时出错: {e}")
continue
return all_articles
def parse_search_result(self, item, keyword):
"""解析搜索结果"""
try:
title_elem = item.find('a', href=re.compile(r'/article/\d+\.html'))
if not title_elem:
return None
title = title_elem.get_text().strip()
link = title_elem.get('href')
if link and not link.startswith('http'):
link = urljoin(self.base_url, link)
# 检查标题是否包含关键词
if not any(word in title for word in ['大模型', 'LLM', '语言模型', 'GPT', 'ChatGPT', 'AI', '人工智能']):
return None
summary_elem = item.find('p', class_=re.compile(r'brief|summary'))
summary = summary_elem.get_text().strip() if summary_elem else ""
return {
'title': title,
'link': link,
'summary': summary,
'keyword': keyword,
'source': 'direct_search'
}
except Exception as e:
logger.error(f"解析搜索结果时出错: {e}")
return None
def get_article_content(self, article_url):
"""获取文章详细内容"""
try:
logger.info(f"获取文章内容: {article_url}")
response = self.get_with_retry(article_url)
if not response:
return self.get_empty_content()
soup = BeautifulSoup(response.text, 'html.parser')
# 查找文章内容区域
content_selectors = [
'.article-content-wrap',
'.article-content',
'.article-detail-content',
'.article-main-content',
'.content'
]
content = ""
for selector in content_selectors:
content_elem = soup.select_one(selector)
if content_elem:
# 清理脚本和样式
for tag in content_elem(['script', 'style', 'nav', 'footer', 'aside']):
tag.decompose()
content = content_elem.get_text().strip()
content = re.sub(r'\s+', ' ', content)
if len(content) > 200:
break
# 如果没找到内容,尝试其他选择器
if not content or len(content) < 200:
# 尝试获取所有段落
paragraphs = soup.find_all('p')
content = ' '.join([p.get_text().strip() for p in paragraphs if len(p.get_text().strip()) > 20])
content = re.sub(r'\s+', ' ', content)
main_points = self.extract_main_points(content)
return {
'full_content': content,
'main_points': main_points,
'content_length': len(content),
'source_method': 'web'
}
except Exception as e:
logger.error(f"获取文章内容时出错 {article_url}: {e}")
return self.get_empty_content()
def get_empty_content(self):
"""返回空内容结构"""
return {
'full_content': "无法获取内容",
'main_points': "无法提取主要观点",
'content_length': 0,
'source_method': 'failed'
}
def extract_main_points(self, content):
"""提取主要观点"""
if not content or len(content) < 50:
return "内容过短,无法提取主要观点"
# 分割成句子
sentences = re.split(r'[。!?.!?]', content)
sentences = [s.strip() for s in sentences if len(s.strip()) > 15]
# 寻找包含关键词的句子
key_sentences = []
keyword_groups = [
['大模型', 'LLM', '语言模型'],
['GPT', 'ChatGPT', 'OpenAI'],
['人工智能', 'AI', '智能'],
['认为', '观点', '应该', '需要', '重要', '关键'],
['趋势', '发展', '未来', '前景']
]
for sentence in sentences:
for keywords in keyword_groups:
if any(keyword in sentence for keyword in keywords):
if sentence not in key_sentences:
key_sentences.append(sentence)
break
# 如果关键词句子不够,取前几个有意义的句子
if len(key_sentences) < 3:
key_sentences.extend([s for s in sentences if len(s) > 30][:5-len(key_sentences)])
# 去重并限制长度
unique_sentences = []
seen = set()
for s in key_sentences:
if s not in seen:
seen.add(s)
unique_sentences.append(s)
main_points = "".join(unique_sentences[:5]) + ""
# 如果内容较短直接返回前300字符
if len(main_points) < 50:
main_points = content[:300] + "..." if len(content) > 300 else content
return main_points
def analyze_article_themes(self, content, title):
"""分析文章主题"""
themes = []
content_lower = content.lower()
title_lower = title.lower()
theme_keywords = {
'技术发展': ['技术', '算法', '架构', '训练', '参数', '模型结构', 'transformer', '神经网络'],
'商业应用': ['商业', '应用', '落地', '场景', '企业', '客户', '产品', '服务', '商业化'],
'投资融资': ['投资', '融资', '资本', '基金', '估值', '融资', '市值', '投资'],
'市场竞争': ['竞争', '市场', '对手', '领先', '优势', '市场份额', 'bat', '微软', '谷歌'],
'政策监管': ['政策', '监管', '法规', '合规', '政府', '立法', '安全', '伦理'],
'挑战风险': ['挑战', '风险', '问题', '困难', '局限', '不足', '缺陷', '担忧'],
'未来趋势': ['趋势', '未来', '预测', '展望', '方向', '发展', '前景', '机会'],
'开源生态': ['开源', '社区', '生态', '开放', '贡献', '协作', '开源'],
'行业影响': ['行业', '影响', '变革', '革命', '颠覆', '创新', '改变']
}
for theme, keywords in theme_keywords.items():
if any(keyword in content_lower or keyword in title_lower for keyword in keywords):
themes.append(theme)
# 如果没有找到主题,根据内容推断
if not themes:
if len(content) > 500:
themes.append('综合讨论')
else:
themes.append('简要报道')
return themes
def extract_key_data(self, content):
"""提取关键数据"""
key_data = []
# 提取数字相关数据
patterns = [
(r'(\d+(?:\.\d+)?)[亿万]*(?:个|款|家|项|种)(?:大模型|模型)', '模型数量'),
(r'参数[^\d]*(\d+(?:\.\d+)?)[亿万]*', '参数规模'),
(r'投资[^\d]*(\d+(?:\.\d+)?)[亿万]*(?:元|美元)', '投资金额'),
(r'市场[^\d]*(\d+(?:\.\d+)?)[亿万]*(?:元|美元)', '市场规模'),
(r'增长[^\d]*(\d+(?:\.\d+)?)%', '增长率'),
(r'(\d+(?:\.\d+)?)[亿万]*参数', '参数量'),
(r'准确[^\d]*(\d+(?:\.\d+)?)%', '准确率')
]
for pattern, label in patterns:
matches = re.findall(pattern, content)
for match in matches:
key_data.append(f"{label}:{match}")
return list(set(key_data))[:5]
def run_comprehensive_crawl(self):
"""运行综合爬取"""
logger.info("开始综合爬取虎嗅网大模型相关文章...")
all_articles = []
# 方法1: 从首页获取
logger.info("方法1: 从首页获取文章")
homepage_articles = self.get_articles_from_homepage()
# 过滤出相关文章
relevant_articles = []
for article in homepage_articles:
title = article.get('title', '').lower()
if any(keyword in title for keyword in ['大模型', 'llm', '语言模型', 'gpt', 'chatgpt', 'ai', '人工智能']):
relevant_articles.append(article)
all_articles.extend(relevant_articles)
logger.info(f"首页找到 {len(relevant_articles)} 篇相关文章")
# 方法2: 直接搜索
logger.info("方法2: 直接搜索关键词")
keywords = ['大模型', 'LLM', '语言模型', 'GPT', 'ChatGPT']
search_articles = self.search_articles_direct(keywords)
all_articles.extend(search_articles)
logger.info(f"搜索找到 {len(search_articles)} 篇文章")
# 去重
unique_articles = []
seen_titles = set()
for article in all_articles:
title = article.get('title', '')
if title and title not in seen_titles:
seen_titles.add(title)
unique_articles.append(article)
logger.info(f"去重后剩余 {len(unique_articles)} 篇唯一文章")
if not unique_articles:
logger.warning("没有找到任何文章,尝试备选方案...")
return self.run_fallback_crawl()
# 获取详细内容
logger.info("开始获取文章详细内容...")
detailed_articles = []
for i, article in enumerate(unique_articles):
logger.info(f"处理第 {i+1}/{len(unique_articles)} 篇: {article['title'][:30]}...")
if article.get('link'):
detail = self.get_article_content(article['link'])
article.update(detail)
detailed_articles.append(article)
self.random_delay()
# 分析文章并生成洞察
logger.info("分析文章关键观点...")
insights = self.generate_insights(detailed_articles)
# 保存结果
self.save_results(insights)
return insights
def run_fallback_crawl(self):
"""备选爬取方案"""
logger.info("启动备选爬取方案...")
# 尝试获取虎嗅网的其他页面
urls_to_try = [
"https://www.huxiu.com/channel/107.html", # 科技频道
"https://www.huxiu.com/channel/101.html", # 商业频道
"https://www.huxiu.com/tag/267.html", # AI标签
"https://www.huxiu.com/tag/人工智能.html"
]
all_articles = []
for url in urls_to_try:
try:
logger.info(f"尝试访问: {url}")
response = self.get_with_retry(url)
if response and response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
# 查找文章链接
links = soup.find_all('a', href=re.compile(r'/article/\d+\.html'))
for link in links[:10]: # 限制数量
title = link.get_text().strip()
if any(keyword in title.lower() for keyword in ['大模型', 'llm', '语言模型', 'gpt']):
article_url = urljoin(self.base_url, link.get('href'))
article = {
'title': title,
'link': article_url,
'source': 'fallback'
}
all_articles.append(article)
self.random_delay()
except Exception as e:
logger.error(f"访问 {url} 时出错: {e}")
continue
if all_articles:
logger.info(f"备选方案找到 {len(all_articles)} 篇文章")
# 获取内容
detailed_articles = []
for i, article in enumerate(all_articles):
logger.info(f"处理备选文章 {i+1}/{len(all_articles)}: {article['title'][:30]}...")
detail = self.get_article_content(article['link'])
article.update(detail)
detailed_articles.append(article)
self.random_delay()
insights = self.generate_insights(detailed_articles)
self.save_results(insights)
return insights
return []
def generate_insights(self, articles):
"""生成洞察数据"""
insights = []
for article in articles:
if article.get('content_length', 0) > 100:
content = article.get('full_content', '')
title = article.get('title', '')
themes = self.analyze_article_themes(content, title)
key_data = self.extract_key_data(content)
insight = {
'标题': title,
'发布时间': article.get('publish_time', '未知'),
'作者': article.get('author', '未知'),
'来源': article.get('source', '未知'),
'主要观点': article.get('main_points', ''),
'文章主题': ''.join(themes),
'关键数据': ''.join(key_data),
'内容长度': article.get('content_length', 0),
'文章链接': article.get('link', ''),
'获取方式': article.get('source_method', '未知'),
'爬取时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
insights.append(insight)
return insights
def save_results(self, insights):
"""保存结果到Excel"""
if not insights:
logger.warning("没有找到有效数据可保存")
return
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"虎嗅网大模型观点分析_{timestamp}.xlsx"
try:
# 创建DataFrame
df = pd.DataFrame(insights)
# 设置列顺序
column_order = [
'标题', '主要观点', '文章主题', '关键数据', '作者',
'发布时间', '来源', '内容长度', '获取方式', '文章链接', '爬取时间'
]
# 重新排列列顺序
existing_columns = [col for col in column_order if col in df.columns]
other_columns = [col for col in df.columns if col not in column_order]
df = df[existing_columns + other_columns]
# 保存Excel
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='大模型观点', index=False)
# 自动调整列宽
worksheet = writer.sheets['大模型观点']
for column in worksheet.columns:
max_length = 0
column_letter = column[0].column_letter
for cell in column:
try:
if len(str(cell.value)) > max_length:
max_length = len(str(cell.value))
except:
pass
adjusted_width = min(max_length + 2, 50)
worksheet.column_dimensions[column_letter].width = adjusted_width
logger.info(f"数据已保存到: {filename}")
# 生成报告
self.generate_report(insights, timestamp)
except Exception as e:
logger.error(f"保存结果时出错: {e}")
def generate_report(self, insights, timestamp):
"""生成分析报告"""
report_filename = f"虎嗅网大模型分析报告_{timestamp}.txt"
total_articles = len(insights)
if total_articles == 0:
return
avg_content_length = sum(insight['内容长度'] for insight in insights) / total_articles
# 统计主题分布
theme_count = {}
for insight in insights:
themes = insight['文章主题'].split('') if insight['文章主题'] else []
for theme in themes:
theme_count[theme] = theme_count.get(theme, 0) + 1
# 生成报告
report = f"""虎嗅网大模型相关文章分析报告
生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
============================================
基本统计:
- 总文章数: {total_articles}
- 平均内容长度: {avg_content_length:.0f} 字符
主题分布:
"""
for theme, count in sorted(theme_count.items(), key=lambda x: x[1], reverse=True):
percentage = (count / total_articles) * 100
report += f"- {theme}: {count} 篇 ({percentage:.1f}%)\n"
report += f"\n代表性观点摘要 (前5篇):\n"
for i, insight in enumerate(insights[:5]):
report += f"\n{i+1}. {insight['标题']}\n"
report += f" 主题: {insight['文章主题']}\n"
report += f" 关键数据: {insight['关键数据']}\n"
report += f" 主要观点: {insight['主要观点'][:150]}...\n"
try:
with open(report_filename, 'w', encoding='utf-8') as f:
f.write(report)
logger.info(f"分析报告已保存到: {report_filename}")
except Exception as e:
logger.error(f"保存报告时出错: {e}")
def main():
"""主函数"""
logger.info("启动虎嗅网大模型文章爬虫...")
spider = HuxiuSpider()
try:
insights = spider.run_comprehensive_crawl()
if insights:
logger.info(f"\n爬取完成! 成功分析 {len(insights)} 篇文章")
logger.info("前3篇文章标题:")
for i, insight in enumerate(insights[:3]):
logger.info(f"{i+1}. {insight['标题']}")
logger.info(f" 主题: {insight['文章主题']}")
logger.info(f" 观点摘要: {insight['主要观点'][:100]}...")
else:
logger.warning("没有找到相关文章")
logger.warning("建议手动访问虎嗅网确认当前可用的文章列表")
except Exception as e:
logger.error(f"爬虫运行出错: {e}")
import traceback
logger.error(traceback.format_exc())
if __name__ == "__main__":
main()