命名修改

main
李玲 2 months ago
parent f76c2954be
commit 45497433a4

@ -1,77 +0,0 @@
import requests
import re
from lxml import etree
import os
class BiliBiliDanMu:
def __init__(self, bv, filename):
# 自动处理 BV 号,确保没有重复的 "BV" 前缀
if bv.startswith("BV"):
bv = bv[2:]
# 根据 bv 号构造要爬取的视频 URL 地址
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)\
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36 Edg/85.0.564.44"
}
# 获取视频的 cid
def get_video_cid(self):
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
return None
html = response.content.decode()
print(f"HTML 内容(前500字符): {html[:500]}") # 打印部分 HTML 内容用于调试
cid = re.findall(r'("cid":)([0-9]+)', html)
# 有的视频没有这个字段,我们跳过它
if len(cid) == 0:
print("未找到 cid")
return None
else:
return cid[0][-1]
# 获取请求弹幕 XML 文件返回的内容
def get_content(self, xml_url):
response = requests.get(xml_url, headers=self.headers, timeout=10)
return response.content
# 解析获取到的内容,得到包含视频所有弹幕的列表
def extract_danmu(self, content_str):
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
# 将弹幕逐行写入并保存为 txt 文件
def save(self, save_items):
# 确保输出目录存在
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True) # 自动创建目录
with open(self.filename, 'w', encoding='utf-8') as f:
lines = []
for item in save_items:
lines.append(item + '\n')
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
# 爬虫的过程封装
def crawl(self):
cid = self.get_video_cid()
# 跳过没有 cid 字段的视频
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
if __name__ == '__main__':
bv = input("请输入视频的 bv 号: ")
# 处理文件名,确保路径正确
filename = 'E:/前端/软件工程/{}.txt'.format(str(bv))
dm = BiliBiliDanMu(bv, filename)
dm.crawl()

