import requests import re import json import time import random from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Lock class BilibiliSpider: def __init__(self, total_videos=300): self.danmukus = [] # 存储爬取到的所有弹幕 self.lock = Lock() # 线程锁,用于多线程操作时保证数据安全 self.total_videos = total_videos # 计划爬取的视频总数 self.collected_ids = set() # 用于存储已收集的视频ID,实现实时去重 self._init_headers() # 初始化请求头信息 def _init_headers(self): """初始化用户代理和Cookie列表,用于构建请求头""" # 用户代理列表,模拟不同浏览器请求 self.user_agents = [ "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_1) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15" ] # Cookie列表,模拟用户登录状态(需替换为实际Cookie) self.cookies = [ "buvid3=32880B13-092F-D559-CC16-EE1C7CB405D854566infoc; b_nut=1762263854; ..." # 你的Cookie ] def _get_headers(self): """动态生成请求头,随机选择用户代理和Cookie""" headers = { "User-Agent": random.choice(self.user_agents), # 随机选择一个用户代理 "Referer": "https://www.bilibili.com/", # 模拟从B站主页跳转 "Accept": "application/json, text/plain, */*" # 声明可接受的响应格式 } # 如果有Cookie,则随机选择一个添加到请求头 if self.cookies: headers["Cookie"] = random.choice(self.cookies) return headers def get_video_ids(self, keyword, max_pages=1): """ 根据关键词搜索并获取视频ID列表 ###########优化点:严格控制页面数量,减少无效请求 """ video_ids = [] # 遍历指定页数的搜索结果 for page in range(1, max_pages + 50): # 一旦收集到足够数量的视频ID,立即停止(核心优化) if len(self.collected_ids) >= self.total_videos: print(f"已收集{len(self.collected_ids)}个ID,提前停止") break # 对每个页面请求保留3次重试机会 for retry in range(3): try: # 构建搜索请求URL url = f"https://api.bilibili.com/x/web-interface/search/type?keyword={keyword}&search_type=video&page={page}" resp = requests.get( url, headers=self._get_headers(), timeout=12, # 适当延长超时时间以减少无效重试 allow_redirects=False # 禁止自动重定向 ) # 检查响应状态码,非200则重试 if resp.status_code != 200: print(f"第{page}页状态码异常,重试{retry+1}次") time.sleep(2 + retry) # 失败延迟随重试次数递增(幅度减小) continue # 解析响应数据 data = json.loads(resp.text) # 检查接口返回状态和数据是否存在 if data.get("code") != 0 or not data["data"].get("result"): print(f"第{page}页无数据,重试{retry+1}次") time.sleep(2 + retry) continue # 提取视频ID并去重(过滤已收集的ID) new_ids = [item["aid"] for item in data["data"]["result"] if item["aid"] not in self.collected_ids] self.collected_ids.update(new_ids) # 将新ID添加到去重集合 video_ids.extend(new_ids) # 将新ID添加到列表 print(f"第{page}页成功,新增{len(new_ids)}个ID,累计{len(self.collected_ids)}个") # 成功请求后延迟0.5-1秒(核心提速点) time.sleep(random.uniform(0.5, 1)) break # 成功获取数据,跳出重试循环 except Exception as e: print(f"第{page}页失败({retry+1}次): {str(e)[:30]}") # 截取部分错误信息 time.sleep(2 + retry) else: # 3次重试均失败,跳过当前页面 print(f"第{page}页多次失败,跳过") time.sleep(1) # 失败页面仅短延迟 # 返回不超过目标数量的视频ID return list(self.collected_ids)[:self.total_videos] def _fetch_danmuku(self, aid): """ 爬取单个视频的弹幕 优化点:减少超时导致的重试,降低单视频处理时间 """ try: # 获取视频的cid(弹幕关联的ID),超时设为10秒(平衡速度和稳定性) cid_url = f"https://api.bilibili.com/x/web-interface/view?aid={aid}" resp = requests.get(cid_url, headers=self._get_headers(), timeout=10) cid = json.loads(resp.text)["data"]["cid"] # 获取弹幕数据(B站弹幕以XML格式存储) danmuku_url = f"https://comment.bilibili.com/{cid}.xml" resp = requests.get(danmuku_url, headers=self._get_headers(), timeout=10) resp.encoding = "utf-8" # 指定编码为UTF-8,避免中文乱码 # 提取弹幕内容(简化正则表达式,提高匹配效率) danmukus = re.findall(r']*>(.*?)', resp.text) # 使用线程锁保证多线程下数据添加的安全性 with self.lock: self.danmukus.extend(danmukus) return len(danmukus) # 返回当前视频获取到的弹幕数量 except Exception as e: print(f"视频{aid}失败: {str(e)[:30]}") # 打印错误信息(截取部分) return 0 # 失败时返回0 def crawl_danmukus(self, keywords=["大语言模型"]): all_ids = [] # 遍历所有关键词,搜索相关视频ID for keyword in keywords: print(f"\n搜索关键词: {keyword}") ids = self.get_video_ids(keyword) all_ids.extend(ids) # 若已获取足够数量的ID,停止搜索 if len(all_ids) >= self.total_videos: print(f"已获取{len(all_ids)}个ID,停止搜索") break # 对视频ID去重,并截取不超过目标数量的ID video_ids = list(set(all_ids))[:self.total_videos] print(f"\n共获取{len(video_ids)}个视频ID,开始爬取弹幕") if not video_ids: return [] # 若无视频ID,返回空列表 # 固定线程数(15-20,避免动态调整的开销) max_workers = min(20, len(video_ids)) # 使用线程池并发爬取弹幕 with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有任务,构建任务与视频ID的映射 futures = {executor.submit(self._fetch_danmuku, aid): aid for aid in video_ids} for idx, future in enumerate(as_completed(futures), 1): count = future.result() # 获取当前任务爬取到的弹幕数量 print(f"进度: {idx}/{len(video_ids)},新增弹幕: {count}条") # 每完成20个任务后轻量延迟(避免反爬,取消批量延迟) if idx % 20 == 0: time.sleep(random.uniform(0.5, 1)) print(f"\n爬取完成,总弹幕数: {len(self.danmukus)}") return self.danmukus # 返回所有爬取到的弹幕