Compare commits

...

6 Commits

@ -0,0 +1,153 @@
import requests
import pandas as pd
import time
import re
import random
from typing import List, Dict
import os
class BilibiliDanmuCrawler:
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://www.bilibili.com'
})
self.noise_patterns = [
r'^666+$', r'^[0-9]+$', r'^点赞$', r'^前排$', r'^沙发$',
r'^哈哈哈+$', r'^233+$', r'^awsl$', r'^爷青回$'
]
def filter_noise(self, danmu: str) -> bool:
"""过滤噪声弹幕"""
danmu = danmu.strip()
if len(danmu) < 2 or len(danmu) > 50:
return False
for pattern in self.noise_patterns:
if re.match(pattern, danmu, re.IGNORECASE):
return False
return True
def generate_mock_data(self) -> pd.DataFrame:
"""生成模拟弹幕数据"""
print("生成模拟弹幕数据...")
# 大语言模型应用相关弹幕
llm_applications = [
"大语言模型在编程辅助方面真的很强,代码生成效率高",
"ChatGPT改变了我的工作方式写作效率提升明显",
"LLM在医疗领域的应用很有前景能辅助诊断",
"大模型的训练成本还是太高了,中小企业用不起",
"国产大模型越来越好了,比如文心一言和通义千问",
"提示工程很重要,好的提示词能大幅提升效果",
"AI写作助手节省了很多时间特别是写报告",
"语言模型在教育应用很棒,能个性化辅导学生",
"担心AI会取代一些初级程序员的工作",
"大模型的伦理问题需要更多关注和监管",
"多模态大模型是未来趋势,能理解图片和文字",
"本地部署大模型很有必要,保护数据隐私",
"AI绘画配合大语言模型很强大创意工作更高效",
"企业级大模型应用越来越多,降本增效明显",
"大语言模型的数据安全问题需要重视",
"代码自动补全功能太实用了,开发效率翻倍",
"智能客服应用成熟24小时在线服务",
"机器翻译质量大幅提升,接近人工水平",
"内容创作领域AI应用广泛自媒体人的利器",
"数据分析结合LLM洞察发现更快捷"
]
# 应用领域分类
applications = {
'编程开发': [
"代码生成太方便了", "编程助手很好用", "debug效率提升", "自动补全智能",
"程序员必备工具", "开发效率大幅提升", "代码审查助手"
],
'内容创作': [
"写作助手真棒", "内容生成快速", "文案创作神器", "自媒体好帮手",
"创意写作辅助", "营销文案生成"
],
'教育培训': [
"学习辅导不错", "教育应用前景广", "个性化教学", "智能答疑系统",
"在线教育革新"
],
'医疗健康': [
"医疗诊断辅助", "健康咨询AI", "病历分析助手", "药物研发应用"
],
'商业办公': [
"办公自动化", "企业智能助手", "数据分析工具", "商业决策支持",
"客户服务优化"
],
'智能客服': [
"客服效率提升", "24小时在线服务", "智能问答准确", "用户服务体验好"
],
'翻译理解': [
"多语言翻译强", "语义理解准确", "跨语言交流便利", "翻译质量高"
],
'创意设计': [
"AI绘画惊艳", "创意设计辅助", "艺术创作伙伴", "设计灵感来源"
]
}
# 用户观点
opinions_positive = [
"效果超出预期", "用户体验很好", "技术发展迅速", "应用价值高",
"工作效率提升", "学习成本低", "界面友好易用"
]
opinions_concerns = [
"成本还是偏高", "数据隐私担忧", "技术不够稳定", "回答有时不准",
"需要网络连接", "企业应用成本高", "依赖国外技术"
]
# 生成弹幕数据
all_danmus = []
bvids = [f"BV1{''.join(random.choices('0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ', k=10))}"
for _ in range(20)]
for bvid in bvids:
# 每个视频生成10-20条弹幕
num_danmus = random.randint(10, 20)
for _ in range(num_danmus):
# 随机选择弹幕类型
danmu_type = random.choice(['application', 'opinion_positive', 'opinion_concern'])
if danmu_type == 'application':
app_category = random.choice(list(applications.keys()))
danmu = random.choice(applications[app_category])
elif danmu_type == 'opinion_positive':
danmu = f"{random.choice(opinions_positive)}{random.choice(['推荐使用', '值得尝试', '会继续使用'])}"
else:
danmu = f"{random.choice(opinions_concerns)}{random.choice(['需要改进', '希望优化', '期待更好'])}"
if self.filter_noise(danmu):
all_danmus.append({
'bvid': bvid,
'danmu': danmu,
'keyword': random.choice(['大语言模型', '大模型', 'LLM'])
})
return pd.DataFrame(all_danmus)
def main():
crawler = BilibiliDanmuCrawler()
print("开始生成弹幕数据...")
df = crawler.generate_mock_data()
# 确保目录存在
os.makedirs('data/raw', exist_ok=True)
# 保存数据
df.to_csv('data/raw/danmu_raw.csv', index=False, encoding='utf-8-sig')
print(f"数据生成完成,共 {len(df)} 条弹幕")
print("数据保存至: data/raw/danmu_raw.csv")
# 显示前几条数据
print("\n前5条数据预览:")
print(df.head())
return df
if __name__ == "__main__":
main()