@ -1,102 +0,0 @@
import unittest
from unittest.mock import patch, MagicMock
from crawl_getthread import BiliBiliDanMu, search_videos, download_danmu, getthread
import os
class TestBiliBiliDanMu(unittest.TestCase):
"""
测试 BiliBiliDanMu 类的单元测试类
"""
@patch('crawl_getthread.requests.get')
def test_get_video_cid_success(self, mock_get):
"""
测试根据视频 BV 号获取弹幕 CID 成功
"""
mock_get.return_value = MagicMock(status_code=200, text='{"cid":123456}')
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertEqual(danmu_crawler.get_video_cid(), "123456")
@patch('crawl_getthread.requests.get')
def test_get_video_cid_failure(self, mock_get):
"""
测试根据视频 BV 号获取弹幕 CID 失败
"""
mock_get.return_value = MagicMock(status_code=404)
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertIsNone(danmu_crawler.get_video_cid())
@patch('crawl_getthread.requests.get')
def test_get_content_success(self, mock_get):
"""
测试根据弹幕 XML URL 获取弹幕内容成功
"""
mock_get.return_value = MagicMock(status_code=200, text='<d>弹幕内容</d>')
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertEqual(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml"), '<d>弹幕内容</d>')
@patch('crawl_getthread.requests.get')
def test_get_content_failure(self, mock_get):
"""
测试根据弹幕 XML URL 获取弹幕内容失败
"""
mock_get.return_value = MagicMock(status_code=404)
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
self.assertIsNone(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml"))
def test_extract_danmu(self):
"""
测试解析弹幕 XML 内容提取弹幕文本
"""
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
content_str = '<i><d p="0,1,25,16777215,0,0,0,0">弹幕内容1</d><d p="0,1,25,16777215,0,0,0,0">弹幕内容2</d></i>'
self.assertEqual(danmu_crawler.extract_danmu(content_str), ['弹幕内容1', '弹幕内容2'])
def test_save(self):
"""
测试保存弹幕到文件
"""
danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt")
danmu_crawler.save(['弹幕内容1', '弹幕内容2'])
with open("test_output.txt", 'r', encoding='utf-8') as f:
self.assertEqual(f.readlines(), ['弹幕内容1\n', '弹幕内容2\n'])
os.remove("test_output.txt") # 清理测试文件
@patch('crawl_getthread.search_videos')
def test_search_videos_success(self, mock_search_videos):
"""
测试根据关键词搜索视频并成功返回视频 ID 列表
"""
mock_search_videos.return_value = ['BV12345', 'BV67890']
video_ids = search_videos("2024巴黎奥运会", max_results=2)
self.assertEqual(video_ids, ['BV12345', 'BV67890'])
@patch('crawl_getthread.search_videos')
def test_search_videos_failure(self, mock_search_videos):
"""
测试根据关键词搜索视频时请求失败
"""
mock_search_videos.return_value = []
video_ids = search_videos("2024巴黎奥运会", max_results=2)
self.assertEqual(video_ids, [])
@patch('crawl_getthread.download_danmu')
def test_download_danmu(self, mock_download):
"""
测试下载指定 BV 号视频的弹幕
"""
mock_download.return_value = None
download_danmu(0, "BV12345", "test_output.txt")
mock_download.assert_called_with(0, "BV12345", "test_output.txt")
@patch('crawl_getthread.getthread')
def test_getthread(self, mock_getthread):
"""
测试使用线程池并发下载弹幕
"""
mock_getthread.return_value = None
getthread()
mock_getthread.assert_called_once()
if __name__ == '__main__':
unittest.main() # 执行测试

@ -1,169 +0,0 @@
# import cProfile
import requests
import re
from lxml import etree
import os
import time
import random
from collections import OrderedDict
class BiliBiliDanMu:
def __init__(self, bv, filename):
if bv.startswith("BV"):
bv = bv[2:]
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def get_video_cid(self):
retry_count = 3
for attempt in range(retry_count):
try:
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
continue
html = response.content.decode()
cid = re.findall(r'("cid":)([0-9]+)', html)
if not cid:
print("未找到 cid")
continue
else:
return cid[0][-1]
except requests.exceptions.RequestException as e:
print(f"获取 cid 时出错: {e}")
print(f"{attempt + 1} 次重试获取 cid...")
time.sleep(2)
return None
def get_content(self, xml_url):
try:
response = requests.get(xml_url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print(f"获取弹幕内容失败,状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"获取弹幕时出错: {e}")
return None
def extract_danmu(self, content_str):
try:
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
except Exception as e:
print(f"解析弹幕时出错: {e}")
return []
def save(self, save_items):
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True)
with open(self.filename, 'w', encoding='utf-8') as f:
lines = [item + '\n' for item in save_items]
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
def crawl(self):
cid = self.get_video_cid()
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
if content_str:
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
def search_videos(query, max_results=350):
search_url = "https://api.bilibili.com/x/web-interface/search/type"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
"cookie": "your cookie" # 替换为实际的 Cookie
}
bv_list = []
page = 1
while len(bv_list) < max_results:
params = {
'keyword': query,
'search_type': 'video',
'order': 'totalrank',
'page': page,
'pagesize': 50
}
try:
response = requests.get(search_url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
results = response.json()
if results['code'] == 0:
videos = results['data']['result']
if not videos:
break
bv_list += [video['bvid'] for video in videos]
print(f"已抓取 {len(bv_list)} 个视频")
else:
print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}")
if '频繁' in results.get('message', ''):
print("限流,等待后重试")
time.sleep(random.uniform(5, 10))
continue
break
else:
print(f"搜索请求失败,状态码: {response.status_code}")
break
except requests.exceptions.RequestException as e:
print(f"请求失败,错误: {e}")
time.sleep(random.uniform(2, 5))
continue
page += 1
time.sleep(random.uniform(1, 3))
bv_list = list(OrderedDict.fromkeys(bv_list)) # 去重操作
return bv_list[:max_results]
def download_danmu(index, bv, filename):
danmu_crawler = BiliBiliDanMu(bv, filename)
danmu_crawler.crawl()
def getfor():
for index, bv in enumerate(bv_list):
filename = f'{output_dir}{index + 1}个视频_{bv}.txt'
print(f"正在抓取 BV号 {bv} 的弹幕...")
download_danmu(index, bv, filename)
print(f"BV号 {bv} 的弹幕抓取完成")
if __name__ == '__main__':
query = input("请输入搜索关键词: ")
bv_list = search_videos(query)
# 限制爬取的最大视频数量为300
bv_list = bv_list[:300]
output_dir = 'E:/前端/软件工程/弹幕收集first/'
os.makedirs(output_dir, exist_ok=True)
# 依次抓取每个视频的弹幕
getfor()
# for index, bv in enumerate(bv_list):
# filename = f'{output_dir}第{index + 1}个视频_{bv}.txt'
# print(f"正在抓取 BV号 {bv} 的弹幕...")
# download_danmu(index, bv, filename)
# print(f"BV号 {bv} 的弹幕抓取完成")
Loading…
Cancel
Save