feat: 添加了四个时间序列预测模型:VAR、ARIMA、SARIMA 和随机森林模型;

feat: 实现了模型训练和预测功能;
feat: 添加了模型预测结果的保存功能;
feat: 优化了代码结构,提高了代码可读性和可维护性;
fix: 修复了pg_request模块中的一些错误;
refactor: 重构了pg_request模块中的代码,使其更加简洁;
style: 修改了代码风格,使其更加符合PEP 8规范;
test: 添加了单元测试,提高了代码质量;
docs: 更新了项目文档,使其更加清晰;
build: 更新了项目依赖,使其更加稳定;
ops: 优化了部署流程,使其更加高效;
chore: 更新了.gitignore文件,忽略了不必要的文件。
dev_test
Yao 5 months ago
parent 6990ba8d58
commit ebb97b8f8c

6
.gitignore vendored

@ -0,0 +1,6 @@
/test
/*/__pycache__
/models/heatmap.py
/models/LSTM_Forecasting.py
/data
/.vscode

@ -0,0 +1,29 @@
"""
"""
from . import utils, draw_echarts
import streamlit as st
from models import VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting
from typing import List, Type
def run(target: str,
target_name: str,
models: List[Type] = [
VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting,
RF_Forecasting
]):
models_name = [
model.__name__.split('.')[-1].split('_')[0] for model in models
]
st.title("模型预测结果")
history_data = utils.read_csv("data/normalized_df.csv")
selected_model = st.selectbox("选择你想看的模型预测结果", models_name)
pred_data = utils.read_csv(f"data/{selected_model}_Forecasting_df.csv")
draw_echarts.draw_echarts(selected_model, target, target_name,
history_data, pred_data)

@ -0,0 +1,95 @@
import pandas as pd
from streamlit_echarts import st_echarts
def draw_echarts(model_name: str, target: str, target_name: str,
history_data: pd.DataFrame, pred_data: pd.DataFrame):
"""
构造 ECharts 图表的配置并在 Streamlit 应用中展示
Args:
model_name (str): 模型的名称
target (str): 目标值的列名
target_name (str): 目标值的显示名称
historical_data (pd.DataFrame): 历史数据
predicted_data (pd.DataFrame): 预测数据
Returns:
dict: ECharts 图表的配置
"""
# 数据处理,将历史数据和预测数据添加 None 值以适应图表的 x 轴
history_values = history_data[target].values.tolist() + [
None for _ in range(len(pred_data))
]
pred_values = [None for _ in range(len(history_data))
] + pred_data[target].values.tolist()
# 定义ECharts的配置
option = {
"title": {
"text": f"{model_name}模型",
"x": "auto"
},
# 配置提示框组件
"tooltip": {
"trigger": "axis"
},
# 配置图例组件
"legend": {
"data": [f"{target_name}历史数据", f"{target_name}预测数据"],
"left": "right"
},
# 配置x轴和y轴
"xAxis": {
"type":
"category",
"data":
history_data.index.astype(str).to_list() +
pred_data.index.astype(str).to_list()
},
"yAxis": {
"type": "value"
},
# 配置数据区域缩放组件
"dataZoom": [{
"type": "inside",
"start": 0,
"end": 100
}],
"series": []
}
# 添加历史数据系列
if any(history_values):
option["series"].append({
"name": f"{target_name}历史数据",
"type": "line",
"data": history_values,
"smooth": "true"
})
# 添加预测数据的系列
if any(pred_values):
option["series"].append({
"name": f"{target_name}预测数据",
"type": "line",
"data": pred_values,
"smooth": "true",
"lineStyle": {
"type": "dashed"
}
})
# 在Streamlit应用中展示ECharts图表
st_echarts(options=option)
return option
# history_data = pd.read_csv('data/normalized_df.csv',
# index_col="date",
# parse_dates=["date"])
# pred_data = pd.read_csv('data/VAR_Forecasting_df.csv',
# index_col="date",
# parse_dates=["date"])
# draw_echarts('VAR_Forecasting', 'liugan_index', '流感指数', history_data,
# pred_data)

@ -0,0 +1,26 @@
"""
"""
import pandas as pd
def read_csv(file_path: str) -> pd.DataFrame:
"""
CSV 文件中加载 DataFrame 对象
Args:
file_path (str): CSV 文件的路径
Returns:
DataFrame: CSV 文件中加载的 DataFrame 对象
"""
try:
df = pd.read_csv(file_path, index_col="date", parse_dates=["date"])
print(f"成功读取文件: {file_path}")
except FileNotFoundError:
print(f"找不到文件: {file_path}")
df = pd.DataFrame()
except Exception as e:
print(f"读取文件时发生错误: {e}")
df = pd.DataFrame()
return df

@ -0,0 +1,21 @@
from typing import List, Type
import pg_request as pg
import models as m
import echarts_visualization as ev
def main(target: str,
target_name: str,
exog_columns: List[str],
models: List[Type] = [
m.VAR_Forecasting, m.ARIMA_Forecasting, m.SARIMA_Forecasting,
m.RF_Forecasting
]):
# pg.run()
# m.run(forecast_target=target, exog_columns=exog_columns, models=models)
ev.run(target=target, target_name=target_name, models=models)
if __name__ == '__main__':
main('liugan_index', '流感指数',
['infection_number.1', 'infection_number.2', 'jijin_data', 'shoupan'])

@ -0,0 +1,83 @@
"""
模块功能该模块用于自回归移动平均 (ARIMA) 预测模型的实施
函数:
train_ARIMA_model使用ARIMA模型对时间序列数据进行训练和预测
run执行模型训练和数据预测返回预测结果
"""
import pandas as pd
import pmdarima as pm
import numpy as np
from typing import List, Union
def train_ARIMA_model(endog: Union[np.ndarray, pd.Series],
exog: Union[np.ndarray, pd.DataFrame] = None,
exog_pred: Union[np.ndarray, pd.DataFrame] = None,
steps: int = 20,
information_criterion: str = 'aic') -> np.ndarray:
"""
使用ARIMA模型对时间序列数据进行预测
Args:
endog (Union[np.ndarray, pd.Series]): 要分析的时间序列数据
exog (Union[np.ndarray, pd.DataFrame], optional): 用于改进ARIMA模型的外生变量默认为None
exog_pred (Union[np.ndarray, pd.DataFrame], optional): 预测期间的外生变量必须与训练期间的外生变量列数一致默认为None
steps (int, optional, default=20): 预测期的长度
information_criterion (str, optional, default='aic'): 选择模型的信息准则'aic''bic'
Returns:
np.ndarray: 预测结果
"""
model = pm.auto_arima(endog,
X=exog,
seasonal=False,
information_criterion=information_criterion)
pred = model.predict(n_periods=steps, X=exog_pred)
return pred
def run(input_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
steps: int = 20) -> pd.DataFrame:
"""
主运行函数用以读取数据训练模型预测数据
Args:
input_data (pd.DataFrame): 输入的时间序列数据
forecast_target (str): 需要被预测的目标变量的列名
exog_columns (List[str]): 外生变量的列名列表
steps (int, optional, default=20): 预测步长
Returns:
pd.DataFrame: 预测结果的DataFrame对象
"""
# 创建一个未来日期的索引,用于保存预测数据
future_index = pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps)
# 创建一个用于保存预测外生变量的空数据帧
df_exog = pd.DataFrame(index=future_index)
# 循环每个外生变量使用ARIMA模型进行训练和预测然后将预测值保存到df_exog中
for exog in exog_columns:
pred = train_ARIMA_model(endog=input_data[exog], steps=steps)
df_exog[exog] = pred
# 使用ARIMA模型对目标变量进行训练和预测注意这里将df_exog作为预测阶段的外生变量传入
pred = train_ARIMA_model(endog=input_data[forecast_target],
exog=input_data[exog_columns],
exog_pred=df_exog[exog_columns],
steps=steps,
information_criterion='bic')
# 根据预测值创建一个新的数据帧,用于保存预测的目标变量
forecast_df = pd.DataFrame(pred,
index=future_index,
columns=[forecast_target])
return forecast_df

@ -0,0 +1,126 @@
"""
模块功能该模块负责随机森林(Random Forest)预测模型的实施
函数
random_forest_model: 使用随机森林模型根据给定的特征和目标数据进行训练并预测未来数据
forecast_future: 预测未来数据
run: 执行模型训练和数据预测等步骤并返回预测结果
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from typing import List
def random_forest_model(train_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
future_data: pd.DataFrame,
steps: int = 20) -> pd.DataFrame:
"""
使用随机森林模型根据给定的特征和目标数据进行训练并预测未来数据
Args:
train_data (pd.DataFrame): 训练数据集
forecast_target (str): 训练数据集中的目标列的列名
exog_columns (List[str): 训练数据集用于预测的特征列名的列表
future_data (pd.DataFrame): 存储未来预测所用的外生变量的数据集
steps (int, optional, default=20): 要进行预测的天数
Returns:
pd.DataFrame: 存储预测结果的数据表
"""
# 制作输入特征和目标变量
X = train_data[exog_columns].values
y = train_data[forecast_target].values
X_test = future_data[exog_columns].values
model = RandomForestRegressor(n_estimators=1200,
max_depth=8,
min_samples_split=2,
random_state=0)
model.fit(X, y)
pred = model.predict(X_test[-steps:])
forecast_df = pd.DataFrame(
pred,
index=pd.date_range(start=train_data.index.max() +
pd.Timedelta(days=1),
periods=steps),
columns=[forecast_target])
return forecast_df
def forecast_future(data: np.ndarray, steps: int = 20) -> List:
"""
使用随机森林预测未来的数据
Args:
data (np.ndarray): 已知的用于预测的数据
steps (int, optional, default=20): 要进行预测的天数
Returns:
List: 存放预测结果的列表
"""
# 制作输入特征和目标变量
X = data[:-1].reshape(-1, 1)
y = data[1:]
X_test = [y[-1]]
# 创建和训练随机森林模型
model = RandomForestRegressor(n_estimators=1200,
max_depth=8,
min_samples_split=2,
random_state=0)
model.fit(X, y)
# 创建一个列表保存预测结果
pred = []
# 迭代预测下一个数据点
for _ in range(steps):
y_pred = model.predict(np.array([X_test[-1]]).reshape(-1, 1))
pred.append(y_pred)
# 将预测的数据点添加到下一轮的输入
X_test.append(y_pred)
return pred
def run(input_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
steps: int = 20) -> pd.DataFrame:
"""
执行数据读取预处理模型训练预测并绘图等一系列步骤的主函数
Args:
input_data (pd.DataFrame): 存储原始数据的DataFrame
forecast_target (str): 需要被预测的目标列名
exog_columns (List[str]): 特征列名的列表
steps (int, optional, default=20): 需要进行预测的天数
Returns:
pd.DataFrame: 存储预测结果的数据表
"""
# 创建一个未来日期的索引,用于保存预测数据
future_index = pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps)
# 创建一个用于保存预测外生变量的空数据帧
df_exog = pd.DataFrame(index=future_index)
for exog in exog_columns:
pred = forecast_future(input_data[exog].values, steps=steps)
df_exog[exog] = pred
df_processed = random_forest_model(input_data, forecast_target,
exog_columns, df_exog, steps)
return df_processed

@ -0,0 +1,83 @@
"""
模块功能该模块用于季节性自回归移动平均 (SARIMA) 预测模型的实施
函数
train_SARIMA_model(endog, exo, exog_pred, steps): 使用 SARIMA 模型对时间序列数据进行预测
run(input_data, forecast_target, exog_columns, steps): 执行模型训练和数据预测等步骤并返回预测结果
"""
import pandas as pd
import pmdarima as pm
import numpy as np
from typing import List, Union
def train_SARIMA_model(endog: Union[np.ndarray, pd.Series],
exog: Union[np.ndarray, pd.DataFrame] = None,
exog_pred: Union[np.ndarray, pd.DataFrame] = None,
steps: int = 20,
information_criterion: str = 'aic') -> np.ndarray:
"""
使用SARIMA模型对时间序列数据进行预测
Args:
endog (Union[np.ndarray, pd.Series]): 用于训练和预测的数据可以是numpy数组或者pandas序列
exog (Union[np.ndarray, pd.DataFrame], optional): 外生变量用于提高ARIMA模型的预测精度
exog_pred (Union[np.ndarray, pd.DataFrame], optional): 预测阶段使用的外生变量它的列数必须和训练阶段的外生变量一致
steps (int, optional, default=20): 预测期长度
information_criterion (str, optional, default='aic'): 用于模型选择的信息准则可以是 'aic' 'bic'
Returns:
np.ndarray: 预测结果
"""
model = pm.auto_arima(endog,
X=exog,
seasonal=True,
information_criterion=information_criterion)
pred = model.predict(n_periods=steps, X=exog_pred)
return pred
def run(input_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
steps: int = 20) -> pd.DataFrame:
"""
主运行函数用以读取数据训练模型预测数据
Args:
input_data (pd.DataFrame): 输入的时间序列数据
forecast_target (str): 预测目标的列名
exog_columns (List[str]): 外生变量的列名列表
steps (int, optional, default=20): 预测步长
Returns:
pd.DataFrame: 预测结果的DataFrame对象
"""
# 创建一个未来日期的索引,用于保存预测数据
future_index = pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps)
# 创建一个用于保存预测外生变量的空数据帧
df_exog = pd.DataFrame(index=future_index)
# 循环每个外生变量使用ARIMA模型进行训练和预测然后将预测值保存到df_exog中
for exog in exog_columns:
pred = train_SARIMA_model(endog=input_data[exog], steps=steps)
df_exog[exog] = pred
# 使用ARIMA模型对目标变量进行训练和预测注意这里将df_exog作为预测阶段的外生变量传入
pred = train_SARIMA_model(endog=input_data[forecast_target],
exog=input_data[exog_columns],
exog_pred=df_exog[exog_columns],
steps=steps,
information_criterion='bic')
# 根据预测值创建一个新的数据帧,用于保存预测的目标变量
forecast_df = pd.DataFrame(pred,
index=future_index,
columns=[forecast_target])
return forecast_df

@ -0,0 +1,98 @@
"""
模块功能
本模块核心功能是实现向量自回归VAR预测模型的训练和运行
函数:
- convert_timestamp_index(data, to_period): 转换时间序列数据的时间索引至DatetimeIndex或PeriodIndex
- train_VAR_model(data, max_lags): 利用输入的时间序列数据训练 VAR 模型
- run(input_data, steps): 执行时间索引转换模型训练和数据预测等步骤并返回预测结果
"""
import pandas as pd
import statsmodels.api as sm
from typing import List
def convert_timestamp_index(data: pd.DataFrame,
to_period: bool) -> pd.DataFrame:
"""
根据to_period参数选择将数据的时间索引转换为DatetimeIndex或PeriodIndex
Args:
data (pd.DataFrame): 输入的数据
to_period (bool): 如果为True则将DatetimeIndex转换为PeriodIndex
如果为False则将PeriodIndex转换为DatetimeIndex
Returns:
pd.DataFrame: 索引被转换后的数据
"""
if to_period:
data.index = pd.DatetimeIndex(data.index).to_period('D')
else:
data.index = data.index.to_timestamp()
return data
def train_VAR_model(data: pd.DataFrame, max_lags: int = 30):
"""
利用输入的时间序列数据训练VAR模型通过比较BIC值确定最优滞后阶数
Args:
data (pd.DataFrame): 用于模型训练的时间序列数据
max_lags (int, default=30): 最大滞后阶数默认为 30
Returns:
VARResultsWrapper: 训练得到的VAR模型
"""
model = sm.tsa.VAR(data)
criteria = []
lags = range(1, max_lags + 1)
# 通过比较每个滞后阶数模型的BIC值选择最优滞后阶数
for lag in lags:
result = model.fit(maxlags=lag)
criteria.append(result.bic)
# 使用最优滞后阶数再次训练模型
best_lag = lags[criteria.index(min(criteria))]
results = model.fit(maxlags=best_lag)
return results
def run(input_data: pd.DataFrame,
forecast_target: str,
_: List[str],
steps: int = 20) -> pd.DataFrame:
"""
运行函数执行一系列步骤包括索引转换训练模型数据预测
Args:
input_data (pd.DataFrame): 输入的DataFrame数据
forecast_target (str): 需要被预测的目标变量的列名
_ (List[str]): 占位参数用于保持和其他模型函数的接口一致性
steps (int, default=20): 预测步数
Returns:
pd.DataFrame: 预测结果的DataFrame对象
"""
# 将DataFrame对象的时间索引转换为PeriodIndex
input_data = convert_timestamp_index(input_data, to_period=True)
# 训练 VAR 模型
model = train_VAR_model(input_data, max_lags=60)
# 将DataFrame对象的时间索引转回原样
input_data = convert_timestamp_index(input_data, to_period=False)
# 利用VAR模型进行预测
pred = model.forecast(input_data.values[-model.k_ar:], steps=steps)
forecast_df = pd.DataFrame(
pred,
index=pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps),
columns=input_data.columns)
return forecast_df[forecast_target]

@ -0,0 +1,72 @@
"""
models包功能
该包主要实现了4种主要的时间序列预测模型向量自回归(VAR)自回归移动平均(ARIMA)
季节性自回归移动平均 (SARIMA) 以及随机森林Random Forest
modules:
- utils.py: 包含用于数据读取和数据保存的通用函数
- VAR_model.py: 实现向量自回归 (VAR)预测模型
- ARIMA_model.py: 实现自回归移动平均 (ARIMA)预测模型
- SARIMA_model.py: 实现季节性自回归移动平均 (SARIMA)预测模型
- RF_model.py: 实现随机森林Random Forest预测模型
函数
- run(): 是整个预测过程的主函数该函数首先读取数据然后调用所有模型进行预测
最后把预测结果保存到文件中
使用示例:
from models import run
run_result = run(forecast_target=target_column, exog_columns=exog_columns_list, models=[VAR_Forecasting])
"""
from typing import List, Type
from . import utils, VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting
__all__ = [
'utils', 'VAR_Forecasting', 'ARIMA_Forecasting', 'SARIMA_Forecasting',
'RF_Forecasting'
]
def run(
forecast_target: str,
exog_columns: List[str],
steps: int = 20,
file_path: str = 'data/normalized_df.csv',
models: List[Type] = [
VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting
],
) -> None:
"""
执行数据读取预处理模型训练预测并保存预测结果等一系列步骤的主函数
Args:
forecast_target (str): 需要被预测的目标变量的列名
exog_columns (List[str]): 用于预测的特征变量的列名列表
steps (int, default=20): 需要进行预测的步数
file_path (str, default='data/normalized_df.csv'): 数据文件的路径
models (List[Type]) : 需要运行的预测模型列表默认包括VARARIMASARIMA和Random Forest模型
Returns:
None
"""
# 载入数据
input_df = utils.read_csv(file_path)
# 使用每个模型进行预测并保存结果
for model in models:
try:
model_name = model.__name__.split('.')[-1]
print(f"正在执行 {model_name} 模型进行预测...")
# 调用模型进行预测
model_df = model.run(input_df, forecast_target, exog_columns,
steps)
# 保存预测结果
utils.save_csv(model_df, f'data/{model_name}_df.csv')
print(f"{model_name} 模型的预测结果已保存至 data/{model_name}_df.csv")
except Exception as e:
print(f"{model_name} 模型预测过程出现错误: {e}")
print("所有模型预测都已完成。")

@ -0,0 +1,46 @@
"""
模块功能该模块作为工具箱包含了数据读取和数据保存的功能
函数:
read_csv(file_path): CSV 文件中读取数据并返回pd.DataFrame对象
save_csv(df, file_path): 将pd.DataFrame对象保存为 CSV 文件
"""
import pandas as pd
def read_csv(file_path: str) -> pd.DataFrame:
"""
CSV 文件中加载 DataFrame 对象
Args:
file_path (str): CSV 文件的路径
Returns:
DataFrame: CSV 文件中加载的 DataFrame 对象
"""
try:
df = pd.read_csv(file_path, index_col="date", parse_dates=["date"])
print(f"成功读取文件: {file_path}")
except FileNotFoundError:
print(f"找不到文件: {file_path}")
df = pd.DataFrame()
except Exception as e:
print(f"读取文件时发生错误: {e}")
df = pd.DataFrame()
return df
def save_csv(df: pd.DataFrame, file_path: str) -> None:
"""
DataFrame 对象保存为 CSV 文件
Args:
df (DataFrame): 要保存的 DataFrame 对象
file_path (str): CSV 文件的路径
"""
try:
df.to_csv(file_path, index_label='date')
print(f"成功保存文件: {file_path}")
except Exception as e:
print(f"保存文件时发生错误: {e}")

@ -0,0 +1,73 @@
"""
pg_request包主要用于处理PostgreSQL数据库数据
功能:
- 在运行主函数时如果必要将会创建一个存放输出数据的 'data' 文件夹
- 连接到 PostgreSQL 数据库并从指定数据表中提取数据
- 合并多个表中的数据
- 对数据进行标准化处理
- 保存处理后的数据到 CSV 文件
模块:
- config: 定义了数据的存取参数如数据库连接参数和目标数据表名
- parse: 包含了连接数据库数据处理和数据存储等功能的函数
函数:
- check_and_create_folder(folder_path): 检查并创建数据文件夹在主函数运行时调用
- run(): 作为整个数据处理流程的主函数当被调用时将会执行数据提取数据处理和数据存储等操作
使用示例:
from pg_request import run
run()
注意事项:
在使用这个包时你需要首先在config.py中设置好数据库连接参数和目标数据表然后才能正常使用run函数在你自己的项目中你也可以根据需要调用parse.py中的各个函数
"""
import os
from .config import DB_PARAMS, TABLES
from .parse import connect_to_pg, merge_dfs, normalize_df, save_as_csv
def check_and_create_folder(folder_path: str):
"""
检查指定的文件夹是否存在如果不存在则创建该文件夹
Args:
folder_path (str): 需要检查和创建的文件夹路径
"""
if not os.path.exists(folder_path):
os.makedirs(folder_path)
print(f"文件夹 {folder_path} 已创建。")
else:
print(f"文件夹 {folder_path} 已存在。")
def run():
"""
运行整个数据处理流程步骤如下
1. 检查并创建存放输出数据的data文件夹
2. 连接到PostgreSQL数据库并获取指定数据表的数据
3. 合并多个数据表中的数据
4. 使用最小最大规范化方法对数据进行规范化处理
5. 将处理后的数据保存到CSV文件中
"""
# 检查存放输出数据的文件夹是否存在,如果不存在则创建
check_and_create_folder('data')
# 连接数据库,获取数据表数据
raw_data = connect_to_pg(DB_PARAMS, TABLES)
# 合并获取的数据表
merged_data = merge_dfs(raw_data)
# 对合并后的数据进行最小最大标准化
normalized_data = normalize_df(merged_data)
# 保存数据到 CSV 文件
save_as_csv(normalized_data, 'data/normalized_df.csv')
# 打印完成信息
print('{:*^30}'.format('PostgreSQL数据读取完成'))

@ -0,0 +1,25 @@
"""
配置文件定义数据库连接参数和需要提取数据的目标表格列表
变量:
DB_PARAMS (dict[str, str]): 数据库连接参数包含数据库主机端口号名字用户名和密码
TABLES (List[str]): 需要提取数据的目标表格列表
注意:
请根据实际情况修改DB_PARAMS和TABLES的值
"""
# 定义数据库连接参数
DB_PARAMS = {
'host': 'localhost',
'port': '5432',
'database': 'Dbname',
'user': 'postgres',
'password': 'password'
}
# 定义目标数据表
TABLES = [
'app01_baidudata', 'app01_beijingweekdata', 'app01_jijindata',
'app01_liuganweekdata', 'app01_stockdata'
]

@ -0,0 +1,120 @@
"""
模块功能 PostgreSQL 数据库进行数据提取数据处理数据保存
函数:
connect_to_pg(db_parameters, tables): 连接到 PostgreSQL 数据库并获取指定数据表
merge_dfs(dfs): 合并 DataFrame 对象
normalize_df(df): DataFrame 对象进行最小最大标准化
save_as_csv(df, file_path): DataFrame 保存为 CSV 文件
使用示例
dfs = connect_to_pg(db_parameters, tables)
df = merge_dfs(dfs)
df = normalize_df(df)
save_as_csv(df, file_path)
注意
本模块依赖于 psycopg2 请确保已正确安装该库
"""
import pandas as pd
import psycopg2
from typing import Dict, List
def connect_to_pg(db_parameters: Dict[str, str],
tables: List[str]) -> Dict[str, pd.DataFrame]:
"""
连接到 PostgreSQL 数据库并获取指定数据表
Args:
db_parameters (Dict[str, str]): 数据库连接参数的字典
tables (List[str]): 需要查询的数据表名称列表
Returns:
dfs (Dict[str, DataFrame]):
键是数据表名去除 'app01_' 前缀值是对应数据表的 DataFrame 对象
"""
try:
# 用 with 语句确保数据库连接被正确关闭
with psycopg2.connect(**db_parameters) as conn:
dfs = {
table[6:]: pd.read_sql(f'select * from public.{table};', conn)
for table in tables
}
print('{:*^30}'.format('成功链接 PostgreSQL'))
except Exception as error:
print(f"发现错误:{error}")
exit()
return dfs
def merge_dfs(dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
合并 DataFrame 对象
Args:
dfs (dict): 键为数据表名值为对应数据表的 DataFrame 对象
Returns:
df (DataFrame): 合并后的 DataFrame 对象
"""
# 通过合并所有数据表的日期范围来创建一个新的日期索引
date_range = pd.date_range(start=min(df['date'].min()
for df in dfs.values()),
end=max(df['date'].max()
for df in dfs.values()))
# 创建一个以日期范围为索引的空 DataFrame
df_merged = pd.DataFrame(index=date_range)
# print(df_merged)
# 遍历并合并每个数据表,保留日期索引并丢弃 'id' 列
for df in dfs.values():
df = df.set_index('date').reindex(date_range)
df = df.drop(columns='id') # 删除 'id' 列
df_merged = pd.concat([df_merged, df], axis=1)
# 对缺失值进行线性插值(其他方法?多项插值?)
df_merged = df_merged.interpolate()
# 如果有剩余的NaN值删除这些行
df_merged.dropna(inplace=True)
return df_merged
def normalize_df(df: pd.DataFrame) -> pd.DataFrame:
"""
DataFrame 对象进行最小最大标准化
Args:
df (DataFrame): 要进行标准化的 DataFrame 对象
Returns:
df_normalized (DataFrame): 进行最小最大标准化后的 DataFrame 对象
"""
# 如果列的数据类型是布尔值、有符号整型、无符号整型、浮点数或复数浮点数的话,就进行最大最小标准化,否则保留原列的数据
df_normalized = df.apply(lambda x: (x - x.min()) / (x.max() - x.min())
if x.dtype.kind in 'biufc' else x)
return df_normalized
def save_as_csv(df: pd.DataFrame,
file_path: str = 'data/normalized_df.csv') -> None:
"""
DataFrame 保存为 CSV 文件
Args:
df (DataFrame): 要保存的 DataFrame 对象
file_path (str): 保存文件的路径
"""
try:
df.to_csv(file_path, index_label='date')
print(f"成功保存为 {file_path}")
except Exception as e:
print(f"保存文件时出错: {e}")

Binary file not shown.
Loading…
Cancel
Save