@ -0,0 +1,142 @@
# scripts/data_processor.py
import pandas as pd
import re
from collections import Counter
import os
class DataProcessor:
def __init__(self):
# 定义大语言模型相关词汇
self.llm_terms = [
'大语言模型', '大模型', 'LLM', 'ChatGPT', 'GPT', '文心一言', '通义千问',
'代码生成', '文本摘要', '智能客服', '内容创作', '机器翻译', '提示工程',
'多模态', 'AI绘画', '智能助手', '本地部署', '开源模型', '商业化',
'深度学习', '自然语言处理', 'Transformer', '预训练模型'
]
def simple_tokenize(self, text: str) -> list:
"""简单的分词函数jieba的替代方案"""
# 先处理特殊词汇
for term in self.llm_terms:
if term in text:
text = text.replace(term, f" {term} ")
# 按标点符号和空格分词
words = re.findall(r'[\w\u4e00-\u9fff]+', text)
return words
def load_data(self, filepath: str) -> pd.DataFrame:
"""加载数据"""
return pd.read_csv(filepath, encoding='utf-8-sig')
def extract_llm_applications(self, text: str) -> list:
"""提取大语言模型应用相关词汇"""
applications = []
# 应用领域关键词映射
app_keywords = {
'编程开发': ['代码', '编程', '开发', '程序员', 'debug', '自动补全', '代码生成', '编程助手'],
'内容创作': ['写作', '创作', '文案', '文章', '内容', '自媒体', '营销', '创意写作'],
'教育培训': ['教育', '学习', '教学', '培训', '老师', '辅导', '答疑', '个性化教学'],
'医疗健康': ['医疗', '健康', '诊断', '医生', '医院', '病历', '药物', '医疗辅助'],
'商业办公': ['办公', '商业', '企业', '工作', '效率', '自动化', '决策', '客户服务'],
'智能客服': ['客服', '助手', '咨询', '问答', '帮助', '服务', '在线', '智能问答'],
'翻译理解': ['翻译', '多语言', '理解', '语义', '跨语言', '交流', '机器翻译'],
'创意设计': ['设计', '创意', '艺术', '绘画', '灵感', '创作', 'AI绘画', '艺术创作']
}
for category, keywords in app_keywords.items():
if any(keyword in text for keyword in keywords):
applications.append(category)
return applications
def process_danmu(self, df: pd.DataFrame) -> pd.DataFrame:
"""处理弹幕数据"""
processed_data = []
for _, row in df.iterrows():
danmu = row['danmu']
# 使用简单分词
words = self.simple_tokenize(danmu)
# 提取应用领域
applications = self.extract_llm_applications(danmu)
processed_data.append({
'bvid': row['bvid'],
'original_danmu': danmu,
'words': words,
'applications': applications,
'word_count': len(words)
})
return pd.DataFrame(processed_data)
def get_top_applications(self, df: pd.DataFrame, top_n: int = 8) -> pd.DataFrame:
"""获取排名前N的应用领域"""
all_applications = []
for apps in df['applications']:
all_applications.extend(apps)
app_counter = Counter(all_applications)
top_apps = app_counter.most_common(top_n)
result_df = pd.DataFrame(top_apps, columns=['应用领域', '出现次数'])
return result_df
def get_word_frequency(self, df: pd.DataFrame, top_n: int = 50) -> pd.DataFrame:
"""获取词频统计"""
all_words = []
for words in df['words']:
# 过滤停用词和单字
filtered_words = [
word for word in words
if len(word) > 1 and not re.match(r'^[0-9a-zA-Z]+$', word)
]
all_words.extend(filtered_words)
word_counter = Counter(all_words)
top_words = word_counter.most_common(top_n)
return pd.DataFrame(top_words, columns=['词语', '频次'])
def save_to_excel(self, df: pd.DataFrame, top_apps: pd.DataFrame, word_freq: pd.DataFrame):
"""保存数据到Excel"""
# 确保目录存在
os.makedirs('data/processed', exist_ok=True)
with pd.ExcelWriter('data/processed/llm_analysis.xlsx', engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='弹幕数据', index=False)
top_apps.to_excel(writer, sheet_name='应用领域排名', index=False)
word_freq.to_excel(writer, sheet_name='词频统计', index=False)
def main():
processor = DataProcessor()
# 加载数据
df = processor.load_data('data/raw/danmu_raw.csv')
print(f"加载了 {len(df)} 条弹幕数据")
# 处理数据
processed_df = processor.process_danmu(df)
# 获取应用领域排名
top_apps = processor.get_top_applications(processed_df, 8)
print("\n应用领域排名前8:")
print(top_apps)
# 获取词频统计
word_freq = processor.get_word_frequency(processed_df, 50)
print(f"\n词频统计前10:")
print(word_freq.head(10))
# 保存到Excel
processor.save_to_excel(processed_df, top_apps, word_freq)
print("\n数据已保存到 data/processed/llm_analysis.xlsx")
return processed_df, top_apps, word_freq
if __name__ == "__main__":
main()

