You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

169 lines
6.0 KiB

# import cProfile
import requests
import re
from lxml import etree
import os
import time
import random
from collections import OrderedDict
class BiliBiliDanMu:
def __init__(self, bv, filename):
if bv.startswith("BV"):
bv = bv[2:]
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def get_video_cid(self):
retry_count = 3
for attempt in range(retry_count):
try:
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
continue
html = response.content.decode()
cid = re.findall(r'("cid":)([0-9]+)', html)
if not cid:
print("未找到 cid")
continue
else:
return cid[0][-1]
except requests.exceptions.RequestException as e:
print(f"获取 cid 时出错: {e}")
print(f"{attempt + 1} 次重试获取 cid...")
time.sleep(2)
return None
def get_content(self, xml_url):
try:
response = requests.get(xml_url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print(f"获取弹幕内容失败,状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"获取弹幕时出错: {e}")
return None
def extract_danmu(self, content_str):
try:
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
except Exception as e:
print(f"解析弹幕时出错: {e}")
return []
def save(self, save_items):
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True)
with open(self.filename, 'w', encoding='utf-8') as f:
lines = [item + '\n' for item in save_items]
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
def crawl(self):
cid = self.get_video_cid()
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
if content_str:
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
def search_videos(query, max_results=350):
search_url = "https://api.bilibili.com/x/web-interface/search/type"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
"cookie": "your cookie" # 替换为实际的 Cookie
}
bv_list = []
page = 1
while len(bv_list) < max_results:
params = {
'keyword': query,
'search_type': 'video',
'order': 'totalrank',
'page': page,
'pagesize': 50
}
try:
response = requests.get(search_url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
results = response.json()
if results['code'] == 0:
videos = results['data']['result']
if not videos:
break
bv_list += [video['bvid'] for video in videos]
print(f"已抓取 {len(bv_list)} 个视频")
else:
print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}")
if '频繁' in results.get('message', ''):
print("限流,等待后重试")
time.sleep(random.uniform(5, 10))
continue
break
else:
print(f"搜索请求失败,状态码: {response.status_code}")
break
except requests.exceptions.RequestException as e:
print(f"请求失败,错误: {e}")
time.sleep(random.uniform(2, 5))
continue
page += 1
time.sleep(random.uniform(1, 3))
bv_list = list(OrderedDict.fromkeys(bv_list)) # 去重操作
return bv_list[:max_results]
def download_danmu(index, bv, filename):
danmu_crawler = BiliBiliDanMu(bv, filename)
danmu_crawler.crawl()
def getfor():
for index, bv in enumerate(bv_list):
filename = f'{output_dir}{index + 1}个视频_{bv}.txt'
print(f"正在抓取 BV号 {bv} 的弹幕...")
download_danmu(index, bv, filename)
print(f"BV号 {bv} 的弹幕抓取完成")
if __name__ == '__main__':
query = input("请输入搜索关键词: ")
bv_list = search_videos(query)
# 限制爬取的最大视频数量为300
bv_list = bv_list[:300]
output_dir = 'E:/前端/软件工程/弹幕收集first/'
os.makedirs(output_dir, exist_ok=True)
# 依次抓取每个视频的弹幕
getfor()
# for index, bv in enumerate(bv_list):
# filename = f'{output_dir}第{index + 1}个视频_{bv}.txt'
# print(f"正在抓取 BV号 {bv} 的弹幕...")
# download_danmu(index, bv, filename)
# print(f"BV号 {bv} 的弹幕抓取完成")