2024巴黎奥运会爬虫相关代码

main
李玲 2 months ago
parent 99daa2fd83
commit f76c2954be

@ -0,0 +1,77 @@
import requests
import re
from lxml import etree
import os
class BiliBiliDanMu:
def __init__(self, bv, filename):
# 自动处理 BV 号,确保没有重复的 "BV" 前缀
if bv.startswith("BV"):
bv = bv[2:]
# 根据 bv 号构造要爬取的视频 URL 地址
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)\
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36 Edg/85.0.564.44"
}
# 获取视频的 cid
def get_video_cid(self):
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
return None
html = response.content.decode()
print(f"HTML 内容(前500字符): {html[:500]}") # 打印部分 HTML 内容用于调试
cid = re.findall(r'("cid":)([0-9]+)', html)
# 有的视频没有这个字段,我们跳过它
if len(cid) == 0:
print("未找到 cid")
return None
else:
return cid[0][-1]
# 获取请求弹幕 XML 文件返回的内容
def get_content(self, xml_url):
response = requests.get(xml_url, headers=self.headers, timeout=10)
return response.content
# 解析获取到的内容,得到包含视频所有弹幕的列表
def extract_danmu(self, content_str):
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
# 将弹幕逐行写入并保存为 txt 文件
def save(self, save_items):
# 确保输出目录存在
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True) # 自动创建目录
with open(self.filename, 'w', encoding='utf-8') as f:
lines = []
for item in save_items:
lines.append(item + '\n')
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
# 爬虫的过程封装
def crawl(self):
cid = self.get_video_cid()
# 跳过没有 cid 字段的视频
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
if __name__ == '__main__':
bv = input("请输入视频的 bv 号: ")
# 处理文件名,确保路径正确
filename = 'E:/前端/软件工程/{}.txt'.format(str(bv))
dm = BiliBiliDanMu(bv, filename)
dm.crawl()

@ -0,0 +1,169 @@
# import cProfile
import requests
import re
from lxml import etree
import os
import time
import random
from collections import OrderedDict
class BiliBiliDanMu:
def __init__(self, bv, filename):
if bv.startswith("BV"):
bv = bv[2:]
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def get_video_cid(self):
retry_count = 3
for attempt in range(retry_count):
try:
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
continue
html = response.content.decode()
cid = re.findall(r'("cid":)([0-9]+)', html)
if not cid:
print("未找到 cid")
continue
else:
return cid[0][-1]
except requests.exceptions.RequestException as e:
print(f"获取 cid 时出错: {e}")
print(f"{attempt + 1} 次重试获取 cid...")
time.sleep(2)
return None
def get_content(self, xml_url):
try:
response = requests.get(xml_url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print(f"获取弹幕内容失败,状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"获取弹幕时出错: {e}")
return None
def extract_danmu(self, content_str):
try:
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
except Exception as e:
print(f"解析弹幕时出错: {e}")
return []
def save(self, save_items):
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True)
with open(self.filename, 'w', encoding='utf-8') as f:
lines = [item + '\n' for item in save_items]
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
def crawl(self):
cid = self.get_video_cid()
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
if content_str:
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
def search_videos(query, max_results=350):
search_url = "https://api.bilibili.com/x/web-interface/search/type"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
"cookie": "your cookie" # 替换为实际的 Cookie
}
bv_list = []
page = 1
while len(bv_list) < max_results:
params = {
'keyword': query,
'search_type': 'video',
'order': 'totalrank',
'page': page,
'pagesize': 50
}
try:
response = requests.get(search_url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
results = response.json()
if results['code'] == 0:
videos = results['data']['result']
if not videos:
break
bv_list += [video['bvid'] for video in videos]
print(f"已抓取 {len(bv_list)} 个视频")
else:
print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}")
if '频繁' in results.get('message', ''):
print("限流,等待后重试")
time.sleep(random.uniform(5, 10))
continue
break
else:
print(f"搜索请求失败,状态码: {response.status_code}")
break
except requests.exceptions.RequestException as e:
print(f"请求失败,错误: {e}")
time.sleep(random.uniform(2, 5))
continue
page += 1
time.sleep(random.uniform(1, 3))
bv_list = list(OrderedDict.fromkeys(bv_list)) # 去重操作
return bv_list[:max_results]
def download_danmu(index, bv, filename):
danmu_crawler = BiliBiliDanMu(bv, filename)
danmu_crawler.crawl()
def getfor():
for index, bv in enumerate(bv_list):
filename = f'{output_dir}{index + 1}个视频_{bv}.txt'
print(f"正在抓取 BV号 {bv} 的弹幕...")
download_danmu(index, bv, filename)
print(f"BV号 {bv} 的弹幕抓取完成")
if __name__ == '__main__':
query = input("请输入搜索关键词: ")
bv_list = search_videos(query)
# 限制爬取的最大视频数量为300
bv_list = bv_list[:300]
output_dir = 'E:/前端/软件工程/弹幕收集first/'
os.makedirs(output_dir, exist_ok=True)
# 依次抓取每个视频的弹幕
getfor()
# for index, bv in enumerate(bv_list):
# filename = f'{output_dir}第{index + 1}个视频_{bv}.txt'
# print(f"正在抓取 BV号 {bv} 的弹幕...")
# download_danmu(index, bv, filename)
# print(f"BV号 {bv} 的弹幕抓取完成")

