Compare commits

...

5 Commits

Author SHA1 Message Date
Yao 442b502724 feat: 添加流感周报爬取脚本和用户代理池
4 months ago
Yao 941c9fe7e0 feat: 修复了在主函数中对模型运行的错误调用
8 months ago
Yao eca2019e33 feat: 优化代码结构,添加模型预测示例与使用说明
8 months ago
Yao 0d1f454dae feat: 删除了README文件,优化了项目结构
8 months ago
Yao ebb97b8f8c feat: 添加了四个时间序列预测模型:VAR、ARIMA、SARIMA 和随机森林模型;
8 months ago

6
.gitignore vendored

@ -0,0 +1,6 @@
/test
/*/__pycache__
/models/heatmap.py
/models/LSTM_Forecasting.py
/data
/.vscode

@ -1,2 +0,0 @@
# Influenza_fund_linkage_system

@ -0,0 +1,29 @@
"""
"""
from . import utils, draw_echarts
import streamlit as st
from models import VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting
from typing import List, Type
def run(target: str,
target_name: str,
models: List[Type] = [
VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting,
RF_Forecasting
]):
models_name = [
model.__name__.split('.')[-1].split('_')[0] for model in models
]
st.title("模型预测结果")
history_data = utils.read_csv("data/normalized_df.csv")
selected_model = st.selectbox("选择你想看的模型预测结果", models_name)
pred_data = utils.read_csv(f"data/{selected_model}_Forecasting_df.csv")
draw_echarts.draw_echarts(selected_model, target, target_name,
history_data, pred_data)

@ -0,0 +1,95 @@
import pandas as pd
from streamlit_echarts import st_echarts
def draw_echarts(model_name: str, target: str, target_name: str,
history_data: pd.DataFrame, pred_data: pd.DataFrame):
"""
构造 ECharts 图表的配置并在 Streamlit 应用中展示
Args:
model_name (str): 模型的名称
target (str): 目标值的列名
target_name (str): 目标值的显示名称
historical_data (pd.DataFrame): 历史数据
predicted_data (pd.DataFrame): 预测数据
Returns:
dict: ECharts 图表的配置
"""
# 数据处理,将历史数据和预测数据添加 None 值以适应图表的 x 轴
history_values = history_data[target].values.tolist() + [
None for _ in range(len(pred_data))
]
pred_values = [None for _ in range(len(history_data))
] + pred_data[target].values.tolist()
# 定义ECharts的配置
option = {
"title": {
"text": f"{model_name}模型",
"x": "auto"
},
# 配置提示框组件
"tooltip": {
"trigger": "axis"
},
# 配置图例组件
"legend": {
"data": [f"{target_name}历史数据", f"{target_name}预测数据"],
"left": "right"
},
# 配置x轴和y轴
"xAxis": {
"type":
"category",
"data":
history_data.index.astype(str).to_list() +
pred_data.index.astype(str).to_list()
},
"yAxis": {
"type": "value"
},
# 配置数据区域缩放组件
"dataZoom": [{
"type": "inside",
"start": 0,
"end": 100
}],
"series": []
}
# 添加历史数据系列
if any(history_values):
option["series"].append({
"name": f"{target_name}历史数据",
"type": "line",
"data": history_values,
"smooth": "true"
})
# 添加预测数据的系列
if any(pred_values):
option["series"].append({
"name": f"{target_name}预测数据",
"type": "line",
"data": pred_values,
"smooth": "true",
"lineStyle": {
"type": "dashed"
}
})
# 在Streamlit应用中展示ECharts图表
st_echarts(options=option)
return option
# history_data = pd.read_csv('data/normalized_df.csv',
# index_col="date",
# parse_dates=["date"])
# pred_data = pd.read_csv('data/VAR_Forecasting_df.csv',
# index_col="date",
# parse_dates=["date"])
# draw_echarts('VAR_Forecasting', 'liugan_index', '流感指数', history_data,
# pred_data)

@ -0,0 +1,26 @@
"""
"""
import pandas as pd
def read_csv(file_path: str) -> pd.DataFrame:
"""
CSV 文件中加载 DataFrame 对象
Args:
file_path (str): CSV 文件的路径
Returns:
DataFrame: CSV 文件中加载的 DataFrame 对象
"""
try:
df = pd.read_csv(file_path, index_col="date", parse_dates=["date"])
print(f"成功读取文件: {file_path}")
except FileNotFoundError:
print(f"找不到文件: {file_path}")
df = pd.DataFrame()
except Exception as e:
print(f"读取文件时发生错误: {e}")
df = pd.DataFrame()
return df

@ -0,0 +1,21 @@
from typing import List, Type
import pg_request as pg
import models as m
import echarts_visualization as ev
def main(target: str,
target_name: str,
exog_columns: List[str],
models: List[Type] = [
m.VAR_Forecasting, m.ARIMA_Forecasting, m.SARIMA_Forecasting,
m.RF_Forecasting
]):
pg.run()
m.run(forecast_target=target, exog_columns=exog_columns, models=models)
ev.run(target=target, target_name=target_name, models=models)
if __name__ == '__main__':
main('liugan_index', '流感指数',
['infection_number.1', 'infection_number.2', 'jijin_data', 'shoupan'])

@ -0,0 +1,83 @@
"""
模块功能该模块用于自回归移动平均 (ARIMA) 预测模型的实施
函数:
train_ARIMA_model使用ARIMA模型对时间序列数据进行训练和预测
run执行模型训练和数据预测返回预测结果
"""
import pandas as pd
import pmdarima as pm
import numpy as np
from typing import List, Union
def train_ARIMA_model(endog: Union[np.ndarray, pd.Series],
exog: Union[np.ndarray, pd.DataFrame] = None,
exog_pred: Union[np.ndarray, pd.DataFrame] = None,
steps: int = 20,
information_criterion: str = 'aic') -> np.ndarray:
"""
使用ARIMA模型对时间序列数据进行预测
Args:
endog (Union[np.ndarray, pd.Series]): 要分析的时间序列数据
exog (Union[np.ndarray, pd.DataFrame], optional): 用于改进ARIMA模型的外生变量默认为None
exog_pred (Union[np.ndarray, pd.DataFrame], optional): 预测期间的外生变量必须与训练期间的外生变量列数一致默认为None
steps (int, optional, default=20): 预测期的长度
information_criterion (str, optional, default='aic'): 选择模型的信息准则'aic''bic'
Returns:
np.ndarray: 预测结果
"""
model = pm.auto_arima(endog,
X=exog,
seasonal=False,
information_criterion=information_criterion)
pred = model.predict(n_periods=steps, X=exog_pred)
return pred
def run(input_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
steps: int = 20) -> pd.DataFrame:
"""
主运行函数用以读取数据训练模型预测数据
Args:
input_data (pd.DataFrame): 输入的时间序列数据
forecast_target (str): 需要被预测的目标变量的列名
exog_columns (List[str]): 外生变量的列名列表
steps (int, optional, default=20): 预测步长
Returns:
pd.DataFrame: 预测结果的DataFrame对象
"""
# 创建一个未来日期的索引,用于保存预测数据
future_index = pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps)
# 创建一个用于保存预测外生变量的空数据帧
df_exog = pd.DataFrame(index=future_index)
# 循环每个外生变量使用ARIMA模型进行训练和预测然后将预测值保存到df_exog中
for exog in exog_columns:
pred = train_ARIMA_model(endog=input_data[exog], steps=steps)
df_exog[exog] = pred
# 使用ARIMA模型对目标变量进行训练和预测注意这里将df_exog作为预测阶段的外生变量传入
pred = train_ARIMA_model(endog=input_data[forecast_target],
exog=input_data[exog_columns],
exog_pred=df_exog[exog_columns],
steps=steps,
information_criterion='bic')
# 根据预测值创建一个新的数据帧,用于保存预测的目标变量
forecast_df = pd.DataFrame(pred,
index=future_index,
columns=[forecast_target])
return forecast_df

@ -0,0 +1,126 @@
"""
模块功能该模块负责随机森林(Random Forest)预测模型的实施
函数
random_forest_model: 使用随机森林模型根据给定的特征和目标数据进行训练并预测未来数据
forecast_future: 预测未来数据
run: 执行模型训练和数据预测等步骤并返回预测结果
"""
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from typing import List
def random_forest_model(train_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
future_data: pd.DataFrame,
steps: int = 20) -> pd.DataFrame:
"""
使用随机森林模型根据给定的特征和目标数据进行训练并预测未来数据
Args:
train_data (pd.DataFrame): 训练数据集
forecast_target (str): 训练数据集中的目标列的列名
exog_columns (List[str): 训练数据集用于预测的特征列名的列表
future_data (pd.DataFrame): 存储未来预测所用的外生变量的数据集
steps (int, optional, default=20): 要进行预测的天数
Returns:
pd.DataFrame: 存储预测结果的数据表
"""
# 制作输入特征和目标变量
X = train_data[exog_columns].values
y = train_data[forecast_target].values
X_test = future_data[exog_columns].values
model = RandomForestRegressor(n_estimators=1200,
max_depth=8,
min_samples_split=2,
random_state=0)
model.fit(X, y)
pred = model.predict(X_test[-steps:])
forecast_df = pd.DataFrame(
pred,
index=pd.date_range(start=train_data.index.max() +
pd.Timedelta(days=1),
periods=steps),
columns=[forecast_target])
return forecast_df
def forecast_future(data: np.ndarray, steps: int = 20) -> List:
"""
使用随机森林预测未来的数据
Args:
data (np.ndarray): 已知的用于预测的数据
steps (int, optional, default=20): 要进行预测的天数
Returns:
List: 存放预测结果的列表
"""
# 制作输入特征和目标变量
X = data[:-1].reshape(-1, 1)
y = data[1:]
X_test = [y[-1]]
# 创建和训练随机森林模型
model = RandomForestRegressor(n_estimators=1200,
max_depth=8,
min_samples_split=2,
random_state=0)
model.fit(X, y)
# 创建一个列表保存预测结果
pred = []
# 迭代预测下一个数据点
for _ in range(steps):
y_pred = model.predict(np.array([X_test[-1]]).reshape(-1, 1))
pred.append(y_pred)
# 将预测的数据点添加到下一轮的输入
X_test.append(y_pred)
return pred
def run(input_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
steps: int = 20) -> pd.DataFrame:
"""
执行数据读取预处理模型训练预测并绘图等一系列步骤的主函数
Args:
input_data (pd.DataFrame): 存储原始数据的DataFrame
forecast_target (str): 需要被预测的目标列名
exog_columns (List[str]): 特征列名的列表
steps (int, optional, default=20): 需要进行预测的天数
Returns:
pd.DataFrame: 存储预测结果的数据表
"""
# 创建一个未来日期的索引,用于保存预测数据
future_index = pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps)
# 创建一个用于保存预测外生变量的空数据帧
df_exog = pd.DataFrame(index=future_index)
for exog in exog_columns:
pred = forecast_future(input_data[exog].values, steps=steps)
df_exog[exog] = pred
df_processed = random_forest_model(input_data, forecast_target,
exog_columns, df_exog, steps)
return df_processed

@ -0,0 +1,83 @@
"""
模块功能该模块用于季节性自回归移动平均 (SARIMA) 预测模型的实施
函数
train_SARIMA_model(endog, exo, exog_pred, steps): 使用 SARIMA 模型对时间序列数据进行预测
run(input_data, forecast_target, exog_columns, steps): 执行模型训练和数据预测等步骤并返回预测结果
"""
import pandas as pd
import pmdarima as pm
import numpy as np
from typing import List, Union
def train_SARIMA_model(endog: Union[np.ndarray, pd.Series],
exog: Union[np.ndarray, pd.DataFrame] = None,
exog_pred: Union[np.ndarray, pd.DataFrame] = None,
steps: int = 20,
information_criterion: str = 'aic') -> np.ndarray:
"""
使用SARIMA模型对时间序列数据进行预测
Args:
endog (Union[np.ndarray, pd.Series]): 用于训练和预测的数据可以是numpy数组或者pandas序列
exog (Union[np.ndarray, pd.DataFrame], optional): 外生变量用于提高ARIMA模型的预测精度
exog_pred (Union[np.ndarray, pd.DataFrame], optional): 预测阶段使用的外生变量它的列数必须和训练阶段的外生变量一致
steps (int, optional, default=20): 预测期长度
information_criterion (str, optional, default='aic'): 用于模型选择的信息准则可以是 'aic' 'bic'
Returns:
np.ndarray: 预测结果
"""
model = pm.auto_arima(endog,
X=exog,
seasonal=True,
information_criterion=information_criterion)
pred = model.predict(n_periods=steps, X=exog_pred)
return pred
def run(input_data: pd.DataFrame,
forecast_target: str,
exog_columns: List[str],
steps: int = 20) -> pd.DataFrame:
"""
主运行函数用以读取数据训练模型预测数据
Args:
input_data (pd.DataFrame): 输入的时间序列数据
forecast_target (str): 预测目标的列名
exog_columns (List[str]): 外生变量的列名列表
steps (int, optional, default=20): 预测步长
Returns:
pd.DataFrame: 预测结果的DataFrame对象
"""
# 创建一个未来日期的索引,用于保存预测数据
future_index = pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps)
# 创建一个用于保存预测外生变量的空数据帧
df_exog = pd.DataFrame(index=future_index)
# 循环每个外生变量使用ARIMA模型进行训练和预测然后将预测值保存到df_exog中
for exog in exog_columns:
pred = train_SARIMA_model(endog=input_data[exog], steps=steps)
df_exog[exog] = pred
# 使用ARIMA模型对目标变量进行训练和预测注意这里将df_exog作为预测阶段的外生变量传入
pred = train_SARIMA_model(endog=input_data[forecast_target],
exog=input_data[exog_columns],
exog_pred=df_exog[exog_columns],
steps=steps,
information_criterion='bic')
# 根据预测值创建一个新的数据帧,用于保存预测的目标变量
forecast_df = pd.DataFrame(pred,
index=future_index,
columns=[forecast_target])
return forecast_df

@ -0,0 +1,98 @@
"""
模块功能
本模块核心功能是实现向量自回归VAR预测模型的训练和运行
函数:
- convert_timestamp_index(data, to_period): 转换时间序列数据的时间索引至DatetimeIndex或PeriodIndex
- train_VAR_model(data, max_lags): 利用输入的时间序列数据训练 VAR 模型
- run(input_data, steps): 执行时间索引转换模型训练和数据预测等步骤并返回预测结果
"""
import pandas as pd
import statsmodels.api as sm
from typing import List
def convert_timestamp_index(data: pd.DataFrame,
to_period: bool) -> pd.DataFrame:
"""
根据to_period参数选择将数据的时间索引转换为DatetimeIndex或PeriodIndex
Args:
data (pd.DataFrame): 输入的数据
to_period (bool): 如果为True则将DatetimeIndex转换为PeriodIndex
如果为False则将PeriodIndex转换为DatetimeIndex
Returns:
pd.DataFrame: 索引被转换后的数据
"""
if to_period:
data.index = pd.DatetimeIndex(data.index).to_period('D')
else:
data.index = data.index.to_timestamp()
return data
def train_VAR_model(data: pd.DataFrame, max_lags: int = 30):
"""
利用输入的时间序列数据训练VAR模型通过比较BIC值确定最优滞后阶数
Args:
data (pd.DataFrame): 用于模型训练的时间序列数据
max_lags (int, default=30): 最大滞后阶数默认为 30
Returns:
VARResultsWrapper: 训练得到的VAR模型
"""
model = sm.tsa.VAR(data)
criteria = []
lags = range(1, max_lags + 1)
# 通过比较每个滞后阶数模型的BIC值选择最优滞后阶数
for lag in lags:
result = model.fit(maxlags=lag)
criteria.append(result.bic)
# 使用最优滞后阶数再次训练模型
best_lag = lags[criteria.index(min(criteria))]
results = model.fit(maxlags=best_lag)
return results
def run(input_data: pd.DataFrame,
forecast_target: str,
_: List[str],
steps: int = 20) -> pd.DataFrame:
"""
运行函数执行一系列步骤包括索引转换训练模型数据预测
Args:
input_data (pd.DataFrame): 输入的DataFrame数据
forecast_target (str): 需要被预测的目标变量的列名
_ (List[str]): 占位参数用于保持和其他模型函数的接口一致性
steps (int, default=20): 预测步数
Returns:
pd.DataFrame: 预测结果的DataFrame对象
"""
# 将DataFrame对象的时间索引转换为PeriodIndex
input_data = convert_timestamp_index(input_data, to_period=True)
# 训练 VAR 模型
model = train_VAR_model(input_data, max_lags=60)
# 将DataFrame对象的时间索引转回原样
input_data = convert_timestamp_index(input_data, to_period=False)
# 利用VAR模型进行预测
pred = model.forecast(input_data.values[-model.k_ar:], steps=steps)
forecast_df = pd.DataFrame(
pred,
index=pd.date_range(start=input_data.index.max() +
pd.Timedelta(days=1),
periods=steps),
columns=input_data.columns)
return forecast_df[forecast_target]

@ -0,0 +1,76 @@
"""
models包功能
该包主要实现了4种主要的时间序列预测模型向量自回归(VAR)自回归移动平均(ARIMA)
季节性自回归移动平均 (SARIMA) 以及随机森林Random Forest
modules:
- utils.py: 包含用于数据读取和数据保存的通用函数
- VAR_model.py: 实现向量自回归 (VAR)预测模型
- ARIMA_model.py: 实现自回归移动平均 (ARIMA)预测模型
- SARIMA_model.py: 实现季节性自回归移动平均 (SARIMA)预测模型
- RF_model.py: 实现随机森林Random Forest预测模型
函数
- run(): 是整个预测过程的主函数该函数首先读取数据然后调用所有模型进行预测
最后把预测结果保存到文件中
使用示例:
```
from models import run
run_result = run(forecast_target=target_column,
exog_columns=exog_columns_list,
models=[VAR_Forecasting])
```
"""
from typing import List, Type
from . import utils, VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting
__all__ = [
'utils', 'VAR_Forecasting', 'ARIMA_Forecasting', 'SARIMA_Forecasting',
'RF_Forecasting'
]
def run(
forecast_target: str,
exog_columns: List[str],
steps: int = 20,
file_path: str = 'data/normalized_df.csv',
models: List[Type] = [
VAR_Forecasting, ARIMA_Forecasting, SARIMA_Forecasting, RF_Forecasting
],
) -> None:
"""
执行数据读取预处理模型训练预测并保存预测结果等一系列步骤的主函数
Args:
forecast_target (str): 需要被预测的目标变量的列名
exog_columns (List[str]): 用于预测的特征变量的列名列表
steps (int, default=20): 需要进行预测的步数
file_path (str, default='data/normalized_df.csv'): 数据文件的路径
models (List[Type]) : 需要运行的预测模型列表默认包括VARARIMASARIMA和Random Forest模型
Returns:
None
"""
# 载入数据
input_df = utils.read_csv(file_path)
# 使用每个模型进行预测并保存结果
for model in models:
try:
model_name = model.__name__.split('.')[-1]
print(f"正在执行 {model_name} 模型进行预测...")
# 调用模型进行预测
model_df = model.run(input_df, forecast_target, exog_columns,
steps)
# 保存预测结果
utils.save_csv(model_df, f'data/{model_name}_df.csv')
print(f"{model_name} 模型的预测结果已保存至 data/{model_name}_df.csv")
except Exception as e:
print(f"{model_name} 模型预测过程出现错误: {e}")
print("所有模型预测都已完成。")

@ -0,0 +1,46 @@
"""
模块功能该模块作为工具箱包含了数据读取和数据保存的功能
函数:
read_csv(file_path): CSV 文件中读取数据并返回pd.DataFrame对象
save_csv(df, file_path): 将pd.DataFrame对象保存为 CSV 文件
"""
import pandas as pd
def read_csv(file_path: str) -> pd.DataFrame:
"""
CSV 文件中加载 DataFrame 对象
Args:
file_path (str): CSV 文件的路径
Returns:
DataFrame: CSV 文件中加载的 DataFrame 对象
"""
try:
df = pd.read_csv(file_path, index_col="date", parse_dates=["date"])
print(f"成功读取文件: {file_path}")
except FileNotFoundError:
print(f"找不到文件: {file_path}")
df = pd.DataFrame()
except Exception as e:
print(f"读取文件时发生错误: {e}")
df = pd.DataFrame()
return df
def save_csv(df: pd.DataFrame, file_path: str) -> None:
"""
DataFrame 对象保存为 CSV 文件
Args:
df (DataFrame): 要保存的 DataFrame 对象
file_path (str): CSV 文件的路径
"""
try:
df.to_csv(file_path, index_label='date')
print(f"成功保存文件: {file_path}")
except Exception as e:
print(f"保存文件时发生错误: {e}")

@ -0,0 +1,74 @@
"""
pg_request包主要用于处理PostgreSQL数据库数据
功能:
- 在运行主函数时如果必要将会创建一个存放输出数据的 'data' 文件夹
- 连接到 PostgreSQL 数据库并从指定数据表中提取数据
- 合并多个表中的数据
- 对数据进行标准化处理
- 保存处理后的数据到 CSV 文件
模块:
- config: 定义了数据的存取参数如数据库连接参数和目标数据表名
- parse: 包含了连接数据库数据处理和数据存储等功能的函数
函数:
- check_and_create_folder(folder_path): 检查并创建数据文件夹在主函数运行时调用
- run(): 作为整个数据处理流程的主函数当被调用时将会执行数据提取数据处理和数据存储等操作
使用示例:
```
from pg_request import run
run()
```
注意事项:
在使用这个包时你需要首先在config.py中设置好数据库连接参数和目标数据表然后才能正常使用run函数在你自己的项目中你也可以根据需要调用parse.py中的各个函数
"""
import os
from .config import DB_PARAMS, TABLES
from .parse import connect_to_pg, merge_dfs, normalize_df, save_as_csv
def check_and_create_folder(folder_path: str):
"""
检查指定的文件夹是否存在如果不存在则创建该文件夹
Args:
folder_path (str): 需要检查和创建的文件夹路径
"""
if not os.path.exists(folder_path):
os.makedirs(folder_path)
print(f"文件夹 {folder_path} 已创建。")
else:
print(f"文件夹 {folder_path} 已存在。")
def run():
"""
运行整个数据处理流程步骤如下
1. 检查并创建存放输出数据的data文件夹
2. 连接到PostgreSQL数据库并获取指定数据表的数据
3. 合并多个数据表中的数据
4. 使用最小最大规范化方法对数据进行规范化处理
5. 将处理后的数据保存到CSV文件中
"""
# 检查存放输出数据的文件夹是否存在,如果不存在则创建
check_and_create_folder('data')
# 连接数据库,获取数据表数据
raw_data = connect_to_pg(DB_PARAMS, TABLES)
# 合并获取的数据表
merged_data = merge_dfs(raw_data)
# 对合并后的数据进行最小最大标准化
normalized_data = normalize_df(merged_data)
# 保存数据到 CSV 文件
save_as_csv(normalized_data, 'data/normalized_df.csv')
# 打印完成信息
print('{:*^30}'.format('PostgreSQL数据读取完成'))

@ -0,0 +1,25 @@
"""
配置文件定义数据库连接参数和需要提取数据的目标表格列表
变量:
DB_PARAMS (dict[str, str]): 数据库连接参数包含数据库主机端口号名字用户名和密码
TABLES (List[str]): 需要提取数据的目标表格列表
注意:
请根据实际情况修改DB_PARAMS和TABLES的值
"""
# 定义数据库连接参数
DB_PARAMS = {
'host': 'localhost',
'port': '5432',
'database': 'Dbname',
'user': 'postgres',
'password': 'password'
}
# 定义目标数据表
TABLES = [
'app01_baidudata', 'app01_beijingweekdata', 'app01_jijindata',
'app01_liuganweekdata', 'app01_stockdata'
]

@ -0,0 +1,120 @@
"""
模块功能 PostgreSQL 数据库进行数据提取数据处理数据保存
函数:
connect_to_pg(db_parameters, tables): 连接到 PostgreSQL 数据库并获取指定数据表
merge_dfs(dfs): 合并 DataFrame 对象
normalize_df(df): DataFrame 对象进行最小最大标准化
save_as_csv(df, file_path): DataFrame 保存为 CSV 文件
使用示例
dfs = connect_to_pg(db_parameters, tables)
df = merge_dfs(dfs)
df = normalize_df(df)
save_as_csv(df, file_path)
注意
本模块依赖于 psycopg2 请确保已正确安装该库
"""
import pandas as pd
import psycopg2
from typing import Dict, List
def connect_to_pg(db_parameters: Dict[str, str],
tables: List[str]) -> Dict[str, pd.DataFrame]:
"""
连接到 PostgreSQL 数据库并获取指定数据表
Args:
db_parameters (Dict[str, str]): 数据库连接参数的字典
tables (List[str]): 需要查询的数据表名称列表
Returns:
dfs (Dict[str, DataFrame]):
键是数据表名去除 'app01_' 前缀值是对应数据表的 DataFrame 对象
"""
try:
# 用 with 语句确保数据库连接被正确关闭
with psycopg2.connect(**db_parameters) as conn:
dfs = {
table[6:]: pd.read_sql(f'select * from public.{table};', conn)
for table in tables
}
print('{:*^30}'.format('成功链接 PostgreSQL'))
except Exception as error:
print(f"发现错误:{error}")
exit()
return dfs
def merge_dfs(dfs: Dict[str, pd.DataFrame]) -> pd.DataFrame:
"""
合并 DataFrame 对象
Args:
dfs (dict): 键为数据表名值为对应数据表的 DataFrame 对象
Returns:
df (DataFrame): 合并后的 DataFrame 对象
"""
# 通过合并所有数据表的日期范围来创建一个新的日期索引
date_range = pd.date_range(start=min(df['date'].min()
for df in dfs.values()),
end=max(df['date'].max()
for df in dfs.values()))
# 创建一个以日期范围为索引的空 DataFrame
df_merged = pd.DataFrame(index=date_range)
# print(df_merged)
# 遍历并合并每个数据表,保留日期索引并丢弃 'id' 列
for df in dfs.values():
df = df.set_index('date').reindex(date_range)
df = df.drop(columns='id') # 删除 'id' 列
df_merged = pd.concat([df_merged, df], axis=1)
# 对缺失值进行线性插值(其他方法?多项插值?)
df_merged = df_merged.interpolate()
# 如果有剩余的NaN值删除这些行
df_merged.dropna(inplace=True)
return df_merged
def normalize_df(df: pd.DataFrame) -> pd.DataFrame:
"""
DataFrame 对象进行最小最大标准化
Args:
df (DataFrame): 要进行标准化的 DataFrame 对象
Returns:
df_normalized (DataFrame): 进行最小最大标准化后的 DataFrame 对象
"""
# 如果列的数据类型是布尔值、有符号整型、无符号整型、浮点数或复数浮点数的话,就进行最大最小标准化,否则保留原列的数据
df_normalized = df.apply(lambda x: (x - x.min()) / (x.max() - x.min())
if x.dtype.kind in 'biufc' else x)
return df_normalized
def save_as_csv(df: pd.DataFrame,
file_path: str = 'data/normalized_df.csv') -> None:
"""
DataFrame 保存为 CSV 文件
Args:
df (DataFrame): 要保存的 DataFrame 对象
file_path (str): 保存文件的路径
"""
try:
df.to_csv(file_path, index_label='date')
print(f"成功保存为 {file_path}")
except Exception as e:
print(f"保存文件时出错: {e}")

Binary file not shown.

@ -0,0 +1,41 @@
# 可供使用的user_agent池
agent_list = [
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 \
Firefox/87.0",
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, li\
ke Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0",
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHT\
ML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0',
"Mozilla/5.0 (Linux; Android 7.0; SM-G950U Build/NRD90M) AppleWebK\
it/537.36 (KHTML, like Gecko) Chrome/62.0.3202.84 Mobile Safari/53\
7.36",
"Mozilla/5.0 (Linux; Android 8.0.0; SM-G965U Build/R16NW) AppleWeb\
Kit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.111 Mobile Safari/\
537.36",
"Mozilla/5.0 (Linux; Android 8.1.0; SM-T837A) AppleWebKit/537.36 (\
KHTML, like Gecko) Chrome/70.0.3538.80 Safari/537.36",
"Mozilla/5.0 (Linux; U; en-us; KFAPWI Build/JDQ39) AppleWebKit/535\
.19 (KHTML, like Gecko) Silk/3.13 Safari/535.19 Silk-Accelerated=t\
rue",
"Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \
550) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\
ile Safari/537.36 Edge/14.14263",
"Mozilla/5.0 (Windows Phone 10.0; Android 4.2.1; Microsoft; Lumia \
950) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/46.0.2486.0 Mob\
ile Safari/537.36 Edge/14.14263",
"Mozilla/5.0 (Linux; Android 11; moto g power (2022)) AppleWebKit/\
537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 6.0.1; Moto G (4)) AppleWebKit/537.36\
(KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 6.0.1; Nexus 10 Build/MOB31T) AppleWe\
bKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36",
"Mozilla/5.0 (Linux; Android 4.4.2; Nexus 4 Build/KOT49H) AppleWeb\
Kit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.\
36",
"Mozilla/5.0 (Linux; Android 6.0; Nexus 5 Build/MRA58N) AppleWebKi\
t/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile Safari/537.36\
",
"Mozilla/5.0 (Linux; Android 8.0.0; Nexus 5X Build/OPR4.170623.006\
) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Mobile S\
afari/537.36",
]

@ -0,0 +1,164 @@
import requests
import random
import pandas as pd
import re
from pylab import mpl
from datetime import datetime, timedelta, date
from multiprocessing.pool import ThreadPool
mpl.rcParams["font.sans-serif"] = ["SimHei"]
mpl.rcParams["axes.unicode_minus"] = False
class GetBeijingGanranShuju(object):
def __init__(self):
ua_list = [
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 Firefox/87.0"
"Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3314.0 Safari/537.36 SE 2.X MetaSr 1.0",
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0'
]
user_Agent = random.choice(ua_list)
self.headers = {
"User-Agent": random.choice(ua_list),
}
def get_Link_2023(self, url):
import time
response = requests.get(url=url, headers=self.headers)
time.sleep(random.uniform(1, 3))
html = response.content.decode("utf-8")
link_2023 = re.findall('<a href="[.]*?(/.*?2023.*?)">', html)
for i in link_2023:
url_head = "https://www.bjcdc.org/"
i = url_head + i
link_list_2023.append(i)
return link_list_2023
def get_Link_2024(self, url):
import time
response = requests.get(url=url, headers=self.headers)
time.sleep(random.uniform(1, 3))
html = response.content.decode("utf-8")
link_2024 = re.findall('<a href="[.]*?(/.*?2024.*?)">', html)
for i in link_2024:
url_head = "https://www.bjcdc.org/"
i = url_head + i
link_list_2024.append(i)
return link_list_2024
def get_content_2023(self, link):
number = ''
response = requests.get(url=link, headers=self.headers)
import time
time.sleep(random.uniform(1, 3))
html = response.content.decode("utf-8")
number_list = re.findall(r'(\d+)例', html, re.DOTALL)
if number_list != []:
number = number_list[0]
time_list = re.findall(r'(\d+月\d+日至2023年\d+月\d+日)', html)
if time_list != []:
time = time_list[0]
time1 = re.match(r'\d+月\d+日?', time).group()
month_number = re.match(r'\d{1,2}', time1).group()
day_number = re.findall(r'月(\d{1,2})', time1)[0]
time = '2023-' + str("%02s" % month_number) + '-' + str(
"%02s" % day_number)
time = time.replace(' ', '0')
if number.isdigit():
data.append([time, number])
def get_content_2024(self, link):
number = ''
response = requests.get(url=link, headers=self.headers)
html = response.content.decode("utf-8")
if '' in html:
return None
else:
number_list = re.findall(r'(\d+)例', html, re.DOTALL)
if number_list != []:
number = number_list[0]
time_list = re.findall(r'(\d+年\d+月)', html)
if time_list != []:
time = time_list[0]
if number.isdigit():
month_data.append([time, number])
# 创建获取 获取北京传染病数据 类的实例
get_beijing_ganran_shuju = GetBeijingGanranShuju()
data, link_list_2023, link_list_2024 = [], [], []
url_1 = ['https://www.bjcdc.org/cdcmodule/jkdt/yqbb/index.shtml']
url_list2 = [
f'https://www.bjcdc.org/cdcmodule/jkdt/yqbb/index_{i}.shtml'
for i in range(2, 5)
]
url_list = url_1 + url_list2
# 2023
for url in url_list:
get_beijing_ganran_shuju.get_Link_2023(url)
# 使用多进程处理每个块
pool = ThreadPool(100)
courses_list = pool.map(get_beijing_ganran_shuju.get_content_2023,
reversed(link_list_2023))
pool.close()
pool.join()
# 排序
# print(data)
# 2024
month_data = []
for url in url_list:
get_beijing_ganran_shuju.get_Link_2024(url)
# print(link_list_2024)
for x in reversed(link_list_2024):
get_beijing_ganran_shuju.get_content_2024(x)
# print(month_data)
# print(data)
# print(type(data))
df = pd.DataFrame(data, columns=['日期', '感染数量'])
df = df[df['日期'] != '2023-12-26']
df['日期'] = pd.to_datetime(df['日期'])
df_week = df.sort_values(by='日期')
# print(df_week)
today = date.today()
# 将月份数据转为周数据
# 起始日期和今天的日期
start_date = datetime(2024, 1, 2)
end_date = datetime.now()
# 生成日期列表
dates = []
while start_date <= end_date:
dates.append(start_date)
start_date += timedelta(days=7)
# 感染数据列表
infection_data = month_data
# 将感染数据转换为字典键为年月YYYY-MM格式
infections = {
datetime.strptime(month, "%Y年%m月").strftime("%Y-%m"): int(int(total) / 4)
for month, total in infection_data
}
# 创建日期和感染数量列表
date_infections = []
for date in dates:
# 转换日期为YYYY-MM格式以匹配字典键
month_key = date.strftime("%Y-%m")
if month_key in infections:
date_infections.append([date, infections[month_key]])
# 创建DataFrame
month_df = pd.DataFrame(date_infections, columns=['日期', '感染数量'])
# 合并周数据和月数据
df = pd.concat([df_week, month_df])
# 打印DataFrame
df = df.rename(columns={'日期': 'date', '感染数量': 'beijing_number'})
print(df)
df.to_csv('beijin_zhoubao.csv', encoding="utf_8")
print('成功爬取北京传染病数据并保存在beijin_zhoubao.csv中')

@ -0,0 +1,131 @@
import requests
import re
from multiprocessing.pool import ThreadPool
import pandas as pd
def get_jijin_data(*args):
"""
获取某个基金某页的历史净值数据
:param fundCode:
:param page:
:return: list
"""
cookies = {
'qgqp_b_id': '5c08ebc12f489b4f5ba9e76c2539ce0b',
'emshistory':
'%5B%2200005%22%2C%2200002%22%2C%2200002%E3%80%81%22%2C%2200001%22%5D',
'HAList':
'ty-0-300411-%u91D1%u76FE%u80A1%u4EFD%2Cty-0-399366-%u80FD%u6E90%u91D1%u5C5E%2Cty-116-00002-%u4E2D%u7535%u63A7%u80A1%2Cty-116-03119-GX%u4E9A%u6D32%u534A%u5BFC%u4F53%2Cty-116-00007-%u667A%u5BCC%u8D44%u6E90%u6295%u8D44%2Cty-116-00001-%u957F%u548C%2Cty-116-00016-%u65B0%u9E3F%u57FA%u5730%u4EA7%2Cty-0-301075-%u591A%u745E%u533B%u836F%2Cty-90-BK1042-%u533B%u836F%u5546%u4E1A%2Cty-1-601607-%u4E0A%u6D77%u533B%u836F',
'mtp': '1',
'ct':
'Rc8QhLQwVpXSsLuf4UOMLbPMtE9gFAEkMTisAatrxh1rv-WFWG9EC-2zw_WFCJnVfsaViwejVO4ziLTZig1GUptw6NORwx36yfzDu9g9zstYkLdwIWvQ-9QqGL-F5C1GCS7xhUtoBrFAibnr_-HA078LL8tr7yWiGM9V3ZmooC8',
'ut':
'FobyicMgeV54OLFNgnrRk4fT26HSX01NG2N55VZbVzZlqOMDJ-67DsHyCMk6G-yTMaqRhIAFuiYbVkK6Y-sYY8ghkJ3v9gyvUZyHWYpJnreP78yw4o-H8FNcTvUXmOj4KLsGaYuV1TAHltcdN0WDTy-YCOJ8OlzrX-MQbQc_CBvXfUYn10iBhXwvJY94XBkg4eOCJpu6Dok3ot9Xsr8flPIDz6f3KxJcIgnXZ7QpZKDMIvavpSunuMiR8Q5ezUD2y-JiBEgNkeoH_36wg0elojOfd5k61gTK',
'pi':
'6293426663250936%3Bm6293426663250936%3B%E4%BA%89%E5%88%86%E5%A4%BA%E7%A7%92%E7%9A%84%E9%A3%8E%E8%BE%B02%3B4qqIkcy3NvmegD2EnE%2BsOg2O1jjgPTjDxX3du3GmlWaCk8fr0sJ%2FmubqRXtUqqRoZWsMMmMvcfSg1wNNX8p93XE3fanPRZvbcs7bYEjCeUqg5RMcJtmbM9jEifMzwRAAmCipwh9KbqrYLdkLenTwJYqOaG9qmaZ2qDmn2Pa66eitUxhH2q0aU0kerTnJCi2qJnM8Y0Oc%3Bz%2Bzk7gxq8gdHwxSGucOoQSvBZ44Uaf7Um0f7bFnTUgwLnxWm2OMnlrG9SZX6ezbrsEoqVVrOk%2FVRGekqxUH%2BufKtmb89UVNnA0x62lxu6z84Y8dT0sXAWUELHmWZf8cnumRIL8kPvuAcHSXq5P6pTC3OaxbBeQ%3D%3D',
'uidal':
'6293426663250936%e4%ba%89%e5%88%86%e5%a4%ba%e7%a7%92%e7%9a%84%e9%a3%8e%e8%be%b02',
'sid': '',
'vtpst': '|',
'websitepoptg_api_time': '1715218615434',
'st_si': '46368340182479',
'EmFundFavorVersion': '1686749115372',
'EmFundFavorVersion2': '1686749115372',
'st_asi': 'delete',
'EMFUND0': 'null',
'st_pvi': '35290886003252',
'st_sp': '2023-12-17%2018%3A51%3A34',
'st_inirUrl': 'https%3A%2F%2Fcn.bing.com%2F',
'st_sn': '27',
'st_psi': '20240509100744555-112200305283-5067673963',
}
headers = {
'Accept':
'*/*',
'Accept-Language':
'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'Connection':
'keep-alive',
# 'Cookie': 'qgqp_b_id=5c08ebc12f489b4f5ba9e76c2539ce0b; emshistory=%5B%2200005%22%2C%2200002%22%2C%2200002%E3%80%81%22%2C%2200001%22%5D; HAList=ty-0-300411-%u91D1%u76FE%u80A1%u4EFD%2Cty-0-399366-%u80FD%u6E90%u91D1%u5C5E%2Cty-116-00002-%u4E2D%u7535%u63A7%u80A1%2Cty-116-03119-GX%u4E9A%u6D32%u534A%u5BFC%u4F53%2Cty-116-00007-%u667A%u5BCC%u8D44%u6E90%u6295%u8D44%2Cty-116-00001-%u957F%u548C%2Cty-116-00016-%u65B0%u9E3F%u57FA%u5730%u4EA7%2Cty-0-301075-%u591A%u745E%u533B%u836F%2Cty-90-BK1042-%u533B%u836F%u5546%u4E1A%2Cty-1-601607-%u4E0A%u6D77%u533B%u836F; mtp=1; ct=Rc8QhLQwVpXSsLuf4UOMLbPMtE9gFAEkMTisAatrxh1rv-WFWG9EC-2zw_WFCJnVfsaViwejVO4ziLTZig1GUptw6NORwx36yfzDu9g9zstYkLdwIWvQ-9QqGL-F5C1GCS7xhUtoBrFAibnr_-HA078LL8tr7yWiGM9V3ZmooC8; ut=FobyicMgeV54OLFNgnrRk4fT26HSX01NG2N55VZbVzZlqOMDJ-67DsHyCMk6G-yTMaqRhIAFuiYbVkK6Y-sYY8ghkJ3v9gyvUZyHWYpJnreP78yw4o-H8FNcTvUXmOj4KLsGaYuV1TAHltcdN0WDTy-YCOJ8OlzrX-MQbQc_CBvXfUYn10iBhXwvJY94XBkg4eOCJpu6Dok3ot9Xsr8flPIDz6f3KxJcIgnXZ7QpZKDMIvavpSunuMiR8Q5ezUD2y-JiBEgNkeoH_36wg0elojOfd5k61gTK; pi=6293426663250936%3Bm6293426663250936%3B%E4%BA%89%E5%88%86%E5%A4%BA%E7%A7%92%E7%9A%84%E9%A3%8E%E8%BE%B02%3B4qqIkcy3NvmegD2EnE%2BsOg2O1jjgPTjDxX3du3GmlWaCk8fr0sJ%2FmubqRXtUqqRoZWsMMmMvcfSg1wNNX8p93XE3fanPRZvbcs7bYEjCeUqg5RMcJtmbM9jEifMzwRAAmCipwh9KbqrYLdkLenTwJYqOaG9qmaZ2qDmn2Pa66eitUxhH2q0aU0kerTnJCi2qJnM8Y0Oc%3Bz%2Bzk7gxq8gdHwxSGucOoQSvBZ44Uaf7Um0f7bFnTUgwLnxWm2OMnlrG9SZX6ezbrsEoqVVrOk%2FVRGekqxUH%2BufKtmb89UVNnA0x62lxu6z84Y8dT0sXAWUELHmWZf8cnumRIL8kPvuAcHSXq5P6pTC3OaxbBeQ%3D%3D; uidal=6293426663250936%e4%ba%89%e5%88%86%e5%a4%ba%e7%a7%92%e7%9a%84%e9%a3%8e%e8%be%b02; sid=; vtpst=|; websitepoptg_api_time=1715218615434; st_si=46368340182479; EmFundFavorVersion=1686749115372; EmFundFavorVersion2=1686749115372; st_asi=delete; EMFUND0=null; EMFUND1=05-09%2009%3A49%3A02@%23%24%u534E%u590F%u6210%u957F%u6DF7%u5408@%23%24000001; EMFUND2=05-09%2009%3A53%3A36@%23%24%u5BCC%u56FD%u7CBE%u51C6%u533B%u7597%u6DF7%u5408A@%23%24005176; EMFUND3=05-09%2009%3A54%3A07@%23%24%u94F6%u6CB3%u533B%u836F%u6DF7%u5408A@%23%24011335; EMFUND4=05-09%2009%3A54%3A13@%23%24%u4E1C%u65B9%u7EA2%u533B%u7597%u5347%u7EA7%u80A1%u7968%u53D1%u8D77A@%23%24015052; EMFUND5=05-09%2009%3A57%3A40@%23%24%u5B9D%u76C8%u73B0%u4EE3%u670D%u52A1%u4E1A%u6DF7%u5408A@%23%24009223; EMFUND6=05-09%2009%3A57%3A51@%23%24%u4E1C%u65B9%u7EA2%u533B%u7597%u5347%u7EA7%u80A1%u7968%u53D1%u8D77C@%23%24015053; EMFUND7=05-09%2009%3A58%3A04@%23%24%u5E7F%u53D1%u521B%u65B0%u533B%u7597%u4E24%u5E74%u6301%u6709%u6DF7%u5408A@%23%24010731; EMFUND8=05-09%2009%3A58%3A56@%23%24%u5BCC%u56FD%u751F%u7269%u533B%u836F%u79D1%u6280%u6DF7%u5408A@%23%24006218; EMFUND9=05-09 09:59:24@#$%u534E%u5546%u533B%u836F%u533B%u7597%u884C%u4E1A%u80A1%u7968@%23%24008107; st_pvi=35290886003252; st_sp=2023-12-17%2018%3A51%3A34; st_inirUrl=https%3A%2F%2Fcn.bing.com%2F; st_sn=27; st_psi=20240509100744555-112200305283-5067673963',
'Referer':
'https://fundf10.eastmoney.com/',
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36 Edg/124.0.0.0',
}
params = {
'callback': 'jQuery183019015669101010957_1715220464680',
'fundCode': args[0][0],
'pageIndex': args[0][1],
'pageSize': '20',
'startDate': '',
'endDate': '',
'_': '1715220492762',
}
response = requests.get('https://api.fund.eastmoney.com/f10/lsjz',
params=params,
cookies=cookies,
headers=headers)
pattern = r'"FSRQ":"(.*?)","DWJZ":"(.*?)"'
text = response.text
data_page = re.findall(pattern, text)
data_list = []
for data in data_page:
data_list.append(list(data))
return data_list
def get_hx_data():
"""
获取华商医药医疗行业股票基金历史净值数据
:return: list of hx_data
"""
fundcode = '008107'
page_list = range(1, 29)
hx_data = []
args_list = [(fundcode, i) for i in page_list]
# 使用多进程处理
pool = ThreadPool(100)
data_list = pool.map(get_jijin_data, args_list)
pool.close()
pool.join()
for data in data_list:
hx_data += data
print(hx_data)
# 数据储存
return hx_data
def get_gf_data():
"""
获取广发创新医疗两年持有混合基金历史净值数据
:return: list of hx_data
"""
fundcode = '010731'
page_list = range(1, 29)
gf_data = []
args_list = [(fundcode, i) for i in page_list]
# 使用多进程处理
pool = ThreadPool(100)
data_list = pool.map(get_jijin_data, args_list)
pool.close()
pool.join()
for data in data_list:
gf_data += data
print(gf_data)
return gf_data
def save_data_to_csv(data, filename):
df = pd.DataFrame(data, columns=['date', filename])
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(by='date')
df.to_csv(f'{filename}.csv', encoding="utf_8")
print(f'成功爬取流感基金数据并保存在{filename}.csv中')
save_data_to_csv(get_hx_data(), 'hx_jijin_data')
save_data_to_csv(get_gf_data(), 'gf_jijin_data')

