You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
8.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import requests
import re
import json
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
class BilibiliSpider:
def __init__(self, total_videos=300):
self.danmukus = [] # 存储爬取到的所有弹幕
self.lock = Lock() # 线程锁,用于多线程操作时保证数据安全
self.total_videos = total_videos # 计划爬取的视频总数
self.collected_ids = set() # 用于存储已收集的视频ID实现实时去重
self._init_headers() # 初始化请求头信息
def _init_headers(self):
"""初始化用户代理和Cookie列表用于构建请求头"""
# 用户代理列表,模拟不同浏览器请求
self.user_agents = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15"
]
# Cookie列表模拟用户登录状态需替换为实际Cookie
self.cookies = [
"buvid3=32880B13-092F-D559-CC16-EE1C7CB405D854566infoc; b_nut=1762263854; ..." # 你的Cookie
]
def _get_headers(self):
"""动态生成请求头随机选择用户代理和Cookie"""
headers = {
"User-Agent": random.choice(self.user_agents), # 随机选择一个用户代理
"Referer": "https://www.bilibili.com/", # 模拟从B站主页跳转
"Accept": "application/json, text/plain, */*" # 声明可接受的响应格式
}
# 如果有Cookie则随机选择一个添加到请求头
if self.cookies:
headers["Cookie"] = random.choice(self.cookies)
return headers
def get_video_ids(self, keyword, max_pages=1):
"""
根据关键词搜索并获取视频ID列表
###########优化点:严格控制页面数量,减少无效请求
"""
video_ids = []
# 遍历指定页数的搜索结果
for page in range(1, max_pages + 50):
# 一旦收集到足够数量的视频ID立即停止核心优化
if len(self.collected_ids) >= self.total_videos:
print(f"已收集{len(self.collected_ids)}个ID提前停止")
break
# 对每个页面请求保留3次重试机会
for retry in range(3):
try:
# 构建搜索请求URL
url = f"https://api.bilibili.com/x/web-interface/search/type?keyword={keyword}&search_type=video&page={page}"
resp = requests.get(
url,
headers=self._get_headers(),
timeout=12, # 适当延长超时时间以减少无效重试
allow_redirects=False # 禁止自动重定向
)
# 检查响应状态码非200则重试
if resp.status_code != 200:
print(f"{page}页状态码异常,重试{retry+1}")
time.sleep(2 + retry) # 失败延迟随重试次数递增(幅度减小)
continue
# 解析响应数据
data = json.loads(resp.text)
# 检查接口返回状态和数据是否存在
if data.get("code") != 0 or not data["data"].get("result"):
print(f"{page}页无数据,重试{retry+1}")
time.sleep(2 + retry)
continue
# 提取视频ID并去重过滤已收集的ID
new_ids = [item["aid"] for item in data["data"]["result"] if item["aid"] not in self.collected_ids]
self.collected_ids.update(new_ids) # 将新ID添加到去重集合
video_ids.extend(new_ids) # 将新ID添加到列表
print(f"{page}页成功,新增{len(new_ids)}个ID累计{len(self.collected_ids)}")
# 成功请求后延迟0.5-1秒核心提速点
time.sleep(random.uniform(0.5, 1))
break # 成功获取数据,跳出重试循环
except Exception as e:
print(f"{page}页失败({retry+1}次): {str(e)[:30]}") # 截取部分错误信息
time.sleep(2 + retry)
else:
# 3次重试均失败跳过当前页面
print(f"{page}页多次失败,跳过")
time.sleep(1) # 失败页面仅短延迟
# 返回不超过目标数量的视频ID
return list(self.collected_ids)[:self.total_videos]
def _fetch_danmuku(self, aid):
"""
爬取单个视频的弹幕
优化点:减少超时导致的重试,降低单视频处理时间
"""
try:
# 获取视频的cid弹幕关联的ID超时设为10秒平衡速度和稳定性
cid_url = f"https://api.bilibili.com/x/web-interface/view?aid={aid}"
resp = requests.get(cid_url, headers=self._get_headers(), timeout=10)
cid = json.loads(resp.text)["data"]["cid"]
# 获取弹幕数据B站弹幕以XML格式存储
danmuku_url = f"https://comment.bilibili.com/{cid}.xml"
resp = requests.get(danmuku_url, headers=self._get_headers(), timeout=10)
resp.encoding = "utf-8" # 指定编码为UTF-8避免中文乱码
# 提取弹幕内容(简化正则表达式,提高匹配效率)
danmukus = re.findall(r'<d[^>]*>(.*?)</d>', resp.text)
# 使用线程锁保证多线程下数据添加的安全性
with self.lock:
self.danmukus.extend(danmukus)
return len(danmukus) # 返回当前视频获取到的弹幕数量
except Exception as e:
print(f"视频{aid}失败: {str(e)[:30]}") # 打印错误信息(截取部分)
return 0 # 失败时返回0
def crawl_danmukus(self, keywords=["大语言模型"]):
all_ids = []
# 遍历所有关键词搜索相关视频ID
for keyword in keywords:
print(f"\n搜索关键词: {keyword}")
ids = self.get_video_ids(keyword)
all_ids.extend(ids)
# 若已获取足够数量的ID停止搜索
if len(all_ids) >= self.total_videos:
print(f"已获取{len(all_ids)}个ID停止搜索")
break
# 对视频ID去重并截取不超过目标数量的ID
video_ids = list(set(all_ids))[:self.total_videos]
print(f"\n共获取{len(video_ids)}个视频ID开始爬取弹幕")
if not video_ids:
return [] # 若无视频ID返回空列表
# 固定线程数15-20避免动态调整的开销
max_workers = min(20, len(video_ids))
# 使用线程池并发爬取弹幕
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务构建任务与视频ID的映射
futures = {executor.submit(self._fetch_danmuku, aid): aid for aid in video_ids}
for idx, future in enumerate(as_completed(futures), 1):
count = future.result() # 获取当前任务爬取到的弹幕数量
print(f"进度: {idx}/{len(video_ids)},新增弹幕: {count}")
# 每完成20个任务后轻量延迟避免反爬取消批量延迟
if idx % 20 == 0:
time.sleep(random.uniform(0.5, 1))
print(f"\n爬取完成,总弹幕数: {len(self.danmukus)}")
return self.danmukus # 返回所有爬取到的弹幕