# encoding: utf-8 import pymysql import time from app.utils import get_cur_date, get_pre_n_date class MyDB: def __init__(self, host, user, passwd, db): self.conn = pymysql.connect(host, user, passwd, db) self.cursor = self.conn.cursor() def __del__(self): if self.conn is not None: self.conn.close() # 测试:获取当前数据库中的所有表 def test_query_tables(self): # sql = 'show tables' sql = "select * from province_daily_datas where pub_time like '2020.08.03%'" self.cursor.execute(sql) results = self.cursor.fetchall() return results # 获取最近一日的数据 # table: 查指定表 # timeHeader: 该表中时间字段 def fetch_datas(self, table='', timeHeader='pub_time'): curdatestr = get_cur_date() sql = "select * from %s where %s like '%s'" % (table, timeHeader, curdatestr + '%') print('+++ [mydb] execute "%s"' % sql) self.cursor.execute(sql) results = self.cursor.fetchall() # 如果当天没有取到数据,则尝试取前一天的数据,直到取到数据为止 n = 1 targetdate = curdatestr while len(results) <= 0: predatestr = get_pre_n_date(n) sql = "select * from %s where %s like '%s'" % (table, timeHeader, predatestr + '%') print('+++ [mydb] execute "%s"' % sql) self.cursor.execute(sql) results = self.cursor.fetchall() n += 1 if len(results) > 0: targetdate = predatestr else: time.sleep(2) # 休眠2s if n >= 100: print('+++ [mydb] 最近三个月都没有数据') return tuple() print('+++ [mydb] 获取到%s的数据,数量:%d' % (targetdate, len(results))) return results # 获取省份数据top5 def province_curConfirm_top_n(self, limit=5): curdatestr = get_cur_date() sql = 'select curConfirm,area,pub_date from province_daily_datas where pub_date like "%s" order by curConfirm desc limit %s' % (curdatestr+'%', limit) print('+++ [mydb] execute "%s"' % sql) self.cursor.execute(sql) results = self.cursor.fetchall() n = 1 targetdate = curdatestr while len(results) <= 0: predatestr = get_pre_n_date(n) sql = 'select curConfirm,area,pub_date from province_daily_datas where pub_date like "%s" order by curConfirm desc limit %s' % (predatestr+'%', limit) print('+++ [mydb] execute "%s"' % sql) self.cursor.execute(sql) results = self.cursor.fetchall() n += 1 if len(results) > 0: targetdate = predatestr else: time.sleep(2) # 休眠2s if n >= 100: print('+++ [mydb] 最近三个月都没有数据') return tuple() print('+++ [mydb] 获取到%s的数据,数量:%d' % (targetdate, len(results))) return results # 根据条件字典生成字符串 # def generate_condition_text(self, condition_dict, flag='%'): # text = '' # for i in range(len(condition_dict)): # text += key + flag + condition_dict[key] # 统计境外输入病例总数以及现有境外输入病例数(前20个省份)柱状图 def province_import_top_n(self, limit=20): curdatestr = get_cur_date() sql = 'select province,confirmed,curConfirm,pub_date from city_daily_datas where city like "%s" and pub_date like "%s" order by confirmed desc limit %s' % ('境外输入%', curdatestr+'%', limit) print('+++ [mydb] execute "%s"' % sql) self.cursor.execute(sql) results = self.cursor.fetchall() n = 1 targetdate = curdatestr while len(results) <= 0: predatestr = get_pre_n_date(n) sql = 'select province,confirmed,curConfirm,pub_date from city_daily_datas where city like "%s" and pub_date like "%s" order by confirmed desc limit %s' % ('境外输入%', predatestr+'%', limit) print('+++ [mydb] execute "%s"' % sql) self.cursor.execute(sql) results = self.cursor.fetchall() n += 1 if len(results) > 0: targetdate = predatestr else: time.sleep(2) # 休眠2s if n >= 100: print('+++ [mydb] 最近三个月都没有数据') return tuple() print('+++ [mydb] 获取到%s的数据,数量:%d' % (targetdate, len(results))) return results # 获取昨日新增确诊病例数前5的城市 # 指定城市最近n日新增确诊数趋势 # 现有确诊数量前10个城市的昨日新增确诊和新增出院人数柱状图 if __name__ == '__main__': mydb = MyDB('localhost', 'root', '123456', 'conv19_datas') results = mydb.fetch_datas('province_daily_datas') print(type(results)) print(len(results)) print(results)