xupengfei 6 months ago
parent cb66d6e67e
commit cf43093d33

328
122.py

@ -0,0 +1,328 @@
import mysql.connector
from mysql.connector import Error
from math import *
import tkinter as t
import psutil
import os
import time
import threading
import datetime
db_config = {
'host': 'localhost',
'user': 'root',
'password': '2015wocani',
'database': 'ProcessData'
}
def init_db_connection():
global db_connection, cursor
try:
db_connection = mysql.connector.connect(**db_config)
cursor = db_connection.cursor()
print("Database connection successful")
except Error as e:
print(f"Error connecting to MySQL: {e}")
# 关闭数据库连接
def close_db_connection():
if db_connection.is_connected():
cursor.close()
db_connection.close()
print("MySQL connection is closed")
def get_db_cursor(connection):
"""获取并返回数据库游标"""
return connection.cursor()
db_connection = create_db_connection(**db_config)
# 定义获取游标的函数
def get_db_cursor(connection):
return connection.cursor()
# 使用上面创建的连接来获取游标
db_cursor = get_db_cursor(db_connection)
root= t.Tk() #主窗口
root.title('任务管理器') #标题
root.geometry('730x630') #窗口尺寸
root.resizable(width=True, height=True)# 设置窗口宽度,高度可变
root.attributes('-alpha',0.9) #设置窗口透明度
root.wm_attributes('-topmost',1) #实现root窗口的置顶显示
sb=t.Scrollbar(root)
sb.pack(side='left',fill='y')
text=t.Text(root,width=100,height=40)
text.place(x=10,y=36)
sb.config(command=text.yview)
text.config(yscrollcommand=sb.set)
sb.pack(side='right',fill='y')
t1=t.Label(text='')
t2=t.Label(text='')
t3=t.Label(text='')
t1.place(x=10,y=580,width=120)
t2.place(x=150,y=580,width=120)
t3.place(x=300,y=580,width=120)
def create_db_connection():
connection = pymysql.connect(host='localhost', user='root', password='2015wocani', database='ProcessData')
return connection
db_connection = create_db_connection(**db_config)
db_cursor = get_db_cursor(db_connection)
def insert_process(pid, name, path, mem_usage, cpu_usage):
"""插入进程数据到数据库"""
try:
sql = """
INSERT INTO ProcessData
(pid, process_name, path, memory_usage, cpu_usage)
VALUES
(%s, %s, %s, %s, %s)
"""
val = (pid, name, path, mem_usage, cpu_usage)
db_cursor.execute(sql, val)
db_connection.commit() # 提交事务
except Exception as e:
print(f"Error inserting process: {e}")
def yy():
text.delete(1.0, 'end')
text.insert('insert', '进程号 ' + '进程名 ' + ' 进程文件路径' + '\n')
for y in psutil.pids():
a = psutil.Process(y)
if a.name() == 'System Idle Process':
continue
else:
# 确保这里的缩进与'else:'对齐,通常使用四个空格
mem_info = a.memory_info()
mem_usage = mem_info.rss / 1024 # 单位转换为KB可根据需要调整
# 插入数据到数据库
insert_process(a.pid, a.name(), a.exe(), mem_usage, psutil.cpu_percent(interval=None))
text.insert('insert', f"{y} {a.name()} {a.exe()}\n\n")
# 确保root.update()与yy()函数体对齐不在else块内
root.update()
def jc():
text.delete(1.0,'end')
mm=os.popen('tasklist')
text.insert('insert',mm.read())
root.update()
def fw():
text.delete(1.0,'end')
mm=os.popen('sc query type= service')
text.insert('insert',mm.read())
root.update()
def xn():
text.delete(1.0,'end')
l1=t.Label(root,text='开机时间:')
tm=datetime.datetime.fromtimestamp(psutil.boot_time()).strftime("%Y-%m-%d %H:%M:%S")
l2=t.Label(root,text=str(tm))
l3=t.Label(root,text='当前时间:')
l4=t.Label(root,text='')
dq=time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
l4.configure(text=str(dq))
l5=t.Label(root,text='物理内存使用情况(MB)')
l6=t.Label(root,text='')
jh=psutil.virtual_memory()
tt=int((jh.total)/1024/1024)
us=int((jh.used)/1024/1024)
fr=int((jh.free)/1024/1024)
l6.configure(text='总量:' + str(tt) +'\n'+'使用:'+str(us) +'\n'+'剩余:'+str(fr))
l7=t.Label(root,text='交换内存使用情况(MB)')
l8=t.Label(root,text='')
hj=psutil.swap_memory()
ht=int((hj.total)/1024/1024)
hu=int((hj.used)/1024/1024)
hf=int((hj.free)/1024/1024)
l8.configure(text='总量:' + str(ht) + ' '+'使用:'+str(hu) +' '+'剩余:'+str(hf))
text.window_create('insert',window=l1)
text.window_create('insert',window=l2)
text.insert('insert','\n\n')
text.window_create('insert',window=l3)
text.window_create('insert',window=l4)
text.insert('insert','\n\n')
text.window_create('insert',window=l5)
text.window_create('insert',window=l6)
text.insert('insert','\n\n')
text.window_create('insert',window=l7)
text.window_create('insert',window=l8)
root.update()
def lw():
text.delete(1.0,'end')
n = psutil.net_io_counters()
r=str(float(n.bytes_recv / 1024 / 1024))+'MB'
s= str(float(n.bytes_sent / 1024 / 1024))+'MB'
text.insert('insert','网卡接收流量: '+str(r)+'\n'+'网卡发送流量:'+str(s)+'\n')
root.update()
def yh():
text.delete(1.0,'end')
use=' 用户'+' '+' 状态'+'\n'
text.insert('insert',use)
for y in psutil.users():
text.insert('2.0',str(y.name)+' '+'运行中。。。。'+'\n')
root.update()
def end_process():
"""根据用户输入的PID结束进程"""
pid = pid_entry.get()
try:
pid = int(pid) # 将输入转换为整型
process = psutil.Process(pid)
process.terminate() # 尝试结束进程
text.insert('insert', f"尝试结束PID为{pid}的进程...\n")
root.update()
except psutil.NoSuchProcess:
text.insert('insert', f"没有找到PID为{pid}的进程。\n")
root.update()
except ValueError:
text.insert('insert', "请输入有效的数字PID。\n")
root.update()
except Exception as e:
text.insert('insert', f"结束进程时发生错误: {e}\n")
root.update()
def search_processes():
"""根据用户输入的关键词搜索进程并更新显示"""
keyword = search_entry.get().strip().lower()
if not keyword:
text.insert('insert', "请输入搜索关键词...\n")
return
found_processes = []
for proc in psutil.process_iter(['pid', 'name']):
if keyword in proc.info['name'].lower():
found_processes.append(proc.info)
if found_processes:
text.delete(1.0, 'end')
for proc_info in found_processes:
text.insert('insert', f"{proc_info['pid']} | {proc_info['name']}\n")
else:
text.insert('insert', f"未找到包含'{keyword}'的进程。\n")
root.update()
# 确保窗口大小适应新的组件
search_frame = t.Frame(root)
search_frame.pack(side='right', anchor='ne') # 定位在右侧上角
# 通过grid布局管理器可以更好地控制组件之间的对齐和间距
search_frame.columnconfigure(0, weight=1) # 让输入框所在的列自动填充空白
search_frame.rowconfigure(0, weight=1)
search_label = t.Label(search_frame, text="搜索进程:")
search_label.grid(row=0, column=0, sticky='w') # 保持左侧对齐,但不额外增加宽度
search_entry = t.Entry(search_frame)
search_entry.grid(row=0, column=1, sticky='ew') # 让输入框填充可用空间
search_entry.config(width=20) # 可选:限制输入框的初始宽度
search_button = t.Button(search_frame, text="搜索", command=search_processes)
search_button.grid(row=0, column=2, padx=(0, 5), pady=(0, 5), sticky='e') # 靠右对齐,可加微小间隔
m=t.Menu(root)
#文件菜单
file=t.Menu(m,tearoff=False)
m.add_cascade(label='文件', menu=file)
file.add_command(label='新建任务',accelerator='(N)')
file.add_command(label='退出任务栏管理器',command=root.quit,accelerator='(x)')
pid_end_frame = t.Frame(root)
pid_end_frame.pack(side=t.RIGHT, anchor=t.SE, padx=10, pady=10) # 添加padx和pady以留出空间
pid_label = t.Label(pid_end_frame, text="PID:")
pid_label.pack(side=t.LEFT)
pid_entry = t.Entry(pid_end_frame, width=5)
pid_entry.pack(side=t.LEFT, padx=(10, 0))
end_process_button = t.Button(pid_end_frame, text="结束进程", command=end_process)
end_process_button.pack(side=t.LEFT, padx=(10, 0)) # 可能需要根据实际调整padx以保持布局美观
root.geometry("400x600")
#选项菜单
ii=t.IntVar()
ii.set(1)
o=t.Menu(m,tearoff=False)
m.add_cascade(label='选项',menu=o)
o.add_radiobutton(label='前端显示',variable=ii, value=0)
o.add_radiobutton(label='使用时最小化',variable=ii, value=1)
o.add_radiobutton(label='最小化时隐藏',variable=ii, value=2)
#查看菜单
v=t.Menu(m,tearoff=False)
m.add_cascade(label='查看',menu=v)
v.add_command(label='立即刷新')
#二级菜单
iv=t.IntVar()
iv.set(1)
s=t.Menu(v,tearoff=False)
v.add_cascade(label='更新速度',menu=s)
s.add_radiobutton(label='',variable=iv, value=0)
s.add_radiobutton(label='普通',variable=iv, value=1)
s.add_radiobutton(label='',variable=iv, value=2)
s.add_radiobutton(label='暂停',variable=iv, value=3)
v.add_command(label='选项列')
#帮助菜单
h=t.Menu(m,tearoff=False)
m.add_cascade(label='帮助',menu=h)
h.add_command(label='任务管理器帮助主体')
h.add_command(label='关于任务管理器')
b1=t.Button(root,text='应用程序',command=yy)
b2=t.Button(root,text='进程',command=jc)
b3=t.Button(root,text='服务',command=fw)
b4=t.Button(root,text='性能',command=xn)
b5=t.Button(root,text='联网',command=lw)
b6=t.Button(root,text='用户',command=yh)
b1.place(x=10,y=15,height=20,width=60)
b2.place(x=70,y=15,height=20,width=60)
b3.place(x=130,y=15,height=20,width=60)
b4.place(x=190,y=15,height=20,width=60)
b5.place(x=250,y=15,height=20,width=60)
b6.place(x=310,y=15,height=20,width=60)
def jcs():
t1.configure(text='进程数:'+str(len(psutil.pids())))
root.after(3000,jcs)
def cpu():
pp=str(ceil(psutil.cpu_percent(1)))
t2.configure(text='CPU 使用率:'+pp+'%')
root.after(1500,cpu)
def wlnc():
f= psutil.virtual_memory().free #剩余内存
t=psutil.virtual_memory().total#总内存
wl= float(t-f)/float(t) #为使得最后值更精确必须用float
t3.configure(text='物理内存:'+str(floor(wl*100))+'%')
root.after(2000,wlnc)
root.bind('<Visibility>',yy) #当打开窗口时显示第一个按钮选项
root.bind('<Visibility>',jcs())
root.bind('<Visibility>',cpu())
root.bind('<Visibility>',wlnc())
root.configure(menu=m)
root.mainloop() #主窗口循环显示
Loading…
Cancel
Save