|
|
import argparse
|
|
|
import time
|
|
|
import logging
|
|
|
import requests
|
|
|
from lxml import etree
|
|
|
from pathlib import Path
|
|
|
from typing import List, Dict
|
|
|
from tqdm import tqdm
|
|
|
import json
|
|
|
import random
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
|
|
|
|
|
|
SESSION = requests.Session()
|
|
|
SESSION.headers.update({
|
|
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
|
|
|
'(KHTML, like Gecko) Chrome/120.0 Safari/537.36'
|
|
|
})
|
|
|
# 请求头,可以降低被拦截概率
|
|
|
SESSION.headers.update({
|
|
|
'Referer': 'https://www.bilibili.com',
|
|
|
'Accept': 'application/json, text/javascript, */*; q=0.01'
|
|
|
})
|
|
|
|
|
|
SEARCH_URL = 'https://api.bilibili.com/x/web-interface/search/type'
|
|
|
VIEW_URL = 'https://api.bilibili.com/x/web-interface/view'
|
|
|
COMMENT_XML = 'https://comment.bilibili.com/{cid}.xml'
|
|
|
|
|
|
|
|
|
def search_videos(keyword: str, max_videos: int = 360) -> List[Dict]:
|
|
|
"""通过 B 站搜索 API 获取视频列表(综合排序)
|
|
|
返回的视频项包含 'bvid' 和 'title'
|
|
|
"""
|
|
|
videos = []
|
|
|
page = 1
|
|
|
page_size = 20
|
|
|
while len(videos) < max_videos:
|
|
|
params = {
|
|
|
'keyword': keyword,
|
|
|
'search_type': 'video',
|
|
|
'page': page,
|
|
|
'page_size': page_size
|
|
|
}
|
|
|
# 简单重试与抖动,降低 412/403 风险
|
|
|
resp = None
|
|
|
for attempt in range(3):
|
|
|
try:
|
|
|
resp = SESSION.get(SEARCH_URL, params=params, timeout=10)
|
|
|
if resp.status_code == 200:
|
|
|
break
|
|
|
logging.warning('搜索重试 %d 次,状态码=%s', attempt, resp.status_code)
|
|
|
except Exception as e:
|
|
|
logging.exception('搜索第 %d 次出现异常: %s', attempt, e)
|
|
|
time.sleep(0.5 + attempt * 0.5)
|
|
|
if not resp or resp.status_code != 200:
|
|
|
logging.warning('搜索失败,状态码=%s', getattr(resp, 'status_code', None))
|
|
|
break
|
|
|
data = resp.json()
|
|
|
try:
|
|
|
vlist = data.get('data', {}).get('result', [])
|
|
|
if not vlist:
|
|
|
break
|
|
|
for item in vlist:
|
|
|
title = item.get('title', '')
|
|
|
bvid = item.get('bvid') or item.get('aid')
|
|
|
if bvid:
|
|
|
videos.append({'bvid': bvid, 'title': title})
|
|
|
if len(videos) >= max_videos:
|
|
|
break
|
|
|
except Exception as e:
|
|
|
logging.exception('搜索结果失败: %s', e)
|
|
|
break
|
|
|
page += 1
|
|
|
time.sleep(0.5)
|
|
|
logging.info('关键词「%s」共找到 %d 个视频,目标 %d', keyword, len(videos), max_videos)
|
|
|
return videos
|
|
|
|
|
|
|
|
|
def get_cid(bvid: str) -> int:
|
|
|
#通过接口获取
|
|
|
params = {'bvid': bvid} if isinstance(bvid, str) and bvid.startswith('BV') else {'aid': bvid}
|
|
|
for attempt in range(3):
|
|
|
try:
|
|
|
resp = SESSION.get(VIEW_URL, params=params, timeout=10)
|
|
|
if resp.status_code != 200:
|
|
|
logging.warning('获取 CID 失败:%s,状态码=%s', bvid, resp.status_code)
|
|
|
time.sleep(1 + attempt)
|
|
|
continue
|
|
|
data = resp.json()
|
|
|
return data.get('data', {}).get('cid')
|
|
|
except Exception as e:
|
|
|
logging.exception('第 %d 次获取 CID 异常:%s,错误=%s', attempt, bvid, e)
|
|
|
time.sleep(1 + attempt)
|
|
|
return None
|
|
|
|
|
|
|
|
|
def fetch_danmaku(cid: int) -> List[str]:
|
|
|
#请求弹幕并解析出文本列表
|
|
|
url = COMMENT_XML.format(cid=cid)
|
|
|
for attempt in range(3):
|
|
|
try:
|
|
|
resp = SESSION.get(url, timeout=10)
|
|
|
if resp.status_code != 200:
|
|
|
logging.warning('弹幕获取失败:cid=%s 状态码=%s', cid, resp.status_code)
|
|
|
time.sleep(1 + attempt)
|
|
|
continue
|
|
|
xml = resp.content
|
|
|
root = etree.fromstring(xml)
|
|
|
texts = [d.text or '' for d in root.findall('d')]
|
|
|
return texts
|
|
|
except Exception as e:
|
|
|
logging.exception('第%d次获取弹幕异常:cid=%s 错误=%s', attempt, cid, e)
|
|
|
time.sleep(1 + attempt)
|
|
|
return []
|
|
|
|
|
|
|
|
|
def save_raw_texts(texts: List[str], out_file: Path):
|
|
|
out_file.parent.mkdir(parents=True, exist_ok=True)
|
|
|
with out_file.open('w', encoding='utf-8') as f:
|
|
|
for t in texts:
|
|
|
f.write(t.replace('\n', ' ') + '\n')
|
|
|
|
|
|
|
|
|
def load_progress(progress_path: Path) -> Dict:
|
|
|
if not progress_path.exists():
|
|
|
return {'done': []}
|
|
|
try:
|
|
|
with progress_path.open('r', encoding='utf-8') as f:
|
|
|
return json.load(f)
|
|
|
except Exception:
|
|
|
return {'done': []}
|
|
|
|
|
|
|
|
|
def save_progress(progress_path: Path, data: Dict):
|
|
|
progress_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
with progress_path.open('w', encoding='utf-8') as f:
|
|
|
json.dump(data, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
|
def main(keywords: List[str], max_videos_per_kw: int, out_dir: Path, delay: float = 0.5, progress_file: str = '进度.json'):
|
|
|
out_dir = Path(out_dir)
|
|
|
progress_path = out_dir / progress_file
|
|
|
progress = load_progress(progress_path)
|
|
|
done = set(progress.get('done', []))
|
|
|
all_texts = []
|
|
|
for kw in keywords:
|
|
|
logging.info('开始处理关键词:%s', kw)
|
|
|
videos = search_videos(kw, max_videos_per_kw)
|
|
|
for v in tqdm(videos, desc=f'关键词 {kw} 的视频'):
|
|
|
bvid = v.get('bvid')
|
|
|
if not bvid:
|
|
|
continue
|
|
|
if bvid in done:
|
|
|
logging.debug('跳过已处理视频:%s', bvid)
|
|
|
continue
|
|
|
try:
|
|
|
cid = get_cid(bvid)
|
|
|
if not cid:
|
|
|
continue
|
|
|
texts = fetch_danmaku(cid)
|
|
|
vid_dir = out_dir / '原始'
|
|
|
vid_dir.mkdir(parents=True, exist_ok=True)
|
|
|
save_raw_texts(texts, vid_dir / f'{bvid}_{cid}.txt')
|
|
|
all_texts.extend(texts)
|
|
|
done.add(bvid)
|
|
|
#周期写入进度
|
|
|
if len(done) % 20 == 0:
|
|
|
save_progress(progress_path, {'done': list(done)})
|
|
|
# 随机延迟
|
|
|
time.sleep(delay + random.random() * 0.3)
|
|
|
except Exception:
|
|
|
logging.exception('处理视频失败:%s', bvid)
|
|
|
#保存聚合的原始文本
|
|
|
save_raw_texts(all_texts, out_dir / f'原始弹幕_{kw}.txt')
|
|
|
|
|
|
# 最终写入进度展示
|
|
|
save_progress(progress_path, {'done': list(done)})
|
|
|
|
|
|
#无数据时提示
|
|
|
if not all_texts:
|
|
|
logging.warning('未采集到任何弹幕——请检查网络、接口变更或反爬(HTTP 412/403)。')
|
|
|
return
|
|
|
|
|
|
#最后
|
|
|
logging.info('抓取完成:共处理 %d 个视频,采集弹幕 %d 行。', len(done), len(all_texts))
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
parser = argparse.ArgumentParser(description='B站弹幕抓取器')
|
|
|
parser.add_argument('-k', '--keywords', nargs='+', default=['大语言模型'], help='关键词')
|
|
|
parser.add_argument('-n', '--max-per-kw', type=int, default=120, help='每个关键词抓取的视频数')
|
|
|
parser.add_argument('-o', '--out', default='输出', help='输出目录')
|
|
|
parser.add_argument('--delay', type=float, default=0.5, help='请求间基础延迟')
|
|
|
parser.add_argument('--progress-file', default='进度.json', help='断点续爬进度文件名(默认 进度.json)')
|
|
|
parser.add_argument('--cookie', default=None, help='Cookie 字符串')
|
|
|
parser.add_argument('--proxy', default=None, help='代理URL(例如 http://127.0.0.1:7890 )')
|
|
|
args = parser.parse_args()
|
|
|
# apply optional cookie and proxy
|
|
|
if args.cookie:
|
|
|
SESSION.headers.update({'Cookie': args.cookie})
|
|
|
if args.proxy:
|
|
|
SESSION.proxies.update({'http': args.proxy, 'https': args.proxy})
|
|
|
main(args.keywords, args.max_per_kw, Path(args.out))
|
|
|
|
|
|
|