import os import pandas as pd import numpy as np from sqlalchemy import create_engine, text from sklearn.preprocessing import MinMaxScaler import logging from sklearn.model_selection import train_test_split from keras.callbacks import EarlyStopping, ModelCheckpoint from sklearn.metrics import mean_squared_error from keras.models import Sequential from keras.layers import LSTM, Dense from tensorflow.keras.layers import Input from keras.preprocessing.sequence import pad_sequences import tensorflow as tf from tensorflow.keras.models import Sequential from tensorflow.keras.layers import LSTM, Dense from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint from tensorflow.keras.models import Model # 导入Model类 import matplotlib.pyplot as plt import joblib import pymysql import logging from datetime import datetime logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def get_db_config(): #从环境变量中获取数据库配置。 config = { "host": os.getenv('DB_HOST', '127.0.0.1'), "user": os.getenv('DB_USER', 'root'), "password": os.getenv('DB_PASSWORD','mysql>hyx123'), "db": os.getenv('DB_NAME', 'airquility'), "charset": 'utf8', } # 检查环境变量是否已设置 for key, value in config.items(): if value is None: raise ValueError(f"缺少环境变量: {key}") return config def create_database_engine(config): """ 创建数据库引擎。 """ db_string = f'mysql+pymysql://{config["user"]}:{config["password"]}@{config["host"]}/{config["db"]}?charset={config["charset"]}' try: engine = create_engine(db_string) print(type(engine)) except Exception as e: print(f"创建数据库引擎失败: {e}") # 根据需要处理异常,例如记录日志或重试 raise # 如果需要将异常继续抛出 return engine def fetch_data(engine, query): """ 从数据库中获取数据。 参数: engine: 数据库连接引擎对象。 query: SQL查询字符串。 返回: 查询结果的数据框(df)。 """ # 确保query是字符串类型 if not isinstance(query, str): logging.error("查询字符串类型错误,query应为字符串。") raise ValueError("查询字符串类型错误,query应为字符串。") if not query.strip(): logging.error("查询字符串为空。") raise ValueError("查询字符串为空。") try: df = pd.read_sql(text(query), engine) return df except Exception as e: logging.error(f"执行SQL查询失败: {e}") raise # 重新抛出异常以便上层处理 def preprocess_data(df, target_col, default_year=2024, features=None): """ 对数据进行预处理,包括日期列转换、特征标准化等。 """ # 检查df是否为空 if df.empty: logging.error("输入的DataFrame为空") return None, None, None # 检查'ptime'列是否存在 if 'ptime' not in df.columns: logging.error("DataFrame中不存在'ptime'列") return None, None, None default_year = 2024 df['ptime'] = df['ptime'].apply(lambda x: datetime.strptime(f"{default_year}/{x}", "%Y/%m/%d")) # 或者,如果使用pd.to_datetime,并且'ptime'格式特殊,需要指定格式 # df['ptime'] = pd.to_datetime(df['ptime'], errors='coerce', format='%m/%d').dt.strftime('%Y-%m-%d') print(df.head) # 如果'ptime'已经是datetime类型,则无需转换 if df['ptime'].dtype == 'datetime64[ns]': print("ptime列已经是以日期时间格式存储。") else: try: # 尝试将'ptime'列转换为datetime类型 df['ptime'] = pd.to_datetime(df['ptime'], format='%m/%d/%Y') except ValueError: logging.error("ptime列转换为datetime类型失败,可能是因为格式不正确。") return None, None, None # 设置'ptime'为索引 #df.set_index('ptime', inplace=True) # 确定features列表 if target_col in df.columns: features = df.columns.drop(target_col) print("features:", features) else: logging.warning(f"目标列 '{target_col}' 在DataFrame中未找到,将不进行列删除操作。") features = df.columns # 检查features是否被正确设置 if features is None: logging.error("未找到任何特征列。") return None, None, None print("@@@@") print(target_col) print("@@@@") try: df.set_index('ptime', inplace=True) except KeyError: print("列 'ptime' 不存在,无法设置为索引。") # 在这里处理缺少'ptime'的情况,比如跳过相关操作或使用其他列 # 使用MinMaxScaler进行特征缩放 scaler = MinMaxScaler() scaled_features = scaler.fit_transform(df[features]) scaled_target = scaler.fit_transform(df.index.values.reshape(-1, 1)) print("~~~") return scaled_features, scaled_target, scaler def split_dataset_into_train_test(features, target, test_size=0.2): """ 切分数据集为训练集和测试集。 """ # 检查features和target的类型以及长度是否相等 if not isinstance(features, np.ndarray) or not isinstance(target, np.ndarray): raise TypeError("features and target must be numpy arrays") if len(features) != len(target): raise ValueError("features and target must have the same length") # 检查test_size是否在合理的范围内 if not 0 < test_size < 1: raise ValueError("test_size must be between 0 and 1") # 计算训练集大小 train_size = int(len(features) * (1 - test_size)) # 使用numpy的切片操作,这不会创建新的数据副本,提高性能 train_features, test_features = features[:train_size], features[train_size:] train_target, test_target = target[:train_size], target[train_size:] print("123456") print(features) print(target) print(train_features) print(train_target) print(test_features) print(test_target) return train_features, test_features, train_target, test_target def validate_data_shapes(train_features, test_features, n_steps): """ 验证训练和测试数据形状是否符合预期。 """ if train_features.shape[1] != n_steps or test_features.shape[1] != n_steps: raise ValueError(f"训练和测试特征的第二维度(时间步长)应为{n_steps}") print("7890") def build_model(n_steps, lstm_units, dense_units, input_shape): inputs = Input(shape=input_shape) # 添加Input对象 x = LSTM(lstm_units)(inputs) # 直接将Input对象传递给LSTM层 outputs = Dense(dense_units)(x) model = tf.keras.Model(inputs=inputs, outputs=outputs) model.compile(optimizer='adam', loss='mse') print("!!!") return model def validate_params(epochs, batch_size): """ 确保 epochs 和 batch_size 是合法的参数。 """ if not isinstance(epochs, int) or epochs <= 0: raise ValueError("epochs 应该是一个正整数") if not isinstance(batch_size, int) or batch_size <= 0: raise ValueError("batch_size 应该是一个正整数") if epochs <= 0 or batch_size <= 0: raise ValueError("epochs和batch_size必须大于0") print("%%%") def ensure_directory_safety(path:str): """ 确保路径安全且存在。 """ if not os.path.isabs(path): raise ValueError("路径应该是绝对路径") directory = os.path.dirname(path) print(directory) try: # 检查目录是否需要创建 if not os.path.exists(directory): # 添加日志记录 logger.info(f"目录 {directory} 不存在,开始创建。") # 使用 exist_ok=True 避免在目录已存在时抛出异常 os.makedirs(directory, exist_ok=True) logger.info(f"目录 {directory} 创建成功。") except PermissionError: # 捕获权限异常,给出清晰的错误提示 logger.error(f"没有权限在 {directory} 创建目录。") raise except Exception as e: # 捕获其他异常,记录并抛出 logger.error(f"创建目录 {directory} 时发生未知错误:{e}") raise print("===") def train_model(model: Model, train_features, train_target, test_features, test_target, epochs: int, batch_size: int, patience: int, save_best_only: bool = True, monitor: str = 'val_loss', mode: str = 'min', model_path: str = "best_model.h5") -> dict: """ 训练模型,并根据早停策略和性能指标保存最佳模型。 :param model: Keras模型实例 :param train_features: 训练特征 :param train_target: 训练目标 :param test_features: 测试特征 :param test_target: 测试目标 :param epochs: 训练轮数 :param batch_size: 批量大小 :param patience: 早停策略的耐心值 :param save_best_only: 是否只保存最佳模型 :param monitor: 监控的指标 :param mode: 监控指标的模式(min/max) :param model_path: 模型保存路径 :return: 训练历史记录 """ logging.basicConfig(level=logging.INFO) model_path = "/path/to/your/model.h5" validate_params(epochs, batch_size) ensure_directory_safety(model_path) # 使用ModelCheckpoint保存最佳模型 filepath = model_path checkpoint = ModelCheckpoint(filepath, monitor=monitor, verbose=1, save_best_only=save_best_only, mode=mode) # 定义早停策略 early_stopping = EarlyStopping(monitor=monitor, patience=patience, verbose=1) try: history = model.fit(train_features, train_target, epochs=epochs, batch_size=batch_size, validation_data=(test_features, test_target), verbose=1, callbacks=[early_stopping, checkpoint]) logging.info("###") return history except ValueError as ve: logging.error(f"参数错误: {ve}") raise except OSError as oe: logging.error(f"文件操作错误: {oe}") raise except Exception as e: logging.error(f"模型训练过程中发生异常: {e}") raise def build_and_train_model(n_steps, features, target, train_features, train_target, test_features, test_target,lstm_units=50, dense_units=1, optimizer='adam', loss='mse', epochs=100, batch_size=32,patience=10, model_save_path='model.h5'): """ 构建LSTM模型并进行训练,增加了参数可配置性,早停策略和模型保存。 """ # 输入数据验证 if not (isinstance(train_features, np.ndarray) and isinstance(train_target, np.ndarray) and isinstance(test_features, np.ndarray) and isinstance(test_target, np.ndarray)): raise ValueError("输入数据train_features, train_target, test_features, test_target必须是numpy数组") checkpoint = ModelCheckpoint(filepath="/path/to/your/model.keras", # 注意这里的路径保持为.h5 monitor='val_loss', # 或您希望监控的指标 verbose=1, save_best_only=True, mode='min') # 数据形状验证 validate_data_shapes(train_features, test_features, n_steps) model = build_model(n_steps, lstm_units, dense_units, input_shape) # 早停策略 early_stopping = EarlyStopping(monitor='val_loss', patience=patience, verbose=1) history = model.fit(train_features, train_target, validation_data=(test_features, test_target), epochs=epochs, batch_size=batch_size, callbacks=[checkpoint], # 其他参数... ) # 模型保存 # 增加了路径验证来防止潜在的安全问题,这里简化处理,实际应用中可能需要更复杂的逻辑 if not model_save_path.endswith('.h5'): model_save_path += '.h5' model.save(model_save_path) return model, history def evaluate_model(model, scaler, test_target, predictions): """ 评估模型性能并反向转换预测结果。 """ predictions = scaler.inverse_transform(predictions) test_target_inv = scaler.inverse_transform(test_target.reshape(-1, 1)) mse = mean_squared_error(test_target_inv, predictions) print(f'Mean Squared Error: {mse}') return mse if __name__ == "__main__": engine = create_database_engine(get_db_config()) query = "SELECT ptime, ci FROM may" df = fetch_data(engine, query) target_col = 'ptime' features, target, scaler = preprocess_data(df, target_col) train_features, test_features, train_target, test_target = split_dataset_into_train_test(features, target, test_size=0.2) n_steps = 5 # 假设train_features和test_features是你的数据,且它们是二维数组 # 首先,你需要获取或设定一个maxlen,这里假设我们已知或计算出它应该是5 maxlen = 5 # 对训练数据进行填充或截断 train_features_padded = pad_sequences(train_features, maxlen=maxlen, padding='post', truncating='post') # 对测试数据进行同样的处理 test_features_padded = pad_sequences(test_features, maxlen=maxlen, padding='post', truncating='post') input_shape = (n_steps, int(train_features.shape[1])) model, history = build_and_train_model(n_steps=n_steps, features=features, target=target, train_target=train_target, test_target=test_target, train_features=train_features_padded, test_features=test_features_padded) predictions = model.predict(test_features) mse = evaluate_model(model, scaler, test_target, predictions) # 可视化预测结果(可选) #plt.plot(test_target, label='Actual') plt.plot(predictions, label='Predicted') plt.legend() plt.xlabel('Ptime') plt.ylabel('CI') #plt.plot(ptime, ci) plt.show() # 保存模型 model.save('trained_model.h5') joblib.dump(scaler, 'scaler.joblib')