diff --git a/code1.py b/code1.py index 8c1b0b8..5e15a5f 100644 --- a/code1.py +++ b/code1.py @@ -1,405 +1,617 @@ -import requests -import re -import json -import time -import random -import pandas as pd -import jieba -from wordcloud import WordCloud -import matplotlib.pyplot as plt -from collections import Counter -from bs4 import BeautifulSoup -import os -from openpyxl import Workbook -import numpy as np -from PIL import Image - -# 设置中文显示 -plt.rcParams["font.family"] = ["SimHei", "WenQuanYi Micro Hei", "Heiti TC"] -plt.rcParams["axes.unicode_minus"] = False - -class BilibiliSpider: - def __init__(self): - # 增强请求头,模拟真实浏览器 - self.headers = { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36", - "Cookie":"SESSDATA=7aeb30d8%2C1777642872%2Cf22c9%2Ab1CjC24iL70YiaVFC1ir___0v3yw4sclHlcjpmjHweCKAJZj5TYDXutV2OkzCcQ1AHCsgSVlZGV2hCVE9xTUNkcU1mZ1VOZnBRaUZHSm9RMW8xdEFLY1dKY1VEZWE0emQ2aDdvWlZ3UkFhU01tM3RDeVlHY0pXY2swMWR2UkUxNk8yM2RMdFZhUFhBIIEC; bili_jct=4aed53cb556e33b6620163c7549350ab", - "Accept": "application/json, text/plain, */*", - "Accept-Language": "zh-CN,zh;q=0.9", - "Connection": "keep-alive", - "Referer": "https://www.bilibili.com/", - "Origin": "https://www.bilibili.com" - } - self.session = requests.Session() - self.session.headers.update(self.headers) - self.danmaku_list = [] # 存储所有弹幕 - self.video_info = [] # 存储视频信息 - - def search_videos(self, keyword, page=1, pages=2): - """搜索视频,每个关键词爬取2页,每页30个,共60个视频""" - print(f"开始搜索关键词: {keyword}") - all_videos = [] - for p in range(page, page + pages): - try: - url = f"https://api.bilibili.com/x/web-interface/search/type?keyword={keyword}&page={p}&page_size=30&search_type=video" - response = self.session.get(url, timeout=15) - response.raise_for_status() - data = json.loads(response.text) - - if data.get("code") != 0: - print(f"搜索失败,错误代码: {data.get('code')},消息: {data.get('message')}") - continue - - video_items = data.get("data", {}).get("result", []) - if not video_items: - print(f"第{p}页未找到视频数据") - continue - - for video in video_items: - bvid = video.get("bvid") - title = video.get("title", "无标题") - play = video.get("play", "0") - author = video.get("author", "未知作者") - - # 去重处理 - if not any(v["bvid"] == bvid for v in all_videos): - all_videos.append({ - "bvid": bvid, - "title": title, - "play": play, - "author": author - }) - self.video_info.append({ - "bvid": bvid, - "title": title, - "play": play, - "author": author - }) - - print(f"已获取第{p}页视频,累计{len(all_videos)}个") - time.sleep(random.uniform(2, 4)) - - except Exception as e: - print(f"搜索视频出错: {str(e)}") - time.sleep(5) - - return all_videos - - def get_cid(self, bvid): - """获取视频的cid""" - try: - url = f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}" - response = self.session.get(url, timeout=10) - response.raise_for_status() - data = json.loads(response.text) - - if data.get("code") == 0: - cid = data.get("data", {}).get("cid") - if cid: - print(f"成功获取bvid={bvid}的cid: {cid}") - return cid - else: - print(f"bvid={bvid}未找到cid") - return None - else: - print(f"获取cid失败,bvid: {bvid},错误: {data.get('message')}") - return None - - except Exception as e: - print(f"获取cid出错(bvid={bvid}): {str(e)}") - return None - - def get_danmaku(self, cid): - """获取弹幕数据""" - if not cid: - return [] - try: - url = f"https://comment.bilibili.com/{cid}.xml" - response = self.session.get(url, timeout=10) - response.raise_for_status() - response.encoding = 'utf-8' - - soup = BeautifulSoup(response.text, "lxml-xml") - danmakus = soup.find_all("d") - result = [danmaku.text.strip() for danmaku in danmakus if danmaku.text.strip()] - print(f"成功获取cid={cid}的{len(result)}条弹幕") - return result - - except Exception as e: - print(f"获取弹幕出错(cid={cid}): {str(e)}") - return [] - - def crawl_keyword(self, keyword): - """爬取关键词相关的视频和弹幕""" - videos = self.search_videos(keyword) - print(f"关键词[{keyword}]找到{len(videos)}个视频") - - for i, video in enumerate(videos): - print(f"\n正在处理第{i+1}/{len(videos)}个视频: {video['title'][:30]}...") - cid = self.get_cid(video["bvid"]) - if cid: - danmakus = self.get_danmaku(cid) - self.danmaku_list.extend(danmakus) - print(f"当前累计弹幕数: {len(self.danmaku_list)}") - - # 每处理3个视频增加等待,降低反爬风险 - if (i + 1) % 3 == 0: - sleep_time = random.uniform(3, 6) - print(f"已处理{i+1}个视频,休息{sleep_time:.2f}秒") - time.sleep(sleep_time) - - print(f"关键词[{keyword}]爬取完成,累计获取{len(self.danmaku_list)}条弹幕") - - def save_data(self, danmaku_filename="danmaku.txt", video_filename="video_info.json"): - """保存弹幕和视频信息到本地""" - # 保存弹幕 - with open(danmaku_filename, "w", encoding="utf-8") as f: - for danmaku in self.danmaku_list: - f.write(danmaku + "\n") - print(f"弹幕已保存到{danmaku_filename}({len(self.danmaku_list)}条)") - - # 保存视频信息 - with open(video_filename, "w", encoding="utf-8") as f: - json.dump(self.video_info, f, ensure_ascii=False, indent=2) - print(f"视频信息已保存到{video_filename}({len(self.video_info)}条)") - - def load_data(self, danmaku_filename="danmaku.txt", video_filename="video_info.json"): - """加载本地数据""" - # 加载弹幕 - if os.path.exists(danmaku_filename): - with open(danmaku_filename, "r", encoding="utf-8") as f: - self.danmaku_list = [line.strip() for line in f.readlines() if line.strip()] - print(f"从{danmaku_filename}加载了{len(self.danmaku_list)}条弹幕") - - # 加载视频信息 - if os.path.exists(video_filename): - with open(video_filename, "r", encoding="utf-8") as f: - self.video_info = json.load(f) - print(f"从{video_filename}加载了{len(self.video_info)}条视频信息") - - def analyze_danmaku(self, top_n=8): - """分析弹幕,统计AI技术应用关键词""" - if not self.danmaku_list: - print("没有弹幕数据可分析,返回空列表") - return [] - - # 扩展AI技术应用关键词 - application_keywords = [ - "聊天机器人", "智能客服", "内容创作", "代码生成", "编程助手", - "翻译", "教育", "医疗", "法律", "金融分析", "金融", - "图像生成", "语音识别", "自动驾驶", "数据分析", "数据", - "游戏", "推荐系统", "搜索引擎", "搜索", "写作", - "成本", "价格", "便宜", "昂贵", "免费", - "就业", "工作", "失业", "替代", "岗位", - "安全", "隐私", "风险", "泄露", "道德", - "学习", "教育", "学生", "老师", "学校", - "企业", "商业", "公司", "盈利", "赚钱" - ] - - application_counts = {kw: 0 for kw in application_keywords} - - for danmaku in self.danmaku_list: - for kw in application_keywords: - if kw in danmaku: - application_counts[kw] += 1 - - # 过滤掉出现次数为0的关键词 - application_counts = {k: v for k, v in application_counts.items() if v > 0} - sorted_applications = sorted(application_counts.items(), key=lambda x: x[1], reverse=True) - top_applications = sorted_applications[:top_n] - - print(f"\n出现频率最高的{top_n}项LLM应用相关关键词:") - for i, (app, count) in enumerate(top_applications, 1): - print(f"{i}. {app}: {count}次") - - return top_applications - - def generate_wordcloud(self, filename="wordcloud.png"): - """生成美观的词云图""" - if not self.danmaku_list: - print("没有弹幕数据可生成词云") - return - - # 文本预处理 - text = " ".join(self.danmaku_list) - - # 使用jieba分词 - words = jieba.cut(text) - - # 过滤停用词和短词 - stop_words = {'的', '了', '在', '是', '我', '有', '和', '就', '不', '人', '都', '一', '一个', '上', '也', '很', '到', '说', '要', '去', '你', '会', '着', '没有', '看', '好', '自己', '这', '那'} - words = [word for word in words if len(word) > 1 and word not in stop_words] - - words_text = " ".join(words) - - # 创建词云 - wc = WordCloud( - font_path="simhei.ttf", - background_color="white", - width=1600, - height=1200, - max_words=300, - collocations=False, - margin=2, - random_state=42, - colormap="viridis" # 使用更美观的配色 - ).generate(words_text) - - # 绘制词云 - plt.figure(figsize=(16, 12)) - plt.imshow(wc, interpolation="bilinear") - plt.axis("off") - plt.title("B站大语言模型相关视频弹幕词云", fontsize=20, pad=20) - plt.tight_layout(pad=0) - plt.savefig(filename, dpi=300, bbox_inches="tight", facecolor='white') - plt.show() - print(f"词云图已保存到{filename}") - - def save_to_excel(self, top_applications, filename="llm_analysis.xlsx"): - """保存数据到Excel文件""" - try: - with pd.ExcelWriter(filename, engine='openpyxl') as writer: - # 1. 应用案例统计 - if top_applications: - df_apps = pd.DataFrame(top_applications, columns=["应用案例", "出现次数"]) - df_apps["排名"] = range(1, len(df_apps) + 1) - df_apps = df_apps[["排名", "应用案例", "出现次数"]] - else: - df_apps = pd.DataFrame([["无数据", 0]], columns=["应用案例", "出现次数"]) - df_apps["排名"] = 1 - - df_apps.to_excel(writer, sheet_name="应用案例统计", index=False) - - # 2. 视频信息 - if self.video_info: - # 去重处理 - unique_videos = [] - seen_bvids = set() - for video in self.video_info: - if video["bvid"] not in seen_bvids: - seen_bvids.add(video["bvid"]) - unique_videos.append(video) - - df_videos = pd.DataFrame(unique_videos) - df_videos["序号"] = range(1, len(df_videos) + 1) - df_videos = df_videos[["序号", "title", "play", "author", "bvid"]] - df_videos.columns = ["序号", "视频标题", "播放量", "作者", "BV号"] - else: - df_videos = pd.DataFrame([["无数据", 0, "无", "无"]], - columns=["视频标题", "播放量", "作者", "BV号"]) - df_videos["序号"] = 1 - - df_videos.to_excel(writer, sheet_name="视频信息", index=False) - - # 3. 数据分析结论 - conclusions = [ - ["分析维度", "主要发现", "用户观点倾向"], - ["应用成本", "多数用户关注使用成本,提及'免费'、'便宜'较多", "希望降低使用门槛"], - ["应用领域", "教育、编程、内容创作是最受关注的领域", "积极看待技术应用"], - ["就业影响", "对就业替代效应存在担忧", "既有期待也有忧虑"], - ["技术成熟度", "普遍认为技术还有提升空间", "理性看待技术发展"], - ["数据安全", "对隐私和安全问题关注度较高", "期待规范发展"] - ] - - df_conclusions = pd.DataFrame(conclusions[1:], columns=conclusions[0]) - df_conclusions.to_excel(writer, sheet_name="数据分析结论", index=False) - - print(f"✅ Excel数据已保存到{os.path.abspath(filename)}") - print(f"✅ 包含工作表:应用案例统计、视频信息、数据分析结论") - - except Exception as e: - print(f"❌ 保存Excel失败:{str(e)}") - print("建议:1. 关闭已打开的同名Excel文件 2. 检查目录写入权限") - - def generate_analysis_report(self): - """生成数据分析报告""" - if not self.danmaku_list: - print("没有数据可分析") - return - - print("\n" + "="*50) - print(" 大语言模型B站用户观点分析报告") - print("="*50) - - # 基础统计 - total_danmaku = len(self.danmaku_list) - total_videos = len(set(v["bvid"] for v in self.video_info)) - - print(f"\n📊 数据概况:") - print(f" - 分析视频数量: {total_videos}个") - print(f" - 采集弹幕数量: {total_danmaku}条") - - # 情感倾向分析(简单版) - positive_words = ["好", "厉害", "强大", "方便", "实用", "惊喜", "期待", "进步"] - negative_words = ["不好", "垃圾", "危险", "担心", "失业", "贵", "贵", "泄露"] - - positive_count = sum(1 for danmaku in self.danmaku_list - if any(word in danmaku for word in positive_words)) - negative_count = sum(1 for danmaku in self.danmaku_list - if any(word in danmaku for word in negative_words)) - - print(f"\n😊 情感倾向分析:") - print(f" - 积极评价: {positive_count}条 ({positive_count/total_danmaku*100:.1f}%)") - print(f" - 消极评价: {negative_count}条 ({negative_count/total_danmaku*100:.1f}%)") - - # 热门话题分析 - print(f"\n🔥 热门话题:") - topics = { - "教育学习": ["学习", "教育", "学生", "老师", "学校", "考试"], - "工作就业": ["工作", "就业", "失业", "岗位", "替代", "职业"], - "技术应用": ["编程", "代码", "写作", "翻译", "创作", "设计"], - "商业价值": ["赚钱", "商业", "企业", "盈利", "成本", "价格"], - "安全伦理": ["安全", "隐私", "道德", "风险", "泄露", "监管"] - } - - for topic, keywords in topics.items(): - count = sum(1 for danmaku in self.danmaku_list - if any(keyword in danmaku for keyword in keywords)) - if count > 0: - print(f" - {topic}: {count}次提及") - - -def main(): - spider = BilibiliSpider() - - # 检查本地数据 - use_existing = False - if os.path.exists("danmaku.txt") and os.path.exists("video_info.json"): - choice = input("发现已存在的弹幕和视频数据,是否直接使用? (y/n): ") - if choice.lower() == "y": - spider.load_data() - use_existing = True - - if not use_existing: - keywords = ["大语言模型", "大模型", "LLM"] - print("开始爬取B站大语言模型相关视频...") - - for keyword in keywords: - print(f"\n{'='*50}") - print(f"正在爬取关键词: {keyword}") - print(f"{'='*50}") - spider.crawl_keyword(keyword) - time.sleep(random.uniform(8, 12)) # 关键词间较长等待 - - spider.save_data() - - # 数据分析 - print(f"\n{'='*50}") - print("开始数据分析...") - print(f"{'='*50}") - - top_applications = spider.analyze_danmaku(top_n=8) - spider.generate_analysis_report() - spider.generate_wordcloud("llm_wordcloud.png") - spider.save_to_excel(top_applications, "llm_bilibili_analysis.xlsx") - - print(f"\n🎉 所有任务执行完毕!") - print(f"📁 生成的文件:") - print(f" - llm_bilibili_analysis.xlsx (数据分析表格)") - print(f" - llm_wordcloud.png (词云图)") - print(f" - danmaku.txt (原始弹幕数据)") - print(f" - video_info.json (视频信息)") - - -if __name__ == "__main__": +import requests +from bs4 import BeautifulSoup +import json +import time +import re +import pandas as pd +import random +from urllib.parse import quote, urljoin +from datetime import datetime +import logging + +# 设置日志 +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +class HuxiuSpider: + def __init__(self): + self.session = requests.Session() + self.headers = { + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', + 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8', + 'Accept-Encoding': 'gzip, deflate, br', + 'Connection': 'keep-alive', + 'Upgrade-Insecure-Requests': '1', + } + self.base_url = "https://www.huxiu.com" + + def random_delay(self): + """随机延迟""" + time.sleep(random.uniform(2, 4)) + + def get_with_retry(self, url, retries=3): + """带重试的请求""" + for attempt in range(retries): + try: + response = self.session.get(url, headers=self.headers, timeout=15) + if response.status_code == 200: + return response + else: + logger.warning(f"请求返回状态码 {response.status_code}: {url}") + except Exception as e: + logger.warning(f"请求失败 {url} (尝试 {attempt + 1}/{retries}): {e}") + + if attempt < retries - 1: + time.sleep(2 ** attempt) + return None + + def get_articles_from_homepage(self): + """从首页获取文章""" + try: + logger.info("从虎嗅网首页获取文章...") + response = self.get_with_retry(self.base_url) + if not response: + return [] + + soup = BeautifulSoup(response.text, 'html.parser') + articles = [] + + # 查找文章卡片 + article_selectors = [ + '.article-item', + '.mod-art', + '.vertical-article', + '.article-card', + '.newsfeed-item' + ] + + for selector in article_selectors: + items = soup.select(selector) + if items: + logger.info(f"找到 {len(items)} 个文章项目,使用选择器: {selector}") + for item in items[:20]: # 限制数量 + article = self.parse_article_card(item) + if article: + articles.append(article) + break + + return articles + + except Exception as e: + logger.error(f"从首页获取文章时出错: {e}") + return [] + + def parse_article_card(self, item): + """解析文章卡片""" + try: + # 查找标题 + title_elem = item.find('a', href=re.compile(r'/article/\d+\.html')) + if not title_elem: + return None + + title = title_elem.get_text().strip() + link = title_elem.get('href') + if link and not link.startswith('http'): + link = urljoin(self.base_url, link) + + # 查找摘要 + summary_elem = item.find('p', class_=re.compile(r'brief|summary|desc')) + if not summary_elem: + summary_elem = item.find('div', class_=re.compile(r'brief|summary|desc')) + summary = summary_elem.get_text().strip() if summary_elem else "" + + # 查找作者和时间 + author_elem = item.find(['span', 'a'], class_=re.compile(r'author|writer')) + author = author_elem.get_text().strip() if author_elem else "" + + time_elem = item.find('span', class_=re.compile(r'time|date')) + publish_time = time_elem.get_text().strip() if time_elem else "" + + return { + 'title': title, + 'link': link, + 'summary': summary, + 'author': author, + 'publish_time': publish_time, + 'source': 'homepage' + } + + except Exception as e: + logger.error(f"解析文章卡片时出错: {e}") + return None + + def search_articles_direct(self, keywords): + """直接搜索文章""" + all_articles = [] + + for keyword in keywords: + logger.info(f"直接搜索关键词: {keyword}") + try: + # 使用虎嗅的搜索页面 + search_url = f"https://www.huxiu.com/search.html?keyword={quote(keyword)}" + response = self.get_with_retry(search_url) + if not response: + continue + + soup = BeautifulSoup(response.text, 'html.parser') + + # 解析搜索结果 + search_results = soup.find_all('div', class_=re.compile(r'article-item|search-result')) + + for item in search_results: + article = self.parse_search_result(item, keyword) + if article: + all_articles.append(article) + + logger.info(f"关键词 '{keyword}' 找到 {len(search_results)} 个结果") + self.random_delay() + + except Exception as e: + logger.error(f"搜索关键词 '{keyword}' 时出错: {e}") + continue + + return all_articles + + def parse_search_result(self, item, keyword): + """解析搜索结果""" + try: + title_elem = item.find('a', href=re.compile(r'/article/\d+\.html')) + if not title_elem: + return None + + title = title_elem.get_text().strip() + link = title_elem.get('href') + if link and not link.startswith('http'): + link = urljoin(self.base_url, link) + + # 检查标题是否包含关键词 + if not any(word in title for word in ['大模型', 'LLM', '语言模型', 'GPT', 'ChatGPT', 'AI', '人工智能']): + return None + + summary_elem = item.find('p', class_=re.compile(r'brief|summary')) + summary = summary_elem.get_text().strip() if summary_elem else "" + + return { + 'title': title, + 'link': link, + 'summary': summary, + 'keyword': keyword, + 'source': 'direct_search' + } + + except Exception as e: + logger.error(f"解析搜索结果时出错: {e}") + return None + + def get_article_content(self, article_url): + """获取文章详细内容""" + try: + logger.info(f"获取文章内容: {article_url}") + response = self.get_with_retry(article_url) + if not response: + return self.get_empty_content() + + soup = BeautifulSoup(response.text, 'html.parser') + + # 查找文章内容区域 + content_selectors = [ + '.article-content-wrap', + '.article-content', + '.article-detail-content', + '.article-main-content', + '.content' + ] + + content = "" + for selector in content_selectors: + content_elem = soup.select_one(selector) + if content_elem: + # 清理脚本和样式 + for tag in content_elem(['script', 'style', 'nav', 'footer', 'aside']): + tag.decompose() + + content = content_elem.get_text().strip() + content = re.sub(r'\s+', ' ', content) + if len(content) > 200: + break + + # 如果没找到内容,尝试其他选择器 + if not content or len(content) < 200: + # 尝试获取所有段落 + paragraphs = soup.find_all('p') + content = ' '.join([p.get_text().strip() for p in paragraphs if len(p.get_text().strip()) > 20]) + content = re.sub(r'\s+', ' ', content) + + main_points = self.extract_main_points(content) + + return { + 'full_content': content, + 'main_points': main_points, + 'content_length': len(content), + 'source_method': 'web' + } + + except Exception as e: + logger.error(f"获取文章内容时出错 {article_url}: {e}") + return self.get_empty_content() + + def get_empty_content(self): + """返回空内容结构""" + return { + 'full_content': "无法获取内容", + 'main_points': "无法提取主要观点", + 'content_length': 0, + 'source_method': 'failed' + } + + def extract_main_points(self, content): + """提取主要观点""" + if not content or len(content) < 50: + return "内容过短,无法提取主要观点" + + # 分割成句子 + sentences = re.split(r'[。!?.!?]', content) + sentences = [s.strip() for s in sentences if len(s.strip()) > 15] + + # 寻找包含关键词的句子 + key_sentences = [] + keyword_groups = [ + ['大模型', 'LLM', '语言模型'], + ['GPT', 'ChatGPT', 'OpenAI'], + ['人工智能', 'AI', '智能'], + ['认为', '观点', '应该', '需要', '重要', '关键'], + ['趋势', '发展', '未来', '前景'] + ] + + for sentence in sentences: + for keywords in keyword_groups: + if any(keyword in sentence for keyword in keywords): + if sentence not in key_sentences: + key_sentences.append(sentence) + break + + # 如果关键词句子不够,取前几个有意义的句子 + if len(key_sentences) < 3: + key_sentences.extend([s for s in sentences if len(s) > 30][:5-len(key_sentences)]) + + # 去重并限制长度 + unique_sentences = [] + seen = set() + for s in key_sentences: + if s not in seen: + seen.add(s) + unique_sentences.append(s) + + main_points = "。".join(unique_sentences[:5]) + "。" + + # 如果内容较短,直接返回前300字符 + if len(main_points) < 50: + main_points = content[:300] + "..." if len(content) > 300 else content + + return main_points + + def analyze_article_themes(self, content, title): + """分析文章主题""" + themes = [] + content_lower = content.lower() + title_lower = title.lower() + + theme_keywords = { + '技术发展': ['技术', '算法', '架构', '训练', '参数', '模型结构', 'transformer', '神经网络'], + '商业应用': ['商业', '应用', '落地', '场景', '企业', '客户', '产品', '服务', '商业化'], + '投资融资': ['投资', '融资', '资本', '基金', '估值', '融资', '市值', '投资'], + '市场竞争': ['竞争', '市场', '对手', '领先', '优势', '市场份额', 'bat', '微软', '谷歌'], + '政策监管': ['政策', '监管', '法规', '合规', '政府', '立法', '安全', '伦理'], + '挑战风险': ['挑战', '风险', '问题', '困难', '局限', '不足', '缺陷', '担忧'], + '未来趋势': ['趋势', '未来', '预测', '展望', '方向', '发展', '前景', '机会'], + '开源生态': ['开源', '社区', '生态', '开放', '贡献', '协作', '开源'], + '行业影响': ['行业', '影响', '变革', '革命', '颠覆', '创新', '改变'] + } + + for theme, keywords in theme_keywords.items(): + if any(keyword in content_lower or keyword in title_lower for keyword in keywords): + themes.append(theme) + + # 如果没有找到主题,根据内容推断 + if not themes: + if len(content) > 500: + themes.append('综合讨论') + else: + themes.append('简要报道') + + return themes + + def extract_key_data(self, content): + """提取关键数据""" + key_data = [] + + # 提取数字相关数据 + patterns = [ + (r'(\d+(?:\.\d+)?)[亿万]*(?:个|款|家|项|种)(?:大模型|模型)', '模型数量'), + (r'参数[^\d]*(\d+(?:\.\d+)?)[亿万]*', '参数规模'), + (r'投资[^\d]*(\d+(?:\.\d+)?)[亿万]*(?:元|美元)', '投资金额'), + (r'市场[^\d]*(\d+(?:\.\d+)?)[亿万]*(?:元|美元)', '市场规模'), + (r'增长[^\d]*(\d+(?:\.\d+)?)%', '增长率'), + (r'(\d+(?:\.\d+)?)[亿万]*参数', '参数量'), + (r'准确[^\d]*(\d+(?:\.\d+)?)%', '准确率') + ] + + for pattern, label in patterns: + matches = re.findall(pattern, content) + for match in matches: + key_data.append(f"{label}:{match}") + + return list(set(key_data))[:5] + + def run_comprehensive_crawl(self): + """运行综合爬取""" + logger.info("开始综合爬取虎嗅网大模型相关文章...") + + all_articles = [] + + # 方法1: 从首页获取 + logger.info("方法1: 从首页获取文章") + homepage_articles = self.get_articles_from_homepage() + # 过滤出相关文章 + relevant_articles = [] + for article in homepage_articles: + title = article.get('title', '').lower() + if any(keyword in title for keyword in ['大模型', 'llm', '语言模型', 'gpt', 'chatgpt', 'ai', '人工智能']): + relevant_articles.append(article) + + all_articles.extend(relevant_articles) + logger.info(f"首页找到 {len(relevant_articles)} 篇相关文章") + + # 方法2: 直接搜索 + logger.info("方法2: 直接搜索关键词") + keywords = ['大模型', 'LLM', '语言模型', 'GPT', 'ChatGPT'] + search_articles = self.search_articles_direct(keywords) + all_articles.extend(search_articles) + logger.info(f"搜索找到 {len(search_articles)} 篇文章") + + # 去重 + unique_articles = [] + seen_titles = set() + for article in all_articles: + title = article.get('title', '') + if title and title not in seen_titles: + seen_titles.add(title) + unique_articles.append(article) + + logger.info(f"去重后剩余 {len(unique_articles)} 篇唯一文章") + + if not unique_articles: + logger.warning("没有找到任何文章,尝试备选方案...") + return self.run_fallback_crawl() + + # 获取详细内容 + logger.info("开始获取文章详细内容...") + detailed_articles = [] + for i, article in enumerate(unique_articles): + logger.info(f"处理第 {i+1}/{len(unique_articles)} 篇: {article['title'][:30]}...") + + if article.get('link'): + detail = self.get_article_content(article['link']) + article.update(detail) + detailed_articles.append(article) + + self.random_delay() + + # 分析文章并生成洞察 + logger.info("分析文章关键观点...") + insights = self.generate_insights(detailed_articles) + + # 保存结果 + self.save_results(insights) + + return insights + + def run_fallback_crawl(self): + """备选爬取方案""" + logger.info("启动备选爬取方案...") + + # 尝试获取虎嗅网的其他页面 + urls_to_try = [ + "https://www.huxiu.com/channel/107.html", # 科技频道 + "https://www.huxiu.com/channel/101.html", # 商业频道 + "https://www.huxiu.com/tag/267.html", # AI标签 + "https://www.huxiu.com/tag/人工智能.html" + ] + + all_articles = [] + for url in urls_to_try: + try: + logger.info(f"尝试访问: {url}") + response = self.get_with_retry(url) + if response and response.status_code == 200: + soup = BeautifulSoup(response.text, 'html.parser') + + # 查找文章链接 + links = soup.find_all('a', href=re.compile(r'/article/\d+\.html')) + for link in links[:10]: # 限制数量 + title = link.get_text().strip() + if any(keyword in title.lower() for keyword in ['大模型', 'llm', '语言模型', 'gpt']): + article_url = urljoin(self.base_url, link.get('href')) + article = { + 'title': title, + 'link': article_url, + 'source': 'fallback' + } + all_articles.append(article) + + self.random_delay() + + except Exception as e: + logger.error(f"访问 {url} 时出错: {e}") + continue + + if all_articles: + logger.info(f"备选方案找到 {len(all_articles)} 篇文章") + # 获取内容 + detailed_articles = [] + for i, article in enumerate(all_articles): + logger.info(f"处理备选文章 {i+1}/{len(all_articles)}: {article['title'][:30]}...") + detail = self.get_article_content(article['link']) + article.update(detail) + detailed_articles.append(article) + self.random_delay() + + insights = self.generate_insights(detailed_articles) + self.save_results(insights) + return insights + + return [] + + def generate_insights(self, articles): + """生成洞察数据""" + insights = [] + + for article in articles: + if article.get('content_length', 0) > 100: + content = article.get('full_content', '') + title = article.get('title', '') + + themes = self.analyze_article_themes(content, title) + key_data = self.extract_key_data(content) + + insight = { + '标题': title, + '发布时间': article.get('publish_time', '未知'), + '作者': article.get('author', '未知'), + '来源': article.get('source', '未知'), + '主要观点': article.get('main_points', ''), + '文章主题': '、'.join(themes), + '关键数据': '、'.join(key_data), + '内容长度': article.get('content_length', 0), + '文章链接': article.get('link', ''), + '获取方式': article.get('source_method', '未知'), + '爬取时间': datetime.now().strftime('%Y-%m-%d %H:%M:%S') + } + insights.append(insight) + + return insights + + def save_results(self, insights): + """保存结果到Excel""" + if not insights: + logger.warning("没有找到有效数据可保存") + return + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"虎嗅网大模型观点分析_{timestamp}.xlsx" + + try: + # 创建DataFrame + df = pd.DataFrame(insights) + + # 设置列顺序 + column_order = [ + '标题', '主要观点', '文章主题', '关键数据', '作者', + '发布时间', '来源', '内容长度', '获取方式', '文章链接', '爬取时间' + ] + + # 重新排列列顺序 + existing_columns = [col for col in column_order if col in df.columns] + other_columns = [col for col in df.columns if col not in column_order] + df = df[existing_columns + other_columns] + + # 保存Excel + with pd.ExcelWriter(filename, engine='openpyxl') as writer: + df.to_excel(writer, sheet_name='大模型观点', index=False) + + # 自动调整列宽 + worksheet = writer.sheets['大模型观点'] + for column in worksheet.columns: + max_length = 0 + column_letter = column[0].column_letter + for cell in column: + try: + if len(str(cell.value)) > max_length: + max_length = len(str(cell.value)) + except: + pass + adjusted_width = min(max_length + 2, 50) + worksheet.column_dimensions[column_letter].width = adjusted_width + + logger.info(f"数据已保存到: {filename}") + + # 生成报告 + self.generate_report(insights, timestamp) + + except Exception as e: + logger.error(f"保存结果时出错: {e}") + + def generate_report(self, insights, timestamp): + """生成分析报告""" + report_filename = f"虎嗅网大模型分析报告_{timestamp}.txt" + + total_articles = len(insights) + if total_articles == 0: + return + + avg_content_length = sum(insight['内容长度'] for insight in insights) / total_articles + + # 统计主题分布 + theme_count = {} + for insight in insights: + themes = insight['文章主题'].split('、') if insight['文章主题'] else [] + for theme in themes: + theme_count[theme] = theme_count.get(theme, 0) + 1 + + # 生成报告 + report = f"""虎嗅网大模型相关文章分析报告 +生成时间: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} +============================================ + +基本统计: +- 总文章数: {total_articles} 篇 +- 平均内容长度: {avg_content_length:.0f} 字符 + +主题分布: +""" + for theme, count in sorted(theme_count.items(), key=lambda x: x[1], reverse=True): + percentage = (count / total_articles) * 100 + report += f"- {theme}: {count} 篇 ({percentage:.1f}%)\n" + + report += f"\n代表性观点摘要 (前5篇):\n" + for i, insight in enumerate(insights[:5]): + report += f"\n{i+1}. {insight['标题']}\n" + report += f" 主题: {insight['文章主题']}\n" + report += f" 关键数据: {insight['关键数据']}\n" + report += f" 主要观点: {insight['主要观点'][:150]}...\n" + + try: + with open(report_filename, 'w', encoding='utf-8') as f: + f.write(report) + logger.info(f"分析报告已保存到: {report_filename}") + except Exception as e: + logger.error(f"保存报告时出错: {e}") + +def main(): + """主函数""" + logger.info("启动虎嗅网大模型文章爬虫...") + + spider = HuxiuSpider() + + try: + insights = spider.run_comprehensive_crawl() + + if insights: + logger.info(f"\n爬取完成! 成功分析 {len(insights)} 篇文章") + logger.info("前3篇文章标题:") + for i, insight in enumerate(insights[:3]): + logger.info(f"{i+1}. {insight['标题']}") + logger.info(f" 主题: {insight['文章主题']}") + logger.info(f" 观点摘要: {insight['主要观点'][:100]}...") + else: + logger.warning("没有找到相关文章") + logger.warning("建议手动访问虎嗅网确认当前可用的文章列表") + + except Exception as e: + logger.error(f"爬虫运行出错: {e}") + import traceback + logger.error(traceback.format_exc()) + +if __name__ == "__main__": main() \ No newline at end of file