#计算各功能耗时的库 import time #爬取网站网址的库 import requests from bs4 import BeautifulSoup #多线程爬取的库 from concurrent.futures import ThreadPoolExecutor # 创建CSV文件和生成文件夹的库 import csv import os #数据可视化的库 import pandas as pd import matplotlib.pyplot as plt #生成词云的库 from wordcloud import WordCloud import numpy as np from PIL import Image # 定义爬取单页数据的函数 def crawl_page(start): print(f'正在爬取第{start // 25 + 1}页') time.sleep(3) #模拟人为访问网页,减少被目标网站识别为爬虫的风险 # 构造URL url_page = url.format(start) # 发起请求 response = requests.get(url=url_page, headers=headers) # 解析HTML soup = BeautifulSoup(response.text, 'html.parser') # 获取电影列表 movie_list = soup.find_all('div', class_='item') page_data = [] # 遍历电影列表 for index, movie in enumerate(movie_list): image = movie.find('div', class_='pic').find('img')['src'] #电影图片的网址 rank = movie.find('em').text # 电影排名 name = movie.find('span', class_='title').text # 电影名称 rating = movie.find('span', class_='rating_num').text # 电影评分 num = movie.find('div', class_='star').find_all('span')[-1].text.strip('人评价') # 评价人数 url1 = movie.find('a')['href'] # 电影的详细信息网址 data = movie.select('.bd p')[0].text.split('\n') time1 = data[2].replace(' ', '').split('/')[0] #上映时间 country = data[2].replace(' ', '').split('/')[1] #制片国家 type1 = data[2].replace(' ', '').split('/')[2] #电影类型 page_data.append([int(rank), name, time1, country, type1, float(rating), int(num), image, url1]) return page_data # 定义下载图片的多线程任务 def download_image_task(data): # 图片链接在第7个位置,电影名称在第1个位置 title=data[1] #title为电影名称,电影名称在列表data的第1个位置 image_url=data[7] #image_url为电影图片的下载链接,下载链接在列表data的第7个位置 #以电影名称命名图片 img_filename = f"{title}.{image_url.split('.')[-1]}" #发送图片链接的请求并返回图片的内容 response = requests.get(image_url) #将图片内容存到文件夹 with open(os.path.join(folder_path, img_filename), 'wb') as f: f.write(response.content) #定义数据可视化的函数:(1)评分分布折线图;(2)高分榜前10电影的条形图 def AnalyseData(filename): # 读取 CSV 文件 df = pd.read_csv(filename) # 设置中文字体 plt.rcParams['font.sans-serif'] = ['SimHei'] # 设置中文显示 #(1)评分分布折线图 rating_counts = df['评分'].value_counts().sort_index() # 计算每个评分的频率并按评分排序 # 绘制第一个表 plt.plot(rating_counts.index, rating_counts.values, marker='o', color='skyblue', linestyle='-') plt.xlabel('评分') plt.ylabel('评分人数') plt.title(' 高分榜电影评分') # 在折线图上显示评分数值 for x, y in zip(rating_counts.index, rating_counts.values): plt.text(x, y, f'{y}', ha='center', va='bottom', fontsize=10) # 保存评分分布折线图的图片为PNG格式 plt.savefig('评分分布折线图.png', bbox_inches='tight') # 保存为PNG格式图片,bbox_inches参数用于确保保存整个图像 # 显示图像 plt.show() plt.close() # 关闭当前图形,以便绘制下一个图形 #(2)高分榜前10电影的条形图 top_10 = df.sort_values(by='评分', ascending=False).head(10) # 绘制第二个表 bars = plt.barh(top_10['电影名称'], top_10['评分'], color='skyblue') plt.xlabel('评分') plt.ylabel('电影名称') plt.title('高分榜TOP10电影') plt.gca().invert_yaxis() # 反转 y 轴,让评分高的电影在上方 # 在条形图上显示评分数值 for bar in bars: width = bar.get_width() plt.text(width, bar.get_y() + bar.get_height() / 2, f'{width}', ha='left', va='center') # 高分榜前10电影的条形图的图片为PNG格式 plt.savefig('高分榜前10电影的条形图.png', bbox_inches='tight') # 保存为PNG格式图片,bbox_inches参数用于确保保存整个图像 # 显示图像 plt.show() plt.close() # 关闭当前图形 #生成词云图片:(1)电影的类型为关键词;(2)制片的国家为关键词 def WordClound(filename): # 读取数据 df = pd.read_csv(filename, encoding='utf-8-sig') # 提取电影类型和制片国家的信息 movie_types = df['电影类型'].values.astype(str) countries = df['制片国家'].values.astype(str) # 设置字体路径 font_path = r'C:\Windows\Fonts\simsun.ttc' # 生成电影类型词云 movie_types_text = ' '.join(movie_types) wc_movie_types = WordCloud( font_path=font_path, background_color="white", max_words=150, width=800, height=400, mask=np.array(Image.open('雪花遮罩图片.jpg')) #遮罩图片为雪花 ).generate(movie_types_text) # 保存电影类型词云图片名称 wc_movie_types.to_file("电影类型词云.png") # 生成制片国家词云 countries_text = ' '.join(countries) wc_countries = WordCloud( font_path=font_path, background_color="white", max_words=150, width=800, height=400, mask=np.array(Image.open('爱心遮罩图片.jpg')) #遮罩图片为爱心 ).generate(countries_text) # 保存制片国家词云图片 wc_countries.to_file("制片国家词云.png") # 显示词云图像 plt.imshow(wc_movie_types, interpolation='bilinear') plt.show() plt.imshow(wc_countries, interpolation='bilinear') #关闭绘图时的坐标轴显示 plt.axis('off') plt.show() if __name__ == '__main__': start_time = time.time() # 设置请求头 headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36 Edg/123.0.0.0' } # 设置要爬取的网站URL url = 'https://movie.douban.com/top250?start={}&filter=' # 创建保存电影信息的CSV文件 filename = '电影信息.csv' if not os.path.isfile(filename): file = open(filename, 'w', newline='', encoding='utf-8') writer = csv.writer(file) writer.writerow(['排名', '电影名称', '上映时间', '制片国家', '电影类型', '评分', '评价人数', '图片', '网址']) file.close() # 创建保存图片的文件夹 folder_path = "电影图片" if not os.path.exists(folder_path): os.makedirs(folder_path) # 采用多线程爬取每一页的数据,创建一个ThreadPoolExecutor线程池,最大工作线程数为10,使用完线程池后自动关闭线程池 with ThreadPoolExecutor(max_workers=10) as executor: #创建一个空列表futures,用于存储提交给线程池的任务的返回结果,即电影的信息 futures = [] for i in range(0, 250, 25): futures.append(executor.submit(crawl_page, i)) #调用executor.submit()方法提交任务 print("下载图片中...") # 等待所有任务完成 for future in futures: page_data = future.result() # 将电影信息写入CSV文件 with open(filename, 'a', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerows(page_data) # 并行下载图片 for data in page_data: executor.submit(download_image_task, data) print("图片全部下载完成") crawling_time = time.time() print("完成爬取信息和保存图片,此时耗时:{}秒".format(crawling_time - start_time)) # 数据可视化 AnalyseData(filename) print("数据可视化完毕") # 生成词云 WordClound(filename) print("词云生成完毕") analyse_time = time.time() print("完成数据可视化和生成词云,此时耗时:{}秒".format(crawling_time - start_time))