@ -0,0 +1,368 @@
import requests
import pandas as pd
import time
import random
import re
import logging
from urllib.parse import quote
from datetime import datetime
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('bilibili_crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class OptimizedBilibiliCrawler:
def __init__(self, max_workers=3):
self.session = requests.Session()
# 使用更真实的请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
'Referer': 'https://www.bilibili.com/',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Origin': 'https://www.bilibili.com',
'Connection': 'keep-alive',
})
# 线程池用于并发处理
self.max_workers = max_workers
# 统计信息
self.stats = {
'total_videos': 0,
'successful_videos': 0,
'failed_videos': 0,
'total_danmu': 0,
'start_time': None,
'end_time': None
}
# 失败重试队列
self.retry_queue = []
def search_videos_by_keyword(self, keyword, pages=6):
"""通过关键词搜索视频"""
videos = []
for page in range(1, pages + 1):
try:
# 使用搜索API
url = f"https://api.bilibili.com/x/web-interface/search/type"
params = {
'search_type': 'video',
'keyword': keyword,
'page': page,
'tids': 36, # 知识区
'order': 'click', # 按播放量排序
}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data['code'] == 0:
for item in data['data']['result']:
videos.append({
'bvid': item['bvid'],
'title': item['title'],
'keyword': keyword
})
logger.info(f"关键词 '{keyword}'{page} 页获取到 {len(data['data']['result'])} 个视频")
else:
logger.warning(f"搜索API返回错误: {data['message']}")
else:
logger.warning(f"搜索请求失败: {response.status_code}")
# 搜索请求间短暂延时
time.sleep(random.uniform(1, 2))
except Exception as e:
logger.error(f"搜索视频异常: {e}")
continue
return videos
def get_video_info(self, bvid, max_retries=2):
"""获取视频信息(包含重试机制)"""
for attempt in range(max_retries):
try:
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}"
response = self.session.get(url, timeout=8)
if response.status_code == 200:
data = response.json()
if data['code'] == 0:
info = data['data']
return {
'cid': info['cid'],
'title': info['title'],
'owner': info['owner']['name'],
'view': info['stat']['view'],
'danmaku': info['stat']['danmaku'],
'pubdate': info['pubdate']
}
elif response.status_code == 412: # 遇到反爬
wait_time = random.uniform(5, 10)
logger.info(f"遇到412错误等待{wait_time:.2f}")
time.sleep(wait_time)
continue
except Exception as e:
logger.warning(f"获取视频信息异常 {bvid} (尝试{attempt+1}/{max_retries}): {e}")
# 重试前等待
if attempt < max_retries - 1:
time.sleep(random.uniform(2, 4))
return None
def get_danmu_data(self, cid, max_retries=2):
"""获取弹幕数据(包含重试机制)"""
for attempt in range(max_retries):
try:
# 使用新的弹幕API
url = f"https://api.bilibili.com/x/v1/dm/list.so?oid={cid}"
response = self.session.get(url, timeout=10)
if response.status_code == 200:
response.encoding = 'utf-8'
danmus = re.findall(r'<d p=".*?">(.*?)</d>', response.text)
return danmus
elif response.status_code == 412:
wait_time = random.uniform(5, 10)
logger.info(f"弹幕请求遇到412等待{wait_time:.2f}")
time.sleep(wait_time)
continue
except Exception as e:
logger.warning(f"获取弹幕异常 CID {cid} (尝试{attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(random.uniform(2, 3))
return []
def process_single_video(self, video_info):
"""处理单个视频的弹幕获取"""
bvid = video_info['bvid']
keyword = video_info['keyword']
logger.info(f"处理视频: {bvid}")
# 获取视频信息
video_detail = self.get_video_info(bvid)
if not video_detail:
logger.warning(f"无法获取视频信息: {bvid}")
return []
# 短暂延时后获取弹幕
time.sleep(random.uniform(1, 2))
# 获取弹幕
danmus = self.get_danmu_data(video_detail['cid'])
result_data = []
for danmu in danmus:
result_data.append({
'关键词': keyword,
'视频BV号': bvid,
'视频标题': video_detail['title'],
'UP主': video_detail['owner'],
'播放量': video_detail['view'],
'弹幕数': video_detail['danmaku'],
'发布时间': datetime.fromtimestamp(video_detail['pubdate']).strftime('%Y-%m-%d %H:%M:%S'),
'弹幕内容': danmu
})
logger.info(f"视频 {bvid} 获取到 {len(danmus)} 条弹幕")
return result_data
def crawl_keyword_concurrent(self, keyword, video_count=120):
"""并发爬取一个关键词的视频弹幕"""
logger.info(f"开始并发爬取关键词: {keyword}")
# 先搜索获取视频列表
search_videos = self.search_videos_by_keyword(keyword, pages=6)
if not search_videos:
logger.warning(f"关键词 '{keyword}' 未搜索到视频")
return []
# 如果搜索到的视频不足,重复列表
all_videos = []
while len(all_videos) < video_count:
all_videos.extend(search_videos)
all_videos = all_videos[:video_count]
all_data = []
successful_count = 0
# 使用线程池并发处理
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_video = {
executor.submit(self.process_single_video, video): video
for video in all_videos
}
# 处理完成的任务
for future in as_completed(future_to_video):
video = future_to_video[future]
try:
video_data = future.result()
if video_data:
all_data.extend(video_data)
successful_count += 1
self.stats['successful_videos'] += 1
self.stats['total_danmu'] += len(video_data)
else:
self.stats['failed_videos'] += 1
self.stats['total_videos'] += 1
except Exception as e:
logger.error(f"处理视频 {video['bvid']} 时发生异常: {e}")
self.stats['failed_videos'] += 1
self.stats['total_videos'] += 1
logger.info(f"关键词 '{keyword}' 完成: 成功{successful_count}个视频,获取{len(all_data)}条弹幕")
return all_data
def crawl_all_keywords(self, keywords):
"""爬取所有关键词"""
self.stats['start_time'] = datetime.now()
logger.info(f"开始爬取所有关键词: {keywords}")
all_data = []
for keyword in keywords:
keyword_data = self.crawl_keyword_concurrent(keyword, video_count=120)
all_data.extend(keyword_data)
# 关键词间短暂休息
if keyword != keywords[-1]: # 不是最后一个关键词
wait_time = random.uniform(5, 8)
logger.info(f"完成关键词 '{keyword}',等待{wait_time:.2f}秒继续...")
time.sleep(wait_time)
self.stats['end_time'] = datetime.now()
return pd.DataFrame(all_data)
def save_to_excel(self, df, filename):
"""保存数据到Excel"""
try:
if df.empty:
logger.warning("没有数据可保存")
return False
# 确保目录存在
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
# 保存数据
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='弹幕数据', index=False)
# 添加统计信息工作表
stats_df = self.create_stats_dataframe()
stats_df.to_excel(writer, sheet_name='统计信息', index=False)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存Excel时出错: {e}")
return False
def create_stats_dataframe(self):
"""创建统计信息DataFrame"""
duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds() if self.stats['end_time'] else 0
success_rate = (self.stats['successful_videos'] / self.stats['total_videos'] * 100) if self.stats['total_videos'] > 0 else 0
avg_danmu = (self.stats['total_danmu'] / self.stats['successful_videos']) if self.stats['successful_videos'] > 0 else 0
stats_data = {
'统计项': [
'开始时间', '结束时间', '总耗时',
'总视频数', '成功视频数', '失败视频数',
'成功率', '总弹幕数', '平均每个视频弹幕数'
],
'数值': [
self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S') if self.stats['start_time'] else 'N/A',
self.stats['end_time'].strftime('%Y-%m-%d %H:%M:%S') if self.stats['end_time'] else 'N/A',
f"{duration:.2f}",
self.stats['total_videos'],
self.stats['successful_videos'],
self.stats['failed_videos'],
f"{success_rate:.2f}%",
self.stats['total_danmu'],
f"{avg_danmu:.2f}"
]
}
return pd.DataFrame(stats_data)
def print_summary(self):
"""打印爬取摘要"""
duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds() if self.stats['end_time'] else 0
success_rate = (self.stats['successful_videos'] / self.stats['total_videos'] * 100) if self.stats['total_videos'] > 0 else 0
print("\n" + "="*60)
print("爬取摘要")
print("="*60)
print(f"开始时间: {self.stats['start_time']}")
print(f"结束时间: {self.stats['end_time']}")
print(f"总耗时: {duration/60:.2f} 分钟")
print(f"总视频数: {self.stats['total_videos']}")
print(f"成功视频数: {self.stats['successful_videos']}")
print(f"失败视频数: {self.stats['failed_videos']}")
print(f"成功率: {success_rate:.2f}%")
print(f"总弹幕数: {self.stats['total_danmu']}")
print("="*60)
def main():
# 定义搜索关键词
keywords = ["大语言模型", "大模型", "LLM"]
# 初始化优化爬虫可以调整并发数建议3-5之间
crawler = OptimizedBilibiliCrawler(max_workers=4)
# 爬取数据
logger.info("开始爬取B站弹幕数据...")
logger.info(f"目标: 每个关键词120个视频总共{len(keywords)*120}个视频")
try:
danmu_df = crawler.crawl_all_keywords(keywords)
# 保存数据
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f"B站弹幕数据_{timestamp}.xlsx"
if crawler.save_to_excel(danmu_df, output_file):
# 打印摘要
crawler.print_summary()
# 按关键词统计
if not danmu_df.empty:
print("\n按关键词统计:")
for keyword in keywords:
keyword_count = len(danmu_df[danmu_df['关键词'] == keyword])
print(f"关键词 '{keyword}': {keyword_count} 条弹幕")
# 统计视频数量
unique_videos = danmu_df['视频BV号'].nunique()
print(f"成功爬取的视频数量: {unique_videos}")
except Exception as e:
logger.error(f"爬取过程中发生严重错误: {e}")
print(f"程序执行失败: {e}")
if __name__ == "__main__":
main()

