import argparse import time import logging import requests from lxml import etree from pathlib import Path from typing import List, Dict from tqdm import tqdm import json import random logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') SESSION = requests.Session() SESSION.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' '(KHTML, like Gecko) Chrome/120.0 Safari/537.36' }) # 请求头,可以降低被拦截概率 SESSION.headers.update({ 'Referer': 'https://www.bilibili.com', 'Accept': 'application/json, text/javascript, */*; q=0.01' }) SEARCH_URL = 'https://api.bilibili.com/x/web-interface/search/type' VIEW_URL = 'https://api.bilibili.com/x/web-interface/view' COMMENT_XML = 'https://comment.bilibili.com/{cid}.xml' def search_videos(keyword: str, max_videos: int = 360) -> List[Dict]: """通过 B 站搜索 API 获取视频列表(综合排序) 返回的视频项包含 'bvid' 和 'title' """ videos = [] page = 1 page_size = 20 while len(videos) < max_videos: params = { 'keyword': keyword, 'search_type': 'video', 'page': page, 'page_size': page_size } # 简单重试与抖动,降低 412/403 风险 resp = None for attempt in range(3): try: resp = SESSION.get(SEARCH_URL, params=params, timeout=10) if resp.status_code == 200: break logging.warning('搜索重试 %d 次,状态码=%s', attempt, resp.status_code) except Exception as e: logging.exception('搜索第 %d 次出现异常: %s', attempt, e) time.sleep(0.5 + attempt * 0.5) if not resp or resp.status_code != 200: logging.warning('搜索失败,状态码=%s', getattr(resp, 'status_code', None)) break data = resp.json() try: vlist = data.get('data', {}).get('result', []) if not vlist: break for item in vlist: title = item.get('title', '') bvid = item.get('bvid') or item.get('aid') if bvid: videos.append({'bvid': bvid, 'title': title}) if len(videos) >= max_videos: break except Exception as e: logging.exception('搜索结果失败: %s', e) break page += 1 time.sleep(0.5) logging.info('关键词「%s」共找到 %d 个视频,目标 %d', keyword, len(videos), max_videos) return videos def get_cid(bvid: str) -> int: #通过接口获取 params = {'bvid': bvid} if isinstance(bvid, str) and bvid.startswith('BV') else {'aid': bvid} for attempt in range(3): try: resp = SESSION.get(VIEW_URL, params=params, timeout=10) if resp.status_code != 200: logging.warning('获取 CID 失败:%s,状态码=%s', bvid, resp.status_code) time.sleep(1 + attempt) continue data = resp.json() return data.get('data', {}).get('cid') except Exception as e: logging.exception('第 %d 次获取 CID 异常:%s,错误=%s', attempt, bvid, e) time.sleep(1 + attempt) return None def fetch_danmaku(cid: int) -> List[str]: #请求弹幕并解析出文本列表 url = COMMENT_XML.format(cid=cid) for attempt in range(3): try: resp = SESSION.get(url, timeout=10) if resp.status_code != 200: logging.warning('弹幕获取失败:cid=%s 状态码=%s', cid, resp.status_code) time.sleep(1 + attempt) continue xml = resp.content root = etree.fromstring(xml) texts = [d.text or '' for d in root.findall('d')] return texts except Exception as e: logging.exception('第%d次获取弹幕异常:cid=%s 错误=%s', attempt, cid, e) time.sleep(1 + attempt) return [] def save_raw_texts(texts: List[str], out_file: Path): out_file.parent.mkdir(parents=True, exist_ok=True) with out_file.open('w', encoding='utf-8') as f: for t in texts: f.write(t.replace('\n', ' ') + '\n') def load_progress(progress_path: Path) -> Dict: if not progress_path.exists(): return {'done': []} try: with progress_path.open('r', encoding='utf-8') as f: return json.load(f) except Exception: return {'done': []} def save_progress(progress_path: Path, data: Dict): progress_path.parent.mkdir(parents=True, exist_ok=True) with progress_path.open('w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) def main(keywords: List[str], max_videos_per_kw: int, out_dir: Path, delay: float = 0.5, progress_file: str = '进度.json'): out_dir = Path(out_dir) progress_path = out_dir / progress_file progress = load_progress(progress_path) done = set(progress.get('done', [])) all_texts = [] for kw in keywords: logging.info('开始处理关键词:%s', kw) videos = search_videos(kw, max_videos_per_kw) for v in tqdm(videos, desc=f'关键词 {kw} 的视频'): bvid = v.get('bvid') if not bvid: continue if bvid in done: logging.debug('跳过已处理视频:%s', bvid) continue try: cid = get_cid(bvid) if not cid: continue texts = fetch_danmaku(cid) vid_dir = out_dir / '原始' vid_dir.mkdir(parents=True, exist_ok=True) save_raw_texts(texts, vid_dir / f'{bvid}_{cid}.txt') all_texts.extend(texts) done.add(bvid) #周期写入进度 if len(done) % 20 == 0: save_progress(progress_path, {'done': list(done)}) # 随机延迟 time.sleep(delay + random.random() * 0.3) except Exception: logging.exception('处理视频失败:%s', bvid) #保存聚合的原始文本 save_raw_texts(all_texts, out_dir / f'原始弹幕_{kw}.txt') # 最终写入进度展示 save_progress(progress_path, {'done': list(done)}) #无数据时提示 if not all_texts: logging.warning('未采集到任何弹幕——请检查网络、接口变更或反爬(HTTP 412/403)。') return #最后 logging.info('抓取完成:共处理 %d 个视频,采集弹幕 %d 行。', len(done), len(all_texts)) if __name__ == '__main__': parser = argparse.ArgumentParser(description='B站弹幕抓取器') parser.add_argument('-k', '--keywords', nargs='+', default=['大语言模型'], help='关键词') parser.add_argument('-n', '--max-per-kw', type=int, default=120, help='每个关键词抓取的视频数') parser.add_argument('-o', '--out', default='输出', help='输出目录') parser.add_argument('--delay', type=float, default=0.5, help='请求间基础延迟') parser.add_argument('--progress-file', default='进度.json', help='断点续爬进度文件名(默认 进度.json)') parser.add_argument('--cookie', default=None, help='Cookie 字符串') parser.add_argument('--proxy', default=None, help='代理URL(例如 http://127.0.0.1:7890 )') args = parser.parse_args() # apply optional cookie and proxy if args.cookie: SESSION.headers.update({'Cookie': args.cookie}) if args.proxy: SESSION.proxies.update({'http': args.proxy, 'https': args.proxy}) main(args.keywords, args.max_per_kw, Path(args.out))