You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

175 lines
6.4 KiB

import requests
import re
from lxml import etree
import os
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from collections import OrderedDict
class BiliBiliDanMu:
def __init__(self, bv, filename):
#处理输入的 BV 号,确保是正确格式
if bv.startswith("BV"):
bv = bv[2:]
self.video_url = "https://bilibili.com/video/BV" + bv
self.filename = filename
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
}
def get_video_cid(self):
#尝试最多 3 次获取视频的 cid
retry_count = 3
for attempt in range(retry_count):
try:
response = requests.get(self.video_url, headers=self.headers, timeout=10)
if response.status_code != 200:
print(f"请求失败,状态码: {response.status_code}")
continue
html = response.content.decode()
cid = re.findall(r'("cid":)([0-9]+)', html)
if not cid:
print("未找到 cid")
continue
else:
return cid[0][-1]
except requests.exceptions.RequestException as e:
print(f"获取 cid 时出错: {e}")
print(f"{attempt + 1} 次重试获取 cid...")
time.sleep(2)
return None
def get_content(self, xml_url):
#获取弹幕 XML 文件的内容
try:
response = requests.get(xml_url, headers=self.headers, timeout=10)
if response.status_code == 200:
return response.content
else:
print(f"获取弹幕内容失败,状态码: {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"获取弹幕时出错: {e}")
return None
def extract_danmu(self, content_str):
#解析XML内容提取弹幕
try:
html = etree.HTML(content_str)
danmu_list = html.xpath("//d/text()")
return danmu_list
except Exception as e:
print(f"解析弹幕时出错: {e}")
return []
def save(self, save_items):
#保存弹幕到文件
output_dir = os.path.dirname(self.filename)
os.makedirs(output_dir, exist_ok=True)
with open(self.filename, 'w', encoding='utf-8') as f:
lines = [item + '\n' for item in save_items]
f.writelines(lines)
print(f"弹幕已保存至 {self.filename}")
def crawl(self):
#执行爬取流程
cid = self.get_video_cid()
if cid is not None:
xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml"
content_str = self.get_content(xml_url)
if content_str:
danmu_lst = self.extract_danmu(content_str)
self.save(danmu_lst)
else:
print("视频没有有效的 cid跳过此视频")
def search_videos(query, max_results=350):
#搜索视频,最多返回 max_results 个结果
search_url = "https://api.bilibili.com/x/web-interface/search/type"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36",
"Referer": "https://www.bilibili.com/",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9",
"cookie": "your cookie" #Cookie 有就行,内容随意
}
bv_list = []
page = 1
while len(bv_list) < max_results:
params = {
'keyword': query,
'search_type': 'video',
'order': 'totalrank',
'page': page,
'pagesize': 50
}
try:
response = requests.get(search_url, params=params, headers=headers, timeout=10)
if response.status_code == 200:
results = response.json()
if results['code'] == 0:
videos = results['data']['result']
if not videos:
break
bv_list += [video['bvid'] for video in videos]
print(f"已抓取 {len(bv_list)} 个视频")
else:
print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}")
if '频繁' in results.get('message', ''):
print("限流,等待后重试")
time.sleep(random.uniform(5, 10))
continue
break
else:
print(f"搜索请求失败,状态码: {response.status_code}")
break
except requests.exceptions.RequestException as e:
print(f"请求失败,错误: {e}")
time.sleep(random.uniform(2, 5))
continue
page += 1
time.sleep(random.uniform(1, 3)) #防止请求过于频繁被禁止
bv_list = list(OrderedDict.fromkeys(bv_list)) #去重操作
return bv_list[:max_results]
def download_danmu(index, bv, filename):
#下载指定BV号视频的弹幕
danmu_crawler = BiliBiliDanMu(bv, filename)
danmu_crawler.crawl()
def getthread():
#使用线程池并发下载弹幕
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_bv = {
executor.submit(download_danmu, index, bv, f'{output_dir}{index + 1}个视频_{bv}.txt'): bv for index, bv in enumerate(bv_list)
}
for future in as_completed(future_to_bv):
bv = future_to_bv[future]
try:
future.result()
print(f"BV号 {bv} 的弹幕抓取完成")
except Exception as exc:
print(f"BV号 {bv} 的弹幕抓取时出错: {exc}")
if __name__ == '__main__':
query = input("请输入搜索关键词: ")
bv_list = search_videos(query)
#限制爬取的最大视频数量为 300
bv_list = bv_list[:300]
output_dir = 'E:/前端/软件工程/弹幕收集bark/'
os.makedirs(output_dir, exist_ok=True)
getthread()