import itertools import re import time from collections import Counter from multiprocessing import Pool from random import random import requests from fake_useragent import UserAgent from openpyxl import Workbook from bilibili_api import video import asyncio import configparser # 定义一些参数 uA = UserAgent() BVs = [] # 存储bv号 counts = Counter() header = { # 从浏览器中复制下请求头 'Accept': 'application/json, text/plain, */*', # 接受的响应类型,包括 JSON、纯文本和所有其他类型 'Accept-Encoding': 'gzip, deflate, br', # 支持的编码方式,包括 gzip、deflate 和 br 'Accept-Language': 'zh-CN,zh;q=0.9', # 接受的语言,优先考虑简体中文 'Origin': 'https://search.bilibili.com', # 请求的起始地址 'Referer': 'https://search.bilibili.com/all?vt=17031316&keyword=2024%E5%B7%B4%E9%BB%8E%E5%A5%A5%E8%BF%90%E4%BC%9A', # 请求的来源页面地址 'Sec-Ch-Ua': '"Chromium";v="128", "Not;A=Brand";v="24", "Microsoft Edge";v="128"', # 客户端的浏览器信息 'Sec-Ch-Ua-Mobile': '?0', # 客户端是否是移动设备,0 表示不是 'Sec-Ch-Ua-Platform': '"Windows"', # 客户端的操作系统平台 'Sec-Fetch-Dest': 'empty', # 请求的目标类型 'Sec-Fetch-Mode': 'cors', # 请求模式,表示跨域资源共享(CORS) 'Sec-Fetch-Site': 'same-site', # 请求的来源网站,表示请求是来自同一站点 'User-Agent': uA.random # 用户代理信息,`uA.random` 是一个随机生成的用户代理字符串 } header1 = { 'UserAgent': uA.random } def get_Response(html_Url, h=header): try: response = requests.get(html_Url, headers=h) response.raise_for_status() response.encoding = 'utf-8' return response except requests.exceptions.RequestException as e: print(f"请求网页发生错误,错误信息:{e}") time.sleep(1) return None # 从B站搜索栏中找排名前total_Num的视频,爬取他们的BV号,target为搜索内容,step为一页读取的视频数,默认为30 def get_Target_Video(arg): page, target, step, Cookie = arg header['Cookie'] = Cookie params = { # 针对搜索栏,从浏览器中复制参数调用api '__refresh__': 'true', '_extra': '', 'context': '', 'page': page, 'page_size': step, # 设置一页读取step个视频 'from_source': '', 'from_spmid': '333.337', 'platform': 'pc', 'highlight': '1', 'single_column': '0', 'keyword': target, # 此处是搜索内容 'qv_id': '00d1q7iUbrvfsPdFZv2zXXtDqfcWuzER', 'ad_resource': '5654', 'source_tag': '3', 'gaia_vtoken': '', 'category_id': '', 'search_type': 'video', 'dynamic_offset': (step + 1) * (page - 1), 'web_location': '1430654', 'w_rid': 'f879864ca2210cc6a911552ceacadd6e', 'wts': '1693922979' } api_Url = "https://api.bilibili.com/x/web-interface/wbi/search/type" # 搜索栏的api print(f"开始爬取第{page}页视频") response = requests.get(api_Url, params, headers=header) if response.ok: content = response.text #print(content) match_Text = r'"bvid":"([^"]+)"' print(f"已爬取第{page}页视频") return re.findall(match_Text, content) else: print(f"请求网页失败,状态码:{response.status_code},可能是Cookie有误") return list() # 根据bvid请求得到cid def get_Cid(bvid): # 视频地址:https://www.bilibili.com/video/BV1PK4y1b7dt?t=1 url = f'https://api.bilibili.com/x/player/pagelist?bvid={bvid}' # 转换接送类型,便于获取cid号 res = requests.get(url).json() # 将获取的网页json编码字符串转换为python对象,字典类型 cid = res["data"][0]['cid'] return cid def get_Danmu(BV): # 对于一个BV号,将其分析为cid,后进入弹幕网站并爬取弹幕进行分析 try: # 使用事件循环执行异步的 get_Cid 方法 cid = asyncio.get_event_loop().run_until_complete(get_Cid(BV)) if not cid: print(f"未能解析到 cid: {BV}") return None print(f"正在解析cid:{cid}的弹幕") danmu_Url = f"https://api.bilibili.com/x/v1/dm/list.so?oid={cid}.xml" content = get_Response(danmu_Url, header1) if not content: print(f"获取弹幕内容失败, BV号: {BV}") return None # 解析弹幕内容 match_Text = r']*>([^<]+)' matches = re.findall(match_Text, content.text) # 爬取到的弹幕全部存储在列表中 new_Counts = Counter(matches) # 利用 counter 库对弹幕进行计数,统计每条弹幕的出现次数 return new_Counts except AttributeError as e: print(f"出现属性错误: {e}") return None # 如果 get_Cid() 返回 None,跳过,报错已经在 get_Cid 中进行了 def main(): # Step 1:利用线程池找到前综合排序前300个视频 print("Step 1: 利用线程池找到前综合排序前300个视频") pool = Pool(10) # 得到一个list,其每个元素为装着bv号的list args = [] for i in range(1, (total_Num // step) + 1): args.append((i, target, step, Cookie)) bv = pool.map(get_Target_Video, args) BVs = list(itertools.chain(*bv)) # 将得到的列表整合到一起 pool.close() pool.join() # print(BVs) # Step 2:利用线程池对每一个视频的弹幕进行分析并分析频率 print("Step 2: 利用线程池对每一个视频的弹幕进行爬取并分析频率") pool = Pool(max_Processing_Num) # 线程太多会403、412,少用一些线程 all_counts = pool.map(get_Danmu, BVs) for count in all_counts: counts.update(count) print("分析完成,正在导入Excel....") pool.close() pool.join() # Step 3: 取出现次数前8的弹幕,将他导入Excel print("Step 3: 取出现次数前8的弹幕,将他导入Excel") Top_8_Count = counts.most_common(8) all_counts = counts.most_common() # 创建一个新的 Excel 工作簿 workbook = Workbook() worksheet1 = workbook.active worksheet1.title = "Top 8" # 将出现次数前 8 多的元素和计数器写入 Excel 表格中 for row, (item, count) in enumerate(Top_8_Count, start=1): worksheet1.cell(row=row, column=1, value=item) worksheet1.cell(row=row, column=2, value=count) # 在第二页保存所有弹幕留档 worksheet2 = workbook.create_sheet(title="所有弹幕") for row, (item, count) in enumerate(all_counts, start=1): worksheet2.cell(row=row, column=1, value=item) worksheet2.cell(row=row, column=2, value=count) # 保存 Excel 工作簿 workbook.save('output/Top8_danmu.xlsx') print("导入完成!") import configparser if __name__ == "__main__": config = configparser.ConfigParser() # 读取配置文件时指定编码为 'utf-8' try: with open('config/config.ini', encoding='utf-8') as f: config.read_file(f) except UnicodeDecodeError as e: print(f"读取配置文件时发生编码错误: {e}") # 这里可以选择其他编码或采取其他措施 exit(1) config_Section_Name = "CR_DEFAULT" # 获取配置项 target = config.get(config_Section_Name, 'target', fallback="2024巴黎奥运会") total_Num = config.getint(config_Section_Name, 'total_Num', fallback=300) step = config.getint(config_Section_Name, 'step', fallback=30) max_Processing_Num = config.getint(config_Section_Name, 'max_Processing_Num', fallback=2) Cookie = config.get(config_Section_Name, 'Cookie', fallback='buvid4=99B52D59-84E1-D6E5-3334-00A5ABF5485951328-023051720-QsQqaACvYRbeH92Bxo5wCA%3D%3D; enable_web_push=DISABLE; header_theme_version=CLOSE; _uuid=FE44153E-EED7-BA1F-B241-10F7389A82B2B15963infoc; buvid3=46549500-1464-AD65-9582-A44F9C4A512231965infoc; b_nut=1720278731; DedeUserID=357687474; DedeUserID__ckMd5=611d95a70851412a; buvid_fp_plain=undefined; PVID=1; CURRENT_QUALITY=64; CURRENT_FNVAL=4048; rpdid=|(JlklRl)~Yu0Ju~kYuJkk)); SESSDATA=58accf91%2C1741958959%2Ce9f13%2A92CjCG0FTU3yXRKTjvmpd9wl7RXlqJ6qUlwhLziQUN_oRfPs8uPa7egHmLWm8l6qhWVxUSVmcxM0ZYbUM3LUhiYkwxckVKNFA4aDNoWFQ0cWR3clROMnZvR1YwRXBTcDN4WWM4aWZrT2RRUlBkZ2haMVJzbGx0N0JZT1VLNWpmZUJTajJHeHE0TnBRIIEC; bili_jct=29dfdcaafd6930788ec4b9d06acc07cd; bili_ticket=eyJhbGciOiJIUzI1NiIsImtpZCI6InMwMyIsInR5cCI6IkpXVCJ9.eyJleHAiOjE3MjY2NjYzMjUsImlhdCI6MTcyNjQwNzA2NSwicGx0IjotMX0.xjzf0c6AFp5o6yLJ6ahcJvyGwJZHK-NDBZsmlX0F6As; bili_ticket_expires=1726666265; fingerprint=556cd93cd4317211728f459c230e65c0; bsource=search_bing; home_feed_column=5; browser_resolution=1482-787; bp_t_offset_357687474=977795428588191744; sid=6jfn4n6l; buvid_fp=556cd93cd4317211728f459c230e65c0; b_lsid=D64E4486_191FB90E96C') # 确保 header 字典已初始化 header = {} header['Cookie'] = Cookie main()