@ -0,0 +1,110 @@
import requests
import random
import pandas as pd
from lxml import etree
import time
import re
from datetime import datetime
from tqdm import *
from multiprocessing.pool import ThreadPool
from user_agents_pool import *
url_1 = ['https://ivdc.chinacdc.cn/cnic/zyzx/lgzb/index.htm']
url_list2 = [
f'https://ivdc.chinacdc.cn/cnic/zyzx/lgzb/index_{i}.htm'
for i in range(1, 4)
]
url_list = url_1 + url_list2
user_Agent = random.choice(agent_list)
headers = {
"User-Agent": user_Agent,
}
def get_Link(url):
link_list = []
response = requests.get(url=url, headers=headers)
time.sleep(2)
html = response.content.decode("utf-8")
tree = etree.HTML(html)
li_list = tree.xpath('/html/body/div[2]/div/div[1]/div/div[2]/ul/li')
# print(len(li_list))
for table in li_list:
link = table.xpath("./span[1]/a/@href")[0]
link = link.replace('.', '')
url_head = "https://ivdc.chinacdc.cn/cnic/zyzx/lgzb"
link = url_head + link
link = link.replace('htm', '.htm')
link_list.append(link)
return link_list
def get_content(link):
response = requests.get(url=link, headers=headers)
time.sleep(2)
html = response.content.decode("utf-8")
# print(html)
tree = etree.HTML(html)
date = tree.xpath(
'/html/body/div[2]/div/div[1]/div/div[2]/div/div/div/p[1]/span/text()'
)[1]
# print(time)
year = tree.xpath(
'/html/body/div[2]/div/div[1]/div/div[2]/div/div/div/p[1]/span/span/text()'
)[0]
# print(year)
date = year + date
date = date.replace('', '')
date_format = '%Y年%m月%d'
target_date = datetime.strptime(date, date_format)
# print(target_date)
start_time = '2023年2月18日'
start_date = datetime.strptime(start_time, date_format)
if target_date > start_date:
specific_number = re.search(
r'(.?<=font-size: 10pt;\">|<span lang=\"EN-US\">)(\d+)(?=</span>起|起)',
html)
number = specific_number.group(2) if specific_number else None
if number == None:
pattern = r'<span lang="EN-US" style="font-size: 10pt;">(\d+)</span><span style="font-size: 10pt'
number_list = re.findall(pattern, html)
if number_list:
number = number_list[0]
else:
number = 0
# print(html)
return [date, number]
else:
return None
def get_liuganzhoubao():
link_list_all = []
for url in url_list:
link_list_all += get_Link(url)
link_list_all = list(reversed(link_list_all))
data_all = []
# 使用多进程处理
pool = ThreadPool(30)
data_list = pool.map(get_content, link_list_all)
pool.close()
pool.join()
for data in data_list:
if data:
data_all.append(data)
print(data_all)
df = pd.DataFrame(data_all, columns=['date', 'infection_number'])
# 将日期列转换为日期时间类型
df['date'] = pd.to_datetime(df['date'], format='%Y年%m月%d')
# 将日期时间类型列格式化为所需的字符串格式
df['date'] = df['date'].dt.strftime('%Y-%m-%d')
print(df)
df.to_csv('liugan_zhoubao.csv', encoding='utf-8')
print('流感周报数据已经储存在liugan_zhoubao.csv中')
# 调用函数
get_liuganzhoubao()

