最终版的Crawler.py爬取b站弹幕代码

main
pc7si35ku 2 months ago
parent d862b97fb6
commit 4171d20e56

@ -0,0 +1,150 @@
import time # 导入时间模块,用于处理时间相关的功能
from typing import List # 从 typing 模块导入 List用于类型注解
import requests # 导入 requests 模块,用于发送 HTTP 请求
import json # 导入 json 模块,用于处理 JSON 数据
from urllib import parse # 从 urllib 模块导入 parse用于处理 URL 编码
import re # 导入正则表达式模块,用于字符串匹配和提取
from concurrent.futures import ThreadPoolExecutor, as_completed # 导入并发执行模块,用于实现多线程
import cProfile
class BilibiliSpider:
def __init__(self, cookie: str, user_agent: str):
# 初始化 BilibiliSpider 类,设置用户的 cookie 和 user_agent
self.cookie = cookie # 存储用户的 cookie以模拟登录状态
self.user_agent = user_agent # 存储用户代理信息,用于模拟浏览器请求
def get_search_result(self, keyword: str, page: int, page_size: int) -> list:
# 根据关键词、页码和页面大小从 Bilibili 搜索视频
headers = {
"Accept": "application/json, text/plain, */*", # 请求接受的内容类型
"Accept-Encoding": "gzip, deflate, br, zstd", # 支持的编码格式
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", # 请求的语言
"Cache-Control": "no-cache", # 不使用缓存
"Cookie": self.cookie, # 用户的 cookie
"Origin": "https://search.bilibili.com", # 请求来源
"Pragma": "no-cache", # 不使用缓存
"Priority": "u=1, i", # 请求优先级
"Referer": f"https://search.bilibili.com/all?keyword={parse.quote(keyword)}", # 请求的来源页面
"User-Agent": self.user_agent, # 模拟的用户代理
}
params = {
"search_type": "video", # 搜索类型为视频
"page": page, # 当前页码
"page_size": page_size, # 每页结果数量
"keyword": keyword, # 搜索关键词
}
while True: # 无限循环,直到成功获取数据为止
try:
# 发送 GET 请求获取搜索结果
url = "https://api.bilibili.com/x/web-interface/search/type"
response = requests.get(url, headers=headers, params=params).json() # 将返回的结果转换为 JSON 格式
if response['code'] == 0: # 检查响应码是否为 0表示成功
# 提取视频的 aid视频ID并返回
aids = [item['id'] for item in response['data']['result']]
return aids # 返回视频ID列表
except Exception as e:
# 捕获异常并打印错误信息等待1秒后重试
print(e)
time.sleep(1) # 等待1秒后重试
def get_cid(self, aid: int) -> int:
# 根据视频 aid 获取对应的 cid视频分段ID
headers = {
"Accept": "application/json, text/plain, */*", # 请求接受的内容类型
"User-Agent": self.user_agent, # 模拟的用户代理
"Cookie": self.cookie, # 用户的 cookie
}
# 向 Bilibili API 请求视频详细信息,获取 cid
response = requests.get(f"https://api.bilibili.com/x/player/pagelist?aid={aid}", headers=headers)
if response.status_code == 200: # 检查请求是否成功
# 解析返回的 JSON 数据
data = response.json()
if data and 'data' in data and len(data['data']) > 0: # 检查数据是否有效
return data['data'][0]['cid'] # 返回视频的第一个页面的 cid
else:
# 如果未找到视频,抛出异常
raise ValueError(f"No video found for aid {aid}.")
else:
# 如果请求失败,抛出异常
raise Exception(f"Failed to retrieve CID for aid {aid}. Status code: {response.status_code}")
def get_bullet_screen(self, aid: int) -> List:
# 根据视频 aid 获取弹幕数据,返回弹幕列表
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", # 请求接受的内容类型
"Cookie": self.cookie, # 用户的 cookie
"User-Agent": self.user_agent # 模拟的用户代理
}
# 请求弹幕数据(弹幕数据以 XML 格式返回)
url = f'https://comment.bilibili.com/{aid}.xml' # 构建弹幕数据的 URL
response = requests.get(url, headers=headers) # 发送 GET 请求获取弹幕数据
response.encoding = 'utf-8' # 设置响应编码为 UTF-8
html = response.text # 获取响应文本
# 使用正则表达式从 XML 中提取弹幕文本,并返回弹幕列表
return re.findall("<d p=.+?>(.+?)</d>", html) # 提取弹幕内容
def main():
# 主函数,爬取 Bilibili 视频弹幕
User_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36"
cookies = "buvid3=0010B368-0E93-5612-1F55-B0AEFA2A788E68736infoc; b_nut=1722828468; _uuid=65A45AC6-10CC5-FD72-3AE5-EDD1D94C6B2A71555infoc; enable_web_push=DISABLE; home_feed_column=5; buvid4=D4A818B4-3DAF-E9E9-CCA6-2292209CA07D70717-024080503-n8yYBXNzLps6TrOphT3zww%3D%3D; header_theme_version=CLOSE; rpdid=|(J|)Rl|kRuk0J'u~kk)k)lJY; CURRENT_QUALITY=80; fingerprint=6d7a6d23f809895ad523f52c214cab31; buvid_fp_plain=undefined; b-user-id=06265419-2000-a180-a632-d8face940e87; CURRENT_BLACKGAP=0; is-2022-channel=1; buvid_fp=6d7a6d23f809895ad523f52c214cab31; bili_ticket_expires=1726804000; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjY4MDQwNjAsImlhdCI6MTcyNjU0NDgwMCwicGx0IjotMX0.GNq3G9bTe-3WQ8VAVc0sI4Qz9p3p7_dhcjMvGWeptHU; browser_resolution=1491-706; bp_t_offset_342699701=978044347712798720; CURRENT_FNVAL=4048; b_lsid=E10FCACF9_191FEFD995E; SESSDATA=a362140e%2C1742111957%2Cf7826%2A91CjBL7AZ1ewk0PKshkKehAyTa-FGUdcmmYfvGIZvLIvE3mrP0Lp1ZFo7Vp-Hg1cnTkFcSVkpPSlUwSkpFWWNodXFmNDdRNnFOdkdfTkpkbmpQNjlDUGkxLXpMRXZIMWpLUkVVSU1sNjM2clZVUmp1dEZDeDFmRTZJS0JObEstb1RVeV94ek91UktnIIEC; bili_jct=2627cc66d6b22d78edd09ea63d44b26e; DedeUserID=3546763816339525; DedeUserID__ckMd5=d856350aecedd530; sid=8ec23v36"
# 创建 BilibiliSpider 实例
Bili = BilibiliSpider(cookies, User_Agent)
keyword = "2024巴黎奥运会" # 设置搜索关键词
page_size = 30 # 每页30个结果
total_pages = 10 # 爬取10页总计300个视频
data_list = [] # 存储所有弹幕数据
# 使用线程池执行器来并发请求
with ThreadPoolExecutor(max_workers=10) as executor: # 创建一个线程池最多有10个工作线程
futures = [] # 存储所有提交的任务
for page in range(1, total_pages + 1): # 循环遍历每一页
print(f"Fetching search results for page {page}...") # 打印当前页码
aids = Bili.get_search_result(keyword, page, page_size) # 获取当前页的视频 aid 列表
for aid in aids: # 遍历每个视频 aid
# 提交任务到线程池,异步获取弹幕数据
futures.append(executor.submit(fetch_bullet_screen, Bili, aid))
# 等待所有任务完成并处理结果
for future in as_completed(futures): # 遍历已完成的任务
try:
bullet_screens = future.result() # 获取任务的结果
data_list.extend(bullet_screens) # 将弹幕数据添加到 data_list
print(f"Fetched {len(bullet_screens)} bullet screens.") # 打印获取的弹幕数量
except Exception as e:
print(f"An error occurred: {e}") # 处理任务中的异常
print(f"Total bullet screens fetched: {len(data_list)}") # 打印总共获取的弹幕数量
# 将弹幕数据保存到 "弹幕.txt" 文件
with open("弹幕.txt", mode='a', encoding="utf-8") as f: # 以附加模式打开文件
for data in data_list: # 遍历所有弹幕数据
f.write(data + '\n') # 将弹幕数据写入文件
def fetch_bullet_screen(Bili: BilibiliSpider, aid: int) -> List:
# 根据视频 aid 获取弹幕数据的辅助函数
try:
print(f"Fetching bullet screen for video with aid {aid}...") # 打印正在获取的 video aid
cid = Bili.get_cid(aid) # 获取视频的 cid
bullet_screens = Bili.get_bullet_screen(cid) # 获取弹幕数据
return bullet_screens # 返回弹幕数据
except Exception as e:
print(f"An error occurred while fetching data for aid {aid}: {e}") # 打印错误信息
return [] # 返回空列表以表示没有获取到数据
if __name__ == "__main__":
main() # 执行主函数
# if __name__ == "__main__":
# # 使用 cProfile 对 main 函数进行性能分析
# cProfile.run('main()','profile_results')
Loading…
Cancel
Save