@ -0,0 +1,175 @@
import requests
import re
from lxml import etree
import os
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import OrderedDict
class BiliBiliDanMu:
def __init__(self, bv, filename):
#处理输入的 BV 号,确保是正确格式
if bv.startswith("BV"):
bv = bv[2:]
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def get_video_cid(self):
#尝试最多 3 次获取视频的 cid
retry_count = 3
for attempt in range(retry_count):
try:
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
continue
html = response.content.decode()
cid = re.findall(r'("cid":)([0-9]+)', html)
if not cid:
print("未找到 cid")
continue
else:
return cid[0][-1]
except requests.exceptions.RequestException as e:
print(f"获取 cid 时出错: {e}")
print(f"{attempt + 1} 次重试获取 cid...")
time.sleep(2)
return None
def get_content(self, xml_url):
#获取弹幕 XML 文件的内容
try:
response = requests.get(xml_url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print(f"获取弹幕内容失败,状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"获取弹幕时出错: {e}")
return None
def extract_danmu(self, content_str):
#解析XML内容提取弹幕
try:
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
except Exception as e:
print(f"解析弹幕时出错: {e}")
return []
def save(self, save_items):
#保存弹幕到文件
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True)
with open(self.filename, 'w', encoding='utf-8') as f:
lines = [item + '\n' for item in save_items]
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
def crawl(self):
#执行爬取流程
cid = self.get_video_cid()
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
if content_str:
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
def search_videos(query, max_results=350):
#搜索视频,最多返回 max_results 个结果
search_url = "https://api.bilibili.com/x/web-interface/search/type"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
"cookie": "your cookie" #Cookie 有就行,内容随意
}
bv_list = []
page = 1
while len(bv_list) < max_results:
params = {
'keyword': query,
'search_type': 'video',
'order': 'totalrank',
'page': page,
'pagesize': 50
}
try:
response = requests.get(search_url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
results = response.json()
if results['code'] == 0:
videos = results['data']['result']
if not videos:
break
bv_list += [video['bvid'] for video in videos]
print(f"已抓取 {len(bv_list)} 个视频")
else:
print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}")
if '频繁' in results.get('message', ''):
print("限流,等待后重试")
time.sleep(random.uniform(5, 10))
continue
break
else:
print(f"搜索请求失败,状态码: {response.status_code}")
break
except requests.exceptions.RequestException as e:
print(f"请求失败,错误: {e}")
time.sleep(random.uniform(2, 5))
continue
page += 1
time.sleep(random.uniform(1, 3)) #防止请求过于频繁被禁止
bv_list = list(OrderedDict.fromkeys(bv_list)) #去重操作
return bv_list[:max_results]
def download_danmu(index, bv, filename):
#下载指定BV号视频的弹幕
danmu_crawler = BiliBiliDanMu(bv, filename)
danmu_crawler.crawl()
def getthread():
#使用线程池并发下载弹幕
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_bv = {
executor.submit(download_danmu, index, bv, f'{output_dir}{index + 1}个视频_{bv}.txt'): bv for index, bv in enumerate(bv_list)
}
for future in as_completed(future_to_bv):
bv = future_to_bv[future]
try:
future.result()
print(f"BV号 {bv} 的弹幕抓取完成")
except Exception as exc:
print(f"BV号 {bv} 的弹幕抓取时出错: {exc}")
if __name__ == '__main__':
query = input("请输入搜索关键词: ")
bv_list = search_videos(query)
#限制爬取的最大视频数量为 300
bv_list = bv_list[:300]
output_dir = 'E:/前端/软件工程/弹幕收集bark/'
os.makedirs(output_dir, exist_ok=True)
getthread()

