import mysql.connector from mysql.connector import Error from math import * import tkinter as t import psutil import os import time import threading import datetime db_config = { 'host': 'localhost', 'user': 'root', 'password': '2015wocani', 'database': 'ProcessData' } def init_db_connection(): global db_connection, cursor try: db_connection = mysql.connector.connect(**db_config) cursor = db_connection.cursor() print("Database connection successful") except Error as e: print(f"Error connecting to MySQL: {e}") # 关闭数据库连接 def close_db_connection(): if db_connection.is_connected(): cursor.close() db_connection.close() print("MySQL connection is closed") def get_db_cursor(connection): """获取并返回数据库游标""" return connection.cursor() db_connection = create_db_connection(**db_config) # 定义获取游标的函数 def get_db_cursor(connection): return connection.cursor() # 使用上面创建的连接来获取游标 db_cursor = get_db_cursor(db_connection) root= t.Tk() #主窗口 root.title('任务管理器') #标题 root.geometry('730x630') #窗口尺寸 root.resizable(width=True, height=True)# 设置窗口宽度,高度可变 root.attributes('-alpha',0.9) #设置窗口透明度 root.wm_attributes('-topmost',1) #实现root窗口的置顶显示 sb=t.Scrollbar(root) sb.pack(side='left',fill='y') text=t.Text(root,width=100,height=40) text.place(x=10,y=36) sb.config(command=text.yview) text.config(yscrollcommand=sb.set) sb.pack(side='right',fill='y') t1=t.Label(text='') t2=t.Label(text='') t3=t.Label(text='') t1.place(x=10,y=580,width=120) t2.place(x=150,y=580,width=120) t3.place(x=300,y=580,width=120) def create_db_connection(): connection = pymysql.connect(host='localhost', user='root', password='2015wocani', database='ProcessData') return connection db_connection = create_db_connection(**db_config) db_cursor = get_db_cursor(db_connection) def insert_process(pid, name, path, mem_usage, cpu_usage): """插入进程数据到数据库""" try: sql = """ INSERT INTO ProcessData (pid, process_name, path, memory_usage, cpu_usage) VALUES (%s, %s, %s, %s, %s) """ val = (pid, name, path, mem_usage, cpu_usage) db_cursor.execute(sql, val) db_connection.commit() # 提交事务 except Exception as e: print(f"Error inserting process: {e}") def yy(): text.delete(1.0, 'end') text.insert('insert', '进程号 ' + '进程名 ' + ' 进程文件路径' + '\n') for y in psutil.pids(): a = psutil.Process(y) if a.name() == 'System Idle Process': continue else: # 确保这里的缩进与'else:'对齐,通常使用四个空格 mem_info = a.memory_info() mem_usage = mem_info.rss / 1024 # 单位转换为KB,可根据需要调整 # 插入数据到数据库 insert_process(a.pid, a.name(), a.exe(), mem_usage, psutil.cpu_percent(interval=None)) text.insert('insert', f"{y} {a.name()} {a.exe()}\n\n") # 确保root.update()与yy()函数体对齐,不在else块内 root.update() def jc(): text.delete(1.0,'end') mm=os.popen('tasklist') text.insert('insert',mm.read()) root.update() def fw(): text.delete(1.0,'end') mm=os.popen('sc query type= service') text.insert('insert',mm.read()) root.update() def xn(): text.delete(1.0,'end') l1=t.Label(root,text='开机时间:') tm=datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S") l2=t.Label(root,text=str(tm)) l3=t.Label(root,text='当前时间:') l4=t.Label(root,text='') dq=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) l4.configure(text=str(dq)) l5=t.Label(root,text='物理内存使用情况(MB):') l6=t.Label(root,text='') jh=psutil.virtual_memory() tt=int((jh.total)/1024/1024) us=int((jh.used)/1024/1024) fr=int((jh.free)/1024/1024) l6.configure(text='总量:' + str(tt) +'\n'+'使用:'+str(us) +'\n'+'剩余:'+str(fr)) l7=t.Label(root,text='交换内存使用情况(MB):') l8=t.Label(root,text='') hj=psutil.swap_memory() ht=int((hj.total)/1024/1024) hu=int((hj.used)/1024/1024) hf=int((hj.free)/1024/1024) l8.configure(text='总量:' + str(ht) + ' '+'使用:'+str(hu) +' '+'剩余:'+str(hf)) text.window_create('insert',window=l1) text.window_create('insert',window=l2) text.insert('insert','\n\n') text.window_create('insert',window=l3) text.window_create('insert',window=l4) text.insert('insert','\n\n') text.window_create('insert',window=l5) text.window_create('insert',window=l6) text.insert('insert','\n\n') text.window_create('insert',window=l7) text.window_create('insert',window=l8) root.update() def lw(): text.delete(1.0,'end') n = psutil.net_io_counters() r=str(float(n.bytes_recv / 1024 / 1024))+'MB' s= str(float(n.bytes_sent / 1024 / 1024))+'MB' text.insert('insert','网卡接收流量: '+str(r)+'\n'+'网卡发送流量:'+str(s)+'\n') root.update() def yh(): text.delete(1.0,'end') use=' 用户'+' '+' 状态'+'\n' text.insert('insert',use) for y in psutil.users(): text.insert('2.0',str(y.name)+' '+'运行中。。。。'+'\n') root.update() def end_process(): """根据用户输入的PID结束进程""" pid = pid_entry.get() try: pid = int(pid) # 将输入转换为整型 process = psutil.Process(pid) process.terminate() # 尝试结束进程 text.insert('insert', f"尝试结束PID为{pid}的进程...\n") root.update() except psutil.NoSuchProcess: text.insert('insert', f"没有找到PID为{pid}的进程。\n") root.update() except ValueError: text.insert('insert', "请输入有效的数字PID。\n") root.update() except Exception as e: text.insert('insert', f"结束进程时发生错误: {e}\n") root.update() def search_processes(): """根据用户输入的关键词搜索进程并更新显示""" keyword = search_entry.get().strip().lower() if not keyword: text.insert('insert', "请输入搜索关键词...\n") return found_processes = [] for proc in psutil.process_iter(['pid', 'name']): if keyword in proc.info['name'].lower(): found_processes.append(proc.info) if found_processes: text.delete(1.0, 'end') for proc_info in found_processes: text.insert('insert', f"{proc_info['pid']} | {proc_info['name']}\n") else: text.insert('insert', f"未找到包含'{keyword}'的进程。\n") root.update() # 确保窗口大小适应新的组件 search_frame = t.Frame(root) search_frame.pack(side='right', anchor='ne') # 定位在右侧上角 # 通过grid布局管理器,可以更好地控制组件之间的对齐和间距 search_frame.columnconfigure(0, weight=1) # 让输入框所在的列自动填充空白 search_frame.rowconfigure(0, weight=1) search_label = t.Label(search_frame, text="搜索进程:") search_label.grid(row=0, column=0, sticky='w') # 保持左侧对齐,但不额外增加宽度 search_entry = t.Entry(search_frame) search_entry.grid(row=0, column=1, sticky='ew') # 让输入框填充可用空间 search_entry.config(width=20) # 可选:限制输入框的初始宽度 search_button = t.Button(search_frame, text="搜索", command=search_processes) search_button.grid(row=0, column=2, padx=(0, 5), pady=(0, 5), sticky='e') # 靠右对齐,可加微小间隔 m=t.Menu(root) #文件菜单 file=t.Menu(m,tearoff=False) m.add_cascade(label='文件', menu=file) file.add_command(label='新建任务',accelerator='(N)') file.add_command(label='退出任务栏管理器',command=root.quit,accelerator='(x)') pid_end_frame = t.Frame(root) pid_end_frame.pack(side=t.RIGHT, anchor=t.SE, padx=10, pady=10) # 添加padx和pady以留出空间 pid_label = t.Label(pid_end_frame, text="PID:") pid_label.pack(side=t.LEFT) pid_entry = t.Entry(pid_end_frame, width=5) pid_entry.pack(side=t.LEFT, padx=(10, 0)) end_process_button = t.Button(pid_end_frame, text="结束进程", command=end_process) end_process_button.pack(side=t.LEFT, padx=(10, 0)) # 可能需要根据实际调整padx以保持布局美观 root.geometry("400x600") #选项菜单 ii=t.IntVar() ii.set(1) o=t.Menu(m,tearoff=False) m.add_cascade(label='选项',menu=o) o.add_radiobutton(label='前端显示',variable=ii, value=0) o.add_radiobutton(label='使用时最小化',variable=ii, value=1) o.add_radiobutton(label='最小化时隐藏',variable=ii, value=2) #查看菜单 v=t.Menu(m,tearoff=False) m.add_cascade(label='查看',menu=v) v.add_command(label='立即刷新') #二级菜单 iv=t.IntVar() iv.set(1) s=t.Menu(v,tearoff=False) v.add_cascade(label='更新速度',menu=s) s.add_radiobutton(label='高',variable=iv, value=0) s.add_radiobutton(label='普通',variable=iv, value=1) s.add_radiobutton(label='低',variable=iv, value=2) s.add_radiobutton(label='暂停',variable=iv, value=3) v.add_command(label='选项列') #帮助菜单 h=t.Menu(m,tearoff=False) m.add_cascade(label='帮助',menu=h) h.add_command(label='任务管理器帮助主体') h.add_command(label='关于任务管理器') b1=t.Button(root,text='应用程序',command=yy) b2=t.Button(root,text='进程',command=jc) b3=t.Button(root,text='服务',command=fw) b4=t.Button(root,text='性能',command=xn) b5=t.Button(root,text='联网',command=lw) b6=t.Button(root,text='用户',command=yh) b1.place(x=10,y=15,height=20,width=60) b2.place(x=70,y=15,height=20,width=60) b3.place(x=130,y=15,height=20,width=60) b4.place(x=190,y=15,height=20,width=60) b5.place(x=250,y=15,height=20,width=60) b6.place(x=310,y=15,height=20,width=60) def jcs(): t1.configure(text='进程数:'+str(len(psutil.pids()))) root.after(3000,jcs) def cpu(): pp=str(ceil(psutil.cpu_percent(1))) t2.configure(text='CPU 使用率:'+pp+'%') root.after(1500,cpu) def wlnc(): f= psutil.virtual_memory().free #剩余内存 t=psutil.virtual_memory().total#总内存 wl= float(t-f)/float(t) #为使得最后值更精确,必须用float t3.configure(text='物理内存:'+str(floor(wl*100))+'%') root.after(2000,wlnc) root.bind('',yy) #当打开窗口时显示第一个按钮选项 root.bind('',jcs()) root.bind('',cpu()) root.bind('',wlnc()) root.configure(menu=m) root.mainloop() #主窗口循环显示