diff --git a/release/__pycache__/test_getthread.cpython-37.pyc b/release/__pycache__/test_getthread.cpython-37.pyc new file mode 100644 index 0000000..e53ded5 Binary files /dev/null and b/release/__pycache__/test_getthread.cpython-37.pyc differ diff --git a/release/crawl_1.py b/release/crawl_1.py new file mode 100644 index 0000000..8a80c37 --- /dev/null +++ b/release/crawl_1.py @@ -0,0 +1,77 @@ +import requests +import re +from lxml import etree +import os + +class BiliBiliDanMu: + def __init__(self, bv, filename): + # 自动处理 BV 号,确保没有重复的 "BV" 前缀 + if bv.startswith("BV"): + bv = bv[2:] + # 根据 bv 号构造要爬取的视频 URL 地址 + self.video_url = "https://bilibili.com/video/BV" + bv + self.filename = filename + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)\ + AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36 Edg/85.0.564.44" + } + + # 获取视频的 cid + def get_video_cid(self): + response = requests.get(self.video_url, headers=self.headers, timeout=10) + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + return None + + html = response.content.decode() + print(f"HTML 内容(前500字符): {html[:500]}") # 打印部分 HTML 内容用于调试 + cid = re.findall(r'("cid":)([0-9]+)', html) + # 有的视频没有这个字段,我们跳过它 + if len(cid) == 0: + print("未找到 cid") + return None + else: + return cid[0][-1] + + # 获取请求弹幕 XML 文件返回的内容 + def get_content(self, xml_url): + response = requests.get(xml_url, headers=self.headers, timeout=10) + return response.content + + # 解析获取到的内容,得到包含视频所有弹幕的列表 + def extract_danmu(self, content_str): + html = etree.HTML(content_str) + danmu_list = html.xpath("//d/text()") + return danmu_list + + # 将弹幕逐行写入并保存为 txt 文件 + def save(self, save_items): + # 确保输出目录存在 + output_dir = os.path.dirname(self.filename) + os.makedirs(output_dir, exist_ok=True) # 自动创建目录 + + with open(self.filename, 'w', encoding='utf-8') as f: + lines = [] + for item in save_items: + lines.append(item + '\n') + f.writelines(lines) + print(f"弹幕已保存至 {self.filename}") + + # 爬虫的过程封装 + def crawl(self): + cid = self.get_video_cid() + # 跳过没有 cid 字段的视频 + if cid is not None: + xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml" + content_str = self.get_content(xml_url) + danmu_lst = self.extract_danmu(content_str) + self.save(danmu_lst) + else: + print("视频没有有效的 cid,跳过此视频") + +if __name__ == '__main__': + bv = input("请输入视频的 bv 号: ") + # 处理文件名,确保路径正确 + filename = 'E:/前端/软件工程/{}.txt'.format(str(bv)) + dm = BiliBiliDanMu(bv, filename) + dm.crawl() \ No newline at end of file diff --git a/release/crawl_getfor.py b/release/crawl_getfor.py new file mode 100644 index 0000000..7901efb --- /dev/null +++ b/release/crawl_getfor.py @@ -0,0 +1,169 @@ +# import cProfile +import requests +import re +from lxml import etree +import os +import time +import random +from collections import OrderedDict + +class BiliBiliDanMu: + def __init__(self, bv, filename): + if bv.startswith("BV"): + bv = bv[2:] + self.video_url = "https://bilibili.com/video/BV" + bv + self.filename = filename + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + } + + def get_video_cid(self): + retry_count = 3 + for attempt in range(retry_count): + try: + response = requests.get(self.video_url, headers=self.headers, timeout=10) + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + continue + + html = response.content.decode() + cid = re.findall(r'("cid":)([0-9]+)', html) + if not cid: + print("未找到 cid") + continue + else: + return cid[0][-1] + except requests.exceptions.RequestException as e: + print(f"获取 cid 时出错: {e}") + print(f"第 {attempt + 1} 次重试获取 cid...") + time.sleep(2) + return None + + def get_content(self, xml_url): + try: + response = requests.get(xml_url, headers=self.headers, timeout=10) + if response.status_code == 200: + return response.content + else: + print(f"获取弹幕内容失败,状态码: {response.status_code}") + return None + except requests.exceptions.RequestException as e: + print(f"获取弹幕时出错: {e}") + return None + + def extract_danmu(self, content_str): + try: + html = etree.HTML(content_str) + danmu_list = html.xpath("//d/text()") + return danmu_list + except Exception as e: + print(f"解析弹幕时出错: {e}") + return [] + + def save(self, save_items): + output_dir = os.path.dirname(self.filename) + os.makedirs(output_dir, exist_ok=True) + + with open(self.filename, 'w', encoding='utf-8') as f: + lines = [item + '\n' for item in save_items] + f.writelines(lines) + print(f"弹幕已保存至 {self.filename}") + + def crawl(self): + cid = self.get_video_cid() + if cid is not None: + xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml" + content_str = self.get_content(xml_url) + if content_str: + danmu_lst = self.extract_danmu(content_str) + self.save(danmu_lst) + else: + print("视频没有有效的 cid,跳过此视频") + + +def search_videos(query, max_results=350): + search_url = "https://api.bilibili.com/x/web-interface/search/type" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + "cookie": "your cookie" # 替换为实际的 Cookie + } + + bv_list = [] + page = 1 + + while len(bv_list) < max_results: + params = { + 'keyword': query, + 'search_type': 'video', + 'order': 'totalrank', + 'page': page, + 'pagesize': 50 + } + + try: + response = requests.get(search_url, params=params, headers=headers, timeout=10) + if response.status_code == 200: + results = response.json() + if results['code'] == 0: + videos = results['data']['result'] + if not videos: + break + bv_list += [video['bvid'] for video in videos] + print(f"已抓取 {len(bv_list)} 个视频") + else: + print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}") + if '频繁' in results.get('message', ''): + print("限流,等待后重试") + time.sleep(random.uniform(5, 10)) + continue + break + else: + print(f"搜索请求失败,状态码: {response.status_code}") + break + except requests.exceptions.RequestException as e: + print(f"请求失败,错误: {e}") + time.sleep(random.uniform(2, 5)) + continue + + page += 1 + time.sleep(random.uniform(1, 3)) + + bv_list = list(OrderedDict.fromkeys(bv_list)) # 去重操作 + return bv_list[:max_results] + + +def download_danmu(index, bv, filename): + danmu_crawler = BiliBiliDanMu(bv, filename) + danmu_crawler.crawl() + +def getfor(): + for index, bv in enumerate(bv_list): + filename = f'{output_dir}第{index + 1}个视频_{bv}.txt' + print(f"正在抓取 BV号 {bv} 的弹幕...") + download_danmu(index, bv, filename) + print(f"BV号 {bv} 的弹幕抓取完成") + + +if __name__ == '__main__': + query = input("请输入搜索关键词: ") + bv_list = search_videos(query) + + # 限制爬取的最大视频数量为300 + bv_list = bv_list[:300] + + output_dir = 'E:/前端/软件工程/弹幕收集first/' + os.makedirs(output_dir, exist_ok=True) + + # 依次抓取每个视频的弹幕 + getfor() + # for index, bv in enumerate(bv_list): + # filename = f'{output_dir}第{index + 1}个视频_{bv}.txt' + # print(f"正在抓取 BV号 {bv} 的弹幕...") + # download_danmu(index, bv, filename) + # print(f"BV号 {bv} 的弹幕抓取完成") \ No newline at end of file diff --git a/release/crawl_getthread.py b/release/crawl_getthread.py new file mode 100644 index 0000000..067d180 --- /dev/null +++ b/release/crawl_getthread.py @@ -0,0 +1,175 @@ +import requests +import re +from lxml import etree +import os +import time +import random +from concurrent.futures import ThreadPoolExecutor, as_completed +from collections import OrderedDict + +class BiliBiliDanMu: + def __init__(self, bv, filename): + #处理输入的 BV 号,确保是正确格式 + if bv.startswith("BV"): + bv = bv[2:] + self.video_url = "https://bilibili.com/video/BV" + bv + self.filename = filename + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + } + + def get_video_cid(self): + #尝试最多 3 次获取视频的 cid + retry_count = 3 + for attempt in range(retry_count): + try: + response = requests.get(self.video_url, headers=self.headers, timeout=10) + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + continue + + html = response.content.decode() + cid = re.findall(r'("cid":)([0-9]+)', html) + if not cid: + print("未找到 cid") + continue + else: + return cid[0][-1] + except requests.exceptions.RequestException as e: + print(f"获取 cid 时出错: {e}") + print(f"第 {attempt + 1} 次重试获取 cid...") + time.sleep(2) + return None + + def get_content(self, xml_url): + #获取弹幕 XML 文件的内容 + try: + response = requests.get(xml_url, headers=self.headers, timeout=10) + if response.status_code == 200: + return response.content + else: + print(f"获取弹幕内容失败,状态码: {response.status_code}") + return None + except requests.exceptions.RequestException as e: + print(f"获取弹幕时出错: {e}") + return None + + def extract_danmu(self, content_str): + #解析XML内容,提取弹幕 + try: + html = etree.HTML(content_str) + danmu_list = html.xpath("//d/text()") + return danmu_list + except Exception as e: + print(f"解析弹幕时出错: {e}") + return [] + + def save(self, save_items): + #保存弹幕到文件 + output_dir = os.path.dirname(self.filename) + os.makedirs(output_dir, exist_ok=True) + + with open(self.filename, 'w', encoding='utf-8') as f: + lines = [item + '\n' for item in save_items] + f.writelines(lines) + print(f"弹幕已保存至 {self.filename}") + + def crawl(self): + #执行爬取流程 + cid = self.get_video_cid() + if cid is not None: + xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml" + content_str = self.get_content(xml_url) + if content_str: + danmu_lst = self.extract_danmu(content_str) + self.save(danmu_lst) + else: + print("视频没有有效的 cid,跳过此视频") + +def search_videos(query, max_results=350): + #搜索视频,最多返回 max_results 个结果 + search_url = "https://api.bilibili.com/x/web-interface/search/type" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + "cookie": "your cookie" #Cookie 有就行,内容随意 + } + + bv_list = [] + page = 1 + + while len(bv_list) < max_results: + params = { + 'keyword': query, + 'search_type': 'video', + 'order': 'totalrank', + 'page': page, + 'pagesize': 50 + } + + try: + response = requests.get(search_url, params=params, headers=headers, timeout=10) + if response.status_code == 200: + results = response.json() + if results['code'] == 0: + videos = results['data']['result'] + if not videos: + break + bv_list += [video['bvid'] for video in videos] + print(f"已抓取 {len(bv_list)} 个视频") + else: + print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}") + if '频繁' in results.get('message', ''): + print("限流,等待后重试") + time.sleep(random.uniform(5, 10)) + continue + break + else: + print(f"搜索请求失败,状态码: {response.status_code}") + break + except requests.exceptions.RequestException as e: + print(f"请求失败,错误: {e}") + time.sleep(random.uniform(2, 5)) + continue + + page += 1 + time.sleep(random.uniform(1, 3)) #防止请求过于频繁被禁止 + + bv_list = list(OrderedDict.fromkeys(bv_list)) #去重操作 + return bv_list[:max_results] + +def download_danmu(index, bv, filename): + #下载指定BV号视频的弹幕 + danmu_crawler = BiliBiliDanMu(bv, filename) + danmu_crawler.crawl() + +def getthread(): + #使用线程池并发下载弹幕 + with ThreadPoolExecutor(max_workers=10) as executor: + future_to_bv = { + executor.submit(download_danmu, index, bv, f'{output_dir}第{index + 1}个视频_{bv}.txt'): bv for index, bv in enumerate(bv_list) + } + for future in as_completed(future_to_bv): + bv = future_to_bv[future] + try: + future.result() + print(f"BV号 {bv} 的弹幕抓取完成") + except Exception as exc: + print(f"BV号 {bv} 的弹幕抓取时出错: {exc}") + +if __name__ == '__main__': + query = input("请输入搜索关键词: ") + bv_list = search_videos(query) + + #限制爬取的最大视频数量为 300 + bv_list = bv_list[:300] + + output_dir = 'E:/前端/软件工程/弹幕收集bark/' + os.makedirs(output_dir, exist_ok=True) + + getthread() \ No newline at end of file diff --git a/release/test_bilibili_danmu.py b/release/test_bilibili_danmu.py new file mode 100644 index 0000000..c5610e8 --- /dev/null +++ b/release/test_bilibili_danmu.py @@ -0,0 +1,102 @@ +import unittest +from unittest.mock import patch, MagicMock +from crawl_getthread import BiliBiliDanMu, search_videos, download_danmu, getthread +import os + +class TestBiliBiliDanMu(unittest.TestCase): + """ + 测试 BiliBiliDanMu 类的单元测试类。 + """ + + @patch('crawl_getthread.requests.get') + def test_get_video_cid_success(self, mock_get): + """ + 测试根据视频 BV 号获取弹幕 CID 成功。 + """ + mock_get.return_value = MagicMock(status_code=200, text='{"cid":123456}') + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertEqual(danmu_crawler.get_video_cid(), "123456") + + @patch('crawl_getthread.requests.get') + def test_get_video_cid_failure(self, mock_get): + """ + 测试根据视频 BV 号获取弹幕 CID 失败。 + """ + mock_get.return_value = MagicMock(status_code=404) + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertIsNone(danmu_crawler.get_video_cid()) + + @patch('crawl_getthread.requests.get') + def test_get_content_success(self, mock_get): + """ + 测试根据弹幕 XML URL 获取弹幕内容成功。 + """ + mock_get.return_value = MagicMock(status_code=200, text='弹幕内容') + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertEqual(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml"), '弹幕内容') + + @patch('crawl_getthread.requests.get') + def test_get_content_failure(self, mock_get): + """ + 测试根据弹幕 XML URL 获取弹幕内容失败。 + """ + mock_get.return_value = MagicMock(status_code=404) + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertIsNone(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml")) + + def test_extract_danmu(self): + """ + 测试解析弹幕 XML 内容,提取弹幕文本。 + """ + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + content_str = '弹幕内容1弹幕内容2' + self.assertEqual(danmu_crawler.extract_danmu(content_str), ['弹幕内容1', '弹幕内容2']) + + def test_save(self): + """ + 测试保存弹幕到文件。 + """ + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + danmu_crawler.save(['弹幕内容1', '弹幕内容2']) + with open("test_output.txt", 'r', encoding='utf-8') as f: + self.assertEqual(f.readlines(), ['弹幕内容1\n', '弹幕内容2\n']) + os.remove("test_output.txt") # 清理测试文件 + + @patch('crawl_getthread.search_videos') + def test_search_videos_success(self, mock_search_videos): + """ + 测试根据关键词搜索视频并成功返回视频 ID 列表。 + """ + mock_search_videos.return_value = ['BV12345', 'BV67890'] + video_ids = search_videos("2024巴黎奥运会", max_results=2) + self.assertEqual(video_ids, ['BV12345', 'BV67890']) + + @patch('crawl_getthread.search_videos') + def test_search_videos_failure(self, mock_search_videos): + """ + 测试根据关键词搜索视频时请求失败。 + """ + mock_search_videos.return_value = [] + video_ids = search_videos("2024巴黎奥运会", max_results=2) + self.assertEqual(video_ids, []) + + @patch('crawl_getthread.download_danmu') + def test_download_danmu(self, mock_download): + """ + 测试下载指定 BV 号视频的弹幕。 + """ + mock_download.return_value = None + download_danmu(0, "BV12345", "test_output.txt") + mock_download.assert_called_with(0, "BV12345", "test_output.txt") + + @patch('crawl_getthread.getthread') + def test_getthread(self, mock_getthread): + """ + 测试使用线程池并发下载弹幕。 + """ + mock_getthread.return_value = None + getthread() + mock_getthread.assert_called_once() + +if __name__ == '__main__': + unittest.main() # 执行测试 \ No newline at end of file diff --git a/release/test_mywordcloud.py b/release/test_mywordcloud.py new file mode 100644 index 0000000..df104dc --- /dev/null +++ b/release/test_mywordcloud.py @@ -0,0 +1,84 @@ +import os +import jieba +from wordcloud import WordCloud +import matplotlib.pyplot as plt + +def generate_wordcloud(directory, output_file): + """ + 生成普通词云图。 + """ + text = "" + for filename in os.listdir(directory): + if filename.endswith('.txt'): + with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file: + text += file.read() + + words = jieba.cut(text) + + stop_words = set([ + "我", "你", "他", "她", "它", "是", "的", "了", "在", "吗", "啊", "吧", + "也", "有", "这", "那", "从", "为", "上", "下", "和", "与", "就", "不", + "中", "还", "要", "会", "能", "对", "着", "个", "把", "所以", "但", "也", + "所以", "从", "如", "她", "他", "它", "还", "也", "吗", "啊", "哦", "?", "!", ",", "。" + ]) + + filtered_words = [word for word in words if word.strip() and word not in stop_words] + + word_freq = {} + for word in filtered_words: + word_freq[word] = word_freq.get(word, 0) + 1 + + wordcloud = WordCloud(font_path='simsun.ttc', width=800, height=400, background_color='white').generate_from_frequencies(word_freq) + + plt.figure(figsize=(10, 5)) + plt.imshow(wordcloud, interpolation='bilinear') + plt.axis("off") + plt.savefig(output_file) + plt.close() + +def generate_trophy_wordcloud(directory, output_file): + """ + 生成奖杯词云图。 + """ + from wordcloud import WordCloud, STOPWORDS + import matplotlib.pyplot as plt + import numpy as np + import jieba.posseg as pseg + from collections import Counter + from PIL import Image + from matplotlib import colors + + text = "" + for filename in os.listdir(directory): + if filename.endswith('.txt'): + with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file: + text += file.read() + + words = pseg.cut(text) + report_words = [word for word, flag in words if (len(word) >= 2) and ('n' in flag)] + + result = Counter(report_words).most_common(300) + content = dict(result) + + stopwords = set(STOPWORDS) + stopwords.update(["我", "你", "他", "她", "它", "是", "的", "了", "在", "吗", "啊", "吧", + "也", "有", "这", "那", "从", "为", "上", "下", "和", "与", "就", "不", + "中", "还", "要", "会", "能", "对", "着", "个", "把", "所以", "但", "也", + "所以", "从", "如", "她", "他", "它", "还", "也", "吗", "啊", "哦", "?", "!", ",", "。"]) + + background = Image.open("E:/前端/奖杯4.png").convert('RGB') + mask = np.array(background) + + font_path = r"C:\Windows\Fonts\STLITI.TTF" + + max_font_size = 100 + min_font_size = 10 + color_list = ['#FF274B'] + colormap = colors.ListedColormap(color_list) + + wordcloud = WordCloud(scale=4, font_path=font_path, colormap=colormap, width=1600, height=900, background_color='white', stopwords=stopwords, mask=mask, max_font_size=max_font_size, min_font_size=min_font_size).generate_from_frequencies(content) + + plt.imshow(wordcloud, interpolation='bilinear') + plt.axis('off') + plt.savefig(output_file) + plt.close() \ No newline at end of file diff --git a/release/test_数据分析.py b/release/test_数据分析.py new file mode 100644 index 0000000..7f7b46c --- /dev/null +++ b/release/test_数据分析.py @@ -0,0 +1,61 @@ +import unittest +from unittest.mock import patch, mock_open +import os +from 数据分析 import read_danmu, filter_ai_related_danmu, count_danmu, save, print_top_n_danmu + +class TestAIDanmuAnalysis(unittest.TestCase): + """ + 测试 数据分析 模块的单元测试类。 + """ + + def setUp(self): + self.folder_path = 'test_data/弹幕收集按序' + self.output_file = 'test_output.xlsx' + + def test_read_danmu(self): + """ + 测试读取弹幕文件。 + """ + test_data = "弹幕1\n弹幕2\n" + with patch('builtins.open', new_callable=mock_open, read_data=test_data) as mock_file: + danmu_list = read_danmu(self.folder_path) + self.assertEqual(danmu_list, [('弹幕1', 'file1'), ('弹幕2', 'file1')]) + mock_file.assert_called_with(os.path.join(self.folder_path, 'file1.txt'), 'r', encoding='utf-8') + + def test_filter_ai_related_danmu(self): + """ + 测试筛选与 AI 相关的弹幕。 + """ + danmu_list = [('弹幕1', 'file1'), ('AI技术', 'file2')] + filtered_danmu = filter_ai_related_danmu(danmu_list, ["AI"]) + self.assertEqual(filtered_danmu, [('AI技术', 'file2')]) + + def test_count_danmu(self): + """ + 测试统计弹幕出现次数。 + """ + danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1')] + danmu_counter = count_danmu(danmu_list) + self.assertEqual(danmu_counter, {'弹幕1': 2, '弹幕2': 1}) + + def test_save(self): + """ + 测试将弹幕数据保存到 Excel 文件。 + """ + danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1')] + with patch('pandas.DataFrame.to_excel') as mock_to_excel: + save(danmu_list, self.output_file) + mock_to_excel.assert_called_with(self.output_file, index=False) + + def test_print_top_n_danmu(self): + """ + 测试打印数量排名前N的弹幕。 + """ + danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1'), ('弹幕3', 'file1')] + with patch('builtins.print') as mock_print: + print_top_n_danmu(danmu_list, top_n=2) + mock_print.assert_any_call('数量排名前 2 的弹幕:') + mock_print.assert_any_call('弹幕内容: 弹幕1, 出现次数: 2') + +if __name__ == '__main__': + unittest.main() # 执行测试 \ No newline at end of file