From d77c66f928ac943088b0861853542fa2a66c6e75 Mon Sep 17 00:00:00 2001 From: plzmcgpwb <3167850775@qq.com> Date: Wed, 18 Sep 2024 20:41:02 +0800 Subject: [PATCH] =?UTF-8?q?=E7=88=AC=E5=8F=96=E6=89=80=E6=9C=89=E5=BC=B9?= =?UTF-8?q?=E5=B9=95=E7=9A=84=E6=BA=90=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- paqu.py | 110 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 paqu.py diff --git a/paqu.py b/paqu.py new file mode 100644 index 0000000..d0fac0c --- /dev/null +++ b/paqu.py @@ -0,0 +1,110 @@ +import time +from typing import List +import requests +import re +from urllib import parse +from concurrent.futures import ThreadPoolExecutor, as_completed + +class BilibiliVideoSpider: + def __init__(self, session_cookie: str, user_agent: str): + self.session_cookie = session_cookie + self.user_agent = user_agent + + def search_videos(self, keyword: str, page: int, page_size: int) -> list: + headers = { + "Accept": "application/json, text/plain, */*", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8", + "Cache-Control": "no-cache", + "Cookie": self.session_cookie, + "Origin": "https://search.bilibili.com", + "Pragma": "no-cache", + "Referer": f"https://search.bilibili.com/all?keyword={parse.quote(keyword)}", + "User-Agent": self.user_agent, + } + + params = { + "search_type": "video", + "page": page, + "page_size": page_size, + "keyword": keyword, + } + + while True: + try: + response = requests.get("https://api.bilibili.com/x/web-interface/search/type", headers=headers, params=params).json() + if response.get('code') == 0: + return [item['id'] for item in response['data'].get('result', [])] + except Exception as error: + print(f"Error fetching search results: {error}") + time.sleep(1) + continue # Retry the request if it fails + + def retrieve_cid(self, aid: int) -> int: + headers = { + "Accept": "application/json, text/plain, */*", + "User-Agent": self.user_agent, + "Cookie": self.session_cookie, + } + + response = requests.get(f"https://api.bilibili.com/x/player/pagelist?aid={aid}&bvid=", headers=headers) + if response.status_code == 200: + data = response.json() + if data and 'data' in data and len(data['data']) > 0: + return data['data'][0]['cid'] + raise ValueError(f"No video found for aid {aid}.") + + def fetch_danmaku(self, aid: int) -> List[str]: + headers = { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8", + "Cookie": self.session_cookie, + "User-Agent": self.user_agent + } + + response = requests.get(f'https://api.bilibili.com/x/v1/dm/list.so?oid={aid}', headers=headers) + response.encoding = 'utf-8' + if response.status_code == 200: + return re.findall('(.+?)', response.text) + else: + print(f"Failed to fetch danmaku for aid {aid}") + return [] + +def fetch_bullet_screen(spider: BilibiliVideoSpider, aid: int) -> List[str]: + try: + print(f"Fetching bullet screen for video with aid {aid}...") + cid = spider.retrieve_cid(aid) + return spider.fetch_danmaku(cid) + except Exception as error: + print(f"Error fetching data for aid {aid}: {error}") + return [] + +def main(): + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36 Edg/128.0.0.0" + session_cookie = "YOUR_COOKIE" # Replace with your actual cookie + + spider = BilibiliVideoSpider(session_cookie, user_agent) + keyword = "2024巴黎奥运会" + results_per_page = 30 + total_pages = 10 + all_danmaku = [] + + with ThreadPoolExecutor(max_workers=10) as executor: + futures = [] + for page in range(1, total_pages + 1): + print(f"Fetching search results for page {page}...") + aids = spider.search_videos(keyword, page, results_per_page) + for aid in aids: + futures.append(executor.submit(fetch_bullet_screen, spider, aid)) + + for future in as_completed(futures): + all_danmaku.extend(future.result()) + + print(f"Total bullet screens fetched: {len(all_danmaku)}") + + # 将弹幕数据保存到 "弹幕.txt" 文件 + with open("弹幕.txt", mode='w', encoding="utf-8") as file: # 以写入模式打开文件 + for danmaku in all_danmaku: # 遍历所有弹幕数据 + file.write(danmaku + '\n') # 将弹幕数据写入文件 + +if __name__ == "__main__": + main() \ No newline at end of file