first commit

main
he 2 months ago
commit d50d8508fc

@ -0,0 +1,211 @@
import sys
import asyncio
# 如果是在 Windows 平台上运行,则设置事件循环策略为 SelectorEventLoopPolicy
# 这是为了避免在 Windows 上运行 asyncio 时可能出现的问题
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import collections # 用于词频统计
import json # 用于处理 JSON 数据
import aiohttp # 用于异步 HTTP 请求
import asyncio # 用于异步操作
import re # 正则表达式模块,用于解析弹幕
import openpyxl # 用于处理 Excel 文件
import pandas as pd # 用于数据处理
import cProfile # 用于性能分析
# 创建性能分析器实例,并开始性能分析
profile = cProfile.Profile()
profile.enable()
# 定义开始和结束日期,用于生成日期范围(虽然在代码中未使用此变量)
startdate = '2023-01-10'
enddate = '2024-09-15'
# 生成日期列表,格式为 'YYYY-MM-DD'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')]
# 定义 Excel 文件名,用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建一个新的 Excel 工作簿和工作表,并添加标题行 '弹幕'
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义 B 站弹幕 API 的基础 URL其中 {number} 是占位符,用于填充视频的 cid 号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头,包含 cookie 和 user-agent用于伪装请求
headers = {
'cookie': "buvid3=D65868DE-AFD5-34A4-1714-A1C0F783C5DC27124infoc; b_nut=1725930527; _uuid=FF569C27-D2C6-10814-36A8-48AA8141364924857infoc; CURRENT_FNVAL=4048; buvid_fp=2ba89565eab107e1e14c7982fc1ef9ea; buvid4=FAB9A58B-B8F5-8DAF-2AC4-4E874D3D1F0E28371-024091001-a%2FA7nVxQVETBwJOeuHlVsQ%3D%3D; rpdid=|(u))kkYu|lu0J'u~klmJ|lkm; DedeUserID=1917958039; DedeUserID__ckMd5=eaa26b970b7e3104; header_theme_version=CLOSE; enable_web_push=DISABLE; home_feed_column=5; browser_resolution=1536-730; bp_t_offset_1917958039=976131738646347776; SESSDATA=388559ba%2C1742109747%2Cdc4ae%2A91CjByrR8jH29CX_2ZktYkcWo9nu9b6csyqZX7Z52SZr2CCIHMMi3VsnapfsZ3vuXyPv4SVlhpOC1KYWpoNVExc0RPSFFCVG0zTGxwUDZNdWs5NDU4ZktBYmkzRUlFSHlDbVF2cVk4RHhDWG5BTFRxVl9oM25JdUJxWDhTWnE2dENQQ3FERUNqSER3IIEC; bili_jct=1e53039d135ff42e0131bfed9b577c34; sid=69pzvo9r; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjY4MTY5NTMsImlhdCI6MTcyNjU1NzY5MywicGx0IjotMX0.-gfoOKe3UugjUrDKiGu2ggTujyv2qI_7XZ_usWbEMvI; bili_ticket_expires=1726816893; b_lsid=AEC784B8_19203376015",
'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
# 全局缓存,用于存储 bvid 和 cid避免重复请求
bvid_cache = {}
cid_cache = {}
# 异步函数:获取 bvid带缓存功能
async def get_bvid(session, page, index):
# 如果已经在缓存中,则直接返回缓存的 bvid
if (page, index) in bvid_cache:
return bvid_cache[(page, index)]
# 构造 API 请求的 URL查询指定页码和关键字的视频
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page}&page_size=50&keyword=2028%E5%B9%B4%E6%B4%9B%E6%9D%89%E7%9F%B6%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_data = await response.json()
# 提取第 index 个视频的 bvid
bvid = json_data['data']['result'][index]['bvid']
# 将 bvid 存入缓存
bvid_cache[(page, index)] = bvid
return bvid
except (KeyError, IndexError, json.JSONDecodeError) as e:
# 如果出现异常,打印错误信息和响应内容,返回 None
print(f"获取 bvid 时出错: {e}")
print(f"响应状态码: {response.status}")
text = await response.text()
print(f"响应内容: {text}")
return None
# 异步函数:获取 cid带缓存功能
async def get_cid(session, bvid):
# 如果 bvid 已经在缓存中,则直接返回缓存的 cid
if bvid in cid_cache:
return cid_cache[bvid]
# 构造 API 请求的 URL查询指定 bvid 的视频信息
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_dict = await response.json()
# 提取第一个视频的 cid
cid = json_dict['data'][0]['cid']
# 将 cid 存入缓存
cid_cache[bvid] = cid
return cid
except (KeyError, IndexError, json.JSONDecodeError):
# 如果出现异常,返回 None
return None
# 异步函数:获取并保存某个视频的弹幕
async def fetch_and_save_bulletchat(session, cid):
# 使用 cid 构造弹幕 API 的 URL
url = tempApi.replace("{number}", str(cid))
try:
# 发送异步 GET 请求
async with session.get(url) as response:
# 获取响应的文本内容XML 格式)
response_text = await response.text()
# 使用正则表达式提取所有弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response_text)
# 如果有弹幕数据,返回列表,否则返回空列表
return data if data else []
except:
# 如果出现异常,返回空列表
return []
# 异步函数:处理并发任务,收集所有弹幕数据
async def fetch_all_bulletchats(session):
all_bulletchats = [] # 用于存储所有的弹幕数据
tasks = [] # 用于存储所有的异步任务
total_requests = 6 * 50 # 总共请求 6 页,每页 50 个视频,共 300 个视频
for i in range(total_requests):
page_number = i // 50 + 1 # 计算当前请求的页码
index = i % 50 # 计算当前页内的索引
# 创建异步任务,获取每个视频的弹幕数据
tasks.append(asyncio.ensure_future(fetch_bulletchat_data(session, page_number, index)))
# 使用 asyncio.as_completed 来迭代已完成的任务
for task in asyncio.as_completed(tasks):
bulletchat_data = await task
if bulletchat_data:
# 将获取的弹幕数据添加到总列表中
all_bulletchats.extend(bulletchat_data)
return all_bulletchats # 返回所有的弹幕数据
# 异步函数:获取单个视频的弹幕数据
async def fetch_bulletchat_data(session, page_number, index):
# 获取视频的 bvid
bvid = await get_bvid(session, page_number, index)
if bvid:
# 获取视频的 cid
cid = await get_cid(session, bvid)
if cid:
# 获取并返回视频的弹幕数据
return await fetch_and_save_bulletchat(session, cid)
return [] # 如果获取失败,返回空列表
# 函数:保存弹幕数据到文本文件和 Excel 文件
def save_to_file(bulletchats):
# 以追加模式打开文本文件,编码为 utf-8
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
# 将每条弹幕写入文本文件,并换行
file_txt.write(index + '\n')
# 将弹幕写入 Excel 表格
total_sheet.append([index])
# 保存 Excel 文件
total_workbook.save(file_xlsx)
# 函数:计算弹幕频次,并保存到 Excel 文件
def calculate_frequency():
try:
# 读取 Excel 文件中的弹幕数据
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
# 将字符串按照空格分割为单词列表
words = text.split()
# 使用 collections.Counter 统计词频
word_counts = collections.Counter(words)
# 将词频按照出现次数从高到低排序
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
# 创建一个新的 Excel 工作簿和工作表,并添加标题行
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次'])
# 将排序后的词频数据写入 Excel 表格
for word, count in sorted_word_counts:
sheet.append([word, count])
# 保存统计结果到新的 Excel 文件
workbook.save('我的统计弹幕出现次数.xlsx')
except Exception as e:
# 如果出现异常,打印错误信息
print(f"计算频次时出错: {e}")
# 异步主函数,负责执行整个流程
async def main():
# 创建一个异步的 HTTP 会话,使用指定的请求头
async with aiohttp.ClientSession(headers=headers) as session:
# 异步获取所有弹幕数据
bulletchats = await fetch_all_bulletchats(session)
# 保存弹幕数据到文件
save_to_file(bulletchats)
# 计算弹幕频次并保存结果
calculate_frequency()
# 输出流程结束信息
print("Finished")
# 启动异步任务
if __name__ == '__main__':
asyncio.run(main())
# 停止性能分析
profile.disable()
# 将性能分析数据保存到文件中
profile.dump_stats('./youhua.prof')

