|
|
import requests
|
|
|
import re
|
|
|
import json
|
|
|
import time
|
|
|
import random
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
from threading import Lock
|
|
|
|
|
|
|
|
|
class BilibiliSpider:
|
|
|
def __init__(self, total_videos=300):
|
|
|
self.danmukus = [] # 存储爬取到的所有弹幕
|
|
|
self.lock = Lock() # 线程锁,用于多线程操作时保证数据安全
|
|
|
self.total_videos = total_videos # 计划爬取的视频总数
|
|
|
self.collected_ids = set() # 用于存储已收集的视频ID,实现实时去重
|
|
|
self._init_headers() # 初始化请求头信息
|
|
|
|
|
|
def _init_headers(self):
|
|
|
"""初始化用户代理和Cookie列表,用于构建请求头"""
|
|
|
# 用户代理列表,模拟不同浏览器请求
|
|
|
self.user_agents = [
|
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
|
|
|
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15"
|
|
|
]
|
|
|
# Cookie列表,模拟用户登录状态(需替换为实际Cookie)
|
|
|
self.cookies = [
|
|
|
"buvid3=32880B13-092F-D559-CC16-EE1C7CB405D854566infoc; b_nut=1762263854; ..." # 你的Cookie
|
|
|
]
|
|
|
|
|
|
def _get_headers(self):
|
|
|
"""动态生成请求头,随机选择用户代理和Cookie"""
|
|
|
headers = {
|
|
|
"User-Agent": random.choice(self.user_agents), # 随机选择一个用户代理
|
|
|
"Referer": "https://www.bilibili.com/", # 模拟从B站主页跳转
|
|
|
"Accept": "application/json, text/plain, */*" # 声明可接受的响应格式
|
|
|
}
|
|
|
# 如果有Cookie,则随机选择一个添加到请求头
|
|
|
if self.cookies:
|
|
|
headers["Cookie"] = random.choice(self.cookies)
|
|
|
return headers
|
|
|
|
|
|
def get_video_ids(self, keyword, max_pages=1):
|
|
|
"""
|
|
|
根据关键词搜索并获取视频ID列表
|
|
|
###########优化点:严格控制页面数量,减少无效请求
|
|
|
"""
|
|
|
video_ids = []
|
|
|
# 遍历指定页数的搜索结果
|
|
|
for page in range(1, max_pages + 50):
|
|
|
# 一旦收集到足够数量的视频ID,立即停止(核心优化)
|
|
|
if len(self.collected_ids) >= self.total_videos:
|
|
|
print(f"已收集{len(self.collected_ids)}个ID,提前停止")
|
|
|
break
|
|
|
|
|
|
# 对每个页面请求保留3次重试机会
|
|
|
for retry in range(3):
|
|
|
try:
|
|
|
# 构建搜索请求URL
|
|
|
url = f"https://api.bilibili.com/x/web-interface/search/type?keyword={keyword}&search_type=video&page={page}"
|
|
|
resp = requests.get(
|
|
|
url,
|
|
|
headers=self._get_headers(),
|
|
|
timeout=12, # 适当延长超时时间以减少无效重试
|
|
|
allow_redirects=False # 禁止自动重定向
|
|
|
)
|
|
|
|
|
|
# 检查响应状态码,非200则重试
|
|
|
if resp.status_code != 200:
|
|
|
print(f"第{page}页状态码异常,重试{retry+1}次")
|
|
|
time.sleep(2 + retry) # 失败延迟随重试次数递增(幅度减小)
|
|
|
continue
|
|
|
|
|
|
# 解析响应数据
|
|
|
data = json.loads(resp.text)
|
|
|
# 检查接口返回状态和数据是否存在
|
|
|
if data.get("code") != 0 or not data["data"].get("result"):
|
|
|
print(f"第{page}页无数据,重试{retry+1}次")
|
|
|
time.sleep(2 + retry)
|
|
|
continue
|
|
|
|
|
|
# 提取视频ID并去重(过滤已收集的ID)
|
|
|
new_ids = [item["aid"] for item in data["data"]["result"] if item["aid"] not in self.collected_ids]
|
|
|
self.collected_ids.update(new_ids) # 将新ID添加到去重集合
|
|
|
video_ids.extend(new_ids) # 将新ID添加到列表
|
|
|
print(f"第{page}页成功,新增{len(new_ids)}个ID,累计{len(self.collected_ids)}个")
|
|
|
|
|
|
# 成功请求后延迟0.5-1秒(核心提速点)
|
|
|
time.sleep(random.uniform(0.5, 1))
|
|
|
break # 成功获取数据,跳出重试循环
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"第{page}页失败({retry+1}次): {str(e)[:30]}") # 截取部分错误信息
|
|
|
time.sleep(2 + retry)
|
|
|
else:
|
|
|
# 3次重试均失败,跳过当前页面
|
|
|
print(f"第{page}页多次失败,跳过")
|
|
|
time.sleep(1) # 失败页面仅短延迟
|
|
|
|
|
|
# 返回不超过目标数量的视频ID
|
|
|
return list(self.collected_ids)[:self.total_videos]
|
|
|
|
|
|
def _fetch_danmuku(self, aid):
|
|
|
"""
|
|
|
爬取单个视频的弹幕
|
|
|
优化点:减少超时导致的重试,降低单视频处理时间
|
|
|
"""
|
|
|
try:
|
|
|
# 获取视频的cid(弹幕关联的ID),超时设为10秒(平衡速度和稳定性)
|
|
|
cid_url = f"https://api.bilibili.com/x/web-interface/view?aid={aid}"
|
|
|
resp = requests.get(cid_url, headers=self._get_headers(), timeout=10)
|
|
|
cid = json.loads(resp.text)["data"]["cid"]
|
|
|
|
|
|
# 获取弹幕数据(B站弹幕以XML格式存储)
|
|
|
danmuku_url = f"https://comment.bilibili.com/{cid}.xml"
|
|
|
resp = requests.get(danmuku_url, headers=self._get_headers(), timeout=10)
|
|
|
resp.encoding = "utf-8" # 指定编码为UTF-8,避免中文乱码
|
|
|
|
|
|
# 提取弹幕内容(简化正则表达式,提高匹配效率)
|
|
|
danmukus = re.findall(r'<d[^>]*>(.*?)</d>', resp.text)
|
|
|
# 使用线程锁保证多线程下数据添加的安全性
|
|
|
with self.lock:
|
|
|
self.danmukus.extend(danmukus)
|
|
|
return len(danmukus) # 返回当前视频获取到的弹幕数量
|
|
|
except Exception as e:
|
|
|
print(f"视频{aid}失败: {str(e)[:30]}") # 打印错误信息(截取部分)
|
|
|
return 0 # 失败时返回0
|
|
|
|
|
|
def crawl_danmukus(self, keywords=["大语言模型"]):
|
|
|
all_ids = []
|
|
|
# 遍历所有关键词,搜索相关视频ID
|
|
|
for keyword in keywords:
|
|
|
print(f"\n搜索关键词: {keyword}")
|
|
|
ids = self.get_video_ids(keyword)
|
|
|
all_ids.extend(ids)
|
|
|
# 若已获取足够数量的ID,停止搜索
|
|
|
if len(all_ids) >= self.total_videos:
|
|
|
print(f"已获取{len(all_ids)}个ID,停止搜索")
|
|
|
break
|
|
|
|
|
|
# 对视频ID去重,并截取不超过目标数量的ID
|
|
|
video_ids = list(set(all_ids))[:self.total_videos]
|
|
|
print(f"\n共获取{len(video_ids)}个视频ID,开始爬取弹幕")
|
|
|
if not video_ids:
|
|
|
return [] # 若无视频ID,返回空列表
|
|
|
|
|
|
# 固定线程数(15-20,避免动态调整的开销)
|
|
|
max_workers = min(20, len(video_ids))
|
|
|
# 使用线程池并发爬取弹幕
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
|
# 提交所有任务,构建任务与视频ID的映射
|
|
|
futures = {executor.submit(self._fetch_danmuku, aid): aid for aid in video_ids}
|
|
|
for idx, future in enumerate(as_completed(futures), 1):
|
|
|
count = future.result() # 获取当前任务爬取到的弹幕数量
|
|
|
print(f"进度: {idx}/{len(video_ids)},新增弹幕: {count}条")
|
|
|
# 每完成20个任务后轻量延迟(避免反爬,取消批量延迟)
|
|
|
if idx % 20 == 0:
|
|
|
time.sleep(random.uniform(0.5, 1))
|
|
|
|
|
|
print(f"\n爬取完成,总弹幕数: {len(self.danmukus)}")
|
|
|
return self.danmukus # 返回所有爬取到的弹幕 |