|
|
import json
|
|
|
from typing import List
|
|
|
|
|
|
import numpy as np
|
|
|
# from statsmodels.tsa.api import VAR
|
|
|
import pandas as pd
|
|
|
import statsmodels.api as sm
|
|
|
|
|
|
|
|
|
def convert_timestamp_index(data: pd.DataFrame,
|
|
|
to_period: bool) -> pd.DataFrame:
|
|
|
"""
|
|
|
根据to_period参数,选择将数据的时间索引转换为DatetimeIndex或PeriodIndex。
|
|
|
|
|
|
Args:
|
|
|
data (pd.DataFrame): 输入的数据。
|
|
|
to_period (bool): 如果为True,则将DatetimeIndex转换为PeriodIndex;
|
|
|
如果为False,则将PeriodIndex转换为DatetimeIndex。
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: 索引被转换后的数据。
|
|
|
"""
|
|
|
if to_period:
|
|
|
data.index = pd.DatetimeIndex(data.index).to_period('D')
|
|
|
else:
|
|
|
data.index = data.index.to_timestamp()
|
|
|
|
|
|
return data
|
|
|
|
|
|
|
|
|
def train_VAR_model(data: pd.DataFrame, max_lags: int = 30):
|
|
|
"""
|
|
|
利用输入的时间序列数据训练VAR模型,通过比较BIC值确定最优滞后阶数。
|
|
|
|
|
|
Args:
|
|
|
data (pd.DataFrame): 用于模型训练的时间序列数据。
|
|
|
max_lags (int, default=30): 最大滞后阶数,默认为 30。
|
|
|
|
|
|
Returns:
|
|
|
VARResultsWrapper: 训练得到的VAR模型。
|
|
|
"""
|
|
|
model = sm.tsa.VAR(data)
|
|
|
criteria = []
|
|
|
lags = range(1, max_lags + 1)
|
|
|
|
|
|
# 通过比较每个滞后阶数模型的BIC值,选择最优滞后阶数
|
|
|
for lag in lags:
|
|
|
result = model.fit(maxlags=lag)
|
|
|
criteria.append(result.bic)
|
|
|
|
|
|
# 使用最优滞后阶数再次训练模型
|
|
|
best_lag = lags[criteria.index(min(criteria))]
|
|
|
results = model.fit(maxlags=best_lag)
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
def VAR_run(input_data: pd.DataFrame,
|
|
|
forecast_target: str,
|
|
|
_: List[str],
|
|
|
steps: int = 20) -> pd.DataFrame:
|
|
|
"""
|
|
|
运行函数,执行一系列步骤,包括索引转换、训练模型、数据预测。
|
|
|
|
|
|
Args:
|
|
|
input_data (pd.DataFrame): 输入的DataFrame数据。
|
|
|
forecast_target (str): 需要被预测的目标变量的列名。
|
|
|
_ (List[str]): 占位参数,用于保持和其他模型函数的接口一致性。
|
|
|
steps (int, default=20): 预测步数。
|
|
|
|
|
|
Returns:
|
|
|
pd.DataFrame: 预测结果的DataFrame对象。
|
|
|
"""
|
|
|
input_data = input_data.replace([np.inf, -np.inf], np.nan).dropna()
|
|
|
# 将DataFrame对象的时间索引转换为PeriodIndex
|
|
|
input_data = convert_timestamp_index(input_data, to_period=True)
|
|
|
# 添加正则化项以确保协方差矩阵正定
|
|
|
input_data += np.random.normal(0, 1e-10, input_data.shape)
|
|
|
# 训练 VAR 模型
|
|
|
model = train_VAR_model(input_data, max_lags=10)
|
|
|
|
|
|
# 将DataFrame对象的时间索引转回原样
|
|
|
input_data = convert_timestamp_index(input_data, to_period=False)
|
|
|
|
|
|
# 利用VAR模型进行预测
|
|
|
pred = model.forecast(input_data.values[-model.k_ar:], steps=steps)
|
|
|
forecast_df = pd.DataFrame(
|
|
|
pred,
|
|
|
index=pd.date_range(start=input_data.index.max() +
|
|
|
pd.Timedelta(days=1),
|
|
|
periods=steps),
|
|
|
columns=input_data.columns)
|
|
|
|
|
|
return forecast_df[forecast_target]
|