From 4171d20e561ed52fe52b9373ecee357f6791e277 Mon Sep 17 00:00:00 2001 From: pc7si35ku <282589624@qq.com> Date: Wed, 18 Sep 2024 11:48:51 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=80=E7=BB=88=E7=89=88=E7=9A=84Crawler.py?= =?UTF-8?q?=E7=88=AC=E5=8F=96b=E7=AB=99=E5=BC=B9=E5=B9=95=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Crawler.py | 150 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 Crawler.py diff --git a/Crawler.py b/Crawler.py new file mode 100644 index 0000000..2f5b451 --- /dev/null +++ b/Crawler.py @@ -0,0 +1,150 @@ +import time # 导入时间模块,用于处理时间相关的功能 +from typing import List # 从 typing 模块导入 List,用于类型注解 +import requests # 导入 requests 模块,用于发送 HTTP 请求 +import json # 导入 json 模块,用于处理 JSON 数据 +from urllib import parse # 从 urllib 模块导入 parse,用于处理 URL 编码 +import re # 导入正则表达式模块,用于字符串匹配和提取 +from concurrent.futures import ThreadPoolExecutor, as_completed # 导入并发执行模块,用于实现多线程 +import cProfile + + +class BilibiliSpider: + def __init__(self, cookie: str, user_agent: str): + # 初始化 BilibiliSpider 类,设置用户的 cookie 和 user_agent + self.cookie = cookie # 存储用户的 cookie,以模拟登录状态 + self.user_agent = user_agent # 存储用户代理信息,用于模拟浏览器请求 + + def get_search_result(self, keyword: str, page: int, page_size: int) -> list: + # 根据关键词、页码和页面大小从 Bilibili 搜索视频 + headers = { + "Accept": "application/json, text/plain, */*", # 请求接受的内容类型 + "Accept-Encoding": "gzip, deflate, br, zstd", # 支持的编码格式 + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", # 请求的语言 + "Cache-Control": "no-cache", # 不使用缓存 + "Cookie": self.cookie, # 用户的 cookie + "Origin": "https://search.bilibili.com", # 请求来源 + "Pragma": "no-cache", # 不使用缓存 + "Priority": "u=1, i", # 请求优先级 + "Referer": f"https://search.bilibili.com/all?keyword={parse.quote(keyword)}", # 请求的来源页面 + "User-Agent": self.user_agent, # 模拟的用户代理 + } + + params = { + "search_type": "video", # 搜索类型为视频 + "page": page, # 当前页码 + "page_size": page_size, # 每页结果数量 + "keyword": keyword, # 搜索关键词 + } + + while True: # 无限循环,直到成功获取数据为止 + try: + # 发送 GET 请求获取搜索结果 + url = "https://api.bilibili.com/x/web-interface/search/type" + response = requests.get(url, headers=headers, params=params).json() # 将返回的结果转换为 JSON 格式 + if response['code'] == 0: # 检查响应码是否为 0,表示成功 + # 提取视频的 aid(视频ID),并返回 + aids = [item['id'] for item in response['data']['result']] + return aids # 返回视频ID列表 + except Exception as e: + # 捕获异常并打印错误信息,等待1秒后重试 + print(e) + time.sleep(1) # 等待1秒后重试 + + def get_cid(self, aid: int) -> int: + # 根据视频 aid 获取对应的 cid(视频分段ID) + headers = { + "Accept": "application/json, text/plain, */*", # 请求接受的内容类型 + "User-Agent": self.user_agent, # 模拟的用户代理 + "Cookie": self.cookie, # 用户的 cookie + } + + # 向 Bilibili API 请求视频详细信息,获取 cid + response = requests.get(f"https://api.bilibili.com/x/player/pagelist?aid={aid}", headers=headers) + + if response.status_code == 200: # 检查请求是否成功 + # 解析返回的 JSON 数据 + data = response.json() + if data and 'data' in data and len(data['data']) > 0: # 检查数据是否有效 + return data['data'][0]['cid'] # 返回视频的第一个页面的 cid + else: + # 如果未找到视频,抛出异常 + raise ValueError(f"No video found for aid {aid}.") + else: + # 如果请求失败,抛出异常 + raise Exception(f"Failed to retrieve CID for aid {aid}. Status code: {response.status_code}") + + def get_bullet_screen(self, aid: int) -> List: + # 根据视频 aid 获取弹幕数据,返回弹幕列表 + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", # 请求接受的内容类型 + "Cookie": self.cookie, # 用户的 cookie + "User-Agent": self.user_agent # 模拟的用户代理 + } + + # 请求弹幕数据(弹幕数据以 XML 格式返回) + url = f'https://comment.bilibili.com/{aid}.xml' # 构建弹幕数据的 URL + response = requests.get(url, headers=headers) # 发送 GET 请求获取弹幕数据 + response.encoding = 'utf-8' # 设置响应编码为 UTF-8 + html = response.text # 获取响应文本 + # 使用正则表达式从 XML 中提取弹幕文本,并返回弹幕列表 + return re.findall("(.+?)", html) # 提取弹幕内容 + + +def main(): + # 主函数,爬取 Bilibili 视频弹幕 + User_Agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.71 Safari/537.36" + cookies = "buvid3=0010B368-0E93-5612-1F55-B0AEFA2A788E68736infoc; b_nut=1722828468; _uuid=65A45AC6-10CC5-FD72-3AE5-EDD1D94C6B2A71555infoc; enable_web_push=DISABLE; home_feed_column=5; buvid4=D4A818B4-3DAF-E9E9-CCA6-2292209CA07D70717-024080503-n8yYBXNzLps6TrOphT3zww%3D%3D; header_theme_version=CLOSE; rpdid=|(J|)Rl|kRuk0J'u~kk)k)lJY; CURRENT_QUALITY=80; fingerprint=6d7a6d23f809895ad523f52c214cab31; buvid_fp_plain=undefined; b-user-id=06265419-2000-a180-a632-d8face940e87; CURRENT_BLACKGAP=0; is-2022-channel=1; buvid_fp=6d7a6d23f809895ad523f52c214cab31; bili_ticket_expires=1726804000; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjY4MDQwNjAsImlhdCI6MTcyNjU0NDgwMCwicGx0IjotMX0.GNq3G9bTe-3WQ8VAVc0sI4Qz9p3p7_dhcjMvGWeptHU; browser_resolution=1491-706; bp_t_offset_342699701=978044347712798720; CURRENT_FNVAL=4048; b_lsid=E10FCACF9_191FEFD995E; SESSDATA=a362140e%2C1742111957%2Cf7826%2A91CjBL7AZ1ewk0PKshkKehAyTa-FGUdcmmYfvGIZvLIvE3mrP0Lp1ZFo7Vp-Hg1cnTkFcSVkpPSlUwSkpFWWNodXFmNDdRNnFOdkdfTkpkbmpQNjlDUGkxLXpMRXZIMWpLUkVVSU1sNjM2clZVUmp1dEZDeDFmRTZJS0JObEstb1RVeV94ek91UktnIIEC; bili_jct=2627cc66d6b22d78edd09ea63d44b26e; DedeUserID=3546763816339525; DedeUserID__ckMd5=d856350aecedd530; sid=8ec23v36" + + # 创建 BilibiliSpider 实例 + Bili = BilibiliSpider(cookies, User_Agent) + + keyword = "2024巴黎奥运会" # 设置搜索关键词 + page_size = 30 # 每页30个结果 + total_pages = 10 # 爬取10页,总计300个视频 + data_list = [] # 存储所有弹幕数据 + + # 使用线程池执行器来并发请求 + with ThreadPoolExecutor(max_workers=10) as executor: # 创建一个线程池,最多有10个工作线程 + futures = [] # 存储所有提交的任务 + for page in range(1, total_pages + 1): # 循环遍历每一页 + print(f"Fetching search results for page {page}...") # 打印当前页码 + aids = Bili.get_search_result(keyword, page, page_size) # 获取当前页的视频 aid 列表 + + for aid in aids: # 遍历每个视频 aid + # 提交任务到线程池,异步获取弹幕数据 + futures.append(executor.submit(fetch_bullet_screen, Bili, aid)) + + # 等待所有任务完成并处理结果 + for future in as_completed(futures): # 遍历已完成的任务 + try: + bullet_screens = future.result() # 获取任务的结果 + data_list.extend(bullet_screens) # 将弹幕数据添加到 data_list + print(f"Fetched {len(bullet_screens)} bullet screens.") # 打印获取的弹幕数量 + except Exception as e: + print(f"An error occurred: {e}") # 处理任务中的异常 + + print(f"Total bullet screens fetched: {len(data_list)}") # 打印总共获取的弹幕数量 + + # 将弹幕数据保存到 "弹幕.txt" 文件 + with open("弹幕.txt", mode='a', encoding="utf-8") as f: # 以附加模式打开文件 + for data in data_list: # 遍历所有弹幕数据 + f.write(data + '\n') # 将弹幕数据写入文件 + + +def fetch_bullet_screen(Bili: BilibiliSpider, aid: int) -> List: + # 根据视频 aid 获取弹幕数据的辅助函数 + try: + print(f"Fetching bullet screen for video with aid {aid}...") # 打印正在获取的 video aid + cid = Bili.get_cid(aid) # 获取视频的 cid + bullet_screens = Bili.get_bullet_screen(cid) # 获取弹幕数据 + return bullet_screens # 返回弹幕数据 + except Exception as e: + print(f"An error occurred while fetching data for aid {aid}: {e}") # 打印错误信息 + return [] # 返回空列表以表示没有获取到数据 + + +if __name__ == "__main__": + main() # 执行主函数 +# if __name__ == "__main__": +# # 使用 cProfile 对 main 函数进行性能分析 +# cProfile.run('main()','profile_results') \ No newline at end of file