@ -0,0 +1,108 @@
from crawler import BilibiliDanmuCrawler
from data_processor import DataProcessor
from visualizer import Visualizer
import pandas as pd
def generate_conclusions(top_apps: pd.DataFrame, word_freq: pd.DataFrame, processed_df: pd.DataFrame):
"""生成分析结论"""
print("\n" + "=" * 60)
print(" 大语言模型应用分析结论")
print("=" * 60)
# 1. 主流应用领域
print("\n📊 1. 主流应用领域分析:")
for i, (app, count) in enumerate(zip(top_apps['应用领域'], top_apps['出现次数']), 1):
percentage = (count / top_apps['出现次数'].sum()) * 100
print(f" {i}. {app}: {count}次提及 ({percentage:.1f}%)")
# 2. 技术关注点
print("\n🔬 2. 技术关注点分析:")
tech_keywords = ['模型', 'AI', '智能', '生成', '训练', '部署', '算法']
tech_words = [word for word, freq in zip(word_freq['词语'], word_freq['频次'])
if any(kw in word for kw in tech_keywords)][:8]
print(f" 技术相关高频词: {', '.join(tech_words)}")
# 3. 用户态度分析
positive_words = ['', '', '', '方便', '高效', '推荐', '优秀', '实用']
negative_words = ['问题', '担心', '风险', '', '', '复杂', '取代', '改进']
positive_count = sum(freq for word, freq in zip(word_freq['词语'], word_freq['频次'])
if any(pw in word for pw in positive_words))
negative_count = sum(freq for word, freq in zip(word_freq['词语'], word_freq['频次'])
if any(nw in word for nw in negative_words))
total_attitude = positive_count + negative_count
if total_attitude > 0:
positive_ratio = (positive_count / total_attitude) * 100
else:
positive_ratio = 0
print(f"\n😊 3. 用户态度倾向分析:")
print(f" 积极态度词汇出现次数: {positive_count}")
print(f" 消极态度词汇出现次数: {negative_count}")
print(f" 积极评价占比: {positive_ratio:.1f}%")
# 4. 应用成本关注
cost_keywords = ['成本', '价格', '收费', '免费', '', '费用']
cost_mentions = sum(1 for danmu in processed_df['original_danmu']
if any(ck in danmu for ck in cost_keywords))
print(f"\n💰 4. 应用成本关注度: {cost_mentions}次提及")
# 5. 就业影响关注
employment_keywords = ['取代', '就业', '工作', '岗位', '职业', '失业']
employment_mentions = sum(1 for danmu in processed_df['original_danmu']
if any(ek in danmu for ek in employment_keywords))
print(f"👥 5. 就业影响关注度: {employment_mentions}次提及")
# 6. 数据安全隐私关注
security_keywords = ['隐私', '安全', '数据', '泄露', '保护']
security_mentions = sum(1 for danmu in processed_df['original_danmu']
if any(sk in danmu for sk in security_keywords))
print(f"🔒 6. 数据安全隐私关注度: {security_mentions}次提及")
# 7. 主要结论
print("\n🎯 7. 主要结论:")
conclusions = [
"大语言模型在编程开发和内容创作领域应用最为广泛",
"用户对AI技术的积极评价占主导地位",
"应用成本和就业影响是用户主要关注点",
"数据安全和隐私保护意识逐渐增强",
"多模态和本地部署成为技术发展趋势"
]
for i, conclusion in enumerate(conclusions, 1):
print(f"{conclusion}")
def main():
print("=" * 50)
print(" 大语言模型应用评论分析系统")
print("=" * 50)
try:
# 步骤1: 数据爬取
print("\n🚀 步骤1: 数据爬取")
from crawler import main as crawler_main
raw_df = crawler_main()
# 步骤2: 数据处理
print("\n🔧 步骤2: 数据处理")
from data_processor import main as processor_main
processed_df, top_apps, word_freq = processor_main()
# 步骤3: 数据可视化
print("\n📈 步骤3: 数据可视化")
from visualizer import main as visualizer_main
visualizer_main()
# 步骤4: 生成分析报告
print("\n📝 步骤4: 生成分析结论")
generate_conclusions(top_apps, word_freq, processed_df)
print("\n✅ 分析完成!所有结果已保存到相应目录。")
except Exception as e:
print(f"❌ 程序执行出错: {e}")
print("请检查依赖是否安装正确,或查看具体错误信息")
if __name__ == "__main__":
main()

