You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
102301535/import requests.py

368 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import requests
import pandas as pd
import time
import random
import re
import logging
from urllib.parse import quote
from datetime import datetime
import os
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('bilibili_crawler.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class OptimizedBilibiliCrawler:
def __init__(self, max_workers=3):
self.session = requests.Session()
# 使用更真实的请求头
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
'Referer': 'https://www.bilibili.com/',
'Accept': 'application/json, text/plain, */*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
'Origin': 'https://www.bilibili.com',
'Connection': 'keep-alive',
})
# 线程池用于并发处理
self.max_workers = max_workers
# 统计信息
self.stats = {
'total_videos': 0,
'successful_videos': 0,
'failed_videos': 0,
'total_danmu': 0,
'start_time': None,
'end_time': None
}
# 失败重试队列
self.retry_queue = []
def search_videos_by_keyword(self, keyword, pages=6):
"""通过关键词搜索视频"""
videos = []
for page in range(1, pages + 1):
try:
# 使用搜索API
url = f"https://api.bilibili.com/x/web-interface/search/type"
params = {
'search_type': 'video',
'keyword': keyword,
'page': page,
'tids': 36, # 知识区
'order': 'click', # 按播放量排序
}
response = self.session.get(url, params=params, timeout=10)
if response.status_code == 200:
data = response.json()
if data['code'] == 0:
for item in data['data']['result']:
videos.append({
'bvid': item['bvid'],
'title': item['title'],
'keyword': keyword
})
logger.info(f"关键词 '{keyword}'{page} 页获取到 {len(data['data']['result'])} 个视频")
else:
logger.warning(f"搜索API返回错误: {data['message']}")
else:
logger.warning(f"搜索请求失败: {response.status_code}")
# 搜索请求间短暂延时
time.sleep(random.uniform(1, 2))
except Exception as e:
logger.error(f"搜索视频异常: {e}")
continue
return videos
def get_video_info(self, bvid, max_retries=2):
"""获取视频信息(包含重试机制)"""
for attempt in range(max_retries):
try:
url = f"https://api.bilibili.com/x/web-interface/view?bvid={bvid}"
response = self.session.get(url, timeout=8)
if response.status_code == 200:
data = response.json()
if data['code'] == 0:
info = data['data']
return {
'cid': info['cid'],
'title': info['title'],
'owner': info['owner']['name'],
'view': info['stat']['view'],
'danmaku': info['stat']['danmaku'],
'pubdate': info['pubdate']
}
elif response.status_code == 412: # 遇到反爬
wait_time = random.uniform(5, 10)
logger.info(f"遇到412错误等待{wait_time:.2f}")
time.sleep(wait_time)
continue
except Exception as e:
logger.warning(f"获取视频信息异常 {bvid} (尝试{attempt+1}/{max_retries}): {e}")
# 重试前等待
if attempt < max_retries - 1:
time.sleep(random.uniform(2, 4))
return None
def get_danmu_data(self, cid, max_retries=2):
"""获取弹幕数据(包含重试机制)"""
for attempt in range(max_retries):
try:
# 使用新的弹幕API
url = f"https://api.bilibili.com/x/v1/dm/list.so?oid={cid}"
response = self.session.get(url, timeout=10)
if response.status_code == 200:
response.encoding = 'utf-8'
danmus = re.findall(r'<d p=".*?">(.*?)</d>', response.text)
return danmus
elif response.status_code == 412:
wait_time = random.uniform(5, 10)
logger.info(f"弹幕请求遇到412等待{wait_time:.2f}")
time.sleep(wait_time)
continue
except Exception as e:
logger.warning(f"获取弹幕异常 CID {cid} (尝试{attempt+1}/{max_retries}): {e}")
if attempt < max_retries - 1:
time.sleep(random.uniform(2, 3))
return []
def process_single_video(self, video_info):
"""处理单个视频的弹幕获取"""
bvid = video_info['bvid']
keyword = video_info['keyword']
logger.info(f"处理视频: {bvid}")
# 获取视频信息
video_detail = self.get_video_info(bvid)
if not video_detail:
logger.warning(f"无法获取视频信息: {bvid}")
return []
# 短暂延时后获取弹幕
time.sleep(random.uniform(1, 2))
# 获取弹幕
danmus = self.get_danmu_data(video_detail['cid'])
result_data = []
for danmu in danmus:
result_data.append({
'关键词': keyword,
'视频BV号': bvid,
'视频标题': video_detail['title'],
'UP主': video_detail['owner'],
'播放量': video_detail['view'],
'弹幕数': video_detail['danmaku'],
'发布时间': datetime.fromtimestamp(video_detail['pubdate']).strftime('%Y-%m-%d %H:%M:%S'),
'弹幕内容': danmu
})
logger.info(f"视频 {bvid} 获取到 {len(danmus)} 条弹幕")
return result_data
def crawl_keyword_concurrent(self, keyword, video_count=120):
"""并发爬取一个关键词的视频弹幕"""
logger.info(f"开始并发爬取关键词: {keyword}")
# 先搜索获取视频列表
search_videos = self.search_videos_by_keyword(keyword, pages=6)
if not search_videos:
logger.warning(f"关键词 '{keyword}' 未搜索到视频")
return []
# 如果搜索到的视频不足,重复列表
all_videos = []
while len(all_videos) < video_count:
all_videos.extend(search_videos)
all_videos = all_videos[:video_count]
all_data = []
successful_count = 0
# 使用线程池并发处理
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
# 提交所有任务
future_to_video = {
executor.submit(self.process_single_video, video): video
for video in all_videos
}
# 处理完成的任务
for future in as_completed(future_to_video):
video = future_to_video[future]
try:
video_data = future.result()
if video_data:
all_data.extend(video_data)
successful_count += 1
self.stats['successful_videos'] += 1
self.stats['total_danmu'] += len(video_data)
else:
self.stats['failed_videos'] += 1
self.stats['total_videos'] += 1
except Exception as e:
logger.error(f"处理视频 {video['bvid']} 时发生异常: {e}")
self.stats['failed_videos'] += 1
self.stats['total_videos'] += 1
logger.info(f"关键词 '{keyword}' 完成: 成功{successful_count}个视频,获取{len(all_data)}条弹幕")
return all_data
def crawl_all_keywords(self, keywords):
"""爬取所有关键词"""
self.stats['start_time'] = datetime.now()
logger.info(f"开始爬取所有关键词: {keywords}")
all_data = []
for keyword in keywords:
keyword_data = self.crawl_keyword_concurrent(keyword, video_count=120)
all_data.extend(keyword_data)
# 关键词间短暂休息
if keyword != keywords[-1]: # 不是最后一个关键词
wait_time = random.uniform(5, 8)
logger.info(f"完成关键词 '{keyword}',等待{wait_time:.2f}秒继续...")
time.sleep(wait_time)
self.stats['end_time'] = datetime.now()
return pd.DataFrame(all_data)
def save_to_excel(self, df, filename):
"""保存数据到Excel"""
try:
if df.empty:
logger.warning("没有数据可保存")
return False
# 确保目录存在
os.makedirs(os.path.dirname(filename) if os.path.dirname(filename) else '.', exist_ok=True)
# 保存数据
with pd.ExcelWriter(filename, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='弹幕数据', index=False)
# 添加统计信息工作表
stats_df = self.create_stats_dataframe()
stats_df.to_excel(writer, sheet_name='统计信息', index=False)
logger.info(f"数据已保存到 {filename}")
return True
except Exception as e:
logger.error(f"保存Excel时出错: {e}")
return False
def create_stats_dataframe(self):
"""创建统计信息DataFrame"""
duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds() if self.stats['end_time'] else 0
success_rate = (self.stats['successful_videos'] / self.stats['total_videos'] * 100) if self.stats['total_videos'] > 0 else 0
avg_danmu = (self.stats['total_danmu'] / self.stats['successful_videos']) if self.stats['successful_videos'] > 0 else 0
stats_data = {
'统计项': [
'开始时间', '结束时间', '总耗时',
'总视频数', '成功视频数', '失败视频数',
'成功率', '总弹幕数', '平均每个视频弹幕数'
],
'数值': [
self.stats['start_time'].strftime('%Y-%m-%d %H:%M:%S') if self.stats['start_time'] else 'N/A',
self.stats['end_time'].strftime('%Y-%m-%d %H:%M:%S') if self.stats['end_time'] else 'N/A',
f"{duration:.2f}",
self.stats['total_videos'],
self.stats['successful_videos'],
self.stats['failed_videos'],
f"{success_rate:.2f}%",
self.stats['total_danmu'],
f"{avg_danmu:.2f}"
]
}
return pd.DataFrame(stats_data)
def print_summary(self):
"""打印爬取摘要"""
duration = (self.stats['end_time'] - self.stats['start_time']).total_seconds() if self.stats['end_time'] else 0
success_rate = (self.stats['successful_videos'] / self.stats['total_videos'] * 100) if self.stats['total_videos'] > 0 else 0
print("\n" + "="*60)
print("爬取摘要")
print("="*60)
print(f"开始时间: {self.stats['start_time']}")
print(f"结束时间: {self.stats['end_time']}")
print(f"总耗时: {duration/60:.2f} 分钟")
print(f"总视频数: {self.stats['total_videos']}")
print(f"成功视频数: {self.stats['successful_videos']}")
print(f"失败视频数: {self.stats['failed_videos']}")
print(f"成功率: {success_rate:.2f}%")
print(f"总弹幕数: {self.stats['total_danmu']}")
print("="*60)
def main():
# 定义搜索关键词
keywords = ["大语言模型", "大模型", "LLM"]
# 初始化优化爬虫可以调整并发数建议3-5之间
crawler = OptimizedBilibiliCrawler(max_workers=4)
# 爬取数据
logger.info("开始爬取B站弹幕数据...")
logger.info(f"目标: 每个关键词120个视频总共{len(keywords)*120}个视频")
try:
danmu_df = crawler.crawl_all_keywords(keywords)
# 保存数据
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f"B站弹幕数据_{timestamp}.xlsx"
if crawler.save_to_excel(danmu_df, output_file):
# 打印摘要
crawler.print_summary()
# 按关键词统计
if not danmu_df.empty:
print("\n按关键词统计:")
for keyword in keywords:
keyword_count = len(danmu_df[danmu_df['关键词'] == keyword])
print(f"关键词 '{keyword}': {keyword_count} 条弹幕")
# 统计视频数量
unique_videos = danmu_df['视频BV号'].nunique()
print(f"成功爬取的视频数量: {unique_videos}")
except Exception as e:
logger.error(f"爬取过程中发生严重错误: {e}")
print(f"程序执行失败: {e}")
if __name__ == "__main__":
main()