ADD file via upload

main
p3eilcsbk 5 months ago
parent 7e51694530
commit 4f3673e1ce

@ -0,0 +1,205 @@
import argparse
import time
import logging
import requests
from lxml import etree
from pathlib import Path
from typing import List, Dict
from tqdm import tqdm
import json
import random
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s')
SESSION = requests.Session()
SESSION.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 '
'(KHTML, like Gecko) Chrome/120.0 Safari/537.36'
})
# 请求头,可以降低被拦截概率
SESSION.headers.update({
'Referer': 'https://www.bilibili.com',
'Accept': 'application/json, text/javascript, */*; q=0.01'
})
SEARCH_URL = 'https://api.bilibili.com/x/web-interface/search/type'
VIEW_URL = 'https://api.bilibili.com/x/web-interface/view'
COMMENT_XML = 'https://comment.bilibili.com/{cid}.xml'
def search_videos(keyword: str, max_videos: int = 360) -> List[Dict]:
"""通过 B 站搜索 API 获取视频列表(综合排序)
返回的视频项包含 'bvid' 'title'
"""
videos = []
page = 1
page_size = 20
while len(videos) < max_videos:
params = {
'keyword': keyword,
'search_type': 'video',
'page': page,
'page_size': page_size
}
# 简单重试与抖动,降低 412/403 风险
resp = None
for attempt in range(3):
try:
resp = SESSION.get(SEARCH_URL, params=params, timeout=10)
if resp.status_code == 200:
break
logging.warning('搜索重试 %d 次,状态码=%s', attempt, resp.status_code)
except Exception as e:
logging.exception('搜索第 %d 次出现异常: %s', attempt, e)
time.sleep(0.5 + attempt * 0.5)
if not resp or resp.status_code != 200:
logging.warning('搜索失败,状态码=%s', getattr(resp, 'status_code', None))
break
data = resp.json()
try:
vlist = data.get('data', {}).get('result', [])
if not vlist:
break
for item in vlist:
title = item.get('title', '')
bvid = item.get('bvid') or item.get('aid')
if bvid:
videos.append({'bvid': bvid, 'title': title})
if len(videos) >= max_videos:
break
except Exception as e:
logging.exception('搜索结果失败: %s', e)
break
page += 1
time.sleep(0.5)
logging.info('关键词「%s」共找到 %d 个视频,目标 %d', keyword, len(videos), max_videos)
return videos
def get_cid(bvid: str) -> int:
#通过接口获取
params = {'bvid': bvid} if isinstance(bvid, str) and bvid.startswith('BV') else {'aid': bvid}
for attempt in range(3):
try:
resp = SESSION.get(VIEW_URL, params=params, timeout=10)
if resp.status_code != 200:
logging.warning('获取 CID 失败:%s,状态码=%s', bvid, resp.status_code)
time.sleep(1 + attempt)
continue
data = resp.json()
return data.get('data', {}).get('cid')
except Exception as e:
logging.exception('%d 次获取 CID 异常:%s,错误=%s', attempt, bvid, e)
time.sleep(1 + attempt)
return None
def fetch_danmaku(cid: int) -> List[str]:
#请求弹幕并解析出文本列表
url = COMMENT_XML.format(cid=cid)
for attempt in range(3):
try:
resp = SESSION.get(url, timeout=10)
if resp.status_code != 200:
logging.warning('弹幕获取失败cid=%s 状态码=%s', cid, resp.status_code)
time.sleep(1 + attempt)
continue
xml = resp.content
root = etree.fromstring(xml)
texts = [d.text or '' for d in root.findall('d')]
return texts
except Exception as e:
logging.exception('%d次获取弹幕异常cid=%s 错误=%s', attempt, cid, e)
time.sleep(1 + attempt)
return []
def save_raw_texts(texts: List[str], out_file: Path):
out_file.parent.mkdir(parents=True, exist_ok=True)
with out_file.open('w', encoding='utf-8') as f:
for t in texts:
f.write(t.replace('\n', ' ') + '\n')
def load_progress(progress_path: Path) -> Dict:
if not progress_path.exists():
return {'done': []}
try:
with progress_path.open('r', encoding='utf-8') as f:
return json.load(f)
except Exception:
return {'done': []}
def save_progress(progress_path: Path, data: Dict):
progress_path.parent.mkdir(parents=True, exist_ok=True)
with progress_path.open('w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def main(keywords: List[str], max_videos_per_kw: int, out_dir: Path, delay: float = 0.5, progress_file: str = '进度.json'):
out_dir = Path(out_dir)
progress_path = out_dir / progress_file
progress = load_progress(progress_path)
done = set(progress.get('done', []))
all_texts = []
for kw in keywords:
logging.info('开始处理关键词:%s', kw)
videos = search_videos(kw, max_videos_per_kw)
for v in tqdm(videos, desc=f'关键词 {kw} 的视频'):
bvid = v.get('bvid')
if not bvid:
continue
if bvid in done:
logging.debug('跳过已处理视频:%s', bvid)
continue
try:
cid = get_cid(bvid)
if not cid:
continue
texts = fetch_danmaku(cid)
vid_dir = out_dir / '原始'
vid_dir.mkdir(parents=True, exist_ok=True)
save_raw_texts(texts, vid_dir / f'{bvid}_{cid}.txt')
all_texts.extend(texts)
done.add(bvid)
#周期写入进度
if len(done) % 20 == 0:
save_progress(progress_path, {'done': list(done)})
# 随机延迟
time.sleep(delay + random.random() * 0.3)
except Exception:
logging.exception('处理视频失败:%s', bvid)
#保存聚合的原始文本
save_raw_texts(all_texts, out_dir / f'原始弹幕_{kw}.txt')
# 最终写入进度展示
save_progress(progress_path, {'done': list(done)})
#无数据时提示
if not all_texts:
logging.warning('未采集到任何弹幕——请检查网络、接口变更或反爬HTTP 412/403')
return
#最后
logging.info('抓取完成:共处理 %d 个视频,采集弹幕 %d 行。', len(done), len(all_texts))
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='B站弹幕抓取器')
parser.add_argument('-k', '--keywords', nargs='+', default=['大语言模型'], help='关键词')
parser.add_argument('-n', '--max-per-kw', type=int, default=120, help='每个关键词抓取的视频数')
parser.add_argument('-o', '--out', default='输出', help='输出目录')
parser.add_argument('--delay', type=float, default=0.5, help='请求间基础延迟')
parser.add_argument('--progress-file', default='进度.json', help='断点续爬进度文件名(默认 进度.json')
parser.add_argument('--cookie', default=None, help='Cookie 字符串')
parser.add_argument('--proxy', default=None, help='代理URL例如 http://127.0.0.1:7890 ')
args = parser.parse_args()
# apply optional cookie and proxy
if args.cookie:
SESSION.headers.update({'Cookie': args.cookie})
if args.proxy:
SESSION.proxies.update({'http': args.proxy, 'https': args.proxy})
main(args.keywords, args.max_per_kw, Path(args.out))
Loading…
Cancel
Save