You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import psutil
import threading
import time
import os
from datetime import datetime, timedelta
from collections import deque
from dotenv import load_dotenv
load_dotenv()
log_file_path = os.getenv("NET_SPEED_PATH", '/data/ww/py_sys_monitor/info/net_speed')
def ensure_log_file_exists():
if not os.path.exists(log_file_path):
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
with open(log_file_path, 'w') as f:
now = datetime.now()
for i in range(12):
past_time = (now - timedelta(hours=i)).replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
f.write(f"{past_time},0,0\n")
ensure_log_file_exists()
def convert_to_mbps(bytes_per_second):
# Convert to Megabits per second (Mbps) 保留3位小数
# 做演示将数据乘以10
return round(bytes_per_second * 10 * 8 / (1024 * 1024), 3)
def log_network_speed():
while True:
net_io_start = psutil.net_io_counters()
time.sleep(3600) # Sleep for 1 hour
net_io_end = psutil.net_io_counters()
bytes_sent_per_second = (net_io_end.bytes_sent - net_io_start.bytes_sent) / 3600
bytes_recv_per_second = (net_io_end.bytes_recv - net_io_start.bytes_recv) / 3600
upload_speed_mbps = convert_to_mbps(bytes_sent_per_second)
download_speed_mbps = convert_to_mbps(bytes_recv_per_second)
# Get the current time and round down to the nearest hour
now = datetime.now()
current_time = now.replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S')
with open(log_file_path, 'a') as f:
f.write(f"{current_time},{upload_speed_mbps},{download_speed_mbps}\n")
def get_last_12_hours_speed():
speeds = []
now = datetime.now()
# Generate the last 12 hours' timestamps
timestamps = [(now - timedelta(hours=i)).replace(minute=0, second=0, microsecond=0).strftime('%Y-%m-%d %H:%M:%S') for i in range(12)]
timestamp_speed_map = {timestamp: {'upload_speed_mbps': 0, 'download_speed_mbps': 0} for timestamp in timestamps}
with open(log_file_path, 'r') as f:
lines = deque(f, maxlen=12) # Read only the last 12 lines
for line in lines:
timestamp, upload_speed, download_speed = line.strip().split(',')
if timestamp in timestamp_speed_map:
timestamp_speed_map[timestamp] = {
'upload_speed_mbps': float(upload_speed),
'download_speed_mbps': float(download_speed)
}
for timestamp in timestamps:
speeds.append({
'timestamp': timestamp,
'upload_speed_mbps': timestamp_speed_map[timestamp]['upload_speed_mbps'],
'download_speed_mbps': timestamp_speed_map[timestamp]['download_speed_mbps']
})
return speeds
# Start the thread to log network speed
thread = threading.Thread(target=log_network_speed)
thread.daemon = True
thread.start()