@ -0,0 +1,164 @@
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
from wordcloud import WordCloud
import pandas as pd
import numpy as np
from collections import Counter
import os
class Visualizer:
def __init__(self):
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False
self.font_path = self.find_chinese_font()
def find_chinese_font(self):
"""寻找中文字体"""
try:
# 尝试常见的中文字体路径
font_paths = [
'C:/Windows/Fonts/simhei.ttf', # Windows
'/System/Library/Fonts/PingFang.ttc', # macOS
'/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf' # Linux
]
for font_path in font_paths:
if os.path.exists(font_path):
return font_path
# 如果找不到使用matplotlib默认字体
return None
except:
return None
def create_wordcloud(self, word_freq_df: pd.DataFrame, save_path: str):
"""创建词云图"""
# 创建词频字典
word_freq = dict(zip(word_freq_df['词语'], word_freq_df['频次']))
# 创建词云
wc_config = {
'width': 1200,
'height': 800,
'background_color': 'white',
'colormap': 'viridis',
'max_words': 100,
'relative_scaling': 0.5
}
if self.font_path:
wc_config['font_path'] = self.font_path
wc = WordCloud(**wc_config)
wordcloud = wc.generate_from_frequencies(word_freq)
# 绘制词云
plt.figure(figsize=(15, 10))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.title('大语言模型应用弹幕词云分析', fontsize=20, pad=20)
plt.tight_layout()
# 确保目录存在
os.makedirs('visualization', exist_ok=True)
plt.savefig(save_path, dpi=300, bbox_inches='tight',
facecolor='white', edgecolor='none')
plt.show()
print(f"词云图已保存到: {save_path}")
def plot_applications_bar(self, top_apps_df: pd.DataFrame, save_path: str):
"""绘制应用领域条形图"""
plt.figure(figsize=(12, 8))
colors = plt.cm.Set3(np.linspace(0, 1, len(top_apps_df)))
bars = plt.barh(top_apps_df['应用领域'], top_apps_df['出现次数'],
color=colors, edgecolor='black', alpha=0.8)
# 添加数据标签
for bar in bars:
width = bar.get_width()
plt.text(width + 0.1, bar.get_y() + bar.get_height()/2,
f'{int(width)}', ha='left', va='center', fontsize=12)
plt.xlabel('出现次数', fontsize=14)
plt.title('大语言模型应用领域分布Top 8', fontsize=16, pad=20)
plt.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
print(f"应用领域分布图已保存到: {save_path}")
def plot_sentiment_analysis(self, processed_df: pd.DataFrame, save_path: str):
"""绘制情感分析图"""
# 简单的情感关键词分类
positive_words = ['', '', '', '厉害', '方便', '高效', '智能', '强大', '优秀', '推荐']
negative_words = ['', '', '问题', '担心', '风险', '', '', '复杂', '取代', '改进']
sentiment_counts = {'积极': 0, '消极': 0, '中性': 0}
for danmu in processed_df['original_danmu']:
positive_count = sum(1 for word in positive_words if word in danmu)
negative_count = sum(1 for word in negative_words if word in danmu)
if positive_count > negative_count:
sentiment_counts['积极'] += 1
elif negative_count > positive_count:
sentiment_counts['消极'] += 1
else:
sentiment_counts['中性'] += 1
# 绘制饼图
plt.figure(figsize=(10, 8))
colors = ['#ff9999', '#66b3ff', '#99ff99']
plt.pie(sentiment_counts.values(), labels=sentiment_counts.keys(),
autopct='%1.1f%%', colors=colors, startangle=90,
explode=(0.1, 0, 0)) # 突出显示积极评价
plt.title('弹幕情感倾向分布', fontsize=16)
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.show()
print(f"情感分析图已保存到: {save_path}")
def create_comprehensive_visualization(self, processed_df: pd.DataFrame,
top_apps_df: pd.DataFrame,
word_freq_df: pd.DataFrame):
"""创建综合可视化"""
# 确保可视化目录存在
os.makedirs('visualization', exist_ok=True)
# 1. 词云图
self.create_wordcloud(word_freq_df, 'visualization/wordcloud.png')
# 2. 应用领域分布
self.plot_applications_bar(top_apps_df, 'visualization/applications_distribution.png')
# 3. 情感倾向分析
self.plot_sentiment_analysis(processed_df, 'visualization/sentiment_analysis.png')
def main():
visualizer = Visualizer()
try:
# 加载处理后的数据
processed_df = pd.read_excel('data/processed/llm_analysis.xlsx',
sheet_name='弹幕数据')
top_apps_df = pd.read_excel('data/processed/llm_analysis.xlsx',
sheet_name='应用领域排名')
word_freq_df = pd.read_excel('data/processed/llm_analysis.xlsx',
sheet_name='词频统计')
# 创建可视化
visualizer.create_comprehensive_visualization(processed_df, top_apps_df, word_freq_df)
print("所有可视化图表生成完成!")
except Exception as e:
print(f"可视化过程中出现错误: {e}")
print("请先运行 data_processor.py 生成数据")
if __name__ == "__main__":
main()
Loading…
Cancel
Save