@ -0,0 +1,75 @@
import requests
from lxml import etree
from datetime import datetime
import re
from multiprocessing.pool import ThreadPool
# 初始化常量和配置
START_DATE = datetime.strptime('2023年2月18日', '%Y年%m月%d')
HEADERS = {
"User-Agent":
"Mozilla/5.0 (Windows NT 6.1; Win64; x64; rv:87.0) Gecko/20100101 Firefox/87.0"
}
SESSION = requests.Session()
# 构建URL列表
BASE_URL = 'https://ivdc.chinacdc.cn/cnic/zyzx/lgzb/index'
url_list = [
f'{BASE_URL}_{i}.htm' if i > 0 else f'{BASE_URL}.htm' for i in range(4)
]
def get_links(url):
try:
response = SESSION.get(url, headers=HEADERS)
time.sleep(0.3)
print(response.status_code)
tree = etree.HTML(response.content.decode("utf-8"))
links = tree.xpath('//li/span[1]/a/@href')
print([
"https://ivdc.chinacdc.cn/cnic/zyzx/lgzb" +
url.replace('.', '').replace('htm', '.htm') for url in links
])
return [
"https://ivdc.chinacdc.cn/cnic/zyzx/lgzb" +
url.replace('.', '').replace('htm', '.htm') for url in links
]
except Exception as e:
print(f"Error fetching links from {url}: {e}")
return []
def get_content(link):
try:
response = SESSION.get(link, headers=HEADERS)
time.sleep(0.3)
html = response.content.decode("utf-8")
tree = etree.HTML(html)
date_text = tree.xpath('//div[@class="content"]//p[1]/span/text()')[1]
year = tree.xpath('//div[@class="content"]//p[1]/span/span/text()')[0]
date = datetime.strptime(year + date_text.replace('', ''),
'%Y年%m月%d')
if date > START_DATE:
number = re.search(r'(\d+)(?=起)', html)
return [
date.strftime('%Y-%m-%d'),
number.group(0) if number else 0
]
except Exception as e:
print(f"Error fetching content from {link}: {e}")
return None
def get_liuganzhoubao():
links = []
for url in url_list:
links += get_links(url)
print(links)
with ThreadPool(10) as pool:
data_list = pool.map(get_content, links)
return [data for data in data_list if data]
if __name__ == "__main__":
data = get_liuganzhoubao()
print(data)

