From f76c2954be57bef50bb7131238ad8c21f2cee458 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9D=8E=E7=8E=B2?= <1971708054@qq.com> Date: Wed, 18 Sep 2024 20:51:04 +0800 Subject: [PATCH] =?UTF-8?q?2024=E5=B7=B4=E9=BB=8E=E5=A5=A5=E8=BF=90?= =?UTF-8?q?=E4=BC=9A=E7=88=AC=E8=99=AB=E7=9B=B8=E5=85=B3=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../__pycache__/test_getthread.cpython-37.pyc | Bin 0 -> 5961 bytes release/crawl_1.py | 77 ++++++++ release/crawl_getfor.py | 169 +++++++++++++++++ release/crawl_getthread.py | 175 ++++++++++++++++++ release/test_bilibili_danmu.py | 102 ++++++++++ release/test_mywordcloud.py | 84 +++++++++ release/test_数据分析.py | 61 ++++++ 7 files changed, 668 insertions(+) create mode 100644 release/__pycache__/test_getthread.cpython-37.pyc create mode 100644 release/crawl_1.py create mode 100644 release/crawl_getfor.py create mode 100644 release/crawl_getthread.py create mode 100644 release/test_bilibili_danmu.py create mode 100644 release/test_mywordcloud.py create mode 100644 release/test_数据分析.py diff --git a/release/__pycache__/test_getthread.cpython-37.pyc b/release/__pycache__/test_getthread.cpython-37.pyc new file mode 100644 index 0000000000000000000000000000000000000000..e53ded50065b8dd4c42bdff7307793e21648d358 GIT binary patch literal 5961 zcmdT|Nq8K^74CJWXSI-xjai!|!~^z>3C4g}0>%Ws0K&lzb{dmOYpNuT)H5SjcU#hA z#1|tkn8d+sAqj?0VhITWoB+WP61e8eDTkc;8b-@0CvxDD{I7auEE$q}M(XOS>RMm@ z@4f%kj$AIS;FtS+ad_}TMfnF6x<3P%JMj1)B4G+sJ;jONs-vQ;d0I_(bTzIs97DD- z9aFv&P6BV;OV%vMs->KiY-f1sTE@w!$~_7*S>mw560@3iwSaI?! z%W`h$v*@5g&BU&~b1cSPH$IBVH?&N^<4ozK^?ezx|6rYd~R ztb+N}-Adv7WvpHqEU1ym1HpMgi~9GB3huI9jfQvk6t9FqLqu8EFIO7136BSyp>gIx z!MNaTXSEVkXoT)hL*@=V{;f#3LXvPatVP3Wb*3`|ZzJ9tB#~qm$_bWYX}psx!?Ji= zEQgilquh?FSEXmCTYn&|=opcH^fvL_g~uO2(o*NJnR!*LZ($;)(6(n(rnl4$N=qrL zEe)y0z7u~D&o+b#Cg_H70G&It%=FCV(E8kBCh{>I|< z+wD#F-Mi<3`#0NOb)4Jx@XC0jVBa+=8Z}gx7;8`4At*`H3iTXQjd?g65j|z2$zlJ`%bkykJCWKX66hPgaA`C?&;E4uuz4 z!&Q&h-5QUQqkst)enF26pL@fRHgr&AA~9}GvH085`Ke zXrFm~@z+1z|G+f$uy={Z7_S8T1^2k?^Zfz$gYpOu0tvz5#B^kp%hh@{D3>!B%%@=$ zEv;tNw3^T~mHwLR2_y7KoD{nqDk?R`FNueGOThdc8K%ytQ-2KroLP0J^5`cs+VpUs zjsYfPIvxWc(og6!dP^TOnK7=5ok4<`RC-Z)Nt;zwWipRaa?YApW{e;;mu88UF|W;- zEz=)E+hp9Po^6@T+NiM9Mu2UvQr~7Mm?txq9Z!q5RAnZiD$RbhPqz}?k@u3lz?a7o z`;_k~b#0%5@0pgyvKy2a6_$HRr?G?FT%P6Ukvov59)R+i!DZ5nWwKZWP2}*3l0mh` z8)2~g7nDL+FlFiVnT6j!-hS=(OYgt_&p-WS@yQPsespMY?$ChU?AvrjrOK`tC~SIU z>z11zy|xh27z?M~TsZzo`-$J#_$WxE%m2nNTu+1j^i2DypV0?<;e`*{$Nsqd+%E>~ zkSyckTW{IR)H>XN#9nx7e)-smrPI%!?NTfj1)WS`O=Jq7jz^Z@2SNbAk90sl(2>R4 z$N-5!NEV3k$ea*WLU5%~4+zQ$#$~-GfXHE}rR#Z7nokj|p&owJ_fS0S-QB9lAhATI z?{Pj+NQyiraxwx_PGgF&9ym!i2zYHGpe6Ye7=%a0Xi)Pa9ea&bF5w*~=b>|y!+OiH zilD&-sPeBuqG*Pi2SV2Yt$nH{aciiSYG{3^v+?BB=0&T~yK1Bg87BIXNa3~ofON=D}!a6_*7X=WS1n`wD z0Yd#Ty$eDKvBaQQhq*iG66Yf?2nXTSNaB-i|LLRlN6)v99%;Yx(O02N!eM386~HN2 zf*>j`qJ*#~E~eyCN-m+~3QDd-QZVG9Maij}M@C7|0z`olvJfjyCPVV7BtK3ey(^${fyMUM1Pj;T({I(b$dS3Uo;KUP#xF z(uGNB8C~g);2a=b<1u5EboIezx>RB%vgDfzv7p{~b>Zccu}oJA$`k|)#~`2Lm{Suj zndlzArLYQSETT+j8)ZKb;_K8a&D?q|EPGYde3i&@m|wV+pe);wANNT^NN8#N8G!Pa zVY{l&Dv_noRl=4bL!h0Lc9w*O5&%lo7isaIFD9jCqr6vg6G*kQRb9;uT}GYp2e+$3x3uol(&NY6Wv zBuTw?1@ z8V#LLnpdyJmhT?q#aq2b#r6E#k?D2`*P?kxf6l!7*{3hH-~Rd1vG?qP6=@AWGUO>I zSyhnpm3}s|YVJ5k4Zb!ecs)u_3arbk*15kD1WuafEYp(xC=Guw5#n^KB2tGVGj4s3 z+8ET~TeJuA=ZcnB^IAWSeVy9u|MG5nvLqEDCnV2%7cwnnPJxeom2!+72!^LB!LcPv0!NWr1pTkXw9{&g(zDO7E;7^ z=yc2FC|xes8Z7iEpDmXUgsvBV5oE#Sa?f4q4*GC*Au>nqNhFYv<&uYqmE^1qc+yr{ zep&uL{B%!c$?Nv4+%xzBvZN6q>hkAP=8Qf9>&2`Y^Q1bp4>Zb{#i5QXCslE<1+oJ= zV-l4|%f$5GB(c(3iI&cCEPuk7Nw$(u^EL2OU$l~Q2H;_}EY=5_bRO#7M7|&Swczln zR?2^T7sf{bbUqA0iVVSxM7d11KAsnpXhTei>Vv4gARY;|8(S7E@`dc8c@4$^D@4F^ zai{K*d2Ku){)P6JN-~w0O9s|lie1KX^8h30oxMw`lTWm1tzF}?i@fKzC520mWyaIk z?><=gG%WnpXnT1p-EA3kN4$bv)*Z1!?9z{=yH=mbA$DW-T3hVOxb*>huM%X5H9@j7A~?eHE-&Z@Z7?(L}iA)cuwPd-ch zJ*1-j*@th&#xNv{w{Y^*;`^`k7#xWG(rJWmugXv%qF2SBoDX{Y-ui$p*n3FMf0=QbIb4P)lN!jzM8_Oe=}1i?1QULAeO)Yfdt( zSBD#-c9wBQhU81MHblUP+W{B9kcmu0EMhv!)ZD3Ze9At&tyDQlOye+Y=wM8@?xUHA z8c4z#CA&Qm@mYN}9b0vr`_b7azk=AJ6aQ+er5lJT`d5@=erq94&VsN}X}xB{0a?gt zGX6-Y>*3G))peSMZ+TTbf-!q{EJTQUj+`hy1Vjn3z92e(cH_GD@rS3Ic7L(0q(9Ng0DaIGy$GbuW`cW-$5n7f1 zGLitE3B*x->dE8EF=ZI{J-A%50yRK@E(xPDMC?KKDhD>hx+lQgpM+hfxafjjo1~16q#x;6@=kPgVDmoo+q;64O37HASV?sx zHeg!DaaGbY0kjQb)r3(0CL~p2om=;7GcuTX0=$WsQcDYHfmk{NXF?8a390^4V%~T* zW~N3+VH&b;ZUs_NSTV;(4nBm7KH#Mk1M9k@qJ%>Or+)D zw2bcfh|~TO<=Dum%}WqhbuMYnQrTWhsqH%+Za;O#mKOy!m{%u?ktV18&QBJeI2lr) zckZW^VNO!e)}j=zhhV%eJlgkA76b+HASE&?+=5)eaI(W;5DH!n8s(vbP9iSJokcos z17b6(iZb=1okgkM7G6M6Qh!CGi=`~o!GfAlXxOKU3N;2XtKn{~T<+eYEPe89`^f7Z zGkN5_<>%g6I(=fmu5Jcp3W+D)Jy41-zDg@EzDgZ2Jomx!i${lCFtjo0Bkvd<$gGU@#EmG6t0>t+U-}W&RYD<%0LNSPrLd%MA_RS&hkG@;{}5!eVjU$F zj}bt75_cgXg+(+V_ELqEE`dYPwdx98?NQ0dx;18{x6|E@yyPHeo5d3|T*&UB^mfa% M^0G$H(iMpIAJL~~ssI20 literal 0 HcmV?d00001 diff --git a/release/crawl_1.py b/release/crawl_1.py new file mode 100644 index 0000000..8a80c37 --- /dev/null +++ b/release/crawl_1.py @@ -0,0 +1,77 @@ +import requests +import re +from lxml import etree +import os + +class BiliBiliDanMu: + def __init__(self, bv, filename): + # 自动处理 BV 号,确保没有重复的 "BV" 前缀 + if bv.startswith("BV"): + bv = bv[2:] + # 根据 bv 号构造要爬取的视频 URL 地址 + self.video_url = "https://bilibili.com/video/BV" + bv + self.filename = filename + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)\ + AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.83 Safari/537.36 Edg/85.0.564.44" + } + + # 获取视频的 cid + def get_video_cid(self): + response = requests.get(self.video_url, headers=self.headers, timeout=10) + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + return None + + html = response.content.decode() + print(f"HTML 内容(前500字符): {html[:500]}") # 打印部分 HTML 内容用于调试 + cid = re.findall(r'("cid":)([0-9]+)', html) + # 有的视频没有这个字段,我们跳过它 + if len(cid) == 0: + print("未找到 cid") + return None + else: + return cid[0][-1] + + # 获取请求弹幕 XML 文件返回的内容 + def get_content(self, xml_url): + response = requests.get(xml_url, headers=self.headers, timeout=10) + return response.content + + # 解析获取到的内容,得到包含视频所有弹幕的列表 + def extract_danmu(self, content_str): + html = etree.HTML(content_str) + danmu_list = html.xpath("//d/text()") + return danmu_list + + # 将弹幕逐行写入并保存为 txt 文件 + def save(self, save_items): + # 确保输出目录存在 + output_dir = os.path.dirname(self.filename) + os.makedirs(output_dir, exist_ok=True) # 自动创建目录 + + with open(self.filename, 'w', encoding='utf-8') as f: + lines = [] + for item in save_items: + lines.append(item + '\n') + f.writelines(lines) + print(f"弹幕已保存至 {self.filename}") + + # 爬虫的过程封装 + def crawl(self): + cid = self.get_video_cid() + # 跳过没有 cid 字段的视频 + if cid is not None: + xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml" + content_str = self.get_content(xml_url) + danmu_lst = self.extract_danmu(content_str) + self.save(danmu_lst) + else: + print("视频没有有效的 cid,跳过此视频") + +if __name__ == '__main__': + bv = input("请输入视频的 bv 号: ") + # 处理文件名,确保路径正确 + filename = 'E:/前端/软件工程/{}.txt'.format(str(bv)) + dm = BiliBiliDanMu(bv, filename) + dm.crawl() \ No newline at end of file diff --git a/release/crawl_getfor.py b/release/crawl_getfor.py new file mode 100644 index 0000000..7901efb --- /dev/null +++ b/release/crawl_getfor.py @@ -0,0 +1,169 @@ +# import cProfile +import requests +import re +from lxml import etree +import os +import time +import random +from collections import OrderedDict + +class BiliBiliDanMu: + def __init__(self, bv, filename): + if bv.startswith("BV"): + bv = bv[2:] + self.video_url = "https://bilibili.com/video/BV" + bv + self.filename = filename + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + } + + def get_video_cid(self): + retry_count = 3 + for attempt in range(retry_count): + try: + response = requests.get(self.video_url, headers=self.headers, timeout=10) + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + continue + + html = response.content.decode() + cid = re.findall(r'("cid":)([0-9]+)', html) + if not cid: + print("未找到 cid") + continue + else: + return cid[0][-1] + except requests.exceptions.RequestException as e: + print(f"获取 cid 时出错: {e}") + print(f"第 {attempt + 1} 次重试获取 cid...") + time.sleep(2) + return None + + def get_content(self, xml_url): + try: + response = requests.get(xml_url, headers=self.headers, timeout=10) + if response.status_code == 200: + return response.content + else: + print(f"获取弹幕内容失败,状态码: {response.status_code}") + return None + except requests.exceptions.RequestException as e: + print(f"获取弹幕时出错: {e}") + return None + + def extract_danmu(self, content_str): + try: + html = etree.HTML(content_str) + danmu_list = html.xpath("//d/text()") + return danmu_list + except Exception as e: + print(f"解析弹幕时出错: {e}") + return [] + + def save(self, save_items): + output_dir = os.path.dirname(self.filename) + os.makedirs(output_dir, exist_ok=True) + + with open(self.filename, 'w', encoding='utf-8') as f: + lines = [item + '\n' for item in save_items] + f.writelines(lines) + print(f"弹幕已保存至 {self.filename}") + + def crawl(self): + cid = self.get_video_cid() + if cid is not None: + xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml" + content_str = self.get_content(xml_url) + if content_str: + danmu_lst = self.extract_danmu(content_str) + self.save(danmu_lst) + else: + print("视频没有有效的 cid,跳过此视频") + + +def search_videos(query, max_results=350): + search_url = "https://api.bilibili.com/x/web-interface/search/type" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + "cookie": "your cookie" # 替换为实际的 Cookie + } + + bv_list = [] + page = 1 + + while len(bv_list) < max_results: + params = { + 'keyword': query, + 'search_type': 'video', + 'order': 'totalrank', + 'page': page, + 'pagesize': 50 + } + + try: + response = requests.get(search_url, params=params, headers=headers, timeout=10) + if response.status_code == 200: + results = response.json() + if results['code'] == 0: + videos = results['data']['result'] + if not videos: + break + bv_list += [video['bvid'] for video in videos] + print(f"已抓取 {len(bv_list)} 个视频") + else: + print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}") + if '频繁' in results.get('message', ''): + print("限流,等待后重试") + time.sleep(random.uniform(5, 10)) + continue + break + else: + print(f"搜索请求失败,状态码: {response.status_code}") + break + except requests.exceptions.RequestException as e: + print(f"请求失败,错误: {e}") + time.sleep(random.uniform(2, 5)) + continue + + page += 1 + time.sleep(random.uniform(1, 3)) + + bv_list = list(OrderedDict.fromkeys(bv_list)) # 去重操作 + return bv_list[:max_results] + + +def download_danmu(index, bv, filename): + danmu_crawler = BiliBiliDanMu(bv, filename) + danmu_crawler.crawl() + +def getfor(): + for index, bv in enumerate(bv_list): + filename = f'{output_dir}第{index + 1}个视频_{bv}.txt' + print(f"正在抓取 BV号 {bv} 的弹幕...") + download_danmu(index, bv, filename) + print(f"BV号 {bv} 的弹幕抓取完成") + + +if __name__ == '__main__': + query = input("请输入搜索关键词: ") + bv_list = search_videos(query) + + # 限制爬取的最大视频数量为300 + bv_list = bv_list[:300] + + output_dir = 'E:/前端/软件工程/弹幕收集first/' + os.makedirs(output_dir, exist_ok=True) + + # 依次抓取每个视频的弹幕 + getfor() + # for index, bv in enumerate(bv_list): + # filename = f'{output_dir}第{index + 1}个视频_{bv}.txt' + # print(f"正在抓取 BV号 {bv} 的弹幕...") + # download_danmu(index, bv, filename) + # print(f"BV号 {bv} 的弹幕抓取完成") \ No newline at end of file diff --git a/release/crawl_getthread.py b/release/crawl_getthread.py new file mode 100644 index 0000000..067d180 --- /dev/null +++ b/release/crawl_getthread.py @@ -0,0 +1,175 @@ +import requests +import re +from lxml import etree +import os +import time +import random +from concurrent.futures import ThreadPoolExecutor, as_completed +from collections import OrderedDict + +class BiliBiliDanMu: + def __init__(self, bv, filename): + #处理输入的 BV 号,确保是正确格式 + if bv.startswith("BV"): + bv = bv[2:] + self.video_url = "https://bilibili.com/video/BV" + bv + self.filename = filename + self.headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + } + + def get_video_cid(self): + #尝试最多 3 次获取视频的 cid + retry_count = 3 + for attempt in range(retry_count): + try: + response = requests.get(self.video_url, headers=self.headers, timeout=10) + if response.status_code != 200: + print(f"请求失败,状态码: {response.status_code}") + continue + + html = response.content.decode() + cid = re.findall(r'("cid":)([0-9]+)', html) + if not cid: + print("未找到 cid") + continue + else: + return cid[0][-1] + except requests.exceptions.RequestException as e: + print(f"获取 cid 时出错: {e}") + print(f"第 {attempt + 1} 次重试获取 cid...") + time.sleep(2) + return None + + def get_content(self, xml_url): + #获取弹幕 XML 文件的内容 + try: + response = requests.get(xml_url, headers=self.headers, timeout=10) + if response.status_code == 200: + return response.content + else: + print(f"获取弹幕内容失败,状态码: {response.status_code}") + return None + except requests.exceptions.RequestException as e: + print(f"获取弹幕时出错: {e}") + return None + + def extract_danmu(self, content_str): + #解析XML内容,提取弹幕 + try: + html = etree.HTML(content_str) + danmu_list = html.xpath("//d/text()") + return danmu_list + except Exception as e: + print(f"解析弹幕时出错: {e}") + return [] + + def save(self, save_items): + #保存弹幕到文件 + output_dir = os.path.dirname(self.filename) + os.makedirs(output_dir, exist_ok=True) + + with open(self.filename, 'w', encoding='utf-8') as f: + lines = [item + '\n' for item in save_items] + f.writelines(lines) + print(f"弹幕已保存至 {self.filename}") + + def crawl(self): + #执行爬取流程 + cid = self.get_video_cid() + if cid is not None: + xml_url = "http://comment.bilibili.com/" + str(cid) + ".xml" + content_str = self.get_content(xml_url) + if content_str: + danmu_lst = self.extract_danmu(content_str) + self.save(danmu_lst) + else: + print("视频没有有效的 cid,跳过此视频") + +def search_videos(query, max_results=350): + #搜索视频,最多返回 max_results 个结果 + search_url = "https://api.bilibili.com/x/web-interface/search/type" + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36", + "Referer": "https://www.bilibili.com/", + "Accept": "application/json, text/plain, */*", + "Accept-Language": "zh-CN,zh;q=0.9", + "cookie": "your cookie" #Cookie 有就行,内容随意 + } + + bv_list = [] + page = 1 + + while len(bv_list) < max_results: + params = { + 'keyword': query, + 'search_type': 'video', + 'order': 'totalrank', + 'page': page, + 'pagesize': 50 + } + + try: + response = requests.get(search_url, params=params, headers=headers, timeout=10) + if response.status_code == 200: + results = response.json() + if results['code'] == 0: + videos = results['data']['result'] + if not videos: + break + bv_list += [video['bvid'] for video in videos] + print(f"已抓取 {len(bv_list)} 个视频") + else: + print(f"搜索失败,错误代码: {results['code']},错误信息: {results.get('message', '无详细信息')}") + if '频繁' in results.get('message', ''): + print("限流,等待后重试") + time.sleep(random.uniform(5, 10)) + continue + break + else: + print(f"搜索请求失败,状态码: {response.status_code}") + break + except requests.exceptions.RequestException as e: + print(f"请求失败,错误: {e}") + time.sleep(random.uniform(2, 5)) + continue + + page += 1 + time.sleep(random.uniform(1, 3)) #防止请求过于频繁被禁止 + + bv_list = list(OrderedDict.fromkeys(bv_list)) #去重操作 + return bv_list[:max_results] + +def download_danmu(index, bv, filename): + #下载指定BV号视频的弹幕 + danmu_crawler = BiliBiliDanMu(bv, filename) + danmu_crawler.crawl() + +def getthread(): + #使用线程池并发下载弹幕 + with ThreadPoolExecutor(max_workers=10) as executor: + future_to_bv = { + executor.submit(download_danmu, index, bv, f'{output_dir}第{index + 1}个视频_{bv}.txt'): bv for index, bv in enumerate(bv_list) + } + for future in as_completed(future_to_bv): + bv = future_to_bv[future] + try: + future.result() + print(f"BV号 {bv} 的弹幕抓取完成") + except Exception as exc: + print(f"BV号 {bv} 的弹幕抓取时出错: {exc}") + +if __name__ == '__main__': + query = input("请输入搜索关键词: ") + bv_list = search_videos(query) + + #限制爬取的最大视频数量为 300 + bv_list = bv_list[:300] + + output_dir = 'E:/前端/软件工程/弹幕收集bark/' + os.makedirs(output_dir, exist_ok=True) + + getthread() \ No newline at end of file diff --git a/release/test_bilibili_danmu.py b/release/test_bilibili_danmu.py new file mode 100644 index 0000000..c5610e8 --- /dev/null +++ b/release/test_bilibili_danmu.py @@ -0,0 +1,102 @@ +import unittest +from unittest.mock import patch, MagicMock +from crawl_getthread import BiliBiliDanMu, search_videos, download_danmu, getthread +import os + +class TestBiliBiliDanMu(unittest.TestCase): + """ + 测试 BiliBiliDanMu 类的单元测试类。 + """ + + @patch('crawl_getthread.requests.get') + def test_get_video_cid_success(self, mock_get): + """ + 测试根据视频 BV 号获取弹幕 CID 成功。 + """ + mock_get.return_value = MagicMock(status_code=200, text='{"cid":123456}') + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertEqual(danmu_crawler.get_video_cid(), "123456") + + @patch('crawl_getthread.requests.get') + def test_get_video_cid_failure(self, mock_get): + """ + 测试根据视频 BV 号获取弹幕 CID 失败。 + """ + mock_get.return_value = MagicMock(status_code=404) + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertIsNone(danmu_crawler.get_video_cid()) + + @patch('crawl_getthread.requests.get') + def test_get_content_success(self, mock_get): + """ + 测试根据弹幕 XML URL 获取弹幕内容成功。 + """ + mock_get.return_value = MagicMock(status_code=200, text='弹幕内容') + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertEqual(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml"), '弹幕内容') + + @patch('crawl_getthread.requests.get') + def test_get_content_failure(self, mock_get): + """ + 测试根据弹幕 XML URL 获取弹幕内容失败。 + """ + mock_get.return_value = MagicMock(status_code=404) + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + self.assertIsNone(danmu_crawler.get_content("http://comment.bilibili.com/123456.xml")) + + def test_extract_danmu(self): + """ + 测试解析弹幕 XML 内容,提取弹幕文本。 + """ + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + content_str = '弹幕内容1弹幕内容2' + self.assertEqual(danmu_crawler.extract_danmu(content_str), ['弹幕内容1', '弹幕内容2']) + + def test_save(self): + """ + 测试保存弹幕到文件。 + """ + danmu_crawler = BiliBiliDanMu("BV12345", "test_output.txt") + danmu_crawler.save(['弹幕内容1', '弹幕内容2']) + with open("test_output.txt", 'r', encoding='utf-8') as f: + self.assertEqual(f.readlines(), ['弹幕内容1\n', '弹幕内容2\n']) + os.remove("test_output.txt") # 清理测试文件 + + @patch('crawl_getthread.search_videos') + def test_search_videos_success(self, mock_search_videos): + """ + 测试根据关键词搜索视频并成功返回视频 ID 列表。 + """ + mock_search_videos.return_value = ['BV12345', 'BV67890'] + video_ids = search_videos("2024巴黎奥运会", max_results=2) + self.assertEqual(video_ids, ['BV12345', 'BV67890']) + + @patch('crawl_getthread.search_videos') + def test_search_videos_failure(self, mock_search_videos): + """ + 测试根据关键词搜索视频时请求失败。 + """ + mock_search_videos.return_value = [] + video_ids = search_videos("2024巴黎奥运会", max_results=2) + self.assertEqual(video_ids, []) + + @patch('crawl_getthread.download_danmu') + def test_download_danmu(self, mock_download): + """ + 测试下载指定 BV 号视频的弹幕。 + """ + mock_download.return_value = None + download_danmu(0, "BV12345", "test_output.txt") + mock_download.assert_called_with(0, "BV12345", "test_output.txt") + + @patch('crawl_getthread.getthread') + def test_getthread(self, mock_getthread): + """ + 测试使用线程池并发下载弹幕。 + """ + mock_getthread.return_value = None + getthread() + mock_getthread.assert_called_once() + +if __name__ == '__main__': + unittest.main() # 执行测试 \ No newline at end of file diff --git a/release/test_mywordcloud.py b/release/test_mywordcloud.py new file mode 100644 index 0000000..df104dc --- /dev/null +++ b/release/test_mywordcloud.py @@ -0,0 +1,84 @@ +import os +import jieba +from wordcloud import WordCloud +import matplotlib.pyplot as plt + +def generate_wordcloud(directory, output_file): + """ + 生成普通词云图。 + """ + text = "" + for filename in os.listdir(directory): + if filename.endswith('.txt'): + with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file: + text += file.read() + + words = jieba.cut(text) + + stop_words = set([ + "我", "你", "他", "她", "它", "是", "的", "了", "在", "吗", "啊", "吧", + "也", "有", "这", "那", "从", "为", "上", "下", "和", "与", "就", "不", + "中", "还", "要", "会", "能", "对", "着", "个", "把", "所以", "但", "也", + "所以", "从", "如", "她", "他", "它", "还", "也", "吗", "啊", "哦", "?", "!", ",", "。" + ]) + + filtered_words = [word for word in words if word.strip() and word not in stop_words] + + word_freq = {} + for word in filtered_words: + word_freq[word] = word_freq.get(word, 0) + 1 + + wordcloud = WordCloud(font_path='simsun.ttc', width=800, height=400, background_color='white').generate_from_frequencies(word_freq) + + plt.figure(figsize=(10, 5)) + plt.imshow(wordcloud, interpolation='bilinear') + plt.axis("off") + plt.savefig(output_file) + plt.close() + +def generate_trophy_wordcloud(directory, output_file): + """ + 生成奖杯词云图。 + """ + from wordcloud import WordCloud, STOPWORDS + import matplotlib.pyplot as plt + import numpy as np + import jieba.posseg as pseg + from collections import Counter + from PIL import Image + from matplotlib import colors + + text = "" + for filename in os.listdir(directory): + if filename.endswith('.txt'): + with open(os.path.join(directory, filename), 'r', encoding='utf-8') as file: + text += file.read() + + words = pseg.cut(text) + report_words = [word for word, flag in words if (len(word) >= 2) and ('n' in flag)] + + result = Counter(report_words).most_common(300) + content = dict(result) + + stopwords = set(STOPWORDS) + stopwords.update(["我", "你", "他", "她", "它", "是", "的", "了", "在", "吗", "啊", "吧", + "也", "有", "这", "那", "从", "为", "上", "下", "和", "与", "就", "不", + "中", "还", "要", "会", "能", "对", "着", "个", "把", "所以", "但", "也", + "所以", "从", "如", "她", "他", "它", "还", "也", "吗", "啊", "哦", "?", "!", ",", "。"]) + + background = Image.open("E:/前端/奖杯4.png").convert('RGB') + mask = np.array(background) + + font_path = r"C:\Windows\Fonts\STLITI.TTF" + + max_font_size = 100 + min_font_size = 10 + color_list = ['#FF274B'] + colormap = colors.ListedColormap(color_list) + + wordcloud = WordCloud(scale=4, font_path=font_path, colormap=colormap, width=1600, height=900, background_color='white', stopwords=stopwords, mask=mask, max_font_size=max_font_size, min_font_size=min_font_size).generate_from_frequencies(content) + + plt.imshow(wordcloud, interpolation='bilinear') + plt.axis('off') + plt.savefig(output_file) + plt.close() \ No newline at end of file diff --git a/release/test_数据分析.py b/release/test_数据分析.py new file mode 100644 index 0000000..7f7b46c --- /dev/null +++ b/release/test_数据分析.py @@ -0,0 +1,61 @@ +import unittest +from unittest.mock import patch, mock_open +import os +from 数据分析 import read_danmu, filter_ai_related_danmu, count_danmu, save, print_top_n_danmu + +class TestAIDanmuAnalysis(unittest.TestCase): + """ + 测试 数据分析 模块的单元测试类。 + """ + + def setUp(self): + self.folder_path = 'test_data/弹幕收集按序' + self.output_file = 'test_output.xlsx' + + def test_read_danmu(self): + """ + 测试读取弹幕文件。 + """ + test_data = "弹幕1\n弹幕2\n" + with patch('builtins.open', new_callable=mock_open, read_data=test_data) as mock_file: + danmu_list = read_danmu(self.folder_path) + self.assertEqual(danmu_list, [('弹幕1', 'file1'), ('弹幕2', 'file1')]) + mock_file.assert_called_with(os.path.join(self.folder_path, 'file1.txt'), 'r', encoding='utf-8') + + def test_filter_ai_related_danmu(self): + """ + 测试筛选与 AI 相关的弹幕。 + """ + danmu_list = [('弹幕1', 'file1'), ('AI技术', 'file2')] + filtered_danmu = filter_ai_related_danmu(danmu_list, ["AI"]) + self.assertEqual(filtered_danmu, [('AI技术', 'file2')]) + + def test_count_danmu(self): + """ + 测试统计弹幕出现次数。 + """ + danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1')] + danmu_counter = count_danmu(danmu_list) + self.assertEqual(danmu_counter, {'弹幕1': 2, '弹幕2': 1}) + + def test_save(self): + """ + 测试将弹幕数据保存到 Excel 文件。 + """ + danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1')] + with patch('pandas.DataFrame.to_excel') as mock_to_excel: + save(danmu_list, self.output_file) + mock_to_excel.assert_called_with(self.output_file, index=False) + + def test_print_top_n_danmu(self): + """ + 测试打印数量排名前N的弹幕。 + """ + danmu_list = [('弹幕1', 'file1'), ('弹幕1', 'file2'), ('弹幕2', 'file1'), ('弹幕3', 'file1')] + with patch('builtins.print') as mock_print: + print_top_n_danmu(danmu_list, top_n=2) + mock_print.assert_any_call('数量排名前 2 的弹幕:') + mock_print.assert_any_call('弹幕内容: 弹幕1, 出现次数: 2') + +if __name__ == '__main__': + unittest.main() # 执行测试 \ No newline at end of file