@ -0,0 +1,102 @@
import unittest
from unittest.mock import patch, MagicMock
from crawl_getthread import BiliBiliDanMu, search_videos, download_danmu, getthread
import os
class TestBiliBiliDanMu(unittest.TestCase):
"""
测试 BiliBiliDanMu 类的单元测试类
"""
@patch('crawl_getthread.requests.get')
def test_get_video_cid_success(self, mock_get):
"""
测试根据视频 BV 号获取弹幕 CID 成功
"""
mock_get.return_value = MagicMock(status_code=200, text='{"cid":123456}')
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertEqual(danmu_crawler.get_video_cid(), "123456")
@patch('crawl_getthread.requests.get')
def test_get_video_cid_failure(self, mock_get):
"""
测试根据视频 BV 号获取弹幕 CID 失败
"""
mock_get.return_value = MagicMock(status_code=404)
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertIsNone(danmu_crawler.get_video_cid())
@patch('crawl_getthread.requests.get')
def test_get_content_success(self, mock_get):
"""
测试根据弹幕 XML URL 获取弹幕内容成功
"""
mock_get.return_value = MagicMock(status_code=200, text='<d>弹幕内容</d>')
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertEqual(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml"), '<d>弹幕内容</d>')
@patch('crawl_getthread.requests.get')
def test_get_content_failure(self, mock_get):
"""
测试根据弹幕 XML URL 获取弹幕内容失败
"""
mock_get.return_value = MagicMock(status_code=404)
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertIsNone(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml"))
def test_extract_danmu(self):
"""
测试解析弹幕 XML 内容提取弹幕文本
"""
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
content_str = '<i><d p="0,1,25,16777215,0,0,0,0">弹幕内容1</d><d p="0,1,25,16777215,0,0,0,0">弹幕内容2</d></i>'
self.assertEqual(danmu_crawler.extract_danmu(content_str), ['弹幕内容1', '弹幕内容2'])
def test_save(self):
"""
测试保存弹幕到文件
"""
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
danmu_crawler.save(['弹幕内容1', '弹幕内容2'])
with open("test_output.txt", 'r', encoding='utf-8') as f:
self.assertEqual(f.readlines(), ['弹幕内容1\n', '弹幕内容2\n'])
os.remove("test_output.txt") # 清理测试文件
@patch('crawl_getthread.search_videos')
def test_search_videos_success(self, mock_search_videos):
"""
测试根据关键词搜索视频并成功返回视频 ID 列表
"""
mock_search_videos.return_value = ['BV12345', 'BV67890']
video_ids = search_videos("2024巴黎奥运会", max_results=2)
self.assertEqual(video_ids, ['BV12345', 'BV67890'])
@patch('crawl_getthread.search_videos')
def test_search_videos_failure(self, mock_search_videos):
"""
测试根据关键词搜索视频时请求失败
"""
mock_search_videos.return_value = []
video_ids = search_videos("2024巴黎奥运会", max_results=2)
self.assertEqual(video_ids, [])
@patch('crawl_getthread.download_danmu')
def test_download_danmu(self, mock_download):
"""
测试下载指定 BV 号视频的弹幕
"""
mock_download.return_value = None
download_danmu(0, "BV12345", "test_output.txt")
mock_download.assert_called_with(0, "BV12345", "test_output.txt")
@patch('crawl_getthread.getthread')
def test_getthread(self, mock_getthread):
"""
测试使用线程池并发下载弹幕
"""
mock_getthread.return_value = None
getthread()
mock_getthread.assert_called_once()
if __name__ == '__main__':
unittest.main() # 执行测试

@ -0,0 +1,84 @@
import os
import jieba
from wordcloud import WordCloud
import matplotlib.pyplot as plt
def generate_wordcloud(directory, output_file):
"""
生成普通词云图
"""
text = ""
for filename in os.listdir(directory):
if filename.endswith('.txt'):
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
text += file.read()
words = jieba.cut(text)
stop_words = set([
"", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "所以", "", "",
"所以", "", "", "", "", "", "", "", "", "", "", "", "", "", ""
])
filtered_words = [word for word in words if word.strip() and word not in stop_words]
word_freq = {}
for word in filtered_words:
word_freq[word] = word_freq.get(word, 0) + 1
wordcloud = WordCloud(font_path='simsun.ttc', width=800, height=400, background_color='white').generate_from_frequencies(word_freq)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis("off")
plt.savefig(output_file)
plt.close()
def generate_trophy_wordcloud(directory, output_file):
"""
生成奖杯词云图
"""
from wordcloud import WordCloud, STOPWORDS
import matplotlib.pyplot as plt
import numpy as np
import jieba.posseg as pseg
from collections import Counter
from PIL import Image
from matplotlib import colors
text = ""
for filename in os.listdir(directory):
if filename.endswith('.txt'):
with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file:
text += file.read()
words = pseg.cut(text)
report_words = [word for word, flag in words if (len(word) >= 2) and ('n' in flag)]
result = Counter(report_words).most_common(300)
content = dict(result)
stopwords = set(STOPWORDS)
stopwords.update(["", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "", "", "",
"", "", "", "", "", "", "", "", "", "所以", "", "",
"所以", "", "", "", "", "", "", "", "", "", "", "", "", "", ""])
background = Image.open("E:/前端/奖杯4.png").convert('RGB')
mask = np.array(background)
font_path = r"C:\Windows\Fonts\STLITI.TTF"
max_font_size = 100
min_font_size = 10
color_list = ['#FF274B']
colormap = colors.ListedColormap(color_list)
wordcloud = WordCloud(scale=4, font_path=font_path, colormap=colormap, width=1600, height=900, background_color='white', stopwords=stopwords, mask=mask, max_font_size=max_font_size, min_font_size=min_font_size).generate_from_frequencies(content)
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.savefig(output_file)
plt.close()

@ -0,0 +1,61 @@
import unittest
from unittest.mock import patch, mock_open
import os
from 数据分析 import read_danmu, filter_ai_related_danmu, count_danmu, save, print_top_n_danmu
class TestAIDanmuAnalysis(unittest.TestCase):
"""
测试 数据分析 模块的单元测试类
"""
def setUp(self):
self.folder_path = 'test_data/弹幕收集按序'
self.output_file = 'test_output.xlsx'
def test_read_danmu(self):
"""
测试读取弹幕文件
"""
test_data = "弹幕1\n弹幕2\n"
with patch('builtins.open', new_callable=mock_open, read_data=test_data) as mock_file:
danmu_list = read_danmu(self.folder_path)
self.assertEqual(danmu_list, [('弹幕1', 'file1'), ('弹幕2', 'file1')])
mock_file.assert_called_with(os.path.join(self.folder_path, 'file1.txt'), 'r', encoding='utf-8')
def test_filter_ai_related_danmu(self):
"""
测试筛选与 AI 相关的弹幕
"""
danmu_list = [('弹幕1', 'file1'), ('AI技术', 'file2')]
filtered_danmu = filter_ai_related_danmu(danmu_list, ["AI"])
self.assertEqual(filtered_danmu, [('AI技术', 'file2')])
def test_count_danmu(self):
"""
测试统计弹幕出现次数
"""
danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1')]
danmu_counter = count_danmu(danmu_list)
self.assertEqual(danmu_counter, {'弹幕1': 2, '弹幕2': 1})
def test_save(self):
"""
测试将弹幕数据保存到 Excel 文件
"""
danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1')]
with patch('pandas.DataFrame.to_excel') as mock_to_excel:
save(danmu_list, self.output_file)
mock_to_excel.assert_called_with(self.output_file, index=False)
def test_print_top_n_danmu(self):
"""
测试打印数量排名前N的弹幕
"""
danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1'), ('弹幕3', 'file1')]
with patch('builtins.print') as mock_print:
print_top_n_danmu(danmu_list, top_n=2)
mock_print.assert_any_call('数量排名前 2 的弹幕:')
mock_print.assert_any_call('弹幕内容: 弹幕1, 出现次数: 2')
if __name__ == '__main__':
unittest.main() # 执行测试
Loading…
Cancel
Save