You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

274 lines
9.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import sys
import io
import time
import random
import os
import warnings
import concurrent.futures
import cProfile
import pstats
from collections import Counter
from bs4 import XMLParsedAsHTMLWarning
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
import requests
import jieba
import pandas as pd
import matplotlib.pyplot as plt
from wordcloud import WordCloud, STOPWORDS
from bs4 import BeautifulSoup
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# 配置与会话
LATEST_COOKIES = {
"_uuid": "19863C108-86AA-749A-B6109-467D313F610DA89057infoc",
"b_lsid": "72611EAE_19A869A3F71",
"b_nut": "1750739388",
"bili_jct": "1f059040f95b75b7a38762995189f81e",
"bili_ticket": "eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3NjM0NTE0NTAsImlhdCI6MTc2MzE5MjE5MCwicGx0IjotMX0.FO8oRZK6ZtwGkHZc-NC48kJbBxCo7tEeSvq5YxZpuAg",
"bili_ticket_expires": "1763451390",
"buvid3": "8E7992AB-88DC-C3F3-9906-629AB81924B588319infoc",
"buvid4": "790AE183-11E7-FFDE-8FD5-0C57D524B62589075-025062412-LJ/KMBh5dbu+dWI+zTfBuQ%3D%3D",
"DedeUserID": "341882542",
"DedeUserID__ckMd5": "40c6b5fa2d4c8265",
"SESSDATA": "9eaeb25f%2C1766291503%2Cd8726%2A62CjBfu020dj3EhPNw4mTBS-Qwc8c-SoJ0TdXVevi93oilOeDV2Z1SkDPQ3tXd05-oBwkSVnpBMDlBaFZ3cmdkSFdtTGhaNHJ5a1o4NWlaaG9nUlZpSlJOTmFObTBQMVlqWFhqZE1Melp6QWJpN1BEX1YzMTN2NlhaTDJrbld2aGs0ei14OXhYd2l3IIEC",
"PVID": "1"
}
def create_session():
session = requests.Session()
retry = Retry(total=2, backoff_factor=0.5, status_forcelist=[429, 500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
global_session = create_session()
# 1. 爬取模块
def fetch_danmakus(aid):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/"
}
try:
cid_resp = global_session.get(
f"https://api.bilibili.com/x/web-interface/view?aid={aid}",
headers=headers,
cookies=LATEST_COOKIES,
timeout=8
)
cid = cid_resp.json().get("data", {}).get("cid")
if not cid:
return []
danmaku_resp = global_session.get(
f"https://comment.bilibili.com/{cid}.xml",
headers=headers,
cookies=LATEST_COOKIES,
timeout=8
)
danmaku_resp.encoding = "utf-8"
soup = BeautifulSoup(danmaku_resp.text, "lxml-xml")
return [d.text.strip() for d in soup.find_all("d") if d.text.strip()]
except Exception as e:
print(f"aid={aid} 爬取失败:{str(e)[:20]}")
return []
def get_top_videos_aids(keyword, max_videos=120):
aids = []
page = 1
page_size = 30
headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36"}
while len(aids) < max_videos and page <= 5:
params = {
"keyword": keyword,
"page": page,
"page_size": page_size,
"search_type": "video",
"order": "totalrank"
}
try:
time.sleep(1.5 + random.random())
resp = global_session.get(
"https://api.bilibili.com/x/web-interface/search/type",
params=params,
headers=headers,
cookies=LATEST_COOKIES,
timeout=10
)
data = resp.json()
if data.get("code") != 0:
break
video_list = data.get("data", {}).get("result", [])
if not video_list:
break
new_aids = [str(v.get("aid")) for v in video_list if v.get("aid")]
aids.extend(new_aids)
aids = list(set(aids))
print(f"关键词[{keyword}]页{page}累计AID{len(aids)}/{max_videos}")
page += 1
except Exception:
page += 1
continue
return aids[:max_videos]
# 2. 过滤与保存
def filter_spam_danmakus(all_danmakus):
if not all_danmakus:
return []
total = len(all_danmakus)
counter = Counter(all_danmakus)
min_count = max(5, int(total * 0.005)) if total > 0 else 5
spam_content = {content for content, count in counter.items() if count >= min_count}
filtered = [d for d in all_danmakus if d not in spam_content]
print(f"\n已过滤{len(all_danmakus) - len(filtered)}条刷屏弹幕(剩余{len(filtered)}条)")
return filtered
def save_all_danmakus(all_danmakus):
if not all_danmakus:
print("无弹幕可保存")
return
try:
with open("all_danmakus.txt", "w", encoding="utf-8") as f:
f.write("\n".join(all_danmakus))
print(f"全部弹幕列表已保存至all_danmakus.txt{len(all_danmakus)}条)")
except Exception as e:
print(f"保存全部弹幕失败:{e}")
# 3. 词云与统计
def generate_ai_wordcloud(filtered_danmakus):
if not filtered_danmakus:
print("词云生成失败:无有效弹幕数据")
return
text = " ".join(filtered_danmakus)
if len(text) < 100:
print("警告:有效文本过短,词云可能不完整")
try:
jieba.add_word("大模型")
jieba.add_word("生成式AI")
cut_text = " ".join(jieba.cut(text, HMM=False))
except Exception as e:
print(f"分词失败,使用原始文本:{e}")
cut_text = text
stopwords = STOPWORDS.union({"", "", "", "", "", "", ""})
font_candidates = [
"C:/Windows/Fonts/simhei.ttf",
"C:/Windows/Fonts/msyh.ttc",
"/System/Library/Fonts/PingFang.ttc",
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"
]
font_path = None
for candidate in font_candidates:
if os.path.exists(candidate):
font_path = candidate
break
if not font_path:
font_path = None
print("警告:未找到中文字体,可能显示乱码")
try:
wc = WordCloud(
font_path=font_path,
width=1200,
height=800,
background_color="white",
max_words=150,
stopwords=stopwords,
prefer_horizontal=0.9,
collocations=False
).generate(cut_text)
plt.figure(figsize=(12, 8), dpi=200)
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.tight_layout(pad=0)
plt.savefig("ai_tech_wordcloud.png", dpi=300, bbox_inches="tight")
print("词云图已成功生成ai_tech_wordcloud.png")
plt.close()
except Exception as e:
print(f"词云生成失败:{e}")
try:
wc = WordCloud(background_color="white").generate(cut_text)
wc.to_file("ai_tech_wordcloud_simple.png")
print("已生成简化版词云ai_tech_wordcloud_simple.png")
except:
print("无法生成词云,请检查文本数据")
def stat_ai_danmakus(filtered_danmakus):
ai_keywords = {
"生成", "对话", "编程", "模型", "训练", "微调", "医疗", "教育",
"金融", "推理", "部署", "参数", "API", "多模态", "效率"
}
ai_danmakus = [d for d in filtered_danmakus if any(kw in d for kw in ai_keywords)]
if not ai_danmakus:
print("无有效AI技术应用弹幕")
return
top8 = Counter(ai_danmakus).most_common(8)
print("\n===== AI技术应用弹幕前8名 =====")
for i, (content, count) in enumerate(top8, 1):
print(f"{i}. {content} → 出现{count}")
pd.DataFrame(top8, columns=["弹幕内容", "出现次数"]).to_excel("ai_danmakus_top8.xlsx", index=False)
# 主函数
def main():
keywords = ["大语言模型", "大模型", "LLM"]
all_danmakus = []
for keyword in keywords:
print(f"\n===== 爬取关键词:{keyword} =====")
aids = get_top_videos_aids(keyword, max_videos=120)
if not aids:
print(f"{keyword}视频,跳过")
continue
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
results = executor.map(fetch_danmakus, aids)
for danmakus in results:
all_danmakus.extend(danmakus)
print(f"{keyword}处理完成,累计弹幕:{len(all_danmakus)}")
save_all_danmakus(all_danmakus)
filtered_danmakus = filter_spam_danmakus(all_danmakus)
generate_ai_wordcloud(filtered_danmakus)
stat_ai_danmakus(filtered_danmakus)
# 性能分析入口
if __name__ == "__main__":
profiler = cProfile.Profile()
profiler.enable()
main()
profiler.disable()
print("\n===== 性能分析结果 =====")
stats = pstats.Stats(profiler)
# 兼容不同Python版本的排序方式
try:
stats.sort_stats(pstats.SortKey.TOTAL)
except AttributeError:
stats.sort_stats('tottime')
stats.print_stats(20)
stats.dump_stats("performance_profile.prof")
print("\n完整性能分析报告已保存至performance_profile.prof")
print("可使用 'snakeviz performance_profile.prof' 可视化分析结果")