|
|
@ -0,0 +1,247 @@
|
|
|
|
|
|
|
|
import asyncio
|
|
|
|
|
|
|
|
import csv
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
|
|
|
import time
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from bilibili_api import search, sync, video, Credential, ass
|
|
|
|
|
|
|
|
from typing import List, Dict
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ASS_DIRECTORY_PATH = '弹幕文件'
|
|
|
|
|
|
|
|
WORDCLOUD_DIRECTORY_PATH = '词云图'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def get_videos_info_by_keyword(keyword: str) -> List[str]:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
根据搜索关键字获取按照时间排序的视频信息列表
|
|
|
|
|
|
|
|
:param keyword: 传入搜索关键字
|
|
|
|
|
|
|
|
:return: 返回关键字搜索结果列表
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
page_index = 1
|
|
|
|
|
|
|
|
info_list = []
|
|
|
|
|
|
|
|
while True:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
rtn_dict = await search.search_by_type(keyword, search_type=search.SearchObjectType.VIDEO,
|
|
|
|
|
|
|
|
order_type=search.OrderVideo.TOTALRANK,
|
|
|
|
|
|
|
|
order_sort=0, page=page_index, page_size=30)
|
|
|
|
|
|
|
|
await asyncio.sleep(3) # 休眠3秒
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
# print('get_videos_info_by_keyword函数执行出现异常:', str(e))
|
|
|
|
|
|
|
|
await asyncio.sleep(8) # 休眠8秒
|
|
|
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if 'result' in rtn_dict.keys() and page_index < 11:
|
|
|
|
|
|
|
|
info_list.extend(rtn_dict['result'])
|
|
|
|
|
|
|
|
for info in rtn_dict['result']:
|
|
|
|
|
|
|
|
print(info) # 显示打印信息
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
bvid_list = []
|
|
|
|
|
|
|
|
for info in info_list:
|
|
|
|
|
|
|
|
bvid_list.append(info['bvid'])
|
|
|
|
|
|
|
|
return bvid_list
|
|
|
|
|
|
|
|
page_index += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_directory(directory_name):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
检查目录是否存在,如果不存在则创建该目录。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
|
|
directory_name (str): 要检查的目录名称(包括路径,如果需要的话)。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
|
|
None
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
# 检查目录是否存在
|
|
|
|
|
|
|
|
if not os.path.exists(directory_name):
|
|
|
|
|
|
|
|
# 如果不存在,则创建目录
|
|
|
|
|
|
|
|
os.makedirs(directory_name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_single_ass_file(bvid: str, c: Credential, directory: str = '弹幕文件') -> bool:
|
|
|
|
|
|
|
|
v = video.Video(bvid, credential=c) # 初始化视频对象
|
|
|
|
|
|
|
|
full_file_path = directory + '\\' + bvid + '.ass'
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
create_directory(directory)
|
|
|
|
|
|
|
|
RETRY_COUNT = 3 # 尝试总数量
|
|
|
|
|
|
|
|
count = 1
|
|
|
|
|
|
|
|
while count <= RETRY_COUNT:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
sync(ass.make_ass_file_danmakus_protobuf(
|
|
|
|
|
|
|
|
obj=v, # 生成弹幕文件的对象
|
|
|
|
|
|
|
|
page=0, # 哪一个分 P (从 0 开始)
|
|
|
|
|
|
|
|
out=full_file_path, # 输出文件地址
|
|
|
|
|
|
|
|
credential=c
|
|
|
|
|
|
|
|
))
|
|
|
|
|
|
|
|
time.sleep(10) # 休眠10秒
|
|
|
|
|
|
|
|
print(full_file_path)
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
print(f'出现异常:{bvid}', str(e))
|
|
|
|
|
|
|
|
time.sleep(15) # 出现异常,休眠15秒
|
|
|
|
|
|
|
|
if 'total' in str(e):
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
count += 1
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_ass_files(bvid_list: List[str], c: Credential, directory: str = '弹幕文件') -> bool:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
传入搜索的bvid列表,生成ass弹幕数据
|
|
|
|
|
|
|
|
:param c: 凭证
|
|
|
|
|
|
|
|
:param directory: 存储弹幕文件的目录
|
|
|
|
|
|
|
|
:param bvid_list:包含bvid数据的列表
|
|
|
|
|
|
|
|
:return:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
for bvid in bvid_list:
|
|
|
|
|
|
|
|
generate_single_ass_file(bvid, c, directory)
|
|
|
|
|
|
|
|
time.sleep(3) # 休眠3秒
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_file_path_list_in_directory(directory):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
获取指定目录下所有文件的名称,并返回这些名称的列表。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
|
|
directory (str): 要搜索的目录的路径。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
|
|
list: 包含该目录下所有文件名称的列表。
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
file_path_list = []
|
|
|
|
|
|
|
|
# 遍历目录中的所有项
|
|
|
|
|
|
|
|
for item in os.listdir(directory):
|
|
|
|
|
|
|
|
# 构建完整的文件路径
|
|
|
|
|
|
|
|
full_path = os.path.join(directory, item)
|
|
|
|
|
|
|
|
# 检查是否是一个文件(不是目录)
|
|
|
|
|
|
|
|
if os.path.isfile(full_path):
|
|
|
|
|
|
|
|
file_path_list.append(full_path) # 将文件名添加到列表中
|
|
|
|
|
|
|
|
return file_path_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_data_after_marker(file_path, marker=')}'):
|
|
|
|
|
|
|
|
results = []
|
|
|
|
|
|
|
|
with open(file_path, 'r', encoding='utf-8') as file:
|
|
|
|
|
|
|
|
for line in file:
|
|
|
|
|
|
|
|
if line.startswith('Dialogue:'):
|
|
|
|
|
|
|
|
# 标记(如')}')之后的所有内容
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
parts = line.split(',', maxsplit=8) # 假设文本是第九个字段(索引为8)
|
|
|
|
|
|
|
|
if len(parts) > 8:
|
|
|
|
|
|
|
|
# 去除文本字段中可能存在的样式或效果定义(如果有的话)
|
|
|
|
|
|
|
|
# 这通常涉及查找并去除大括号内的内容,但这里我们简化处理
|
|
|
|
|
|
|
|
text = parts[8].strip()
|
|
|
|
|
|
|
|
# 如果')}'确实在文本中,并且你想要它之后的内容
|
|
|
|
|
|
|
|
index = text.find(marker)
|
|
|
|
|
|
|
|
if index != -1:
|
|
|
|
|
|
|
|
# 提取')}'之后的所有内容
|
|
|
|
|
|
|
|
data_after_marker = text[index + len(marker):].strip()
|
|
|
|
|
|
|
|
results.append(data_after_marker)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# 如果没有找到')}',我们可能想要整个文本字段
|
|
|
|
|
|
|
|
# 但这里我们假设只添加找到')}'之后内容的情况
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
# 没有足够的字段,可能不是一条完整的Dialogue行
|
|
|
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_all_ass_data_list(directory) -> List[str]:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
传入目录路径,获取该路径下左右ass文件中的数据
|
|
|
|
|
|
|
|
:param directory: 目录
|
|
|
|
|
|
|
|
:return: 返回包含所有弹幕数据的列表
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
ass_data_list = []
|
|
|
|
|
|
|
|
file_path_list = get_file_path_list_in_directory(directory)
|
|
|
|
|
|
|
|
for file_path in file_path_list:
|
|
|
|
|
|
|
|
ass_data_list.extend(extract_data_after_marker(file_path))
|
|
|
|
|
|
|
|
return ass_data_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_csv_full_file(file_name: str) -> str:
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
获取要写入的csv文件路径
|
|
|
|
|
|
|
|
:param file_name: 文件名称
|
|
|
|
|
|
|
|
:return: 文件路径
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
return os.path.join(WORDCLOUD_DIRECTORY_PATH, file_name)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_to_csv(file_name: str, data: List) -> bool:
|
|
|
|
|
|
|
|
# 获取文件完整存储路径
|
|
|
|
|
|
|
|
file_full_path = get_csv_full_file(file_name)
|
|
|
|
|
|
|
|
data_list = [data]
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
with open(file_full_path, 'w', newline='', encoding='utf-8-sig') as csvfile:
|
|
|
|
|
|
|
|
csvwriter = csv.writer(csvfile)
|
|
|
|
|
|
|
|
csvwriter.writerows(data_list)
|
|
|
|
|
|
|
|
print(f'写入文件{file_full_path}成功')
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
print(f"写入失败:{e}")
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def filter_statements_by_keywords(statements) -> List[str]:
|
|
|
|
|
|
|
|
# 初始化一个空列表来存储包含关键词的语句
|
|
|
|
|
|
|
|
filtered_statements = []
|
|
|
|
|
|
|
|
keywords = ['8K超高清直播', '3D全息视频技术', 'AI平台全方位分析运动数据', 'AI为视障人士搭建无障碍设施',
|
|
|
|
|
|
|
|
'辅助现场管理', '专属于运动员的GPT', '助力数字收藏', 'ai音效']
|
|
|
|
|
|
|
|
# 遍历语句列表
|
|
|
|
|
|
|
|
for statement in statements:
|
|
|
|
|
|
|
|
# 遍历关键词列表
|
|
|
|
|
|
|
|
for keyword in keywords:
|
|
|
|
|
|
|
|
# 如果语句中包含关键词,则添加到结果列表中
|
|
|
|
|
|
|
|
# 注意:这里使用strip()是为了处理可能的前后空格
|
|
|
|
|
|
|
|
if keyword.strip() in statement:
|
|
|
|
|
|
|
|
filtered_statements.append(statement)
|
|
|
|
|
|
|
|
# 如果一个语句可能包含多个关键词,但只想添加一次,可以取消下面这行的注释
|
|
|
|
|
|
|
|
# 并将其放置在内部循环的末尾,但这会改变逻辑(即只考虑第一个匹配的关键词)
|
|
|
|
|
|
|
|
# break
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 返回包含关键词的语句列表
|
|
|
|
|
|
|
|
return filtered_statements
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def write_list_to_txt_file(file_path, data_list):
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
将列表中的元素换行写入到指定的文件中。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
:param file_path: 文件的路径(包括文件名)
|
|
|
|
|
|
|
|
:param data_list: 要写入文件的列表
|
|
|
|
|
|
|
|
:return: 写入成功返回True,写入失败返回False
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
# 使用'w'模式打开文件,如果文件不存在则创建,如果文件已存在则覆盖
|
|
|
|
|
|
|
|
with open(file_path, 'w', encoding='utf-8') as file:
|
|
|
|
|
|
|
|
for item in data_list:
|
|
|
|
|
|
|
|
# 写入列表元素并换行
|
|
|
|
|
|
|
|
file.write(f"{item}\n")
|
|
|
|
|
|
|
|
# 如果没有异常发生,则认为写入成功
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
|
|
|
# 如果发生异常,则认为写入失败,并打印错误信息
|
|
|
|
|
|
|
|
print(f"写入文件时发生错误: {e}")
|
|
|
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
|
|
|
credential = Credential(
|
|
|
|
|
|
|
|
sessdata="b6e6f337%2C1742184491%2Cd4926%2A91CjDUM9dIQqffWha-wOu21sO84l8qDzJdawk_L-u2C37uPmwhJBSTtcx8qLue45cs2W8SVnRyaXZGRU5hREQtWnVyczNWYUtxQmFIWm1DWUdZdUxZWmdkZUU4YXNyWU9ISDBkWERoRkEySE1pUGpubzBvZzBSUkdrNzN3Z3FfQW80QmtMaEllbG53IIEC",
|
|
|
|
|
|
|
|
bili_jct="9709211369ccff83737c4d1051b8c020", buvid3="0947A1E9-8546-982F-FAB4-641B8351905589064infoc",
|
|
|
|
|
|
|
|
dedeuserid="26985229")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 下面这个代码是用于根据关键词搜索,爬取各个视频弹幕,生成存储弹幕文件
|
|
|
|
|
|
|
|
# bvid_list = sync(get_videos_info_by_keyword('2024巴黎奥运会'))
|
|
|
|
|
|
|
|
# generate_ass_files(bvid_list, credential)
|
|
|
|
|
|
|
|
time.sleep(10)
|
|
|
|
|
|
|
|
# 下面这个代码用于搜索和AI应用技术相匹配的弹幕语句,并生成相应的csv文件至词云图目录
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
file_path_list = get_file_path_list_in_directory(ASS_DIRECTORY_PATH)
|
|
|
|
|
|
|
|
ass_data = get_all_ass_data_list(ASS_DIRECTORY_PATH)
|
|
|
|
|
|
|
|
rtn_list = filter_statements_by_keywords(ass_data)
|
|
|
|
|
|
|
|
write_to_csv('弹幕.csv', rtn_list) # 这个函数用于生成存储弹幕.csv
|
|
|
|
|
|
|
|
write_list_to_txt_file(os.path.join(WORDCLOUD_DIRECTORY_PATH, 'text.txt'), rtn_list) # 这个函数用于生成存储最后匹配弹幕的文本文件
|