@ -0,0 +1,37 @@
import re
from collections import Counter
import pandas as pd
# 读取txt文件
with open('提取.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 定义AI应用的关键词
ai_categories = {
'AI生成/合成': [r'AI生成', r'AI合成', r'AI图', r'一眼AI', r'AI'],
'AI视频': [r'AI视频', r'AI合成的视频'],
'AI修复': [r'AI修复', r'AI超分', r'超分辨率'],
'AI音效/配音': [r'AI音效', r'AI配音'],
'AI训练': [r'AI训练']
}
# 统计AI应用类别的出现次数
category_count = Counter()
# 遍历每一行弹幕匹配AI应用的关键词
for line in lines:
for category, keywords in ai_categories.items():
if any(re.search(keyword, line, re.IGNORECASE) for keyword in keywords):
category_count[category] += 1
# 将统计结果转换为pandas DataFrame
df = pd.DataFrame(list(category_count.items()), columns=['AI应用类别', '数量'])
# 按数量从大到小排序
df_sorted = df.sort_values(by='数量', ascending=False)
# 保存结果到Excel文件
df_sorted.to_excel('AI应用统计结果.xlsx', index=False)
# 输出提示
print("AI应用统计结果已保存为 'AI应用统计结果.xlsx',数据已按数量从大到小排序。")

@ -0,0 +1,16 @@
import re
# 读取txt文件
with open('我的全部弹幕.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 使用正则表达式提取包含“ai”或“AI”的弹幕去掉单词边界限制
ai_danmu = [line.strip() for line in lines if re.search(r'[Aa][Ii]', line)]
# 保存结果到新文件
with open('提取.txt', 'w', encoding='utf-8') as output_file:
for danmu in ai_danmu:
output_file.write(danmu + '\n')
# 输出提取的弹幕
print(f"提取了 {len(ai_danmu)} 条弹幕,其中包含 'ai''AI' 关键字。")

@ -0,0 +1,41 @@
import os
from os import path
from wordcloud import WordCloud
import jieba
from imageio.v3 import imread
def Bulletchat_Wordcloud():
try:
#获取当前文件路径
d = path.dirname(__file__) if "__file__" in locals() else os.getcwd()
text = open(path.join(d,r'2028.txt'), 'rb').read()
# 设置模板图
img_mask = imread(r'm1.png')
# 对弹幕进行精确模式分词
text_list = jieba.lcut(text,cut_all=False)
text_str = ' '.join(text_list)
# print(text_str)
# 设置中文字体
font_path = r'C:\Users\15653\msyh.ttc'
# 停止词设置
stopwords = set('')
stopwords.update(['','','','','','','什么','所以','','','','','','','',''])
wc = WordCloud(
font_path=font_path,
#max_words=500, # 最多词个数
#min_font_size=15,
#max_font_size=100,
mask=img_mask,
background_color='white',
stopwords=stopwords,
colormap='copper'
)
wc.generate(text_str)
wc.to_file(r'outm1.png')
except Exception as e:
print(f"词云图生成异常: {e}")
if __name__ == '__main__':
Bulletchat_Wordcloud()

@ -0,0 +1,97 @@
from PIL import Image, ImageDraw, ImageFont
import random
import imageio
import os
import re
# 参数设置
WIDTH = 800 # 图片宽度
HEIGHT = 600 # 图片高度
FONT_SIZE = 20 # 字体大小
COL_WIDTH = FONT_SIZE
NUM_COLS = WIDTH // COL_WIDTH
MAX_TRAIL_LENGTH = 15 # 代码雨最大长度
FRAMES_PER_SUBTITLE = 10 # 每条字幕的帧数
OUTPUT_GIF = 'code_rain.gif' # 输出 GIF 文件名
def read_subtitles(filename):
"""读取字幕文件"""
with open(filename, 'r', encoding='utf-8') as f:
subtitles = f.readlines()
subtitles = [line.strip() for line in subtitles if line.strip()]
return subtitles
def extract_characters(subtitles):
"""从字幕中提取所有出现的字符"""
chars_set = set()
for line in subtitles:
chars_set.update(line)
return list(chars_set)
def main():
# 读取字幕
subtitles = read_subtitles(r'提取.txt')
# 提取字符集
CHARS = extract_characters(subtitles)
# 添加额外的字符(可选)
CHARS.extend(list('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789@#$%^&*()+-'))
# 加载字体(请确保字体文件存在)
font_path = r'msyh.ttc' # 替换为您的字体文件完整路径,确保支持中文
# 检查字体文件是否存在
if not os.path.exists(font_path):
print(f"未找到字体文件 '{font_path}'。请提供正确的字体路径。")
return
try:
font = ImageFont.truetype(font_path, FONT_SIZE)
except OSError:
print(f"无法打开字体文件 '{font_path}'。请检查文件是否损坏或权限设置。")
return
# 初始化列的位置
column_positions = [random.randint(-HEIGHT, 0) for _ in range(NUM_COLS)]
frames = []
for subtitle in subtitles:
for _ in range(FRAMES_PER_SUBTITLE):
# 创建新图像
img = Image.new('RGB', (WIDTH, HEIGHT), color='black')
draw = ImageDraw.Draw(img)
# 绘制代码雨
for i in range(NUM_COLS):
x = i * COL_WIDTH
y = column_positions[i]
for j in range(MAX_TRAIL_LENGTH):
char = random.choice(CHARS)
y_pos = y - j * FONT_SIZE
if y_pos < 0 or y_pos > HEIGHT:
continue
green_value = int(255 / MAX_TRAIL_LENGTH * (MAX_TRAIL_LENGTH - j))
draw.text((x, y_pos), char, font=font, fill=(0, green_value, 0))
# 更新列的位置
column_positions[i] = (y + FONT_SIZE) % (HEIGHT + FONT_SIZE * MAX_TRAIL_LENGTH)
# 绘制字幕
bbox = draw.textbbox((0, 0), subtitle, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
text_x = (WIDTH - text_width) // 2
text_y = HEIGHT - text_height - 10 # 距离底部 10 像素
draw.text((text_x, text_y), subtitle, font=font, fill=(255, 255, 255))
# 添加帧
frames.append(img)
# 保存为 GIF
frames[0].save(OUTPUT_GIF, save_all=True, append_images=frames[1:], optimize=False, duration=100, loop=0)
print(f"GIF 已保存为 '{OUTPUT_GIF}'")
if __name__ == '__main__':
main()

@ -0,0 +1,160 @@
import collections # 用于词频统计
import json # 用于处理JSON数据
import requests # 用于发送HTTP请求
import re # 正则表达式模块,用于解析弹幕
import time # 用于时间相关操作
import openpyxl # 用于处理Excel文件
import pandas as pd # 用于数据处理
from concurrent.futures import ThreadPoolExecutor, as_completed # 用于并发操作
import cProfile
profile = cProfile.Profile()
profile.enable()
# 定义开始和结束日期,用于生成日期范围
startdate = '20240710'
enddate = '20240910'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')] # 生成日期列表
# 定义Excel文件名用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建Excel工作簿和工作表并添加标题行
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义B站弹幕API的基础URL{number}是占位符用于填充视频的cid号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头包含cookie和user-agent用于伪装请求
headers = {
'cookie':"buvid3=D65868DE-AFD5-34A4-1714-A1C0F783C5DC27124infoc; b_nut=1725930527; _uuid=FF569C27-D2C6-10814-36A8-48AA8141364924857infoc; CURRENT_FNVAL=4048; buvid_fp=2ba89565eab107e1e14c7982fc1ef9ea; buvid4=FAB9A58B-B8F5-8DAF-2AC4-4E874D3D1F0E28371-024091001-a%2FA7nVxQVETBwJOeuHlVsQ%3D%3D; rpdid=|(u))kkYu|lu0J'u~klmJ|lkm; SESSDATA=e8f35e7e%2C1741482645%2Cb3572%2A91CjC7hBYEVq-d38AwweerB9sclbgqT78LR6aribbsaBRVlJ0BoUjCMidR-nm82eDlo70SVlVibjl1UnQ0Y0NzSFFCb21DRGNNSXp4YnRSbFdzMXo0NjR4QkM0TlBKejUweW1TbDJkT0g3Z2Z6bTdmQVJzdmpvVHZmR1JWOEhtbnFGZmpuQUt6WXZnIIEC; bili_jct=7d37b038ea7714a0c41ec3d26603737b; DedeUserID=1917958039; DedeUserID__ckMd5=eaa26b970b7e3104; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjYxOTIwNTMsImlhdCI6MTcyNTkzMjc5MywicGx0IjotMX0.82V6_w7kGoSvzDy9rT-9DpsL7U_BrB24GefbBM0Vvb8; bili_ticket_expires=1726191993; header_theme_version=CLOSE; enable_web_push=DISABLE; home_feed_column=5; browser_resolution=1536-730; b_lsid=953CBCA8_191E441EE95; bp_t_offset_1917958039=976131738646347776; sid=hl295qcj",
'user-agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
# 定义函数获取搜索结果中的bvid视频的唯一标识符
def get_bvid(page_number, number):
# 构造搜索API的URLpage_number是页码number是该页中的视频编号
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page_number}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
response = requests.get(url=url, headers=headers) # 发送请求
try:
# 解析返回的JSON数据提取视频的bvid
json_data = json.loads(response.text)
print(json_data)
bvid = json_data['data']['result'][number]['bvid']
print(f"获取到bvid: {bvid}")
return bvid # 返回bvid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException) as e:
print(f"获取bvid时出错: {e}")
# 捕获错误并返回None防止程序崩溃
return None
# 定义函数根据bvid获取视频的cid弹幕对应的唯一标识符
def get_cid(bvid):
try:
# 通过bvid构造获取cid的API请求URL
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
response = requests.get(url, headers=headers) # 发送请求
if response.status_code != 200:
# 如果请求状态码不是200返回None
return None
# 解析返回的JSON数据提取cid
json_dict = json.loads(response.text)
return json_dict['data'][0]['cid'] # 返回cid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException):
return None # 捕获错误并返回None
# 定义函数,获取并保存某个视频的弹幕
def fetch_and_save_bulletchat(cid):
# 用cid替换API中的占位符
url = tempApi.replace("{number}", str(cid))
try:
# 发送请求获取弹幕数据
response = requests.get(url, headers=headers)
response.encoding = response.apparent_encoding # 设置编码
# 使用正则表达式解析弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response.text)
if data:
return data # 返回弹幕列表
except requests.RequestException:
return [] # 如果请求失败,返回空列表
# 定义函数批量获取bvid和cid并创建并发任务
def put_api():
tasks = []
# 使用ThreadPoolExecutor创建线程池用于并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
# 控制页码范围1到5页每页50个视频
for i in range(1, 7):
for j in range(50):
bvid = get_bvid(i, j) # 获取bvid
if bvid:
cid = get_cid(bvid) # 获取cid
if cid:
# 提交弹幕抓取任务到线程池
tasks.append(executor.submit(fetch_and_save_bulletchat, cid))
return tasks # 返回任务列表
# 定义函数,处理并发任务,收集所有弹幕数据
def get_data(tasks):
all_bulletchats = []
# 遍历所有完成的任务,获取结果
for task in as_completed(tasks):
bulletchat_data = task.result()
if bulletchat_data:
all_bulletchats.extend(bulletchat_data) # 将弹幕数据加入总列表
return all_bulletchats # 返回所有弹幕数据
# 定义函数将弹幕数据保存到文件和Excel中
def save_to_file(bulletchats):
# 打开文本文件,将弹幕逐行写入
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
file_txt.write(index + '\n')
total_sheet.append([index]) # 将弹幕写入Excel表格
total_workbook.save(file_xlsx) # 保存Excel文件
# 定义函数计算弹幕频次并保存到Excel
def calculate_frequency():
try:
# 读取弹幕Excel文件
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
words = text.split() # 将弹幕分割为单词
word_counts = collections.Counter(words) # 统计单词频次
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # 按频次排序
# 创建新的Excel工作簿用于保存频次统计结果
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次']) # 添加标题行
# 将排序后的词频结果写入Excel
for word, count in sorted_word_counts:
sheet.append([word, count])
workbook.save('我的统计弹幕出现次数.xlsx') # 保存频次统计的Excel文件
except Exception as e:
print(f"计算频次时出错: {e}")
# 主函数,负责执行整个流程
def main():
tasks = put_api() # 获取bvid和cid并创建并发任务
bulletchats = get_data(tasks) # 获取所有弹幕数据
save_to_file(bulletchats) # 保存弹幕数据到文件和Excel
calculate_frequency() # 计算弹幕频次
print("Finished") # 输出流程结束信息
# 如果此脚本被直接运行则调用main函数
if __name__ == '__main__':
main()
profile.disable()
# Save the profiling data to a file
profile.dump_stats('./output.prof')

@ -0,0 +1,149 @@
import collections # 用于词频统计
import json # 用于处理JSON数据
import requests # 用于发送HTTP请求
import re # 正则表达式模块,用于解析弹幕
import time # 用于时间相关操作
import openpyxl # 用于处理Excel文件
import pandas as pd # 用于数据处理
from concurrent.futures import ThreadPoolExecutor, as_completed # 用于并发操作
# 定义开始和结束日期,用于生成日期范围
startdate = '20240710'
enddate = '20240910'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')] # 生成日期列表
# 定义Excel文件名用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建Excel工作簿和工作表并添加标题行
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义B站弹幕API的基础URL{number}是占位符用于填充视频的cid号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头包含cookie和user-agent用于伪装请求
headers = {
'cookie':"buvid3=D65868DE-AFD5-34A4-1714-A1C0F783C5DC27124infoc; b_nut=1725930527; _uuid=FF569C27-D2C6-10814-36A8-48AA8141364924857infoc; CURRENT_FNVAL=4048; buvid_fp=2ba89565eab107e1e14c7982fc1ef9ea; buvid4=FAB9A58B-B8F5-8DAF-2AC4-4E874D3D1F0E28371-024091001-a%2FA7nVxQVETBwJOeuHlVsQ%3D%3D; rpdid=|(u))kkYu|lu0J'u~klmJ|lkm; SESSDATA=e8f35e7e%2C1741482645%2Cb3572%2A91CjC7hBYEVq-d38AwweerB9sclbgqT78LR6aribbsaBRVlJ0BoUjCMidR-nm82eDlo70SVlVibjl1UnQ0Y0NzSFFCb21DRGNNSXp4YnRSbFdzMXo0NjR4QkM0TlBKejUweW1TbDJkT0g3Z2Z6bTdmQVJzdmpvVHZmR1JWOEhtbnFGZmpuQUt6WXZnIIEC; bili_jct=7d37b038ea7714a0c41ec3d26603737b; DedeUserID=1917958039; DedeUserID__ckMd5=eaa26b970b7e3104; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjYxOTIwNTMsImlhdCI6MTcyNTkzMjc5MywicGx0IjotMX0.82V6_w7kGoSvzDy9rT-9DpsL7U_BrB24GefbBM0Vvb8; bili_ticket_expires=1726191993; header_theme_version=CLOSE; enable_web_push=DISABLE; home_feed_column=5; browser_resolution=1536-730; b_lsid=953CBCA8_191E441EE95; bp_t_offset_1917958039=976131738646347776; sid=hl295qcj",
'user-agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
# 定义函数获取搜索结果中的bvid视频的唯一标识符
def get_bvid(page_number, number):
# 构造搜索API的URLpage_number是页码number是该页中的视频编号
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page_number}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
response = requests.get(url=url, headers=headers) # 发送请求
try:
# 解析返回的JSON数据提取视频的bvid
json_data = json.loads(response.text)
print(json_data)
bvid = json_data['data']['result'][number]['bvid']
print(f"获取到bvid: {bvid}")
return bvid # 返回bvid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException) as e:
print(f"获取bvid时出错: {e}")
# 捕获错误并返回None防止程序崩溃
return None
# 定义函数根据bvid获取视频的cid弹幕对应的唯一标识符
def get_cid(bvid):
try:
# 通过bvid构造获取cid的API请求URL
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
response = requests.get(url, headers=headers) # 发送请求
if response.status_code != 200:
# 如果请求状态码不是200返回None
return None
# 解析返回的JSON数据提取cid
json_dict = json.loads(response.text)
return json_dict['data'][0]['cid'] # 返回cid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException):
return None # 捕获错误并返回None
# 定义函数,获取并保存某个视频的弹幕
def fetch_and_save_bulletchat(cid):
# 用cid替换API中的占位符
url = tempApi.replace("{number}", str(cid))
try:
# 发送请求获取弹幕数据
response = requests.get(url, headers=headers)
response.encoding = response.apparent_encoding # 设置编码
# 使用正则表达式解析弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response.text)
if data:
return data # 返回弹幕列表
except requests.RequestException:
return [] # 如果请求失败,返回空列表
# 定义函数批量获取bvid和cid并创建并发任务
def put_api():
tasks = []
# 使用ThreadPoolExecutor创建线程池用于并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
# 控制页码范围1到5页每页50个视频
for i in range(1, 7):
for j in range(50):
bvid = get_bvid(i, j) # 获取bvid
if bvid:
cid = get_cid(bvid) # 获取cid
if cid:
# 提交弹幕抓取任务到线程池
tasks.append(executor.submit(fetch_and_save_bulletchat, cid))
return tasks # 返回任务列表
# 定义函数,处理并发任务,收集所有弹幕数据
def get_data(tasks):
all_bulletchats = []
# 遍历所有完成的任务,获取结果
for task in as_completed(tasks):
bulletchat_data = task.result()
if bulletchat_data:
all_bulletchats.extend(bulletchat_data) # 将弹幕数据加入总列表
return all_bulletchats # 返回所有弹幕数据
# 定义函数将弹幕数据保存到文件和Excel中
def save_to_file(bulletchats):
# 打开文本文件,将弹幕逐行写入
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
file_txt.write(index + '\n')
total_sheet.append([index]) # 将弹幕写入Excel表格
total_workbook.save(file_xlsx) # 保存Excel文件
# 定义函数计算弹幕频次并保存到Excel
def calculate_frequency():
try:
# 读取弹幕Excel文件
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
words = text.split() # 将弹幕分割为单词
word_counts = collections.Counter(words) # 统计单词频次
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # 按频次排序
# 创建新的Excel工作簿用于保存频次统计结果
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次']) # 添加标题行
# 将排序后的词频结果写入Excel
for word, count in sorted_word_counts:
sheet.append([word, count])
workbook.save('我的统计弹幕出现次数.xlsx') # 保存频次统计的Excel文件
except Exception as e:
print(f"计算频次时出错: {e}")
# 主函数,负责执行整个流程
def main():
tasks = put_api() # 获取bvid和cid并创建并发任务
bulletchats = get_data(tasks) # 获取所有弹幕数据
save_to_file(bulletchats) # 保存弹幕数据到文件和Excel
calculate_frequency() # 计算弹幕频次
print("Finished") # 输出流程结束信息
# 如果此脚本被直接运行则调用main函数
if __name__ == '__main__':
main()

@ -0,0 +1,211 @@
import sys
import asyncio
# 如果是在 Windows 平台上运行,则设置事件循环策略为 SelectorEventLoopPolicy
# 这是为了避免在 Windows 上运行 asyncio 时可能出现的问题
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import collections # 用于词频统计
import json # 用于处理 JSON 数据
import aiohttp # 用于异步 HTTP 请求
import asyncio # 用于异步操作
import re # 正则表达式模块,用于解析弹幕
import openpyxl # 用于处理 Excel 文件
import pandas as pd # 用于数据处理
import cProfile # 用于性能分析
# 创建性能分析器实例,并开始性能分析
profile = cProfile.Profile()
profile.enable()
# 定义开始和结束日期,用于生成日期范围(虽然在代码中未使用此变量)
startdate = '2024-07-10'
enddate = '2024-09-10'
# 生成日期列表,格式为 'YYYY-MM-DD'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')]
# 定义 Excel 文件名,用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建一个新的 Excel 工作簿和工作表,并添加标题行 '弹幕'
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义 B 站弹幕 API 的基础 URL其中 {number} 是占位符,用于填充视频的 cid 号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头,包含 cookie 和 user-agent用于伪装请求
headers = {
'cookie': "您的 B 站 Cookie 值",
'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64)..."
}
# 全局缓存,用于存储 bvid 和 cid避免重复请求
bvid_cache = {}
cid_cache = {}
# 异步函数:获取 bvid带缓存功能
async def get_bvid(session, page, index):
# 如果已经在缓存中,则直接返回缓存的 bvid
if (page, index) in bvid_cache:
return bvid_cache[(page, index)]
# 构造 API 请求的 URL查询指定页码和关键字的视频
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_data = await response.json()
# 提取第 index 个视频的 bvid
bvid = json_data['data']['result'][index]['bvid']
# 将 bvid 存入缓存
bvid_cache[(page, index)] = bvid
return bvid
except (KeyError, IndexError, json.JSONDecodeError) as e:
# 如果出现异常,打印错误信息和响应内容,返回 None
print(f"获取 bvid 时出错: {e}")
print(f"响应状态码: {response.status}")
text = await response.text()
print(f"响应内容: {text}")
return None
# 异步函数:获取 cid带缓存功能
async def get_cid(session, bvid):
# 如果 bvid 已经在缓存中,则直接返回缓存的 cid
if bvid in cid_cache:
return cid_cache[bvid]
# 构造 API 请求的 URL查询指定 bvid 的视频信息
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_dict = await response.json()
# 提取第一个视频的 cid
cid = json_dict['data'][0]['cid']
# 将 cid 存入缓存
cid_cache[bvid] = cid
return cid
except (KeyError, IndexError, json.JSONDecodeError):
# 如果出现异常,返回 None
return None
# 异步函数:获取并保存某个视频的弹幕
async def fetch_and_save_bulletchat(session, cid):
# 使用 cid 构造弹幕 API 的 URL
url = tempApi.replace("{number}", str(cid))
try:
# 发送异步 GET 请求
async with session.get(url) as response:
# 获取响应的文本内容XML 格式)
response_text = await response.text()
# 使用正则表达式提取所有弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response_text)
# 如果有弹幕数据,返回列表,否则返回空列表
return data if data else []
except:
# 如果出现异常,返回空列表
return []
# 异步函数:处理并发任务,收集所有弹幕数据
async def fetch_all_bulletchats(session):
all_bulletchats = [] # 用于存储所有的弹幕数据
tasks = [] # 用于存储所有的异步任务
total_requests = 6 * 50 # 总共请求 6 页,每页 50 个视频,共 300 个视频
for i in range(total_requests):
page_number = i // 50 + 1 # 计算当前请求的页码
index = i % 50 # 计算当前页内的索引
# 创建异步任务,获取每个视频的弹幕数据
tasks.append(asyncio.ensure_future(fetch_bulletchat_data(session, page_number, index)))
# 使用 asyncio.as_completed 来迭代已完成的任务
for task in asyncio.as_completed(tasks):
bulletchat_data = await task
if bulletchat_data:
# 将获取的弹幕数据添加到总列表中
all_bulletchats.extend(bulletchat_data)
return all_bulletchats # 返回所有的弹幕数据
# 异步函数:获取单个视频的弹幕数据
async def fetch_bulletchat_data(session, page_number, index):
# 获取视频的 bvid
bvid = await get_bvid(session, page_number, index)
if bvid:
# 获取视频的 cid
cid = await get_cid(session, bvid)
if cid:
# 获取并返回视频的弹幕数据
return await fetch_and_save_bulletchat(session, cid)
return [] # 如果获取失败,返回空列表
# 函数:保存弹幕数据到文本文件和 Excel 文件
def save_to_file(bulletchats):
# 以追加模式打开文本文件,编码为 utf-8
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
# 将每条弹幕写入文本文件,并换行
file_txt.write(index + '\n')
# 将弹幕写入 Excel 表格
total_sheet.append([index])
# 保存 Excel 文件
total_workbook.save(file_xlsx)
# 函数:计算弹幕频次,并保存到 Excel 文件
def calculate_frequency():
try:
# 读取 Excel 文件中的弹幕数据
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
# 将字符串按照空格分割为单词列表
words = text.split()
# 使用 collections.Counter 统计词频
word_counts = collections.Counter(words)
# 将词频按照出现次数从高到低排序
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
# 创建一个新的 Excel 工作簿和工作表,并添加标题行
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次'])
# 将排序后的词频数据写入 Excel 表格
for word, count in sorted_word_counts:
sheet.append([word, count])
# 保存统计结果到新的 Excel 文件
workbook.save('我的统计弹幕出现次数.xlsx')
except Exception as e:
# 如果出现异常,打印错误信息
print(f"计算频次时出错: {e}")
# 异步主函数,负责执行整个流程
async def main():
# 创建一个异步的 HTTP 会话,使用指定的请求头
async with aiohttp.ClientSession(headers=headers) as session:
# 异步获取所有弹幕数据
bulletchats = await fetch_all_bulletchats(session)
# 保存弹幕数据到文件
save_to_file(bulletchats)
# 计算弹幕频次并保存结果
calculate_frequency()
# 输出流程结束信息
print("Finished")
# 启动异步任务
if __name__ == '__main__':
asyncio.run(main())
# 停止性能分析
profile.disable()
# 将性能分析数据保存到文件中
profile.dump_stats('./youhua.prof')

Binary file not shown.

@ -0,0 +1,62 @@
import requests
import json
import certifi
# 请在此处替换为您的 News API 密钥
API_KEY = '04c1848491d143f9a9af8a64655167e8'
# 定义查询参数
query = '2024 Paris Olympics AND AI technology'
language = 'en'
page_size = 100
# 定义请求 URL 和参数
url = 'https://newsapi.org/v2/everything'
params = {
'q': query,
'language': language,
'pageSize': page_size,
'apiKey': API_KEY
}
def fetch_articles():
all_articles = []
page = 1
while True:
params['page'] = page
try:
response = requests.get(url, params=params, proxies={"http": None, "https": None})
response.raise_for_status() # 检查HTTP错误
data = response.json()
if data['status'] != 'ok':
print(f"获取文章时出错:{data.get('message')}")
break
articles = data.get('articles', [])
if not articles:
break
all_articles.extend(articles)
print(f"已获取第 {page} 页,共 {len(all_articles)} 篇文章")
page += 1
if len(all_articles) >= 100:
break
except requests.exceptions.RequestException as e:
print(f"请求出现异常:{e}")
break
return all_articles
def save_articles(articles, filename='articles.json'):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(articles, f, ensure_ascii=False, indent=4)
print(f"已将 {len(articles)} 篇文章保存到 {filename}")
if __name__ == '__main__':
articles = fetch_articles()
if articles:
save_articles(articles)

Binary file not shown.

After

Width:  |  Height:  |  Size: 644 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 158 KiB

File diff suppressed because it is too large Load Diff

@ -0,0 +1,5 @@
AI应用类别统计
AI生成/合成: 72 次
AI音效/配音: 3 次
AI视频: 3 次
AI修复: 3 次

Binary file not shown.

After

Width:  |  Height:  |  Size: 155 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 74 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 351 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 91 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 276 KiB

@ -0,0 +1,59 @@
ai就别上桌了吧
就是AI不用怀疑
一眼Ai还造谣
这是AI不是真人啊别信
这是AI
什么ai
好了逐渐AI……敦煌还是适合跳舞体育着实不大行
一眼Ai
都是ai
别拿ai跑人家真人呗 有点下头
Ai
这是AI?
AI 合成的,别当真
ai啊……
AI味太浓
ai的痕迹太明显了
这ai
AI吧
一眼ai
ai
一眼AI
ai?
这不是AI吧
一看AI
AI?
这是AI的吧
AI
这难道不是AI
AI
一眼Ai
AI图就不要误导了
ai背后奥运五环都画错了
这是AI可惜了以为是实拍
下次比赛就按照这个ai设计的装扮应该没问题
跟AI配音较什么劲。。。
ai音效
ai还能帮运动员训练
一眼ai生成
像ai
aircraft!
ai视频
Usain Bolt
这个shuai啊
ai视频
这配音不如AI
不要什么都ai这是ai的爹
一股ai味儿
不知道为什么感觉有一股AI味儿。。。
不要什么都ai这是ai的爹
一股ai味儿
不知道为什么感觉有一股AI味儿。。。
我最开始以为这人ai合成的
ai视频
ai音效
ai
真的很烦现在这些用AI超分的假4K珍藏不懂珍藏点什么油画一样
画面怪怪的AI修复过吗
应该AI修复过的4K
我最开始以为这人ai合成的

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.0 KiB

@ -0,0 +1,521 @@
一、PSP表格
(2.1)在开始实现程序之前在附录提供的PSP表格记录下你估计将在程序的各个模块的开发上耗费的时间。3'
(2.2)在你实现完程序之后在附录提供的PSP表格记录下你在程序的各个模块上实际花费的时间。3'
| **PSP2.1** | **Personal Software Process Stages** | **预估耗时(分钟)** | **实际耗时(分钟)** |
| :-------------------------------------- | --------------------------------------- | -------------------- | -------------------- |
| Planning | 计划 | 30 | 40 |
| · Estimate | · 估计这个任务需要多少时间 | 30 | 40 |
| Development | 开发 | 600 | 800 |
| · Analysis | · 需求分析 (包括学习新技术) | 90 | 120 |
| · Design Spec | · 生成设计文档 | 30 | 40 |
| · Design Review | · 设计复审 | 15 | 20 |
| · Coding Standard | · 代码规范 (为目前的开发制定合适的规范) | 30 | 30 |
| · Design | · 具体设计 | 60 | 80 |
| · Coding | · 具体编码 | 240 | 300 |
| · Code Review | · 代码复审 | 60 | 120 |
| · Test | · 测试(自我测试,修改代码,提交修改) | 75 | 90 |
| Reporting | 报告 | 80 | 100 |
| · Test Repor | · 测试报告 | 20 | 40 |
| · Size Measurement | · 计算工作量 | 30 | 30 |
| · Postmortem & Process Improvement Plan | · 事后总结, 并提出过程改进计划 | 30 | 30 |
| | · 合计 | 710 | 940 |
二、任务要求的实现
(3.1)项目设计与技术栈。从阅读完题目到完成作业这一次的任务被你拆分成了几个环节你分别通过什么渠道、使用什么方式方法完成了各个环节列出你完成本次任务所使用的技术栈。5'
- 计划
我首先对任务进行了整体规划,按照 PSPPersonal Software Process表格的思路将任务分解为以下几个环节
1. **需求分析**:理解任务需求,明确需要从指定日期范围内的 B 站视频中获取弹幕数据,并进行频次统计。
2. **技术调研**:查找相关的 B 站 API以及学习如何使用 Python 的相关库来实现功能。
3. **编码实现**:编写代码实现各个功能模块,包括获取 bvid、cid抓取弹幕数据存储和统计。
4. **测试调试**:对代码进行测试,解决出现的异常和错误,确保程序稳定运行,同时对代码进行性能上的优化改进。
5. **结果输出**:将最终的弹幕数据和统计结果保存到 Excel 文件中,并绘制词云图等展示结果。
- 开发
- 样本验证
通过一个视频的弹幕爬虫实践,验证计划步骤的可行性。
- **获取 bvid 和 cid**
通过对 B 站网页和 API 的分析,了解了如何使用搜索接口获取视频列表,并从中提取每个视频的 `bvid`,使用关键字和分页参数,构造 B 站的搜索 API 请求 URL。发送请求后解析返回的 JSON 数据结构,从中提取每个视频的 `bvid`。使用获取到的 `bvid`,构造另一个 API 请求,获取视频的 `cid`,这是获取弹幕所需的参数。
主要技术栈浏览器开发者模式的使用浏览器json文件分析Python正则表达式Python文档写入
- **编程实现获取弹幕**
在获取到视频的 `cid` 后,使用弹幕接口来获取弹幕数据,并对数据进行处理和存储:构造弹幕 API 的请求 URL发送请求获取弹幕的 XML 数据,使用正则表达式从 XML 数据中提取弹幕文本内容,使用 `ThreadPoolExecutor` 创建线程池,提升数据抓取的效率,将获取的弹幕数据保存到文本文件和 Excel 文件中,方便后续的分析和处理,
主要技术栈:`requests`、`re`、`openpyxl`、`pandas`、`collections``concurrent.futures.ThreadPoolExecutor`
- 弹幕出现频次统计
读取保存的弹幕数据,使用 `collections.Counter` 对弹幕进行词频统计,并将结果保存到新的 Excel 文件中。通过Python字典和sorted的函数降序排列弹幕文本以及出现的频次通过openpyxl库的函数写入Excel中并输出出现频次前8的弹幕
主要技术栈Python读文档Python字典Python写入ExcelPython sorted( )函数
- 词云图绘制
通过wordcloudjiebaimageio等库的函数完成对所有弹幕内容的文本分割图片识别词云图设置及生成。
主要技术栈wordcloudjiebaimageio库相关函数的使用Adobe Photoshop的使用
- 性能分析与改进
- 采用异常抛出处理
- 利用pycharm的插件[SonarLint](https://sonarsource.atlassian.net/browse/RSPEC-6246?jql=project %3D RSPEC AND resolution %3D Unresolved ORDER BY updated DESC)进行Code Quality Analysis参照[官方检测规则及修复示例](https://sonarsource.atlassian.net/browse/RSPEC-6246?jql=project %3D RSPEC AND resolution %3D Unresolved ORDER BY updated DESC)消除了所有警告,如图:
![img](https://img-community.csdnimg.cn/images/4b6c3786b81b4d339989154ec7a618a9.png)
- 利用python自带的unittest进行单元测试顺利通过单元测试
![img](https://img-community.csdnimg.cn/images/4a99bf1d0c5c418691dd9b01c5d266fd.png)
- 使用cProfile对各函数执行次数和执行时间在此基础上对脚本性能进行改进
- 利用减少API调用频率**HTTP连接重用**异步IO**缓存处理**等优化代码
(3.2)爬虫与数据处理。说明业务逻辑简述代码的设计过程例如可介绍有几个类几个函数他们之间的关系并对关键的函数或算法进行说明。20'
### 业务逻辑图
![弹幕爬取及数据分析](C:\Users\15653\Desktop\102101535\102201525\result\巴黎奥运会\弹幕爬取及数据分析.png)
- 先从b站API中获取视频的bvid号。先加上headers头部信息伪装来源减少被拦截的可能性然后通过HTTP的get方法获取json文件再用python内置函数将json文件转换成字典格式。其数据结构可以从html文件中看出由此可以得到视频的bvid号。
其中用到request.get()函数和json.loads()函数json.loads()函数可以把requests.get()函数得到的json文件转为字典格式
```python
# 定义函数获取搜索结果中的bvid视频的唯一标识符
def get_bvid(page_number, number):
# 构造搜索API的URLpage_number是页码number是该页中的视频编号
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page_number}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
response = requests.get(url=url, headers=headers) # 发送请求
try:
# 解析返回的JSON数据提取视频的bvid
json_data = json.loads(response.text)
print(json_data)
bvid = json_data['data']['result'][number]['bvid']
print(f"获取到bvid: {bvid}")
return bvid # 返回bvid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException) as e:
print(f"获取bvid时出错: {e}")
# 捕获错误并返回None防止程序崩溃
return None
```
- 得到视频的bvid后在根据b站的另一个API获取指定视频的cid号同样利用get方法得到json文件后转成字典再其根据数据结构得到cid号
其中同样用到request.get()函数和json.loads()函数json.loads()函数可以把requests.get()函数得到的json文件转为字典格式
```python
# 定义函数根据bvid获取视频的cid弹幕对应的唯一标识符
def get_cid(bvid):
try:
# 通过bvid构造获取cid的API请求URL
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
response = requests.get(url, headers=headers) # 发送请求
if response.status_code != 200:
# 如果请求状态码不是200返回None
return None
# 解析返回的JSON数据提取cid
json_dict = json.loads(response.text)
return json_dict['data'][0]['cid'] # 返回cid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException):
return None # 捕获错误并返回None
```
- 有了视频的cid号之后就可以进行视频弹幕的爬取了利用字符串拼接得到300个视频的弹幕地址使用ThreadPoolExecutor创建线程池用于并发请求并使用as_completed先加快处理速度
其中用到了ThreadPoolExecutoras_completed等函数
```python
# 定义函数批量获取bvid和cid并创建并发任务
def put_api():
tasks = []
# 使用ThreadPoolExecutor创建线程池用于并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
# 控制页码范围1到5页每页50个视频
for i in range(1, 7):
for j in range(50):
bvid = get_bvid(i, j) # 获取bvid
if bvid:
cid = get_cid(bvid) # 获取cid
if cid:
# 提交弹幕抓取任务到线程池
tasks.append(executor.submit(fetch_and_save_bulletchat, cid))
return tasks # 返回任务列表
# 定义函数,处理并发任务,收集所有弹幕数据
def get_data(tasks):
all_bulletchats = []
# 遍历所有完成的任务,获取结果
for task in as_completed(tasks):
bulletchat_data = task.result()
if bulletchat_data:
all_bulletchats.extend(bulletchat_data) # 将弹幕数据加入总列表
return all_bulletchats # 返回所有弹幕数据
```
- 爬取弹幕完成后就可以利用Excel中的数据进行词频统计了这里关键在于利用collections中的Counter方法统计出列表中各个弹幕的出现次数将其存储在字典当中再利用sorted方法以value进行排序即得到出现次数前20的弹幕将其输出到控制台和Excel中
```python
# 定义函数计算弹幕频次并保存到Excel
def calculate_frequency():
try:
# 读取弹幕Excel文件
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
words = text.split() # 将弹幕分割为单词
word_counts = collections.Counter(words) # 统计单词频次
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # 按频次排序
# 创建新的Excel工作簿用于保存频次统计结果
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次']) # 添加标题行
# 将排序后的词频结果写入Excel
for word, count in sorted_word_counts:
sheet.append([word, count])
workbook.save('我的统计弹幕出现次数.xlsx') # 保存频次统计的Excel文件
except Exception as e:
print(f"计算频次时出错: {e}")
```
- 词云图和饼图绘制饼图由上一步得到的Excel表格按公式生成
词云图主要使用了jieba库分词imageio库图像识别stopwords集合的停止词设置WordCloud()的词云图设置
```python
import os
from os import path
from wordcloud import WordCloud
import jieba
from imageio.v3 import imread
def Bulletchat_Wordcloud():
try:
#获取当前文件路径
d = path.dirname(__file__) if "__file__" in locals() else os.getcwd()
text = open(path.join(d,r'提取.txt'), 'rb').read()
# 设置模板图
img_mask = imread(r'Paris.png')
# 对弹幕进行精确模式分词
text_list = jieba.lcut(text,cut_all=False)
text_str = ' '.join(text_list)
# print(text_str)
# 设置中文字体
font_path = 'msyh.ttc'
# 停止词设置
stopwords = set('')
stopwords.update(['的','和','又','了','都','是','什么','所以','这','呢','吧','吗','个','呀','嘛','哈'])
wc = WordCloud(
font_path=font_path,
#max_words=500, # 最多词个数
#min_font_size=15,
#max_font_size=100,
mask=img_mask,
background_color='white',
stopwords=stopwords,
colormap='copper'
)
wc.generate(text_str)
wc.to_file(r'outParis.png')
except Exception as e:
print(f"词云图生成异常: {e}")
if __name__ == '__main__':
Bulletchat_Wordcloud()
```
- 用cProfile库分析程序中步骤执行的时间和次数并对性能加以改进
```python
import cProfile # 用于性能分析
# 创建性能分析器实例,并开始性能分析
profile = cProfile.Profile()
profile.enable()
# 停止性能分析
profile.disable()
# 将性能分析数据保存到文件中
profile.dump_stats('./youhua.prof')
```
(3.3)数据统计接口部分的性能改进。记录在数据统计接口的性能上所花费的时间描述你改进的思路并展示一张性能分析图例如可通过VS /JProfiler的性能分析工具自动生成并展示你程序中消耗最大的函数。6'
### 1. **优化前的性能数据具体分析**
- **`get_bvid` 函数**
- 耗时 **295秒**
- 其中API请求相关函数如 `api.py:62(get)``api.py:14(request)` 各自花费 **445秒**
- 网络请求的实际发送 (`sessions.py:673(send)`) 花费 **443秒**,这说明几乎所有的时间都耗费在网络请求上。
- **`get_cid` 函数**
- 耗时 **165秒**,相比 `get_bvid` 要少一些,原因可能是 `cid` 数据更简单或者部分请求被缓存了。
- 其他网络IO的函数`connectionpool.py:609(connect)``ssl_wrap_socket` 等花费的时间分别为 **282秒****268秒**,进一步证明网络请求是整个程序的性能瓶颈。
- **总体时间**
- 主程序 (`my_cprofile.py:145(main)`) 耗时 **467秒**,其中 `put_api` 操作耗时 **461秒**,几乎占据了程序的整个执行时间。
- 因为网络IO开销极大导致程序在大量时间里处于等待网络响应的状态。
![image1](C:\Users\15653\Desktop\102101535\102201525\result\巴黎奥运会\image1.png)
由此可见瓶颈主要集中在以下几个函数调用中:
1. **requests.get** 调用花费了相当长的时间特别是在发送HTTP请求的部分 (`connectionpool.py`, `connection.py`, `ssl.py`)。
2. 由于涉及大量网络请求API请求的频率和总数是性能的主要瓶颈其中耗时最多的函数为get_bvid(), 其次为get_cid()
我考虑下述4种优化
### 1. **减少API调用频率**
- **并发请求的优化**:虽然我已经使用了 `ThreadPoolExecutor` 进行并发处理,但在每个 `get_bvid()``get_cid()` 请求之间,可能有一些等待时间。可以尝试通过减少重复请求以及批量化 API 请求来优化。
```python
# 异步函数:处理并发任务,收集所有弹幕数据
async def fetch_all_bulletchats(session):
all_bulletchats = [] # 用于存储所有的弹幕数据
tasks = [] # 用于存储所有的异步任务
total_requests = 6 * 50 # 总共请求 6 页,每页 50 个视频,共 300 个视频
for i in range(total_requests):
page_number = i // 50 + 1 # 计算当前请求的页码
index = i % 50 # 计算当前页内的索引
# 创建异步任务,获取每个视频的弹幕数据
tasks.append(asyncio.ensure_future(fetch_bulletchat_data(session, page_number, index)))
# 使用 asyncio.as_completed 来迭代已完成的任务
for task in asyncio.as_completed(tasks):
bulletchat_data = await task
if bulletchat_data:
# 将获取的弹幕数据添加到总列表中
all_bulletchats.extend(bulletchat_data)
return all_bulletchats # 返回所有的弹幕数据
```
### 2. **HTTP连接重用**
- **优化 HTTP 连接**:我通过 `requests.Session()` 来重用 HTTP 连接。现在每次调用 `requests.get()` 都会建立新的连接,使用 `Session()` 可以减少连接的建立和关闭时间。
```python
# 异步函数:获取并保存某个视频的弹幕
async def fetch_and_save_bulletchat(session, cid):
# 使用 cid 构造弹幕 API 的 URL
url = tempApi.replace("{number}", str(cid))
try:
# 发送异步 GET 请求
async with session.get(url) as response:
# 获取响应的文本内容XML 格式)
response_text = await response.text()
# 使用正则表达式提取所有弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response_text)
# 如果有弹幕数据,返回列表,否则返回空列表
return data if data else []
except:
# 如果出现异常,返回空列表
return []
```
### 3. **异步IO**
- **异步请求**:使用 `asyncio``aiohttp` 替换 `requests` 模块。`aiohttp` 可以实现真正的异步 I/O在大量网络请求中将会显著提升性能。
```python
# 异步函数:获取 bvid带缓存功能
async def get_bvid(session, page, index):
# 如果已经在缓存中,则直接返回缓存的 bvid
if (page, index) in bvid_cache:
return bvid_cache[(page, index)]
# 构造 API 请求的 URL查询指定页码和关键字的视频
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_data = await response.json()
# 提取第 index 个视频的 bvid
bvid = json_data['data']['result'][index]['bvid']
# 将 bvid 存入缓存
bvid_cache[(page, index)] = bvid
return bvid
except (KeyError, IndexError, json.JSONDecodeError) as e:
# 如果出现异常,打印错误信息和响应内容,返回 None
print(f"获取 bvid 时出错: {e}")
print(f"响应状态码: {response.status}")
text = await response.text()
print(f"响应内容: {text}")
return None
# 异步函数:获取 cid带缓存功能
async def get_cid(session, bvid):
# 如果 bvid 已经在缓存中,则直接返回缓存的 cid
if bvid in cid_cache:
return cid_cache[bvid]
# 构造 API 请求的 URL查询指定 bvid 的视频信息
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_dict = await response.json()
# 提取第一个视频的 cid
cid = json_dict['data'][0]['cid']
# 将 cid 存入缓存
cid_cache[bvid] = cid
return cid
except (KeyError, IndexError, json.JSONDecodeError):
# 如果出现异常,返回 None
return None
```
### 4. **缓存处理**
- **本地缓存数据**:我在获取 bvid 和 cid 的数据时引入缓存机制,避免重复请求,将数据写入本地,在下一次请求时复用缓存数据。
```python
# 全局缓存,用于存储 bvid 和 cid避免重复请求
bvid_cache = {}
cid_cache = {}
```
### 2. **优化后的性能数据具体分析**
- **`youhua.py:146(main)` 函数**
- 优化后的主程序耗时 **4.07秒**,相比优化前的 **467秒**,这是一个巨大的性能提升,缩短了将近 **463秒**表明通过缓存和异步IO机制API调用被显著减少。
- **`calculate_frequency` 函数**
- 耗时 **3.02秒**该部分为数据处理的核心部分相比优化前计算的时间比API调用要占比更大表明大部分时间花费在了有意义的计算任务上。
- **文件读取**
- `read_excel``parse` 各自花费 **2.11秒****2.10秒**,这些是文件读取和解析的时间。相比于优化前将近 **300秒**`get_bvid` 时间这些文件IO操作的耗时较短且合理。
- **网络等待时间减少**
- `selectors.py:319(select)``selectors.py:313(_select)` 分别只花费 **0.996秒****0.981秒**。相比优化前网络IO的时间开销显著减少这说明网络请求已经大幅减少。
![image2](C:\Users\15653\Desktop\102101535\102201525\result\巴黎奥运会\image2.png)
由此可见**优化对性能产生显著影响**
(3.4)数据结论的可靠性。介绍结论的内容以及通过什么数据以及何种判断方式得出此结论6'
### 结论:
1. **生成/合成技术主导讨论**AI生成和合成技术如AI生成图像或内容是观众关注的焦点占据了绝大多数讨论。这可能是因为此类技术能够直接影响用户在观看奥运会内容时的视觉体验如AI生成的体育场景、运动员的虚拟形象等。
2. **其他AI应用较少关注**音效、配音、视频修复等方面的AI技术相对没有引起广泛讨论可能是因为它们在奥运会视频内容的应用场景不如生成/合成技术显著。
### 数据:
![c7529e636eb8d378937b350a8b44a9b9_720](C:\Users\15653\Documents\Tencent Files\1565319159\nt_qq\nt_data\Pic\2024-09\Thumb\c7529e636eb8d378937b350a8b44a9b9_720.jpg)
### 判断依据:
- **弹幕数量**通过弹幕出现次数来判断哪些AI技术最受关注。生成/合成类弹幕数量显著高于其他类别72次 vs. 3次这一数值差异清晰地展示了不同类别的关注度。
- **弹幕内容**一些弹幕提到了AI生成内容的逼真性以及人们对AI生成画面是否影响真实观看体验的讨论。这种定性分析表明观众对生成/合成技术的情感反应更为强烈。
(3.5)数据可视化界面的展示。在博客中介绍数据可视化界面的组件和设计的思路。15'
饼图直接通过Excel绘制
![image-20240918111524329](C:\Users\15653\AppData\Roaming\Typora\typora-user-images\image-20240918111524329.png)
下图是通过WordCloud库生成的词云图GUASS原图是什么~
![outm1](C:\Users\15653\Desktop\102101535\102201525\result\巴黎奥运会\outm1.png)
**组件**:程序主要由以下组件构成:使用了 `os``os.path` 模块来处理文件路径,确保程序可以正确定位输入和输出文件;`wordcloud` 库用于生成词云图;`jieba` 库用于中文分词,能够精确地将中文文本切分成词语;`imageio.v3` 的 `imread` 函数用于读取作为词云形状的模板图片;程序还指定了字体路径(如 `msyh.ttc`)来支持中文字符显示,并设置了停止词以过滤常见的、不具备分析价值的词汇。输入的文本数据和模板图像分别从指定的文件路径读取,最终生成的词云图保存到指定的输出路径。
**设计思路**:程序的设计目的是从包含弹幕或其他文本的文件中生成一个定制化的词云图。首先,读取文本文件和模板图片,然后使用 `jieba` 库对中文文本进行精确模式分词,得到词语列表。将词语列表拼接成适合词云生成的字符串后,利用 `WordCloud` 对象设置词云的各项参数,如字体、最大词数、模板形状、背景颜色、停止词和配色方案等。通过调用 `generate` 方法生成词云,并使用 `to_file` 方法将结果保存为图片文件。程序还包含异常处理机制,捕获并输出在生成词云过程中可能出现的错误,确保程序的稳健性和用户的知情权。
接下来展示我做的代码雨,爬虫要有始有终,既然爬了弹幕,就要以弹幕的形式呈现结果。于是我就选择了这个代码雨程序,迫于时间有限没能做的很美观。
![code_rain](C:\Users\15653\Desktop\102101535\102201525\result\巴黎奥运会\code_rain.gif)
**组件**:程序主要使用了 Python 的标准库和第三方库,包括 `random` 模块用于生成随机数,`Pillow` 库中的 `Image`、`ImageDraw`、`ImageFont` 模块用于创建和绘制图像,`imageio` 库用于将生成的图像帧合成为 GIF 动画。此外,程序需要一个支持中文的字体文件(如 `msyh.ttc`)和包含中英文字幕的文本文件 `subtitles.txt` 作为资源。
**设计思路**:程序首先从字幕文件中读取字幕内容,提取所有出现的字符来构建代码雨的字符集,确保代码雨中包含字幕中的中英文字符。然后,初始化图像尺寸、字体大小、代码雨列数等参数,随机设置每一列代码雨的起始位置。在每一帧中,绘制代码雨效果,字符从上至下随机下落,颜色由亮变暗模拟雨滴的视觉效果,同时在图像底部显示当前的字幕。通过循环更新代码雨的位置,生成连续的帧,最后将所有帧合成为一个动画 GIF呈现出动态的代码雨与字幕同步显示的效果。
(3.6)附加题展示。
爬取世界主流媒体的观点,预测事件走向。
原先是想爬取YouTube相关视频弹幕以此获取世界主流媒体看法后来遇到了种种困难时间也不太充裕了所有就在国内的新闻媒体上爬取了一些国外媒体的相关文字报道经过许多无效文字的筛选结果如下所示
英国广播公司BBC:顶尖运动员面临着骇人听闻的网络辱骂。今年,巴黎奥运会正试图保护他们免受这种辱骂,**人工智能算法正在从社交媒体用户发布的有关奥运会的海量内容中筛选出唯一的使命:消除网络辱骂。**
路透社Reuters:基于人工智能的攻击也有可能扰乱奥运会“无论是售票、欺诈还是操纵直播”。国际拳击协会AIBA使用人工智能技术审查贝尔格莱德世界拳赛的裁判和裁判员,麦克拉伦将其描述为“消除比赛操纵的一大历史性进步”,并表示该技术可以作为其他奥林匹克运动项目使用裁判和裁判员的蓝图。
华盛顿邮报The Washington Post:你相信吗?人工智能会模仿声音录制奥运会片段
法国世界报Le Monde[DJ 芭芭拉·布奇 (Barbara Butch) 在参加奥运会开幕式后提出网络骚扰投诉需要通过技术手段如AI来监控和防范。
中国日报:随着中国国家跳水队在巴黎奥运会上继续淘金,其人工智能训练系统正在体育创新领域引起轰动。传统的视频录制无法捕捉快速序列,并且后续的数据分析既耗时又不及时。基于百度类似 ChatGPT 的产品和大型语言模型 Ernie Bot该人工智能系统通过提供清晰、准确、全面的见解推进数据量化和分析潜水来解决这些挑战。
**观点分析**
各大媒体对2024年巴黎奥运会上人工智能的应用提出了不同的看法。BBC强调了顶尖运动员受到网络辱骂的威胁奥运会正利用人工智能算法筛选并消除这些不良内容保护运动员的心理健康。路透社警示了人工智能可能被用于攻击奥运会的风险如售票欺诈和直播操纵。AIBA通过AI技术审查裁判提升比赛的公平性被视为消除比赛操纵的历史性进步。华盛顿邮报提到人工智能可以模仿声音录制奥运片段暗示了AI在媒体内容生成方面的应用和潜在问题。法国世界报关注到参与者因网络骚扰提出投诉进一步凸显了网络安全的重要性。中国日报则报道了中国跳水队利用AI训练系统取得优异成绩展示了AI在体育训练和数据分析中的创新应用。
**事件走向**
人工智能预计将在2024年巴黎奥运会上发挥关键作用带来机遇与挑战并存的局面。一方面AI技术将广泛应用于赛事管理、运动员保护和训练提升推动奥运会的顺利和高水平进行。另一方面需要警惕AI可能被滥用于网络攻击、欺诈和虚假信息传播。为了最大化地发挥AI的正面作用奥运会组织者和各相关方可能会加强对AI技术的投入与监管通过国际合作制定相关规范确保赛事的公平、公正和安全。
自主发挥:爬取有趣的数据进行分析、制作数据可视化大屏等,有创意有乐趣即可。
我爬取了b站关键词为“2028年洛杉矶奥运会”的前300个视频中的弹幕并做了词云分析和数据可视化如下图
![outm2](C:\Users\15653\Desktop\102101535\102201525\result\2028洛杉矶奥运会\outm2.png)
![image-20240918115359939](C:\Users\15653\AppData\Roaming\Typora\typora-user-images\image-20240918115359939.png)
## **有趣的发现**
- **对经典元素的致敬**:如 **“工业革命”**、**“拳王阿里”**、**“慕尼黑惨案”**,观众对节目中历史元素的呈现反应热烈。
- **游戏与现实的交融****“洛圣都”**、**“GTA”** 的出现,体现了年轻观众将游戏文化与现实场景联系的思维方式。
- **网络流行语的运用****“yyds”**、**“要来力”** 等,展现了弹幕文化的活力和创意。
- ### **1. 高频词语解读**
- 唯一真神166 次)
- 可能指代某个在开幕式或宣传片中表现突出的元素或人物,引发观众的热议和赞叹。
- 工业革命66 次)
- 可能与开幕式中涉及工业革命主题的表演相关,激起了观众的共鸣。
- 要来力63 次)
- 这是 **“Eureka”**(尤里卡)的音译或谐音,可能在节目中出现,引发观众模仿发送。
- 洛圣都63 次)
- **“洛圣都”** 是游戏《侠盗猎车手 V》GTA V中的虚构城市原型是洛杉矶。观众可能因场景或氛围联想到游戏。
- 全体起立62 次)
- 表达对某个表演或人物的尊敬和赞美。
### **2. 情感倾向分析**
- **正面情感**
- **“太美了”**、**“震撼”**、**“好看”**、**“漂亮”**、**“帅”**、**“经典”** 等,体现了观众对节目内容的喜爱和赞赏。
- **致敬与怀念**
- **“拳王阿里”**、**“慕尼黑惨案”**、**“致敬”**,可能节目中有致敬历史人物或事件的环节。
- **期待与兴奋**
- **“超级期待”**、**“历史最佳”**、**“梦幻五环”**,观众对奥运会的期待和对表演的高度评价。
三、心得体会
(4.1)在这儿写下你完成本次作业的心得体会,当然,如果你还有想表达的东西但在上面两个板块没有体现,也可以写在这儿~10'
- 在本次作业真正入手实践之间在会运用爬虫技术爬取百度上的图片并不会甚至没听说过可以爬取b站弹幕。在动手实践时也是遇到了许多困难在爬虫代码的实现上所费时不多更多的是在爬取过多次被封后的等待时间以及在性能分析那块费时许久。git的使用也是摸索了好久遭遇种种错误之后终于调通了。
- 总的来说这次作业让我学习到了许多之前没了解过的领域如Code Quality Analysis、多线程并发执行、异步爬虫等等都是之前从未探索过的领域。同时更加熟悉了python的文件读取操作、matplotlib库进行绘图以及爬虫技术的拓展。最重要的是锻炼了独立面对一个繁琐项目的能力能够在有限的时间中完成各项任务也加强了面对异常情况的查阅资料处理能力对我来说这是一次启发性的项目体验能够让我在今后的学习生活中更好的独立完成任务。
- 在最后的多线程优化环节折腾了一天多的时间又是在deadline前爆赶在以后面对比较繁琐的任务时应该提前做好规划分成小块逐步完成。下次一定

@ -0,0 +1,44 @@
# Part·6 附录
## 6.1 PSP表格
PSP是卡耐基梅隆大学CMU的专家们针对软件工程师所提出的一套模型Personal Software Process (PSP 个人开发流程,或称个体软件过程)。
| **PSP2.1** | **Personal Software Process Stages** | **预估耗时(分钟)** | **实际耗时(分钟)** |
| :-------------------------------------- | --------------------------------------- | -------------------- | -------------------- |
| Planning | 计划 | | |
| · Estimate | · 估计这个任务需要多少时间 | | |
| Development | 开发 | | |
| · Analysis | · 需求分析 (包括学习新技术) | | |
| · Design Spec | · 生成设计文档 | | |
| · Design Review | · 设计复审 | | |
| · Coding Standard | · 代码规范 (为目前的开发制定合适的规范) | | |
| · Design | · 具体设计 | | |
| · Coding | · 具体编码 | | |
| · Code Review | · 代码复审 | | |
| · Test | · 测试(自我测试,修改代码,提交修改) | | |
| Reporting | 报告 | | |
| · Test Repor | · 测试报告 | | |
| · Size Measurement | · 计算工作量 | | |
| · Postmortem & Process Improvement Plan | · 事后总结, 并提出过程改进计划 | | |
| | · 合计 | | |
一个功能完备的程序不是一蹴而就的。可将一个大任务划分为可操作的小任务,同时最好按照任务难度或紧急程度指定各个任务的完成次序。因此,在动手开发之前,要先估计将在程序各模块开发所需耗费的时间,以及完成整个项目所需的时间,将这个[估计值]记录下来写成PSP 的形式。
PSP的目的是记录工程师如何实现需求的效率和我们使用项目管理工具如微软的Project Professional或者禅道等进行项目进度规划类似。
有关PSP的更多内容请自行阅读[邹欣老师的博客:现代软件工程讲义 2 工程师的能力评估和发展](http://www.cnblogs.com/xinz/archive/2011/10/22/2220872.html)
## 6.2 https://code.educoder.net/projects
请阅读邹欣老师的博客源代码管理了解源代码管理的10个实践问题。
本次作业要求使用https://code.educoder.net/projects进行源代码管理**代码有进展即签入https://code.educoder.net/projects**。签入记录不合理的项目会被助教抽查询问项目细节。
对代码签入的具体要求如下根据需求划分功能后每做完一个功能编译成功后应至少commit一次。本例中至少应区分基本功能和扩展功能即分别针对基本功能、扩展功能编译成功后总共至少应commit两次。具体的功能划分请自行定义并在撰写博客时体现出来遵循自己对需求的功能划分来提交代码即可。
对Commit不是很熟悉的话请阅读[阮一峰的博客Commit message 和 Change log 编写指南](http://www.ruanyifeng.com/blog/2016/01/commit_message_change_log.html),了解更多细节。
## 6.3 单元测试
请根据自己以往积累的测试经验,在编码完成之后,提交产品之前,设计测试用例,并编写单元测试,对自己的项目进行测试。
首先至少应采用白盒测试用例设计方法来设计测试用例其他测试方法不限。其次要设计至少10个测试用例确保你的程序能够正确处理各种情况。最后结合测试评估的要求对自己的测试设计进行评价这些测试用例能满足该程序测试的要求吗
另一个重要的措施是要把单元测试自动化,这样每个人都能很容易地运行它,并且可以使单元测试每天都运行。每个人都可以随时在自己的机器上运行。团队一般是在每日构建中运行单元测试的,这样每个单元测试的错误就能及时被发现并得到修改。
推荐阅读[邹欣老师的博客:现代软件工程讲义 2 开发技术 - 单元测试 & 回归测试](http://www.cnblogs.com/xinz/archive/2011/11/20/2255830.html)

@ -0,0 +1,211 @@
import sys
import asyncio
# 如果是在 Windows 平台上运行,则设置事件循环策略为 SelectorEventLoopPolicy
# 这是为了避免在 Windows 上运行 asyncio 时可能出现的问题
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import collections # 用于词频统计
import json # 用于处理 JSON 数据
import aiohttp # 用于异步 HTTP 请求
import asyncio # 用于异步操作
import re # 正则表达式模块,用于解析弹幕
import openpyxl # 用于处理 Excel 文件
import pandas as pd # 用于数据处理
import cProfile # 用于性能分析
# 创建性能分析器实例,并开始性能分析
profile = cProfile.Profile()
profile.enable()
# 定义开始和结束日期,用于生成日期范围(虽然在代码中未使用此变量)
startdate = '2023-01-10'
enddate = '2024-09-15'
# 生成日期列表,格式为 'YYYY-MM-DD'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')]
# 定义 Excel 文件名,用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建一个新的 Excel 工作簿和工作表,并添加标题行 '弹幕'
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义 B 站弹幕 API 的基础 URL其中 {number} 是占位符,用于填充视频的 cid 号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头,包含 cookie 和 user-agent用于伪装请求
headers = {
'cookie': "buvid3=D65868DE-AFD5-34A4-1714-A1C0F783C5DC27124infoc; b_nut=1725930527; _uuid=FF569C27-D2C6-10814-36A8-48AA8141364924857infoc; CURRENT_FNVAL=4048; buvid_fp=2ba89565eab107e1e14c7982fc1ef9ea; buvid4=FAB9A58B-B8F5-8DAF-2AC4-4E874D3D1F0E28371-024091001-a%2FA7nVxQVETBwJOeuHlVsQ%3D%3D; rpdid=|(u))kkYu|lu0J'u~klmJ|lkm; DedeUserID=1917958039; DedeUserID__ckMd5=eaa26b970b7e3104; header_theme_version=CLOSE; enable_web_push=DISABLE; home_feed_column=5; browser_resolution=1536-730; bp_t_offset_1917958039=976131738646347776; SESSDATA=388559ba%2C1742109747%2Cdc4ae%2A91CjByrR8jH29CX_2ZktYkcWo9nu9b6csyqZX7Z52SZr2CCIHMMi3VsnapfsZ3vuXyPv4SVlhpOC1KYWpoNVExc0RPSFFCVG0zTGxwUDZNdWs5NDU4ZktBYmkzRUlFSHlDbVF2cVk4RHhDWG5BTFRxVl9oM25JdUJxWDhTWnE2dENQQ3FERUNqSER3IIEC; bili_jct=1e53039d135ff42e0131bfed9b577c34; sid=69pzvo9r; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjY4MTY5NTMsImlhdCI6MTcyNjU1NzY5MywicGx0IjotMX0.-gfoOKe3UugjUrDKiGu2ggTujyv2qI_7XZ_usWbEMvI; bili_ticket_expires=1726816893; b_lsid=AEC784B8_19203376015",
'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
# 全局缓存,用于存储 bvid 和 cid避免重复请求
bvid_cache = {}
cid_cache = {}
# 异步函数:获取 bvid带缓存功能
async def get_bvid(session, page, index):
# 如果已经在缓存中,则直接返回缓存的 bvid
if (page, index) in bvid_cache:
return bvid_cache[(page, index)]
# 构造 API 请求的 URL查询指定页码和关键字的视频
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page}&page_size=50&keyword=2028%E5%B9%B4%E6%B4%9B%E6%9D%89%E7%9F%B6%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_data = await response.json()
# 提取第 index 个视频的 bvid
bvid = json_data['data']['result'][index]['bvid']
# 将 bvid 存入缓存
bvid_cache[(page, index)] = bvid
return bvid
except (KeyError, IndexError, json.JSONDecodeError) as e:
# 如果出现异常,打印错误信息和响应内容,返回 None
print(f"获取 bvid 时出错: {e}")
print(f"响应状态码: {response.status}")
text = await response.text()
print(f"响应内容: {text}")
return None
# 异步函数:获取 cid带缓存功能
async def get_cid(session, bvid):
# 如果 bvid 已经在缓存中,则直接返回缓存的 cid
if bvid in cid_cache:
return cid_cache[bvid]
# 构造 API 请求的 URL查询指定 bvid 的视频信息
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_dict = await response.json()
# 提取第一个视频的 cid
cid = json_dict['data'][0]['cid']
# 将 cid 存入缓存
cid_cache[bvid] = cid
return cid
except (KeyError, IndexError, json.JSONDecodeError):
# 如果出现异常,返回 None
return None
# 异步函数:获取并保存某个视频的弹幕
async def fetch_and_save_bulletchat(session, cid):
# 使用 cid 构造弹幕 API 的 URL
url = tempApi.replace("{number}", str(cid))
try:
# 发送异步 GET 请求
async with session.get(url) as response:
# 获取响应的文本内容XML 格式)
response_text = await response.text()
# 使用正则表达式提取所有弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response_text)
# 如果有弹幕数据,返回列表,否则返回空列表
return data if data else []
except:
# 如果出现异常,返回空列表
return []
# 异步函数:处理并发任务,收集所有弹幕数据
async def fetch_all_bulletchats(session):
all_bulletchats = [] # 用于存储所有的弹幕数据
tasks = [] # 用于存储所有的异步任务
total_requests = 6 * 50 # 总共请求 6 页,每页 50 个视频,共 300 个视频
for i in range(total_requests):
page_number = i // 50 + 1 # 计算当前请求的页码
index = i % 50 # 计算当前页内的索引
# 创建异步任务,获取每个视频的弹幕数据
tasks.append(asyncio.ensure_future(fetch_bulletchat_data(session, page_number, index)))
# 使用 asyncio.as_completed 来迭代已完成的任务
for task in asyncio.as_completed(tasks):
bulletchat_data = await task
if bulletchat_data:
# 将获取的弹幕数据添加到总列表中
all_bulletchats.extend(bulletchat_data)
return all_bulletchats # 返回所有的弹幕数据
# 异步函数:获取单个视频的弹幕数据
async def fetch_bulletchat_data(session, page_number, index):
# 获取视频的 bvid
bvid = await get_bvid(session, page_number, index)
if bvid:
# 获取视频的 cid
cid = await get_cid(session, bvid)
if cid:
# 获取并返回视频的弹幕数据
return await fetch_and_save_bulletchat(session, cid)
return [] # 如果获取失败,返回空列表
# 函数:保存弹幕数据到文本文件和 Excel 文件
def save_to_file(bulletchats):
# 以追加模式打开文本文件,编码为 utf-8
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
# 将每条弹幕写入文本文件,并换行
file_txt.write(index + '\n')
# 将弹幕写入 Excel 表格
total_sheet.append([index])
# 保存 Excel 文件
total_workbook.save(file_xlsx)
# 函数:计算弹幕频次,并保存到 Excel 文件
def calculate_frequency():
try:
# 读取 Excel 文件中的弹幕数据
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
# 将字符串按照空格分割为单词列表
words = text.split()
# 使用 collections.Counter 统计词频
word_counts = collections.Counter(words)
# 将词频按照出现次数从高到低排序
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
# 创建一个新的 Excel 工作簿和工作表,并添加标题行
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次'])
# 将排序后的词频数据写入 Excel 表格
for word, count in sorted_word_counts:
sheet.append([word, count])
# 保存统计结果到新的 Excel 文件
workbook.save('我的统计弹幕出现次数.xlsx')
except Exception as e:
# 如果出现异常,打印错误信息
print(f"计算频次时出错: {e}")
# 异步主函数,负责执行整个流程
async def main():
# 创建一个异步的 HTTP 会话,使用指定的请求头
async with aiohttp.ClientSession(headers=headers) as session:
# 异步获取所有弹幕数据
bulletchats = await fetch_all_bulletchats(session)
# 保存弹幕数据到文件
save_to_file(bulletchats)
# 计算弹幕频次并保存结果
calculate_frequency()
# 输出流程结束信息
print("Finished")
# 启动异步任务
if __name__ == '__main__':
asyncio.run(main())
# 停止性能分析
profile.disable()
# 将性能分析数据保存到文件中
profile.dump_stats('./youhua.prof')

@ -0,0 +1,37 @@
import re
from collections import Counter
import pandas as pd
# 读取txt文件
with open('提取.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 定义AI应用的关键词
ai_categories = {
'AI生成/合成': [r'AI生成', r'AI合成', r'AI图', r'一眼AI', r'AI'],
'AI视频': [r'AI视频', r'AI合成的视频'],
'AI修复': [r'AI修复', r'AI超分', r'超分辨率'],
'AI音效/配音': [r'AI音效', r'AI配音'],
'AI训练': [r'AI训练']
}
# 统计AI应用类别的出现次数
category_count = Counter()
# 遍历每一行弹幕匹配AI应用的关键词
for line in lines:
for category, keywords in ai_categories.items():
if any(re.search(keyword, line, re.IGNORECASE) for keyword in keywords):
category_count[category] += 1
# 将统计结果转换为pandas DataFrame
df = pd.DataFrame(list(category_count.items()), columns=['AI应用类别', '数量'])
# 按数量从大到小排序
df_sorted = df.sort_values(by='数量', ascending=False)
# 保存结果到Excel文件
df_sorted.to_excel('AI应用统计结果.xlsx', index=False)
# 输出提示
print("AI应用统计结果已保存为 'AI应用统计结果.xlsx',数据已按数量从大到小排序。")

@ -0,0 +1,16 @@
import re
# 读取txt文件
with open('我的全部弹幕.txt', 'r', encoding='utf-8') as file:
lines = file.readlines()
# 使用正则表达式提取包含“ai”或“AI”的弹幕去掉单词边界限制
ai_danmu = [line.strip() for line in lines if re.search(r'[Aa][Ii]', line)]
# 保存结果到新文件
with open('提取.txt', 'w', encoding='utf-8') as output_file:
for danmu in ai_danmu:
output_file.write(danmu + '\n')
# 输出提取的弹幕
print(f"提取了 {len(ai_danmu)} 条弹幕,其中包含 'ai''AI' 关键字。")

@ -0,0 +1,41 @@
import os
from os import path
from wordcloud import WordCloud
import jieba
from imageio.v3 import imread
def Bulletchat_Wordcloud():
try:
#获取当前文件路径
d = path.dirname(__file__) if "__file__" in locals() else os.getcwd()
text = open(path.join(d,r'2028.txt'), 'rb').read()
# 设置模板图
img_mask = imread(r'm1.png')
# 对弹幕进行精确模式分词
text_list = jieba.lcut(text,cut_all=False)
text_str = ' '.join(text_list)
# print(text_str)
# 设置中文字体
font_path = r'C:\Users\15653\msyh.ttc'
# 停止词设置
stopwords = set('')
stopwords.update(['','','','','','','什么','所以','','','','','','','',''])
wc = WordCloud(
font_path=font_path,
#max_words=500, # 最多词个数
#min_font_size=15,
#max_font_size=100,
mask=img_mask,
background_color='white',
stopwords=stopwords,
colormap='copper'
)
wc.generate(text_str)
wc.to_file(r'outm1.png')
except Exception as e:
print(f"词云图生成异常: {e}")
if __name__ == '__main__':
Bulletchat_Wordcloud()

@ -0,0 +1,97 @@
from PIL import Image, ImageDraw, ImageFont
import random
import imageio
import os
import re
# 参数设置
WIDTH = 800 # 图片宽度
HEIGHT = 600 # 图片高度
FONT_SIZE = 20 # 字体大小
COL_WIDTH = FONT_SIZE
NUM_COLS = WIDTH // COL_WIDTH
MAX_TRAIL_LENGTH = 15 # 代码雨最大长度
FRAMES_PER_SUBTITLE = 10 # 每条字幕的帧数
OUTPUT_GIF = 'code_rain.gif' # 输出 GIF 文件名
def read_subtitles(filename):
"""读取字幕文件"""
with open(filename, 'r', encoding='utf-8') as f:
subtitles = f.readlines()
subtitles = [line.strip() for line in subtitles if line.strip()]
return subtitles
def extract_characters(subtitles):
"""从字幕中提取所有出现的字符"""
chars_set = set()
for line in subtitles:
chars_set.update(line)
return list(chars_set)
def main():
# 读取字幕
subtitles = read_subtitles(r'提取.txt')
# 提取字符集
CHARS = extract_characters(subtitles)
# 添加额外的字符(可选)
CHARS.extend(list('ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789@#$%^&*()+-'))
# 加载字体(请确保字体文件存在)
font_path = r'msyh.ttc' # 替换为您的字体文件完整路径,确保支持中文
# 检查字体文件是否存在
if not os.path.exists(font_path):
print(f"未找到字体文件 '{font_path}'。请提供正确的字体路径。")
return
try:
font = ImageFont.truetype(font_path, FONT_SIZE)
except OSError:
print(f"无法打开字体文件 '{font_path}'。请检查文件是否损坏或权限设置。")
return
# 初始化列的位置
column_positions = [random.randint(-HEIGHT, 0) for _ in range(NUM_COLS)]
frames = []
for subtitle in subtitles:
for _ in range(FRAMES_PER_SUBTITLE):
# 创建新图像
img = Image.new('RGB', (WIDTH, HEIGHT), color='black')
draw = ImageDraw.Draw(img)
# 绘制代码雨
for i in range(NUM_COLS):
x = i * COL_WIDTH
y = column_positions[i]
for j in range(MAX_TRAIL_LENGTH):
char = random.choice(CHARS)
y_pos = y - j * FONT_SIZE
if y_pos < 0 or y_pos > HEIGHT:
continue
green_value = int(255 / MAX_TRAIL_LENGTH * (MAX_TRAIL_LENGTH - j))
draw.text((x, y_pos), char, font=font, fill=(0, green_value, 0))
# 更新列的位置
column_positions[i] = (y + FONT_SIZE) % (HEIGHT + FONT_SIZE * MAX_TRAIL_LENGTH)
# 绘制字幕
bbox = draw.textbbox((0, 0), subtitle, font=font)
text_width = bbox[2] - bbox[0]
text_height = bbox[3] - bbox[1]
text_x = (WIDTH - text_width) // 2
text_y = HEIGHT - text_height - 10 # 距离底部 10 像素
draw.text((text_x, text_y), subtitle, font=font, fill=(255, 255, 255))
# 添加帧
frames.append(img)
# 保存为 GIF
frames[0].save(OUTPUT_GIF, save_all=True, append_images=frames[1:], optimize=False, duration=100, loop=0)
print(f"GIF 已保存为 '{OUTPUT_GIF}'")
if __name__ == '__main__':
main()

@ -0,0 +1,160 @@
import collections # 用于词频统计
import json # 用于处理JSON数据
import requests # 用于发送HTTP请求
import re # 正则表达式模块,用于解析弹幕
import time # 用于时间相关操作
import openpyxl # 用于处理Excel文件
import pandas as pd # 用于数据处理
from concurrent.futures import ThreadPoolExecutor, as_completed # 用于并发操作
import cProfile
profile = cProfile.Profile()
profile.enable()
# 定义开始和结束日期,用于生成日期范围
startdate = '20240710'
enddate = '20240910'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')] # 生成日期列表
# 定义Excel文件名用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建Excel工作簿和工作表并添加标题行
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义B站弹幕API的基础URL{number}是占位符用于填充视频的cid号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头包含cookie和user-agent用于伪装请求
headers = {
'cookie':"buvid3=D65868DE-AFD5-34A4-1714-A1C0F783C5DC27124infoc; b_nut=1725930527; _uuid=FF569C27-D2C6-10814-36A8-48AA8141364924857infoc; CURRENT_FNVAL=4048; buvid_fp=2ba89565eab107e1e14c7982fc1ef9ea; buvid4=FAB9A58B-B8F5-8DAF-2AC4-4E874D3D1F0E28371-024091001-a%2FA7nVxQVETBwJOeuHlVsQ%3D%3D; rpdid=|(u))kkYu|lu0J'u~klmJ|lkm; SESSDATA=e8f35e7e%2C1741482645%2Cb3572%2A91CjC7hBYEVq-d38AwweerB9sclbgqT78LR6aribbsaBRVlJ0BoUjCMidR-nm82eDlo70SVlVibjl1UnQ0Y0NzSFFCb21DRGNNSXp4YnRSbFdzMXo0NjR4QkM0TlBKejUweW1TbDJkT0g3Z2Z6bTdmQVJzdmpvVHZmR1JWOEhtbnFGZmpuQUt6WXZnIIEC; bili_jct=7d37b038ea7714a0c41ec3d26603737b; DedeUserID=1917958039; DedeUserID__ckMd5=eaa26b970b7e3104; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjYxOTIwNTMsImlhdCI6MTcyNTkzMjc5MywicGx0IjotMX0.82V6_w7kGoSvzDy9rT-9DpsL7U_BrB24GefbBM0Vvb8; bili_ticket_expires=1726191993; header_theme_version=CLOSE; enable_web_push=DISABLE; home_feed_column=5; browser_resolution=1536-730; b_lsid=953CBCA8_191E441EE95; bp_t_offset_1917958039=976131738646347776; sid=hl295qcj",
'user-agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
# 定义函数获取搜索结果中的bvid视频的唯一标识符
def get_bvid(page_number, number):
# 构造搜索API的URLpage_number是页码number是该页中的视频编号
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page_number}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
response = requests.get(url=url, headers=headers) # 发送请求
try:
# 解析返回的JSON数据提取视频的bvid
json_data = json.loads(response.text)
print(json_data)
bvid = json_data['data']['result'][number]['bvid']
print(f"获取到bvid: {bvid}")
return bvid # 返回bvid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException) as e:
print(f"获取bvid时出错: {e}")
# 捕获错误并返回None防止程序崩溃
return None
# 定义函数根据bvid获取视频的cid弹幕对应的唯一标识符
def get_cid(bvid):
try:
# 通过bvid构造获取cid的API请求URL
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
response = requests.get(url, headers=headers) # 发送请求
if response.status_code != 200:
# 如果请求状态码不是200返回None
return None
# 解析返回的JSON数据提取cid
json_dict = json.loads(response.text)
return json_dict['data'][0]['cid'] # 返回cid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException):
return None # 捕获错误并返回None
# 定义函数,获取并保存某个视频的弹幕
def fetch_and_save_bulletchat(cid):
# 用cid替换API中的占位符
url = tempApi.replace("{number}", str(cid))
try:
# 发送请求获取弹幕数据
response = requests.get(url, headers=headers)
response.encoding = response.apparent_encoding # 设置编码
# 使用正则表达式解析弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response.text)
if data:
return data # 返回弹幕列表
except requests.RequestException:
return [] # 如果请求失败,返回空列表
# 定义函数批量获取bvid和cid并创建并发任务
def put_api():
tasks = []
# 使用ThreadPoolExecutor创建线程池用于并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
# 控制页码范围1到5页每页50个视频
for i in range(1, 7):
for j in range(50):
bvid = get_bvid(i, j) # 获取bvid
if bvid:
cid = get_cid(bvid) # 获取cid
if cid:
# 提交弹幕抓取任务到线程池
tasks.append(executor.submit(fetch_and_save_bulletchat, cid))
return tasks # 返回任务列表
# 定义函数,处理并发任务,收集所有弹幕数据
def get_data(tasks):
all_bulletchats = []
# 遍历所有完成的任务,获取结果
for task in as_completed(tasks):
bulletchat_data = task.result()
if bulletchat_data:
all_bulletchats.extend(bulletchat_data) # 将弹幕数据加入总列表
return all_bulletchats # 返回所有弹幕数据
# 定义函数将弹幕数据保存到文件和Excel中
def save_to_file(bulletchats):
# 打开文本文件,将弹幕逐行写入
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
file_txt.write(index + '\n')
total_sheet.append([index]) # 将弹幕写入Excel表格
total_workbook.save(file_xlsx) # 保存Excel文件
# 定义函数计算弹幕频次并保存到Excel
def calculate_frequency():
try:
# 读取弹幕Excel文件
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
words = text.split() # 将弹幕分割为单词
word_counts = collections.Counter(words) # 统计单词频次
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # 按频次排序
# 创建新的Excel工作簿用于保存频次统计结果
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次']) # 添加标题行
# 将排序后的词频结果写入Excel
for word, count in sorted_word_counts:
sheet.append([word, count])
workbook.save('我的统计弹幕出现次数.xlsx') # 保存频次统计的Excel文件
except Exception as e:
print(f"计算频次时出错: {e}")
# 主函数,负责执行整个流程
def main():
tasks = put_api() # 获取bvid和cid并创建并发任务
bulletchats = get_data(tasks) # 获取所有弹幕数据
save_to_file(bulletchats) # 保存弹幕数据到文件和Excel
calculate_frequency() # 计算弹幕频次
print("Finished") # 输出流程结束信息
# 如果此脚本被直接运行则调用main函数
if __name__ == '__main__':
main()
profile.disable()
# Save the profiling data to a file
profile.dump_stats('./output.prof')

@ -0,0 +1,149 @@
import collections # 用于词频统计
import json # 用于处理JSON数据
import requests # 用于发送HTTP请求
import re # 正则表达式模块,用于解析弹幕
import time # 用于时间相关操作
import openpyxl # 用于处理Excel文件
import pandas as pd # 用于数据处理
from concurrent.futures import ThreadPoolExecutor, as_completed # 用于并发操作
# 定义开始和结束日期,用于生成日期范围
startdate = '20240710'
enddate = '20240910'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')] # 生成日期列表
# 定义Excel文件名用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建Excel工作簿和工作表并添加标题行
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义B站弹幕API的基础URL{number}是占位符用于填充视频的cid号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头包含cookie和user-agent用于伪装请求
headers = {
'cookie':"buvid3=D65868DE-AFD5-34A4-1714-A1C0F783C5DC27124infoc; b_nut=1725930527; _uuid=FF569C27-D2C6-10814-36A8-48AA8141364924857infoc; CURRENT_FNVAL=4048; buvid_fp=2ba89565eab107e1e14c7982fc1ef9ea; buvid4=FAB9A58B-B8F5-8DAF-2AC4-4E874D3D1F0E28371-024091001-a%2FA7nVxQVETBwJOeuHlVsQ%3D%3D; rpdid=|(u))kkYu|lu0J'u~klmJ|lkm; SESSDATA=e8f35e7e%2C1741482645%2Cb3572%2A91CjC7hBYEVq-d38AwweerB9sclbgqT78LR6aribbsaBRVlJ0BoUjCMidR-nm82eDlo70SVlVibjl1UnQ0Y0NzSFFCb21DRGNNSXp4YnRSbFdzMXo0NjR4QkM0TlBKejUweW1TbDJkT0g3Z2Z6bTdmQVJzdmpvVHZmR1JWOEhtbnFGZmpuQUt6WXZnIIEC; bili_jct=7d37b038ea7714a0c41ec3d26603737b; DedeUserID=1917958039; DedeUserID__ckMd5=eaa26b970b7e3104; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjYxOTIwNTMsImlhdCI6MTcyNTkzMjc5MywicGx0IjotMX0.82V6_w7kGoSvzDy9rT-9DpsL7U_BrB24GefbBM0Vvb8; bili_ticket_expires=1726191993; header_theme_version=CLOSE; enable_web_push=DISABLE; home_feed_column=5; browser_resolution=1536-730; b_lsid=953CBCA8_191E441EE95; bp_t_offset_1917958039=976131738646347776; sid=hl295qcj",
'user-agent':"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/128.0.0.0 Safari/537.36"
}
# 定义函数获取搜索结果中的bvid视频的唯一标识符
def get_bvid(page_number, number):
# 构造搜索API的URLpage_number是页码number是该页中的视频编号
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page_number}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
response = requests.get(url=url, headers=headers) # 发送请求
try:
# 解析返回的JSON数据提取视频的bvid
json_data = json.loads(response.text)
print(json_data)
bvid = json_data['data']['result'][number]['bvid']
print(f"获取到bvid: {bvid}")
return bvid # 返回bvid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException) as e:
print(f"获取bvid时出错: {e}")
# 捕获错误并返回None防止程序崩溃
return None
# 定义函数根据bvid获取视频的cid弹幕对应的唯一标识符
def get_cid(bvid):
try:
# 通过bvid构造获取cid的API请求URL
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
response = requests.get(url, headers=headers) # 发送请求
if response.status_code != 200:
# 如果请求状态码不是200返回None
return None
# 解析返回的JSON数据提取cid
json_dict = json.loads(response.text)
return json_dict['data'][0]['cid'] # 返回cid
except (KeyError, IndexError, json.JSONDecodeError, requests.RequestException):
return None # 捕获错误并返回None
# 定义函数,获取并保存某个视频的弹幕
def fetch_and_save_bulletchat(cid):
# 用cid替换API中的占位符
url = tempApi.replace("{number}", str(cid))
try:
# 发送请求获取弹幕数据
response = requests.get(url, headers=headers)
response.encoding = response.apparent_encoding # 设置编码
# 使用正则表达式解析弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response.text)
if data:
return data # 返回弹幕列表
except requests.RequestException:
return [] # 如果请求失败,返回空列表
# 定义函数批量获取bvid和cid并创建并发任务
def put_api():
tasks = []
# 使用ThreadPoolExecutor创建线程池用于并发请求
with ThreadPoolExecutor(max_workers=10) as executor:
# 控制页码范围1到5页每页50个视频
for i in range(1, 7):
for j in range(50):
bvid = get_bvid(i, j) # 获取bvid
if bvid:
cid = get_cid(bvid) # 获取cid
if cid:
# 提交弹幕抓取任务到线程池
tasks.append(executor.submit(fetch_and_save_bulletchat, cid))
return tasks # 返回任务列表
# 定义函数,处理并发任务,收集所有弹幕数据
def get_data(tasks):
all_bulletchats = []
# 遍历所有完成的任务,获取结果
for task in as_completed(tasks):
bulletchat_data = task.result()
if bulletchat_data:
all_bulletchats.extend(bulletchat_data) # 将弹幕数据加入总列表
return all_bulletchats # 返回所有弹幕数据
# 定义函数将弹幕数据保存到文件和Excel中
def save_to_file(bulletchats):
# 打开文本文件,将弹幕逐行写入
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
file_txt.write(index + '\n')
total_sheet.append([index]) # 将弹幕写入Excel表格
total_workbook.save(file_xlsx) # 保存Excel文件
# 定义函数计算弹幕频次并保存到Excel
def calculate_frequency():
try:
# 读取弹幕Excel文件
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
words = text.split() # 将弹幕分割为单词
word_counts = collections.Counter(words) # 统计单词频次
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) # 按频次排序
# 创建新的Excel工作簿用于保存频次统计结果
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次']) # 添加标题行
# 将排序后的词频结果写入Excel
for word, count in sorted_word_counts:
sheet.append([word, count])
workbook.save('我的统计弹幕出现次数.xlsx') # 保存频次统计的Excel文件
except Exception as e:
print(f"计算频次时出错: {e}")
# 主函数,负责执行整个流程
def main():
tasks = put_api() # 获取bvid和cid并创建并发任务
bulletchats = get_data(tasks) # 获取所有弹幕数据
save_to_file(bulletchats) # 保存弹幕数据到文件和Excel
calculate_frequency() # 计算弹幕频次
print("Finished") # 输出流程结束信息
# 如果此脚本被直接运行则调用main函数
if __name__ == '__main__':
main()

@ -0,0 +1,211 @@
import sys
import asyncio
# 如果是在 Windows 平台上运行,则设置事件循环策略为 SelectorEventLoopPolicy
# 这是为了避免在 Windows 上运行 asyncio 时可能出现的问题
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
import collections # 用于词频统计
import json # 用于处理 JSON 数据
import aiohttp # 用于异步 HTTP 请求
import asyncio # 用于异步操作
import re # 正则表达式模块,用于解析弹幕
import openpyxl # 用于处理 Excel 文件
import pandas as pd # 用于数据处理
import cProfile # 用于性能分析
# 创建性能分析器实例,并开始性能分析
profile = cProfile.Profile()
profile.enable()
# 定义开始和结束日期,用于生成日期范围(虽然在代码中未使用此变量)
startdate = '2024-07-10'
enddate = '2024-09-10'
# 生成日期列表,格式为 'YYYY-MM-DD'
date = [x for x in pd.date_range(startdate, enddate).strftime('%Y-%m-%d')]
# 定义 Excel 文件名,用于保存弹幕数据
file_xlsx = '我的全部弹幕.xlsx'
# 创建一个新的 Excel 工作簿和工作表,并添加标题行 '弹幕'
total_workbook = openpyxl.Workbook()
total_sheet = total_workbook.active
total_sheet.append(['弹幕'])
# 定义 B 站弹幕 API 的基础 URL其中 {number} 是占位符,用于填充视频的 cid 号
tempApi = 'https://api.bilibili.com/x/v1/dm/list.so?oid={number}'
# 定义请求头,包含 cookie 和 user-agent用于伪装请求
headers = {
'cookie': "您的 B 站 Cookie 值",
'user-agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64)..."
}
# 全局缓存,用于存储 bvid 和 cid避免重复请求
bvid_cache = {}
cid_cache = {}
# 异步函数:获取 bvid带缓存功能
async def get_bvid(session, page, index):
# 如果已经在缓存中,则直接返回缓存的 bvid
if (page, index) in bvid_cache:
return bvid_cache[(page, index)]
# 构造 API 请求的 URL查询指定页码和关键字的视频
url = f'https://api.bilibili.com/x/web-interface/search/type?page={page}&page_size=50&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A&search_type=video'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_data = await response.json()
# 提取第 index 个视频的 bvid
bvid = json_data['data']['result'][index]['bvid']
# 将 bvid 存入缓存
bvid_cache[(page, index)] = bvid
return bvid
except (KeyError, IndexError, json.JSONDecodeError) as e:
# 如果出现异常,打印错误信息和响应内容,返回 None
print(f"获取 bvid 时出错: {e}")
print(f"响应状态码: {response.status}")
text = await response.text()
print(f"响应内容: {text}")
return None
# 异步函数:获取 cid带缓存功能
async def get_cid(session, bvid):
# 如果 bvid 已经在缓存中,则直接返回缓存的 cid
if bvid in cid_cache:
return cid_cache[bvid]
# 构造 API 请求的 URL查询指定 bvid 的视频信息
url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}&jsonp=jsonp'
# 发送异步 GET 请求
async with session.get(url) as response:
try:
# 尝试将响应内容解析为 JSON 格式
json_dict = await response.json()
# 提取第一个视频的 cid
cid = json_dict['data'][0]['cid']
# 将 cid 存入缓存
cid_cache[bvid] = cid
return cid
except (KeyError, IndexError, json.JSONDecodeError):
# 如果出现异常,返回 None
return None
# 异步函数:获取并保存某个视频的弹幕
async def fetch_and_save_bulletchat(session, cid):
# 使用 cid 构造弹幕 API 的 URL
url = tempApi.replace("{number}", str(cid))
try:
# 发送异步 GET 请求
async with session.get(url) as response:
# 获取响应的文本内容XML 格式)
response_text = await response.text()
# 使用正则表达式提取所有弹幕内容
data = re.findall('<d p=".*?">(.*?)</d>', response_text)
# 如果有弹幕数据,返回列表,否则返回空列表
return data if data else []
except:
# 如果出现异常,返回空列表
return []
# 异步函数:处理并发任务,收集所有弹幕数据
async def fetch_all_bulletchats(session):
all_bulletchats = [] # 用于存储所有的弹幕数据
tasks = [] # 用于存储所有的异步任务
total_requests = 6 * 50 # 总共请求 6 页,每页 50 个视频,共 300 个视频
for i in range(total_requests):
page_number = i // 50 + 1 # 计算当前请求的页码
index = i % 50 # 计算当前页内的索引
# 创建异步任务,获取每个视频的弹幕数据
tasks.append(asyncio.ensure_future(fetch_bulletchat_data(session, page_number, index)))
# 使用 asyncio.as_completed 来迭代已完成的任务
for task in asyncio.as_completed(tasks):
bulletchat_data = await task
if bulletchat_data:
# 将获取的弹幕数据添加到总列表中
all_bulletchats.extend(bulletchat_data)
return all_bulletchats # 返回所有的弹幕数据
# 异步函数:获取单个视频的弹幕数据
async def fetch_bulletchat_data(session, page_number, index):
# 获取视频的 bvid
bvid = await get_bvid(session, page_number, index)
if bvid:
# 获取视频的 cid
cid = await get_cid(session, bvid)
if cid:
# 获取并返回视频的弹幕数据
return await fetch_and_save_bulletchat(session, cid)
return [] # 如果获取失败,返回空列表
# 函数:保存弹幕数据到文本文件和 Excel 文件
def save_to_file(bulletchats):
# 以追加模式打开文本文件,编码为 utf-8
with open('我的全部弹幕.txt', 'a', encoding='utf-8') as file_txt:
for index in bulletchats:
# 将每条弹幕写入文本文件,并换行
file_txt.write(index + '\n')
# 将弹幕写入 Excel 表格
total_sheet.append([index])
# 保存 Excel 文件
total_workbook.save(file_xlsx)
# 函数:计算弹幕频次,并保存到 Excel 文件
def calculate_frequency():
try:
# 读取 Excel 文件中的弹幕数据
fd = pd.read_excel(file_xlsx)
lines = fd['弹幕']
# 将所有弹幕拼接成一个字符串
text = ' '.join(lines.astype(str))
# 将字符串按照空格分割为单词列表
words = text.split()
# 使用 collections.Counter 统计词频
word_counts = collections.Counter(words)
# 将词频按照出现次数从高到低排序
sorted_word_counts = sorted(word_counts.items(), key=lambda x: x[1], reverse=True)
# 创建一个新的 Excel 工作簿和工作表,并添加标题行
workbook = openpyxl.Workbook()
sheet = workbook.active
sheet.append(['弹幕', '频次'])
# 将排序后的词频数据写入 Excel 表格
for word, count in sorted_word_counts:
sheet.append([word, count])
# 保存统计结果到新的 Excel 文件
workbook.save('我的统计弹幕出现次数.xlsx')
except Exception as e:
# 如果出现异常,打印错误信息
print(f"计算频次时出错: {e}")
# 异步主函数,负责执行整个流程
async def main():
# 创建一个异步的 HTTP 会话,使用指定的请求头
async with aiohttp.ClientSession(headers=headers) as session:
# 异步获取所有弹幕数据
bulletchats = await fetch_all_bulletchats(session)
# 保存弹幕数据到文件
save_to_file(bulletchats)
# 计算弹幕频次并保存结果
calculate_frequency()
# 输出流程结束信息
print("Finished")
# 启动异步任务
if __name__ == '__main__':
asyncio.run(main())
# 停止性能分析
profile.disable()
# 将性能分析数据保存到文件中
profile.dump_stats('./youhua.prof')

Binary file not shown.

@ -0,0 +1,62 @@
import requests
import json
import certifi
# 请在此处替换为您的 News API 密钥
API_KEY = '04c1848491d143f9a9af8a64655167e8'
# 定义查询参数
query = '2024 Paris Olympics AND AI technology'
language = 'en'
page_size = 100
# 定义请求 URL 和参数
url = 'https://newsapi.org/v2/everything'
params = {
'q': query,
'language': language,
'pageSize': page_size,
'apiKey': API_KEY
}
def fetch_articles():
all_articles = []
page = 1
while True:
params['page'] = page
try:
response = requests.get(url, params=params, proxies={"http": None, "https": None})
response.raise_for_status() # 检查HTTP错误
data = response.json()
if data['status'] != 'ok':
print(f"获取文章时出错:{data.get('message')}")
break
articles = data.get('articles', [])
if not articles:
break
all_articles.extend(articles)
print(f"已获取第 {page} 页,共 {len(all_articles)} 篇文章")
page += 1
if len(all_articles) >= 100:
break
except requests.exceptions.RequestException as e:
print(f"请求出现异常:{e}")
break
return all_articles
def save_articles(articles, filename='articles.json'):
with open(filename, 'w', encoding='utf-8') as f:
json.dump(articles, f, ensure_ascii=False, indent=4)
print(f"已将 {len(articles)} 篇文章保存到 {filename}")
if __name__ == '__main__':
articles = fetch_articles()
if articles:
save_articles(articles)
Loading…
Cancel
Save