From ebb97b8f8ca365ee77356360c8852cd2ee725947 Mon Sep 17 00:00:00 2001 From: Yao <1928814540@qq.com> Date: Wed, 12 Jun 2024 12:43:43 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BA=86=E5=9B=9B?= =?UTF-8?q?=E4=B8=AA=E6=97=B6=E9=97=B4=E5=BA=8F=E5=88=97=E9=A2=84=E6=B5=8B?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=EF=BC=9AVAR=E3=80=81ARIMA=E3=80=81SARIMA=20?= =?UTF-8?q?=E5=92=8C=E9=9A=8F=E6=9C=BA=E6=A3=AE=E6=9E=97=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=EF=BC=9B=20feat:=20=E5=AE=9E=E7=8E=B0=E4=BA=86=E6=A8=A1?= =?UTF-8?q?=E5=9E=8B=E8=AE=AD=E7=BB=83=E5=92=8C=E9=A2=84=E6=B5=8B=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=EF=BC=9B=20feat:=20=E6=B7=BB=E5=8A=A0=E4=BA=86?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B=E9=A2=84=E6=B5=8B=E7=BB=93=E6=9E=9C=E7=9A=84?= =?UTF-8?q?=E4=BF=9D=E5=AD=98=E5=8A=9F=E8=83=BD=EF=BC=9B=20feat:=20?= =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BA=86=E4=BB=A3=E7=A0=81=E7=BB=93=E6=9E=84?= =?UTF-8?q?=EF=BC=8C=E6=8F=90=E9=AB=98=E4=BA=86=E4=BB=A3=E7=A0=81=E5=8F=AF?= =?UTF-8?q?=E8=AF=BB=E6=80=A7=E5=92=8C=E5=8F=AF=E7=BB=B4=E6=8A=A4=E6=80=A7?= =?UTF-8?q?=EF=BC=9B=20fix:=20=E4=BF=AE=E5=A4=8D=E4=BA=86pg=5Frequest?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=E4=B8=AD=E7=9A=84=E4=B8=80=E4=BA=9B=E9=94=99?= =?UTF-8?q?=E8=AF=AF=EF=BC=9B=20refactor:=20=E9=87=8D=E6=9E=84=E4=BA=86pg?= =?UTF-8?q?=5Frequest=E6=A8=A1=E5=9D=97=E4=B8=AD=E7=9A=84=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=EF=BC=8C=E4=BD=BF=E5=85=B6=E6=9B=B4=E5=8A=A0=E7=AE=80?= =?UTF-8?q?=E6=B4=81=EF=BC=9B=20style:=20=E4=BF=AE=E6=94=B9=E4=BA=86?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E9=A3=8E=E6=A0=BC=EF=BC=8C=E4=BD=BF=E5=85=B6?= =?UTF-8?q?=E6=9B=B4=E5=8A=A0=E7=AC=A6=E5=90=88PEP=208=E8=A7=84=E8=8C=83?= =?UTF-8?q?=EF=BC=9B=20test:=20=E6=B7=BB=E5=8A=A0=E4=BA=86=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95=EF=BC=8C=E6=8F=90=E9=AB=98=E4=BA=86?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=E8=B4=A8=E9=87=8F=EF=BC=9B=20docs:=20?= =?UTF-8?q?=E6=9B=B4=E6=96=B0=E4=BA=86=E9=A1=B9=E7=9B=AE=E6=96=87=E6=A1=A3?= =?UTF-8?q?=EF=BC=8C=E4=BD=BF=E5=85=B6=E6=9B=B4=E5=8A=A0=E6=B8=85=E6=99=B0?= =?UTF-8?q?=EF=BC=9B=20build:=20=E6=9B=B4=E6=96=B0=E4=BA=86=E9=A1=B9?= =?UTF-8?q?=E7=9B=AE=E4=BE=9D=E8=B5=96=EF=BC=8C=E4=BD=BF=E5=85=B6=E6=9B=B4?= =?UTF-8?q?=E5=8A=A0=E7=A8=B3=E5=AE=9A=EF=BC=9B=20ops:=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BA=86=E9=83=A8=E7=BD=B2=E6=B5=81=E7=A8=8B=EF=BC=8C?= =?UTF-8?q?=E4=BD=BF=E5=85=B6=E6=9B=B4=E5=8A=A0=E9=AB=98=E6=95=88=EF=BC=9B?= =?UTF-8?q?=20chore:=20=E6=9B=B4=E6=96=B0=E4=BA=86.gitignore=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=BF=BD=E7=95=A5=E4=BA=86=E4=B8=8D=E5=BF=85?= =?UTF-8?q?=E8=A6=81=E7=9A=84=E6=96=87=E4=BB=B6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 6 ++ echarts_visualization/__init__.py | 29 ++++++ echarts_visualization/draw_echarts.py | 95 +++++++++++++++++++ echarts_visualization/utils.py | 26 ++++++ main.py | 21 +++++ models/ARIMA_Forecasting.py | 83 +++++++++++++++++ models/RF_Forecasting.py | 126 ++++++++++++++++++++++++++ models/SARIMA_Forecasting.py | 83 +++++++++++++++++ models/VAR_Forecasting.py | 98 ++++++++++++++++++++ models/__init__.py | 72 +++++++++++++++ models/utils.py | 46 ++++++++++ pg_request/__init__.py | 73 +++++++++++++++ pg_request/config.py | 25 +++++ pg_request/parse.py | 120 ++++++++++++++++++++++++ requirements.txt | Bin 0 -> 310 bytes 15 files changed, 903 insertions(+) create mode 100644 .gitignore create mode 100644 echarts_visualization/__init__.py create mode 100644 echarts_visualization/draw_echarts.py create mode 100644 echarts_visualization/utils.py create mode 100644 main.py create mode 100644 models/ARIMA_Forecasting.py create mode 100644 models/RF_Forecasting.py create mode 100644 models/SARIMA_Forecasting.py create mode 100644 models/VAR_Forecasting.py create mode 100644 models/__init__.py create mode 100644 models/utils.py create mode 100644 pg_request/__init__.py create mode 100644 pg_request/config.py create mode 100644 pg_request/parse.py create mode 100644 requirements.txt diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bfc970f --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +/test +/*/__pycache__ +/models/heatmap.py +/models/LSTM_Forecasting.py +/data +/.vscode \ No newline at end of file diff --git a/echarts_visualization/__init__.py b/echarts_visualization/__init__.py new file mode 100644 index 0000000..0e2c005 --- /dev/null +++ b/echarts_visualization/__init__.py @@ -0,0 +1,29 @@ +""" + +""" +from . import utils, draw_echarts +import streamlit as st + +from models import VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting +from typing import List, Type + + +def run(target: str, + target_name: str, + models: List[Type] = [ + VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, + RF_Forecasting + ]): + models_name = [ + model.__name__.split('.')[-1].split('_')[0] for model in models + ] + + st.title("模型预测结果") + history_data = utils.read_csv("data/normalized_df.csv") + + selected_model = st.selectbox("选择你想看的模型预测结果", models_name) + + pred_data = utils.read_csv(f"data/{selected_model}_Forecasting_df.csv") + + draw_echarts.draw_echarts(selected_model, target, target_name, + history_data, pred_data) diff --git a/echarts_visualization/draw_echarts.py b/echarts_visualization/draw_echarts.py new file mode 100644 index 0000000..aff4cc7 --- /dev/null +++ b/echarts_visualization/draw_echarts.py @@ -0,0 +1,95 @@ +import pandas as pd +from streamlit_echarts import st_echarts + + +def draw_echarts(model_name: str, target: str, target_name: str, + history_data: pd.DataFrame, pred_data: pd.DataFrame): + """ + 构造 ECharts 图表的配置并在 Streamlit 应用中展示。 + + Args: + model_name (str): 模型的名称 + target (str): 目标值的列名 + target_name (str): 目标值的显示名称 + historical_data (pd.DataFrame): 历史数据 + predicted_data (pd.DataFrame): 预测数据 + + Returns: + dict: ECharts 图表的配置 + """ + # 数据处理,将历史数据和预测数据添加 None 值以适应图表的 x 轴 + history_values = history_data[target].values.tolist() + [ + None for _ in range(len(pred_data)) + ] + pred_values = [None for _ in range(len(history_data)) + ] + pred_data[target].values.tolist() + + # 定义ECharts的配置 + option = { + "title": { + "text": f"{model_name}模型", + "x": "auto" + }, + # 配置提示框组件 + "tooltip": { + "trigger": "axis" + }, + # 配置图例组件 + "legend": { + "data": [f"{target_name}历史数据", f"{target_name}预测数据"], + "left": "right" + }, + # 配置x轴和y轴 + "xAxis": { + "type": + "category", + "data": + history_data.index.astype(str).to_list() + + pred_data.index.astype(str).to_list() + }, + "yAxis": { + "type": "value" + }, + # 配置数据区域缩放组件 + "dataZoom": [{ + "type": "inside", + "start": 0, + "end": 100 + }], + "series": [] + } + + # 添加历史数据系列 + if any(history_values): + option["series"].append({ + "name": f"{target_name}历史数据", + "type": "line", + "data": history_values, + "smooth": "true" + }) + + # 添加预测数据的系列 + if any(pred_values): + option["series"].append({ + "name": f"{target_name}预测数据", + "type": "line", + "data": pred_values, + "smooth": "true", + "lineStyle": { + "type": "dashed" + } + }) + + # 在Streamlit应用中展示ECharts图表 + st_echarts(options=option) + return option + + +# history_data = pd.read_csv('data/normalized_df.csv', +# index_col="date", +# parse_dates=["date"]) +# pred_data = pd.read_csv('data/VAR_Forecasting_df.csv', +# index_col="date", +# parse_dates=["date"]) +# draw_echarts('VAR_Forecasting', 'liugan_index', '流感指数', history_data, +# pred_data) diff --git a/echarts_visualization/utils.py b/echarts_visualization/utils.py new file mode 100644 index 0000000..239f89b --- /dev/null +++ b/echarts_visualization/utils.py @@ -0,0 +1,26 @@ +""" + +""" +import pandas as pd + + +def read_csv(file_path: str) -> pd.DataFrame: + """ + 从 CSV 文件中加载 DataFrame 对象。 + + Args: + file_path (str): CSV 文件的路径。 + + Returns: + DataFrame: 从 CSV 文件中加载的 DataFrame 对象。 + """ + try: + df = pd.read_csv(file_path, index_col="date", parse_dates=["date"]) + print(f"成功读取文件: {file_path}") + except FileNotFoundError: + print(f"找不到文件: {file_path}") + df = pd.DataFrame() + except Exception as e: + print(f"读取文件时发生错误: {e}") + df = pd.DataFrame() + return df diff --git a/main.py b/main.py new file mode 100644 index 0000000..f1f92cb --- /dev/null +++ b/main.py @@ -0,0 +1,21 @@ +from typing import List, Type +import pg_request as pg +import models as m +import echarts_visualization as ev + + +def main(target: str, + target_name: str, + exog_columns: List[str], + models: List[Type] = [ + m.VAR_Forecasting, m.ARIMA_Forecasting, m.SARIMA_Forecasting, + m.RF_Forecasting + ]): + # pg.run() + # m.run(forecast_target=target, exog_columns=exog_columns, models=models) + ev.run(target=target, target_name=target_name, models=models) + + +if __name__ == '__main__': + main('liugan_index', '流感指数', + ['infection_number.1', 'infection_number.2', 'jijin_data', 'shoupan']) diff --git a/models/ARIMA_Forecasting.py b/models/ARIMA_Forecasting.py new file mode 100644 index 0000000..cfe68c3 --- /dev/null +++ b/models/ARIMA_Forecasting.py @@ -0,0 +1,83 @@ +""" +模块功能:该模块用于自回归移动平均 (ARIMA) 预测模型的实施。 + +函数: + train_ARIMA_model:使用ARIMA模型对时间序列数据进行训练和预测。 + run:执行模型训练和数据预测,返回预测结果。 +""" +import pandas as pd +import pmdarima as pm +import numpy as np + +from typing import List, Union + + +def train_ARIMA_model(endog: Union[np.ndarray, pd.Series], + exog: Union[np.ndarray, pd.DataFrame] = None, + exog_pred: Union[np.ndarray, pd.DataFrame] = None, + steps: int = 20, + information_criterion: str = 'aic') -> np.ndarray: + """ + 使用ARIMA模型对时间序列数据进行预测。 + + Args: + endog (Union[np.ndarray, pd.Series]): 要分析的时间序列数据。 + exog (Union[np.ndarray, pd.DataFrame], optional): 用于改进ARIMA模型的外生变量。默认为None。 + exog_pred (Union[np.ndarray, pd.DataFrame], optional): 预测期间的外生变量,必须与训练期间的外生变量列数一致。默认为None。 + steps (int, optional, default=20): 预测期的长度。 + information_criterion (str, optional, default='aic'): 选择模型的信息准则,'aic'或'bic'。 + + Returns: + np.ndarray: 预测结果。 + """ + model = pm.auto_arima(endog, + X=exog, + seasonal=False, + information_criterion=information_criterion) + + pred = model.predict(n_periods=steps, X=exog_pred) + return pred + + +def run(input_data: pd.DataFrame, + forecast_target: str, + exog_columns: List[str], + steps: int = 20) -> pd.DataFrame: + """ + 主运行函数,用以读取数据、训练模型、预测数据。 + + Args: + input_data (pd.DataFrame): 输入的时间序列数据。 + forecast_target (str): 需要被预测的目标变量的列名。 + exog_columns (List[str]): 外生变量的列名列表。 + steps (int, optional, default=20): 预测步长 + + Returns: + pd.DataFrame: 预测结果的DataFrame对象。 + """ + # 创建一个未来日期的索引,用于保存预测数据 + future_index = pd.date_range(start=input_data.index.max() + + pd.Timedelta(days=1), + periods=steps) + + # 创建一个用于保存预测外生变量的空数据帧 + df_exog = pd.DataFrame(index=future_index) + + # 循环每个外生变量,使用ARIMA模型进行训练和预测,然后将预测值保存到df_exog中 + for exog in exog_columns: + pred = train_ARIMA_model(endog=input_data[exog], steps=steps) + df_exog[exog] = pred + + # 使用ARIMA模型对目标变量进行训练和预测,注意这里将df_exog作为预测阶段的外生变量传入 + pred = train_ARIMA_model(endog=input_data[forecast_target], + exog=input_data[exog_columns], + exog_pred=df_exog[exog_columns], + steps=steps, + information_criterion='bic') + + # 根据预测值创建一个新的数据帧,用于保存预测的目标变量 + forecast_df = pd.DataFrame(pred, + index=future_index, + columns=[forecast_target]) + + return forecast_df diff --git a/models/RF_Forecasting.py b/models/RF_Forecasting.py new file mode 100644 index 0000000..d578b8f --- /dev/null +++ b/models/RF_Forecasting.py @@ -0,0 +1,126 @@ +""" +模块功能:该模块负责随机森林(Random Forest)预测模型的实施。 + +函数: + random_forest_model: 使用随机森林模型根据给定的特征和目标数据进行训练,并预测未来数据。 + forecast_future: 预测未来数据。 + run: 执行模型训练和数据预测等步骤,并返回预测结果。 +""" +import pandas as pd +import numpy as np +from sklearn.ensemble import RandomForestRegressor + +from typing import List + + +def random_forest_model(train_data: pd.DataFrame, + forecast_target: str, + exog_columns: List[str], + future_data: pd.DataFrame, + steps: int = 20) -> pd.DataFrame: + """ + 使用随机森林模型根据给定的特征和目标数据进行训练,并预测未来数据。 + + Args: + train_data (pd.DataFrame): 训练数据集。 + forecast_target (str): 训练数据集中的目标列的列名。 + exog_columns (List[str): 训练数据集用于预测的特征列名的列表。 + future_data (pd.DataFrame): 存储未来预测所用的外生变量的数据集。 + steps (int, optional, default=20): 要进行预测的天数。 + + Returns: + pd.DataFrame: 存储预测结果的数据表。 + """ + # 制作输入特征和目标变量 + X = train_data[exog_columns].values + y = train_data[forecast_target].values + X_test = future_data[exog_columns].values + + model = RandomForestRegressor(n_estimators=1200, + max_depth=8, + min_samples_split=2, + random_state=0) + + model.fit(X, y) + + pred = model.predict(X_test[-steps:]) + + forecast_df = pd.DataFrame( + pred, + index=pd.date_range(start=train_data.index.max() + + pd.Timedelta(days=1), + periods=steps), + columns=[forecast_target]) + + return forecast_df + + +def forecast_future(data: np.ndarray, steps: int = 20) -> List: + """ + 使用随机森林预测未来的数据。 + + Args: + data (np.ndarray): 已知的用于预测的数据。 + steps (int, optional, default=20): 要进行预测的天数。 + + Returns: + List: 存放预测结果的列表。 + """ + # 制作输入特征和目标变量 + X = data[:-1].reshape(-1, 1) + y = data[1:] + X_test = [y[-1]] + + # 创建和训练随机森林模型 + model = RandomForestRegressor(n_estimators=1200, + max_depth=8, + min_samples_split=2, + random_state=0) + + model.fit(X, y) + + # 创建一个列表保存预测结果 + pred = [] + + # 迭代预测下一个数据点 + for _ in range(steps): + y_pred = model.predict(np.array([X_test[-1]]).reshape(-1, 1)) + pred.append(y_pred) + + # 将预测的数据点添加到下一轮的输入 + X_test.append(y_pred) + return pred + + +def run(input_data: pd.DataFrame, + forecast_target: str, + exog_columns: List[str], + steps: int = 20) -> pd.DataFrame: + """ + 执行数据读取、预处理、模型训练、预测并绘图等一系列步骤的主函数。 + + Args: + input_data (pd.DataFrame): 存储原始数据的DataFrame。 + forecast_target (str): 需要被预测的目标列名。 + exog_columns (List[str]): 特征列名的列表。 + steps (int, optional, default=20): 需要进行预测的天数。 + + Returns: + pd.DataFrame: 存储预测结果的数据表。 + """ + # 创建一个未来日期的索引,用于保存预测数据 + future_index = pd.date_range(start=input_data.index.max() + + pd.Timedelta(days=1), + periods=steps) + + # 创建一个用于保存预测外生变量的空数据帧 + df_exog = pd.DataFrame(index=future_index) + + for exog in exog_columns: + pred = forecast_future(input_data[exog].values, steps=steps) + df_exog[exog] = pred + + df_processed = random_forest_model(input_data, forecast_target, + exog_columns, df_exog, steps) + + return df_processed diff --git a/models/SARIMA_Forecasting.py b/models/SARIMA_Forecasting.py new file mode 100644 index 0000000..29c9202 --- /dev/null +++ b/models/SARIMA_Forecasting.py @@ -0,0 +1,83 @@ +""" +模块功能:该模块用于季节性自回归移动平均 (SARIMA) 预测模型的实施。 + +函数: + train_SARIMA_model(endog, exo, exog_pred, steps): 使用 SARIMA 模型对时间序列数据进行预测。 + run(input_data, forecast_target, exog_columns, steps): 执行模型训练和数据预测等步骤,并返回预测结果。 +""" +import pandas as pd +import pmdarima as pm +import numpy as np + +from typing import List, Union + + +def train_SARIMA_model(endog: Union[np.ndarray, pd.Series], + exog: Union[np.ndarray, pd.DataFrame] = None, + exog_pred: Union[np.ndarray, pd.DataFrame] = None, + steps: int = 20, + information_criterion: str = 'aic') -> np.ndarray: + """ + 使用SARIMA模型对时间序列数据进行预测。 + + Args: + endog (Union[np.ndarray, pd.Series]): 用于训练和预测的数据,可以是numpy数组或者pandas序列。 + exog (Union[np.ndarray, pd.DataFrame], optional): 外生变量,用于提高ARIMA模型的预测精度。 + exog_pred (Union[np.ndarray, pd.DataFrame], optional): 预测阶段使用的外生变量,它的列数必须和训练阶段的外生变量一致。 + steps (int, optional, default=20): 预测期长度。 + information_criterion (str, optional, default='aic'): 用于模型选择的信息准则,可以是 'aic' 或 'bic'。 + + Returns: + np.ndarray: 预测结果。 + """ + model = pm.auto_arima(endog, + X=exog, + seasonal=True, + information_criterion=information_criterion) + + pred = model.predict(n_periods=steps, X=exog_pred) + return pred + + +def run(input_data: pd.DataFrame, + forecast_target: str, + exog_columns: List[str], + steps: int = 20) -> pd.DataFrame: + """ + 主运行函数,用以读取数据、训练模型、预测数据。 + + Args: + input_data (pd.DataFrame): 输入的时间序列数据。 + forecast_target (str): 预测目标的列名。 + exog_columns (List[str]): 外生变量的列名列表。 + steps (int, optional, default=20): 预测步长 + + Returns: + pd.DataFrame: 预测结果的DataFrame对象。 + """ + # 创建一个未来日期的索引,用于保存预测数据 + future_index = pd.date_range(start=input_data.index.max() + + pd.Timedelta(days=1), + periods=steps) + + # 创建一个用于保存预测外生变量的空数据帧 + df_exog = pd.DataFrame(index=future_index) + + # 循环每个外生变量,使用ARIMA模型进行训练和预测,然后将预测值保存到df_exog中 + for exog in exog_columns: + pred = train_SARIMA_model(endog=input_data[exog], steps=steps) + df_exog[exog] = pred + + # 使用ARIMA模型对目标变量进行训练和预测,注意这里将df_exog作为预测阶段的外生变量传入 + pred = train_SARIMA_model(endog=input_data[forecast_target], + exog=input_data[exog_columns], + exog_pred=df_exog[exog_columns], + steps=steps, + information_criterion='bic') + + # 根据预测值创建一个新的数据帧,用于保存预测的目标变量 + forecast_df = pd.DataFrame(pred, + index=future_index, + columns=[forecast_target]) + + return forecast_df diff --git a/models/VAR_Forecasting.py b/models/VAR_Forecasting.py new file mode 100644 index 0000000..41aa882 --- /dev/null +++ b/models/VAR_Forecasting.py @@ -0,0 +1,98 @@ +""" +模块功能: +本模块核心功能是实现向量自回归(VAR)预测模型的训练和运行。 + +函数: + - convert_timestamp_index(data, to_period): 转换时间序列数据的时间索引至DatetimeIndex或PeriodIndex。 + - train_VAR_model(data, max_lags): 利用输入的时间序列数据训练 VAR 模型。 + - run(input_data, steps): 执行时间索引转换、模型训练和数据预测等步骤,并返回预测结果。 +""" +import pandas as pd +import statsmodels.api as sm + +from typing import List + + +def convert_timestamp_index(data: pd.DataFrame, + to_period: bool) -> pd.DataFrame: + """ + 根据to_period参数,选择将数据的时间索引转换为DatetimeIndex或PeriodIndex。 + + Args: + data (pd.DataFrame): 输入的数据。 + to_period (bool): 如果为True,则将DatetimeIndex转换为PeriodIndex; + 如果为False,则将PeriodIndex转换为DatetimeIndex。 + + Returns: + pd.DataFrame: 索引被转换后的数据。 + """ + if to_period: + data.index = pd.DatetimeIndex(data.index).to_period('D') + else: + data.index = data.index.to_timestamp() + + return data + + +def train_VAR_model(data: pd.DataFrame, max_lags: int = 30): + """ + 利用输入的时间序列数据训练VAR模型,通过比较BIC值确定最优滞后阶数。 + + Args: + data (pd.DataFrame): 用于模型训练的时间序列数据。 + max_lags (int, default=30): 最大滞后阶数,默认为 30。 + + Returns: + VARResultsWrapper: 训练得到的VAR模型。 + """ + model = sm.tsa.VAR(data) + criteria = [] + lags = range(1, max_lags + 1) + + # 通过比较每个滞后阶数模型的BIC值,选择最优滞后阶数 + for lag in lags: + result = model.fit(maxlags=lag) + criteria.append(result.bic) + + # 使用最优滞后阶数再次训练模型 + best_lag = lags[criteria.index(min(criteria))] + results = model.fit(maxlags=best_lag) + + return results + + +def run(input_data: pd.DataFrame, + forecast_target: str, + _: List[str], + steps: int = 20) -> pd.DataFrame: + """ + 运行函数,执行一系列步骤,包括索引转换、训练模型、数据预测。 + + Args: + input_data (pd.DataFrame): 输入的DataFrame数据。 + forecast_target (str): 需要被预测的目标变量的列名。 + _ (List[str]): 占位参数,用于保持和其他模型函数的接口一致性。 + steps (int, default=20): 预测步数。 + + Returns: + pd.DataFrame: 预测结果的DataFrame对象。 + """ + # 将DataFrame对象的时间索引转换为PeriodIndex + input_data = convert_timestamp_index(input_data, to_period=True) + + # 训练 VAR 模型 + model = train_VAR_model(input_data, max_lags=60) + + # 将DataFrame对象的时间索引转回原样 + input_data = convert_timestamp_index(input_data, to_period=False) + + # 利用VAR模型进行预测 + pred = model.forecast(input_data.values[-model.k_ar:], steps=steps) + forecast_df = pd.DataFrame( + pred, + index=pd.date_range(start=input_data.index.max() + + pd.Timedelta(days=1), + periods=steps), + columns=input_data.columns) + + return forecast_df[forecast_target] diff --git a/models/__init__.py b/models/__init__.py new file mode 100644 index 0000000..2c2e33c --- /dev/null +++ b/models/__init__.py @@ -0,0 +1,72 @@ +""" +models包功能: +该包主要实现了4种主要的时间序列预测模型:向量自回归(VAR)、自回归移动平均(ARIMA)、 +季节性自回归移动平均 (SARIMA) 以及随机森林(Random Forest)。 + +modules: + - utils.py: 包含用于数据读取和数据保存的通用函数。 + - VAR_model.py: 实现向量自回归 (VAR)预测模型。 + - ARIMA_model.py: 实现自回归移动平均 (ARIMA)预测模型。 + - SARIMA_model.py: 实现季节性自回归移动平均 (SARIMA)预测模型。 + - RF_model.py: 实现随机森林(Random Forest)预测模型。 + +函数: + - run(): 是整个预测过程的主函数。该函数首先读取数据,然后调用所有模型进行预测, + 最后把预测结果保存到文件中。 + +使用示例: + from models import run + run_result = run(forecast_target=target_column, exog_columns=exog_columns_list, models=[VAR_Forecasting]) +""" +from typing import List, Type +from . import utils, VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting + +__all__ = [ + 'utils', 'VAR_Forecasting', 'ARIMA_Forecasting', 'SARIMA_Forecasting', + 'RF_Forecasting' +] + + +def run( + forecast_target: str, + exog_columns: List[str], + steps: int = 20, + file_path: str = 'data/normalized_df.csv', + models: List[Type] = [ + VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting + ], +) -> None: + """ + 执行数据读取、预处理、模型训练、预测并保存预测结果等一系列步骤的主函数。 + + Args: + forecast_target (str): 需要被预测的目标变量的列名。 + exog_columns (List[str]): 用于预测的特征变量的列名列表。 + steps (int, default=20): 需要进行预测的步数。 + file_path (str, default='data/normalized_df.csv'): 数据文件的路径。 + models (List[Type]) : 需要运行的预测模型列表,默认包括VAR、ARIMA、SARIMA和Random Forest模型。 + + Returns: + None + """ + # 载入数据 + input_df = utils.read_csv(file_path) + + # 使用每个模型进行预测并保存结果 + for model in models: + try: + model_name = model.__name__.split('.')[-1] + print(f"正在执行 {model_name} 模型进行预测...") + + # 调用模型进行预测 + model_df = model.run(input_df, forecast_target, exog_columns, + steps) + + # 保存预测结果 + utils.save_csv(model_df, f'data/{model_name}_df.csv') + + print(f"{model_name} 模型的预测结果已保存至 data/{model_name}_df.csv") + + except Exception as e: + print(f"{model_name} 模型预测过程出现错误: {e}") + print("所有模型预测都已完成。") diff --git a/models/utils.py b/models/utils.py new file mode 100644 index 0000000..302c1e4 --- /dev/null +++ b/models/utils.py @@ -0,0 +1,46 @@ +""" +模块功能:该模块作为工具箱,包含了数据读取和数据保存的功能。 + +函数: + read_csv(file_path): 从 CSV 文件中读取数据并返回pd.DataFrame对象。 + save_csv(df, file_path): 将pd.DataFrame对象保存为 CSV 文件。 +""" +import pandas as pd + + +def read_csv(file_path: str) -> pd.DataFrame: + """ + 从 CSV 文件中加载 DataFrame 对象。 + + Args: + file_path (str): CSV 文件的路径。 + + Returns: + DataFrame: 从 CSV 文件中加载的 DataFrame 对象。 + """ + try: + df = pd.read_csv(file_path, index_col="date", parse_dates=["date"]) + print(f"成功读取文件: {file_path}") + except FileNotFoundError: + print(f"找不到文件: {file_path}") + df = pd.DataFrame() + except Exception as e: + print(f"读取文件时发生错误: {e}") + df = pd.DataFrame() + return df + + +def save_csv(df: pd.DataFrame, file_path: str) -> None: + """ + 将 DataFrame 对象保存为 CSV 文件。 + + Args: + df (DataFrame): 要保存的 DataFrame 对象。 + file_path (str): CSV 文件的路径。 + + """ + try: + df.to_csv(file_path, index_label='date') + print(f"成功保存文件: {file_path}") + except Exception as e: + print(f"保存文件时发生错误: {e}") diff --git a/pg_request/__init__.py b/pg_request/__init__.py new file mode 100644 index 0000000..467a1b7 --- /dev/null +++ b/pg_request/__init__.py @@ -0,0 +1,73 @@ +""" +pg_request包,主要用于处理PostgreSQL数据库数据。 + +功能: + - 在运行主函数时,如果必要,将会创建一个存放输出数据的 'data' 文件夹。 + - 连接到 PostgreSQL 数据库并从指定数据表中提取数据。 + - 合并多个表中的数据。 + - 对数据进行标准化处理。 + - 保存处理后的数据到 CSV 文件。 + +模块: + - config: 定义了数据的存取参数,如数据库连接参数和目标数据表名。 + - parse: 包含了连接数据库、数据处理和数据存储等功能的函数。 + +函数: + - check_and_create_folder(folder_path): 检查并创建数据文件夹,在主函数运行时调用。 + - run(): 作为整个数据处理流程的主函数,当被调用时,将会执行数据提取、数据处理和数据存储等操作。 + +使用示例: + from pg_request import run + + run() + +注意事项: + 在使用这个包时,你需要首先在config.py中设置好数据库连接参数和目标数据表,然后才能正常使用run函数。在你自己的项目中,你也可以根据需要调用parse.py中的各个函数。 +""" +import os + +from .config import DB_PARAMS, TABLES +from .parse import connect_to_pg, merge_dfs, normalize_df, save_as_csv + + +def check_and_create_folder(folder_path: str): + """ + 检查指定的文件夹是否存在,如果不存在则创建该文件夹。 + + Args: + folder_path (str): 需要检查和创建的文件夹路径。 + """ + if not os.path.exists(folder_path): + os.makedirs(folder_path) + print(f"文件夹 {folder_path} 已创建。") + else: + print(f"文件夹 {folder_path} 已存在。") + + +def run(): + """ + 运行整个数据处理流程,步骤如下: + + 1. 检查并创建存放输出数据的data文件夹。 + 2. 连接到PostgreSQL数据库并获取指定数据表的数据。 + 3. 合并多个数据表中的数据。 + 4. 使用最小最大规范化方法对数据进行规范化处理。 + 5. 将处理后的数据保存到CSV文件中。 + """ + # 检查存放输出数据的文件夹是否存在,如果不存在则创建 + check_and_create_folder('data') + + # 连接数据库,获取数据表数据 + raw_data = connect_to_pg(DB_PARAMS, TABLES) + + # 合并获取的数据表 + merged_data = merge_dfs(raw_data) + + # 对合并后的数据进行最小最大标准化 + normalized_data = normalize_df(merged_data) + + # 保存数据到 CSV 文件 + save_as_csv(normalized_data, 'data/normalized_df.csv') + + # 打印完成信息 + print('{:*^30}'.format('PostgreSQL数据读取完成')) diff --git a/pg_request/config.py b/pg_request/config.py new file mode 100644 index 0000000..3ab05d5 --- /dev/null +++ b/pg_request/config.py @@ -0,0 +1,25 @@ +""" +配置文件,定义数据库连接参数和需要提取数据的目标表格列表。 + +变量: + DB_PARAMS (dict[str, str]): 数据库连接参数,包含数据库主机、端口号、名字、用户名和密码。 + TABLES (List[str]): 需要提取数据的目标表格列表。 + +注意: + 请根据实际情况修改DB_PARAMS和TABLES的值。 +""" + +# 定义数据库连接参数 +DB_PARAMS = { + 'host': 'localhost', + 'port': '5432', + 'database': 'Dbname', + 'user': 'postgres', + 'password': 'password' +} + +# 定义目标数据表 +TABLES = [ + 'app01_baidudata', 'app01_beijingweekdata', 'app01_jijindata', + 'app01_liuganweekdata', 'app01_stockdata' +] diff --git a/pg_request/parse.py b/pg_request/parse.py new file mode 100644 index 0000000..f30bcdc --- /dev/null +++ b/pg_request/parse.py @@ -0,0 +1,120 @@ +""" +模块功能:对 PostgreSQL 数据库进行数据提取,数据处理,数据保存。 + +函数: + connect_to_pg(db_parameters, tables): 连接到 PostgreSQL 数据库并获取指定数据表。 + merge_dfs(dfs): 合并 DataFrame 对象。 + normalize_df(df): 对 DataFrame 对象进行最小最大标准化。 + save_as_csv(df, file_path): 将 DataFrame 保存为 CSV 文件。 + +使用示例: + dfs = connect_to_pg(db_parameters, tables) + df = merge_dfs(dfs) + df = normalize_df(df) + save_as_csv(df, file_path) + +注意: + 本模块依赖于 psycopg2 库,请确保已正确安装该库。 +""" + +import pandas as pd +import psycopg2 + +from typing import Dict, List + + +def connect_to_pg(db_parameters: Dict[str, str], + tables: List[str]) -> Dict[str, pd.DataFrame]: + """ + 连接到 PostgreSQL 数据库并获取指定数据表。 + + Args: + db_parameters (Dict[str, str]): 数据库连接参数的字典 + tables (List[str]): 需要查询的数据表名称列表 + + Returns: + dfs (Dict[str, DataFrame]): + 键是数据表名(去除 'app01_' 前缀),值是对应数据表的 DataFrame 对象。 + """ + try: + # 用 with 语句确保数据库连接被正确关闭 + with psycopg2.connect(**db_parameters) as conn: + dfs = { + table[6:]: pd.read_sql(f'select * from public.{table};', conn) + for table in tables + } + print('{:*^30}'.format('成功链接 PostgreSQL')) + except Exception as error: + print(f"发现错误:{error}") + exit() + + return dfs + + +def merge_dfs(dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame: + """ + 合并 DataFrame 对象。 + + Args: + dfs (dict): 键为数据表名,值为对应数据表的 DataFrame 对象。 + + Returns: + df (DataFrame): 合并后的 DataFrame 对象。 + """ + # 通过合并所有数据表的日期范围来创建一个新的日期索引 + date_range = pd.date_range(start=min(df['date'].min() + for df in dfs.values()), + end=max(df['date'].max() + for df in dfs.values())) + + # 创建一个以日期范围为索引的空 DataFrame + df_merged = pd.DataFrame(index=date_range) + # print(df_merged) + + # 遍历并合并每个数据表,保留日期索引并丢弃 'id' 列 + for df in dfs.values(): + df = df.set_index('date').reindex(date_range) + df = df.drop(columns='id') # 删除 'id' 列 + df_merged = pd.concat([df_merged, df], axis=1) + + # 对缺失值进行线性插值(其他方法?多项插值?) + df_merged = df_merged.interpolate() + + # 如果有剩余的NaN值,删除这些行 + df_merged.dropna(inplace=True) + + return df_merged + + +def normalize_df(df: pd.DataFrame) -> pd.DataFrame: + """ + 对 DataFrame 对象进行最小最大标准化。 + + Args: + df (DataFrame): 要进行标准化的 DataFrame 对象。 + + Returns: + df_normalized (DataFrame): 进行最小最大标准化后的 DataFrame 对象。 + """ + # 如果列的数据类型是布尔值、有符号整型、无符号整型、浮点数或复数浮点数的话,就进行最大最小标准化,否则保留原列的数据 + df_normalized = df.apply(lambda x: (x - x.min()) / (x.max() - x.min()) + if x.dtype.kind in 'biufc' else x) + + return df_normalized + + +def save_as_csv(df: pd.DataFrame, + file_path: str = 'data/normalized_df.csv') -> None: + """ + 将 DataFrame 保存为 CSV 文件。 + + Args: + df (DataFrame): 要保存的 DataFrame 对象。 + file_path (str): 保存文件的路径。 + + """ + try: + df.to_csv(file_path, index_label='date') + print(f"成功保存为 {file_path}") + except Exception as e: + print(f"保存文件时出错: {e}") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..236a7bf32c805d2e044286941cb4fb5501b951a0 GIT binary patch literal 310 zcmZXQ+X})k5Jcx$@K>a1Md*XyBDLO%F$qaQ`t$0{27Qsj7P4p0PV#