import logging import pymysql import subprocess import numpy as np from matplotlib import pyplot as plt #%matplotlib inline from tkinter import Tk from pyecharts import options as _opts from pyecharts.charts import Bar logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') #db_config={'host':'127.0.0.1','user':'root','password':'mysql>hyx123','db':'airquility','charset':'utf8'} #ptimelist=[] #cilist=[] #data_dict=dict(zip(ptimelist,cilist)) jan_list = [] feb_list = [] mar_list = [] apr_list = [] may_list = [] ptime_list = [] def get_data(db_config): """ 从数据库中获取ptime和ci数据。 :param db_config: 数据库配置字典 :return: 包含ptime和ci的元组列表 """ # 存储ptime数据的列表 try: with pymysql.connect(**db_config) as conn: logging.info("数据库连接成功") with conn.cursor() as cursor: # 使用单个查询以提高性能 sql_ptime = "SELECT ptime FROM May" cursor.execute(sql_ptime) data_ptime = cursor.fetchall() ptime_list = [row['ptime'] for row in data_ptime if 'ptime' in row] sql_may = "SELECT pm FROM may" cursor.execute(sql_may) data_may = cursor.fetchall() may_list = [row['pm'] for row in data_may if 'pm' in row] sql_feb="SELECT pm FROM feb" cursor.execute(sql_feb) data_feb=cursor.fetchall() #print(data_ci) feb_list = [row['pm'] for row in data_feb if 'pm' in row] sql_jan = "SELECT pm FROM jan" cursor.execute(sql_jan) data_jan = cursor.fetchall() # print(data_ci) jan_list = [row['pm'] for row in data_jan if 'pm' in row] sql_mar = "SELECT pm FROM mar" cursor.execute(sql_mar) data_mar = cursor.fetchall() # print(data_ci) mar_list = [row['pm'] for row in data_mar if 'pm' in row] sql_apr = "SELECT pm FROM apr" cursor.execute(sql_apr) data_apr = cursor.fetchall() # print(data_ci) apr_list = [row['pm'] for row in data_apr if 'pm' in row] return ptime_list, may_list, feb_list, jan_list, mar_list, apr_list except pymysql.err.OperationalError as e: logging.error(f"数据库操作失败: {e}") # 根据具体场景,可以选择返回空列表、抛出异常或返回特定错误码 return [], [], [], [], [], [] def average(str_list): if not str_list: # 检查列表是否为空 return 0 # 或者根据需要处理,比如返回None或者抛出异常 num_list = [float(item) for item in str_list] # 假设可能有浮点数 return sum(num_list) / len(num_list) def visualize_data(ptime_list, may_list, feb_list, jan_list, mar_list, apr_list): """ 可视化数据。 :param ptime_list: ptime数据列表 :param ci_list: ci数据列表 :param column_to_plot: 需要绘制的列名 """ if not may_list or not feb_list or not jan_list: logging.error("数据为空,无法进行可视化") return #y_list=[] #for i in range(4,8): # y_list.append(i) x = np.arange(26) bar_width = 0.2 # 调用函数并打印结果 jan_value = average(jan_list) feb_value = average(feb_list) may_value = average(may_list) mar_value = average(mar_list) apr_value = average(apr_list) # 创建图形对象 fig = plt.figure("average_pm2.5") #plt.title('每月pm2.5平均指数') # 创建子图 ax = fig.add_axes([0, 0, 1, 1]) # 让饼状图变成圆形 ax.axis('equal') # 准备数据 #labels = ['jan', 'feb', 'may','mar','apr'] #num_data = [jan_value, feb_value, may_value, mar_value, apr_value] plt.figure(figsize=(15, 10), dpi=80) plt.bar(ptime_list, apr_value, tick_label=ptime_list, width=bar_width) plt.bar(x + bar_width, may_value, width=bar_width) # 饼状图之间的间隙大小 #explode = (0.02, 0.03, 0.02, 0.01, 0.01) # 画饼 #ax.pie(num_data, labels=labels, autopct='%1.1f%%', explode=explode) plt.show() if __name__ == "__main__": db_config = { "host": '127.0.0.1', "user": "root", "password": 'mysql>hyx123', "db": 'airquility', "charset": 'utf8', "cursorclass": pymysql.cursors.DictCursor } ptime_list, may_list, feb_list, jan_list, mar_list, apr_list= get_data(db_config) visualize_data(ptime_list, may_list, feb_list, jan_list, mar_list, apr_list) #print(pymysql.__version__) ''' window= Tk() window.title("每日空气质量") window.geometry("300x200") #Label(window, text="图表已生成为air_quality_index.html").pack() window.mainloop() '''