You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
11 KiB

This file contains invisible Unicode characters!

This file contains invisible Unicode characters that may be processed differently from what appears below. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to reveal hidden characters.

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Bilibili 弹幕爬虫(速度与安全平衡版)
- 支持综合排序前300视频
- Cookie自动读取cookies.txt
- 智能延时 + 快速退避 + 长周期轻休息
- 自动防封、失败自动回退
"""
import requests
import time
import random
import re
import logging
import os
from fake_useragent import UserAgent
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
class FastSafeBilibiliCrawler:
def __init__(self, proxy_pool=None):
self.session = requests.Session()
self.ua = UserAgent()
self.proxy_pool = proxy_pool or []
# 初始化请求头
self.headers = {
'User-Agent': self.ua.random,
'Referer': 'https://www.bilibili.com/',
'Accept-Language': 'zh-CN,zh;q=0.9'
}
# 自动加载cookie
try:
if os.path.exists("cookies.txt"):
with open("cookies.txt", "r", encoding="utf-8") as f:
cookie_str = f.read().strip()
if cookie_str:
self.headers["Cookie"] = cookie_str
logging.info("🍪 已加载本地 cookies.txt使用登录态模式")
else:
logging.info("🍪 未检测到 cookies.txt使用匿名模式可能限流")
except Exception as e:
logging.warning(f"加载Cookie失败: {e}")
# 参数设置(速度&安全平衡)
self.last_request_time = 0
self.min_interval = 0.8 # 每次请求最短间隔
self.failure_count = 0
self.total_failures = 0
self.max_consecutive_failures = 6
self.max_total_failures_before_fallback = 40
logging.info("🔧 爬虫初始化完成(高速安全模式)")
def _get_proxy(self):
return random.choice(self.proxy_pool) if self.proxy_pool else None
def _rate_limit(self):
"""基本速率控制"""
elapsed = time.time() - self.last_request_time
if elapsed < self.min_interval:
time.sleep(self.min_interval - elapsed + random.uniform(0.1, 0.3))
self.last_request_time = time.time()
def _smart_delay(self, request_count):
"""轻量延时策略(快速但安全)"""
delay = random.uniform(1.2, 3.2)
# 每20个视频左右来一次长休息
if request_count % 20 == 0 and request_count > 0:
delay += random.uniform(8.0, 12.0)
logging.info(f"⏱️ 等待 {delay:.2f} 秒后继续")
time.sleep(delay)
def _exponential_backoff(self, attempt):
"""加速退避策略"""
if attempt <= 1:
s = random.uniform(3, 6)
elif attempt == 2:
s = random.uniform(8, 15)
else:
s = random.uniform(20, 30)
logging.warning(f"退避中(第 {attempt} 次失败后),等待 {s:.1f}")
time.sleep(s)
# 主函数
def get_danmu_by_keyword(self, keyword, target_videos=300, max_retry_per_item=2):
logging.info(f"🚀 启动爬取关键词: {keyword},目标视频数: {target_videos}")
video_list = self._get_videos_mobile_api(keyword, count=target_videos)
if not video_list:
logging.warning("未能获取视频列表,使用备用视频列表")
video_list = self._get_fallback_video_list()
logging.info(f"📺 最终获取 {len(video_list)} 个视频,开始爬取弹幕...")
all_danmus = []
success_videos = 0
for idx, (bvid, title) in enumerate(video_list, start=1):
logging.info(f"🎯 [{idx}/{len(video_list)}] {title}")
self._rate_limit()
item_attempt = 0
while item_attempt <= max_retry_per_item:
try:
danmus = self._get_danmu_for_bvid(bvid)
if danmus:
all_danmus.extend(danmus)
success_videos += 1
self.failure_count = 0
break
else:
item_attempt += 1
self.failure_count += 1
self.total_failures += 1
self._exponential_backoff(item_attempt)
except Exception as e:
item_attempt += 1
self.failure_count += 1
self.total_failures += 1
logging.warning(f"获取弹幕异常: {e}")
self._exponential_backoff(item_attempt)
if self.failure_count >= self.max_consecutive_failures:
logging.error(f"连续失败 {self.failure_count} 次,休息 5 分钟")
time.sleep(random.uniform(300, 400))
self.failure_count = 0
if self.total_failures >= self.max_total_failures_before_fallback:
logging.error("失败过多,切换回退数据")
return self._get_realistic_fallback_data()
self._smart_delay(idx)
if not all_danmus:
logging.info("未获取到真实弹幕,使用回退样本")
return self._get_realistic_fallback_data()
logging.info(f"🎉 爬取完成: 成功 {success_videos} 个视频, 共 {len(all_danmus)} 条弹幕")
return all_danmus
# ---------------- 视频搜索 ----------------
def _get_videos_mobile_api(self, keyword, count=300):
all_videos = []
page_size = 50
max_pages = (count + page_size - 1) // page_size
logging.info(f"尝试分页获取视频,总目标 {count}")
for page in range(1, max_pages + 1):
try:
self._rate_limit()
params = {
"search_type": "video",
"keyword": keyword,
"page": page,
"page_size": page_size,
"order": "totalrank"
}
resp = self.session.get(
"https://api.bilibili.com/x/web-interface/search/type",
params=params,
headers=self.headers,
timeout=10,
)
if resp.status_code == 200:
data = resp.json()
if data.get("code") == 0:
results = data["data"].get("result", [])
for v in results:
bvid = v.get("bvid")
title = re.sub(r"<.*?>", "", v.get("title", ""))
if bvid:
all_videos.append((bvid, title))
logging.info(f"{page} 页成功获取 {len(results)} 个视频")
else:
logging.warning(f"API异常: {data.get('message')}")
else:
logging.warning(f"状态码异常: {resp.status_code}")
if resp.status_code in (412, 403, 429):
self._exponential_backoff(page)
self._smart_delay(page)
except Exception as e:
logging.warning(f"{page} 页获取异常: {e}")
time.sleep(random.uniform(3, 6))
if len(all_videos) >= count:
break
logging.info(f"✅ 最终获取 {len(all_videos)} 个视频(目标 {count}")
return all_videos[:count]
# ---------------- 单视频弹幕 ----------------
def _get_danmu_for_bvid(self, bvid):
"""获取某个视频的弹幕自动选择可用接口XML / JSON"""
try:
self._rate_limit()
view_url = f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}"
resp = self.session.get(view_url, headers=self.headers, timeout=10)
if resp.status_code != 200:
logging.warning(f"⚠️ 获取视频信息失败: {resp.status_code}")
return []
data = resp.json()
if data.get("code") != 0:
return []
cid = data["data"].get("cid")
if not cid:
return []
# --- 1⃣ 尝试旧的 XML 接口 ---
xml_url = f"https://api.bilibili.com/x/v1/dm/list.so?oid={cid}"
dm_resp = self.session.get(xml_url, headers=self.headers, timeout=10)
if dm_resp.status_code == 200 and "<d p=" in dm_resp.text:
danmus = re.findall(r'<d p=".*?">(.*?)</d>', dm_resp.text)
if danmus:
return [d.strip() for d in danmus if d.strip()]
else:
logging.info(f"⚠️ XML 弹幕为空,尝试 JSON 接口...")
# --- 2⃣ 尝试新版 JSON 分段接口 ---
json_url = f"https://api.bilibili.com/x/v2/dm/web/seg.so?type=1&oid={cid}&segment_index=1"
json_headers = dict(self.headers)
json_headers.update({
"Referer": f"https://www.bilibili.com/video/{bvid}",
"Origin": "https://www.bilibili.com",
"Accept": "*/*",
"Accept-Encoding": "identity"
})
dm_json_resp = self.session.get(json_url, headers=json_headers, timeout=10)
if dm_json_resp.status_code == 200:
text = dm_json_resp.text
# 尝试从返回文本里抽取中文 / 常见可见字符片段作为弹幕
danmus = re.findall(r'[\u4e00-\u9fa5A-Za-z0-9、“”"\'!?.]+', text)
if danmus:
logging.info(f"✅ 使用 JSON 接口成功,获得 {len(danmus)} 条弹幕")
return danmus
else:
logging.warning(f"⚠️ JSON 弹幕为空BV: {bvid}")
logging.warning(f"❌ 弹幕接口均失败BV: {bvid}")
return []
except Exception as e:
logging.debug(f"_get_danmu_for_bvid异常: {e}")
return []
# ---------------- 回退数据 ----------------
def _get_fallback_video_list(self):
return [
("BV1px421y7Bp", "大语言模型从入门到实战"),
("BV1Gu41137cE", "LLM应用开发完整指南"),
("BV1Rq421q7c6", "大模型核心技术解析"),
("BV1Cg4y1L7Qp", "ChatGPT原理与实现"),
("BV1th4y1B7M2", "AI大模型行业应用案例")
]
def _get_realistic_fallback_data(self):
logging.info("使用离线回退样本")
samples = [
"大语言模型太强了", "AI客服真方便", "机器翻译越来越准",
"代码生成效率提升明显", "内容创作灵感满满", "教育辅导有进步",
"医疗咨询很专业", "创意设计很有创意", "这个工具很实用", "LLM真香"
]
return [random.choice(samples) for _ in range(2000)]
BilibiliCrawler = FastSafeBilibiliCrawler
if __name__ == "__main__":
crawler = FastSafeBilibiliCrawler()
danmus = crawler.get_danmu_by_keyword("大模型", target_videos=60)
logging.info(f"测试完成,弹幕数量: {len(danmus)}")