@ -0,0 +1,145 @@
import requests
from pylab import mpl
import pandas as pd
import time
from datetime import datetime, timedelta, date
mpl.rcParams["font.sans-serif"] = ["SimHei"]
mpl.rcParams["axes.unicode_minus"] = False
class DownloadBaiDuIndex(object):
# 创建一个类来下载百度指数
def __init__(self, cookie):
self.cookie = cookie
# 配置请求头
self.headers = {
"Connection":
"keep-alive",
"Accept":
"application/json, text/plain, */*",
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/86.0.4240.198 Safari/537.36",
"Sec-Fetch-Site":
"same-origin",
"Sec-Fetch-Mode":
"cors",
"Sec-Fetch-Dest":
"empty",
"Referer":
"https://index.baidu.com/v2/main/index.html",
"Accept-Language":
"zh-CN,zh;q=0.9",
'Cookie':
self.cookie,
"Host":
"index.baidu.com",
"X-Requested-With":
"XMLHttpRequest",
"Cipher-Text":
"1656572408684_1656582701256_Nvm1pABkNsfD7V9VhZSzzFiFKylr3l5NR3YDrmHmH9yfFicm+Z9kmmwKVqVV6unvzAEh5hgXmgelP+OyOeaK8F21LyRVX1BDjxm+ezsglwoe1yfp6lEpuvu5Iggg1dz3PLF8e2II0e80ocXeU0jQFBhSbnB2wjhKl57JggTej12CzuL+h9eeVWdaMO4DSBWU2XX6PfbN8pv9+cdfFhVRHCzb0BJBU3iccoFczwNQUvzLn0nZsu0YPtG5DxDkGlRlZrCfKMtqKAe1tXQhg3+Oww4N3CQUM+6A/tKZA7jfRE6CGTFetC7QQyKlD7nxabkQ5CReAhFYAFAVYJ+sEqmY5pke8s3+RZ6jR7ASOih6Afl35EArbJzzLpnNPgrPCHoJiDUlECJveul7P5vvXl/O/Q==",
}
def decrypt(self, ptbk, index_data):
n = len(ptbk) // 2
a = dict(zip(ptbk[:n], ptbk[n:]))
return "".join([a[s] for s in index_data])
def get_index_data_json(self, keys, start=None, end=None):
words = [[{"name": key, "wordType": 1}] for key in keys]
words = str(words).replace(" ", "").replace("'", "\"")
url = f'http://index.baidu.com/api/SearchApi/index?area=0&word={words}&area=0&startDate={start}&endDate={end}'
res = requests.get(url, headers=self.headers)
html = res.content.decode("UTF-8")
data = res.json()['data']
uniqid = data['uniqid']
url = f'http://index.baidu.com/Interface/ptbk?uniqid={uniqid}'
# print(url)
res = requests.get(url, headers=self.headers)
html2 = res.content.decode("UTF-8")
time.sleep(3)
ptbk = res.json()['data']
result = {}
result["startDate"] = start
result["endDate"] = end
for userIndexe in data['userIndexes']:
name = userIndexe['word'][0]['name']
tmp = {}
index_all = userIndexe['all']['data']
index_all_data = [
int(e) for e in self.decrypt(ptbk, index_all).split(",")
]
tmp["all"] = index_all_data
index_pc = userIndexe['pc']['data']
index_pc_data = [
int(e) for e in self.decrypt(ptbk, index_pc).split(",")
]
tmp["pc"] = index_pc_data
index_wise = userIndexe['wise']['data']
index_wise_data = [
int(e) for e in self.decrypt(ptbk, index_wise).split(",")
]
tmp["wise"] = index_wise_data
result[name] = tmp
return result
def GetIndex(self, keys, start=None, end=None):
today = date.today()
if start is None:
start = str(today - timedelta(days=8))
if end is None:
end = str(today - timedelta(days=2))
try:
raw_data = self.get_index_data_json(keys=keys,
start=start,
end=end)
raw_data = pd.DataFrame(raw_data[keys[0]])
raw_data.index = pd.date_range(start=start, end=end)
except Exception as e:
print(e)
raw_data = pd.DataFrame({'all': [], 'pc': [], 'wise': []})
# 分别表示总计PC端移动端
finally:
return raw_data
def get_baidu_index():
cookie = 'BIDUPSID=84B8FDC3134DE2D8E0E6B86E2BFCC3DC; \
PSTM=1697213335; \
BAIDUID=84B8FDC3134DE2D8E0E6B86E2BFCC3DC:SL=0:NR=10:FG=1; BAIDUID_BFESS=84B8FDC3134DE2D8E0E6B86E2BFCC3DC:SL=0:NR=10:FG=1; Hm_lvt_d101ea4d2a5c67dab98251f0b5de24dc=1701483117; BDUSS=RUU3ZtM0RwcU9VeW0zV0ltMGhWZXNvd3hoMXc3YmtoZmxOOXktTDNFM3JMNUpsRUFBQUFBJCQAAAAAAQAAAAEAAADwtxh-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAOuiamXromplSH; SIGNIN_UC=70a2711cf1d3d9b1a82d2f87d633bd8a04514997999zSyIXXcI1QTeZqm4c8hyxlWksvkordeK7x1ZPceY2CR3NLufUujm7MOZ3p6TYUaUvd3Qjet3M3JcQfM5hy8%2FuP9HNu4dCG7B6RoS3S4L25PQZlnh3joEA0cArzaShqjtNyIlDOFD7nF4m%2FHL%2FxUXMnks0IYh6ZyO0xZ1iCY3pJruPDK3dBKJPJ%2BTsLIUPckisDLv5o4FBynumqVmNrIcRJauvv%2BcQtioTBjGMshtfwaZjDT2WCz713NtlH6uxabBdf8gRHMu6r8uSWjXKPG3dAflk5ycDG%2F1BoioLYK697k%3D91877884685963653296273632513192; __cas__rn__=451499799; __cas__st__212=b5f51a7b5b20cb36d3ced6764c8b0e567b436d1a2aa46e1f861833387e9d43267ac11419a4d630081274b162; __cas__id__212=51862268; CPTK_212=1671659797; CPID_212=51862268; bdindexid=473uetvtav5o3d1jfb3m9s3d34; RT="z=1&dm=baidu.com&si=0751b751-3767-4525-9566-4b5f1cd26e3a&ss=lpnhlcxe&sl=8&tt=fr3&bcn=https%3A%2F%2Ffclog.baidu.com%2Flog%2Fweirwood%3Ftype%3Dperf"; Hm_lpvt_d101ea4d2a5c67dab98251f0b5de24dc=1701490081; ab_sr=1.0.1_MjQ2ODNmNmI4NzI5MzFhZDAxYzIzZDQzYmMyZDAwOTZiYWE5NDY4OGQxMDNkYzA0NGM4OGU1ZDk5YjZmYjdkMTkyNTYxMDJiZmVlMjllNGU1MWQ1YjgwYTAzZGQxMWFkYzEyMDQ3ZjYxMThkNWI1NTg1ZTliOWVmYTQ1M2E3NjhmMDUzNTllNjU3YzYwNDlhOTU0ODRhMzJlZDAwMWY5Yg==; BDUSS_BFESS=RUU3ZtM0RwcU9VeW0zV0ltMGhWZXNvd3hoMXc3YmtoZmxOOXktTDNFM3JMNUpsRUFBQUFBJCQAAAAAAQAAAAEAAADwtxh-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAOuiamXromplSH'
# 初始化一个实例
downloadbaiduindex = DownloadBaiDuIndex(cookie=cookie)
# key = input('请输入关键词')
key = '流感'
# 获取当天时间
# from datetime import date
today = str(date.today())
data = downloadbaiduindex.get_index_data_json(keys=[key],
start='2012-01-01',
end=today)
liugan_data = (data['流感']['all'])
# 设定起始日期和终止日期
start_date = date(2012, 1, 1)
end_date = datetime.now().date() + timedelta(days=7)
# 创建日期列表,间隔为一周
date_list = []
current_date = start_date
while current_date <= end_date:
date_list.append(current_date)
current_date += timedelta(weeks=1) # 每次增加一周
date_list = date_list[:len(liugan_data)]
df = pd.DataFrame({'date': date_list, 'liugan_index': liugan_data})
df = df.drop(df.index[-1])
print(df)
# 数据保存
df.to_csv('./test/data/baidu_index.csv', encoding='utf-8')
print('成功爬取百度流感指数并储存在baidu_index.csv')
# 调用函数
get_baidu_index()
Loading…
Cancel
Save