import pymysql def generate_insert_sql(table_name, data): # 构建字段列表和值列表 fields = ', '.join(data.keys()) values = ', '.join(['%s'] * len(data)) # 构建SQL语句 sql = "INSERT INTO {} ({}) VALUES ({})".format(table_name, fields, values) # 构建值列表 val = tuple(data.values()) return sql, val class sqlclass(): def __init__(self,host:str,port:int,user:str,passwd:str,db:str): self.host = host self.port = port self.user = user self.passwd = passwd self.db = db def sql_connect(self): # 数据库连接操作 self.conn = pymysql.connect(host=self.host,port=self.port,user=self.user,passwd=self.passwd,db=self.db) def sql_insert(self,table_name,data): # 插入数据接口 self.sql_connect() try: insert_sql,val = generate_insert_sql(table_name,data) cur = self.conn.cursor() cur.execute(insert_sql, val) self.conn.commit() except pymysql.Error as error: print("插入数据失败"+str(error)) self.conn.rollback() return -1 finally: self.conn.close() return 1 def sql_query(self,table_name,condition=None): # 构建SELECT语句 sql = "SELECT * FROM {}".format(table_name) # 添加条件语句 if condition: sql += " WHERE {}".format(condition) self.sql_connect() try: cur = self.conn.cursor() cur.execute(sql) results = cur.fetchall() return results except pymysql.Error as error: print("查询失败"+str(error)) return -1 finally: self.conn.close() def sql_del(self,table_name,condition=None): # 构建SELECT语句 sql = "DELETE FROM {}".format(table_name) # 添加条件语句 if condition: sql += " WHERE {}".format(condition) self.sql_connect() try: cur = self.conn.cursor() cur.execute(sql) self.conn.commit() # results = cur.fetchall() # return results except pymysql.Error as error: print("删除数据失败"+str(error)) self.conn.rollback() return -1 finally: self.conn.close() return 1 def sql_up(self,table_name,new_values,condition): # 构建sql语句 sql = f"UPDATE {table_name} SET " # 构造 SET 子句 set_clause = ", ".join([f"{key} = %s" for key in new_values.keys()]) sql += set_clause sql += " WHERE {}".format(condition) # 构建值列表 val = tuple(new_values.values()) # 连接数据库 self.sql_connect() # 创建游标 try: cur = self.conn.cursor() aa =cur.execute(sql, val) # 提交事务 self.conn.commit() if aa < 1: return -1 return 1 except pymysql.Error as error: print("更新数据失败"+str(error)) self.conn.rollback() return -1 finally: self.conn.close() if __name__ == "__main__": sql = sqlclass('localhost',3306,'root','2363305350','python_cs') # sql.sql_insert('T_user',{'user_account':'admin','user_pass':'admin','user_aut':0}) # sql.sql_up('T_user',{'user_pass':'123456','user_account':'123456'},f"user_id=1") # sql.sql_query('123') # data = sql.sql_query('t_commode') rest = sql.sql_query('t_commode',f"commod_name='苹果'") print(('id','类别','名称','品牌','产地','数量','进价','售价')) # for i in data: # print(i) # print(data)