|
|
import argparse
|
|
|
import logging
|
|
|
import time
|
|
|
import pandas as pd
|
|
|
import jieba
|
|
|
import generate
|
|
|
import dmk
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
import os
|
|
|
# 忽略jieba产生的警告
|
|
|
logging.getLogger("jieba").setLevel(logging.ERROR)
|
|
|
|
|
|
# 利用多线程获取所有弹幕
|
|
|
def fetch_all_danmakus(all_danmakus, video_ids): #这里采用5线程
|
|
|
with ThreadPoolExecutor(max_workers=5) as executor:
|
|
|
futures = [executor.submit(dmk.get_danmaku, bvid) for bvid in video_ids]
|
|
|
for future in as_completed(futures):
|
|
|
result = future.result()
|
|
|
all_danmakus.extend(result)
|
|
|
|
|
|
# 定义爬虫类,里面有六个变量以及两个函数,分别为获得弹幕以及生成词云
|
|
|
class DanmakuCrawler:
|
|
|
def __init__(self, keyword, max_result, top_num, image_style, width, height):
|
|
|
self.keyword = keyword
|
|
|
self.max_result = max_result
|
|
|
self.top_num = top_num
|
|
|
self.image_style = image_style
|
|
|
self.width = width
|
|
|
self.height = height
|
|
|
self.keywords = dmk.load_keywords(self.keyword)
|
|
|
self.stopwords = dmk.load_stopwords('stopwords.txt')
|
|
|
|
|
|
def get_all_danmakus(self):
|
|
|
video_ids = dmk.get_video_ids(self.keyword, self.max_result) # 获取所有视频bvid
|
|
|
if not video_ids:
|
|
|
print("No video ids retrieved; check the network and API responses.")
|
|
|
return []
|
|
|
all_danmakus = []
|
|
|
fetch_all_danmakus(all_danmakus, video_ids) # 获取所有视频中的弹幕
|
|
|
all_danmakus = dmk.filter_danmakus(all_danmakus, self.keywords) # 仅保留含有关键字的弹幕
|
|
|
print(all_danmakus)
|
|
|
if not all_danmakus:
|
|
|
print("No danmakus retrieved; unable to generate word cloud.")
|
|
|
danmaku_df = pd.DataFrame(all_danmakus, columns=['danmaku'])
|
|
|
top_danmakus = danmaku_df['danmaku'].value_counts().head(self.top_num) # 选取前top_num个弹幕
|
|
|
top_danmakus.to_excel(f'top_danmakus_{self.keyword}.xlsx') # 保存在表格中
|
|
|
return all_danmakus
|
|
|
|
|
|
def generate_word_cloud(self, danmakus):
|
|
|
danmaku_df = pd.DataFrame(danmakus, columns=['danmaku'])
|
|
|
top_danmakus = danmaku_df['danmaku'].value_counts().head(self.top_num)
|
|
|
danmaku_frequency = top_danmakus.to_dict()
|
|
|
keys = ' '.join(danmaku_frequency.keys())
|
|
|
resulting_string = ' '.join(jieba.cut(keys)) # 对弹幕进行分词处理,可以使词云图更美观
|
|
|
resulting_string = dmk.remove_stopwords(resulting_string, self.stopwords) # 去除停词表中无用的词汇
|
|
|
generate.generate_wordcloud(resulting_string, "C:\\Windows\\Fonts\\msyh.ttc", self.width, self.height, self.image_style)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
parser = argparse.ArgumentParser()
|
|
|
parser.add_argument('--keyword', default='2024巴黎奥运会') # 关键字参数,默认为“2024巴黎奥运会”
|
|
|
parser.add_argument('--max_result', type=int, default=300) # 最大读取视频参数,默认为 300
|
|
|
parser.add_argument('--top_num', type=int, default=8) # 选取前几个弹幕参数,默认为 8
|
|
|
parser.add_argument('--image_style', default='family') # 词云风格参数,默认为“family”
|
|
|
parser.add_argument('--width', type=int, default=300) # 词云宽度参数,默认为 300
|
|
|
parser.add_argument('--height', type=int, default=300) # 词云高度参数,默认为 300
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
crawler = DanmakuCrawler(args.keyword, args.max_result, args.top_num, args.image_style, args.width, args.height) # 定义爬虫类
|
|
|
danmakus = crawler.get_all_danmakus()
|
|
|
if danmakus:
|
|
|
crawler.generate_word_cloud(danmakus)
|