You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

383 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import pandas as pd
import numpy as np
from sqlalchemy import create_engine, text
from sklearn.preprocessing import MinMaxScaler
import logging
from sklearn.model_selection import train_test_split
from keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.metrics import mean_squared_error
from keras.models import Sequential
from keras.layers import LSTM, Dense
from tensorflow.keras.layers import Input
from keras.preprocessing.sequence import pad_sequences
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import Model # 导入Model类
import matplotlib.pyplot as plt
import joblib
import pymysql
import logging
from datetime import datetime
logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def get_db_config():
#从环境变量中获取数据库配置。
config = {
"host": os.getenv('DB_HOST', '127.0.0.1'),
"user": os.getenv('DB_USER', 'root'),
"password": os.getenv('DB_PASSWORD','mysql>hyx123'),
"db": os.getenv('DB_NAME', 'airquility'),
"charset": 'utf8',
}
# 检查环境变量是否已设置
for key, value in config.items():
if value is None:
raise ValueError(f"缺少环境变量: {key}")
return config
def create_database_engine(config):
"""
创建数据库引擎。
"""
db_string = f'mysql+pymysql://{config["user"]}:{config["password"]}@{config["host"]}/{config["db"]}?charset={config["charset"]}'
try:
engine = create_engine(db_string)
print(type(engine))
except Exception as e:
print(f"创建数据库引擎失败: {e}")
# 根据需要处理异常,例如记录日志或重试
raise # 如果需要将异常继续抛出
return engine
def fetch_data(engine, query):
"""
从数据库中获取数据。
参数:
engine: 数据库连接引擎对象。
query: SQL查询字符串。
返回:
查询结果的数据框(df)。
"""
# 确保query是字符串类型
if not isinstance(query, str):
logging.error("查询字符串类型错误query应为字符串。")
raise ValueError("查询字符串类型错误query应为字符串。")
if not query.strip():
logging.error("查询字符串为空。")
raise ValueError("查询字符串为空。")
try:
df = pd.read_sql(text(query), engine)
return df
except Exception as e:
logging.error(f"执行SQL查询失败: {e}")
raise # 重新抛出异常以便上层处理
def preprocess_data(df, target_col, default_year=2024, features=None):
"""
对数据进行预处理,包括日期列转换、特征标准化等。
"""
# 检查df是否为空
if df.empty:
logging.error("输入的DataFrame为空")
return None, None, None
# 检查'ptime'列是否存在
if 'ptime' not in df.columns:
logging.error("DataFrame中不存在'ptime'")
return None, None, None
default_year = 2024
df['ptime'] = df['ptime'].apply(lambda x: datetime.strptime(f"{default_year}/{x}", "%Y/%m/%d"))
# 或者如果使用pd.to_datetime并且'ptime'格式特殊,需要指定格式
# df['ptime'] = pd.to_datetime(df['ptime'], errors='coerce', format='%m/%d').dt.strftime('%Y-%m-%d')
print(df.head)
# 如果'ptime'已经是datetime类型则无需转换
if df['ptime'].dtype == 'datetime64[ns]':
print("ptime列已经是以日期时间格式存储。")
else:
try:
# 尝试将'ptime'列转换为datetime类型
df['ptime'] = pd.to_datetime(df['ptime'], format='%m/%d/%Y')
except ValueError:
logging.error("ptime列转换为datetime类型失败可能是因为格式不正确。")
return None, None, None
# 设置'ptime'为索引
#df.set_index('ptime', inplace=True)
# 确定features列表
if target_col in df.columns:
features = df.columns.drop(target_col)
print("features:", features)
else:
logging.warning(f"目标列 '{target_col}' 在DataFrame中未找到将不进行列删除操作。")
features = df.columns
# 检查features是否被正确设置
if features is None:
logging.error("未找到任何特征列。")
return None, None, None
print("@@@@")
print(target_col)
print("@@@@")
try:
df.set_index('ptime', inplace=True)
except KeyError:
print("'ptime' 不存在,无法设置为索引。")
# 在这里处理缺少'ptime'的情况,比如跳过相关操作或使用其他列
# 使用MinMaxScaler进行特征缩放
scaler = MinMaxScaler()
scaled_features = scaler.fit_transform(df[features])
scaled_target = scaler.fit_transform(df.index.values.reshape(-1, 1))
print("~~~")
return scaled_features, scaled_target, scaler
def split_dataset_into_train_test(features, target, test_size=0.2):
"""
切分数据集为训练集和测试集。
"""
# 检查features和target的类型以及长度是否相等
if not isinstance(features, np.ndarray) or not isinstance(target, np.ndarray):
raise TypeError("features and target must be numpy arrays")
if len(features) != len(target):
raise ValueError("features and target must have the same length")
# 检查test_size是否在合理的范围内
if not 0 < test_size < 1:
raise ValueError("test_size must be between 0 and 1")
# 计算训练集大小
train_size = int(len(features) * (1 - test_size))
# 使用numpy的切片操作这不会创建新的数据副本提高性能
train_features, test_features = features[:train_size], features[train_size:]
train_target, test_target = target[:train_size], target[train_size:]
print("123456")
print(features)
print(target)
print(train_features)
print(train_target)
print(test_features)
print(test_target)
return train_features, test_features, train_target, test_target
def validate_data_shapes(train_features, test_features, n_steps):
"""
验证训练和测试数据形状是否符合预期。
"""
if train_features.shape[1] != n_steps or test_features.shape[1] != n_steps:
raise ValueError(f"训练和测试特征的第二维度(时间步长)应为{n_steps}")
print("7890")
def build_model(n_steps, lstm_units, dense_units, input_shape):
inputs = Input(shape=input_shape) # 添加Input对象
x = LSTM(lstm_units)(inputs) # 直接将Input对象传递给LSTM层
outputs = Dense(dense_units)(x)
model = tf.keras.Model(inputs=inputs, outputs=outputs)
model.compile(optimizer='adam', loss='mse')
print("!!!")
return model
def validate_params(epochs, batch_size):
"""
确保 epochs 和 batch_size 是合法的参数。
"""
if not isinstance(epochs, int) or epochs <= 0:
raise ValueError("epochs 应该是一个正整数")
if not isinstance(batch_size, int) or batch_size <= 0:
raise ValueError("batch_size 应该是一个正整数")
if epochs <= 0 or batch_size <= 0:
raise ValueError("epochs和batch_size必须大于0")
print("%%%")
def ensure_directory_safety(path:str):
"""
确保路径安全且存在。
"""
if not os.path.isabs(path):
raise ValueError("路径应该是绝对路径")
directory = os.path.dirname(path)
print(directory)
try:
# 检查目录是否需要创建
if not os.path.exists(directory):
# 添加日志记录
logger.info(f"目录 {directory} 不存在,开始创建。")
# 使用 exist_ok=True 避免在目录已存在时抛出异常
os.makedirs(directory, exist_ok=True)
logger.info(f"目录 {directory} 创建成功。")
except PermissionError:
# 捕获权限异常,给出清晰的错误提示
logger.error(f"没有权限在 {directory} 创建目录。")
raise
except Exception as e:
# 捕获其他异常,记录并抛出
logger.error(f"创建目录 {directory} 时发生未知错误:{e}")
raise
print("===")
def train_model(model: Model, train_features, train_target, test_features, test_target, epochs: int, batch_size: int,
patience: int, save_best_only: bool = True, monitor: str = 'val_loss', mode: str = 'min',
model_path: str = "best_model.h5") -> dict:
"""
训练模型,并根据早停策略和性能指标保存最佳模型。
:param model: Keras模型实例
:param train_features: 训练特征
:param train_target: 训练目标
:param test_features: 测试特征
:param test_target: 测试目标
:param epochs: 训练轮数
:param batch_size: 批量大小
:param patience: 早停策略的耐心值
:param save_best_only: 是否只保存最佳模型
:param monitor: 监控的指标
:param mode: 监控指标的模式min/max
:param model_path: 模型保存路径
:return: 训练历史记录
"""
logging.basicConfig(level=logging.INFO)
model_path = "/path/to/your/model.h5"
validate_params(epochs, batch_size)
ensure_directory_safety(model_path)
# 使用ModelCheckpoint保存最佳模型
filepath = model_path
checkpoint = ModelCheckpoint(filepath, monitor=monitor, verbose=1, save_best_only=save_best_only, mode=mode)
# 定义早停策略
early_stopping = EarlyStopping(monitor=monitor, patience=patience, verbose=1)
try:
history = model.fit(train_features, train_target, epochs=epochs, batch_size=batch_size,
validation_data=(test_features, test_target), verbose=1,
callbacks=[early_stopping, checkpoint])
logging.info("###")
return history
except ValueError as ve:
logging.error(f"参数错误: {ve}")
raise
except OSError as oe:
logging.error(f"文件操作错误: {oe}")
raise
except Exception as e:
logging.error(f"模型训练过程中发生异常: {e}")
raise
def build_and_train_model(n_steps, features, target, train_features, train_target, test_features, test_target,lstm_units=50, dense_units=1, optimizer='adam', loss='mse', epochs=100, batch_size=32,patience=10, model_save_path='model.h5'):
"""
构建LSTM模型并进行训练增加了参数可配置性早停策略和模型保存。
"""
# 输入数据验证
if not (isinstance(train_features, np.ndarray) and isinstance(train_target, np.ndarray) and isinstance(test_features, np.ndarray) and isinstance(test_target, np.ndarray)):
raise ValueError("输入数据train_features, train_target, test_features, test_target必须是numpy数组")
checkpoint = ModelCheckpoint(filepath="/path/to/your/model.keras", # 注意这里的路径保持为.h5
monitor='val_loss', # 或您希望监控的指标
verbose=1,
save_best_only=True,
mode='min')
# 数据形状验证
validate_data_shapes(train_features, test_features, n_steps)
model = build_model(n_steps, lstm_units, dense_units, input_shape)
# 早停策略
early_stopping = EarlyStopping(monitor='val_loss', patience=patience, verbose=1)
history = model.fit(train_features, train_target,
validation_data=(test_features, test_target),
epochs=epochs,
batch_size=batch_size,
callbacks=[checkpoint],
# 其他参数...
) # 模型保存
# 增加了路径验证来防止潜在的安全问题,这里简化处理,实际应用中可能需要更复杂的逻辑
if not model_save_path.endswith('.h5'):
model_save_path += '.h5'
model.save(model_save_path)
return model, history
def evaluate_model(model, scaler, test_target, predictions):
"""
评估模型性能并反向转换预测结果。
"""
predictions = scaler.inverse_transform(predictions)
test_target_inv = scaler.inverse_transform(test_target.reshape(-1, 1))
mse = mean_squared_error(test_target_inv, predictions)
print(f'Mean Squared Error: {mse}')
return mse
if __name__ == "__main__":
engine = create_database_engine(get_db_config())
query = "SELECT ptime, ci FROM may"
df = fetch_data(engine, query)
target_col = 'ptime'
features, target, scaler = preprocess_data(df, target_col)
train_features, test_features, train_target, test_target = split_dataset_into_train_test(features, target, test_size=0.2)
n_steps = 5
# 假设train_features和test_features是你的数据且它们是二维数组
# 首先你需要获取或设定一个maxlen这里假设我们已知或计算出它应该是5
maxlen = 5
# 对训练数据进行填充或截断
train_features_padded = pad_sequences(train_features, maxlen=maxlen, padding='post', truncating='post')
# 对测试数据进行同样的处理
test_features_padded = pad_sequences(test_features, maxlen=maxlen, padding='post', truncating='post')
input_shape = (n_steps, int(train_features.shape[1]))
model, history = build_and_train_model(n_steps=n_steps,
features=features,
target=target,
train_target=train_target,
test_target=test_target,
train_features=train_features_padded,
test_features=test_features_padded)
predictions = model.predict(test_features)
mse = evaluate_model(model, scaler, test_target, predictions)
# 可视化预测结果(可选)
#plt.plot(test_target, label='Actual')
plt.plot(predictions, label='Predicted')
plt.legend()
plt.xlabel('Ptime')
plt.ylabel('CI')
#plt.plot(ptime, ci)
plt.show()
# 保存模型
model.save('trained_model.h5')
joblib.dump(scaler, 'scaler.joblib')