You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

633 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import random
import pandas as pd
import numpy as np
import json
import os
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import joblib
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei'] # 解决中文显示
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示
# ===================== 数据生成模块 =====================
def generate_student_consume_data(
n_train=100, n_test=5,
save_paths={"csv": "student_consume_train.csv", "excel": "student_consume_train.xlsx",
"json": "student_consume_train.json"},
test_path="student_consume_test.csv"
):
"""
生成模拟学生消费数据(含缺失值、异常值)
:param n_train: 训练数据量
:param n_test: 测试数据量
:param save_paths: 训练数据保存路径csv/excel/json
:param test_path: 测试数据保存路径
"""
# 生成基础信息
student_ids = [f"2024{str(i).zfill(6)}" for i in range(1, n_train + n_test + 1)]
names = [f"学生{j}" for j in range(1, n_train + n_test + 1)]
# 生成消费数据10次消费记录含缺失/异常值)
consume_data = []
for idx, (sid, name) in enumerate(zip(student_ids, names)):
row = [sid, name]
# 生成10次消费金额正常范围10-50元随机插入异常值/缺失值)
for _ in range(10):
# 10%概率生成缺失值5%概率生成异常值(>200或<0
if random.random() < 0.1:
row.append(None)
elif random.random() < 0.05:
row.append(random.choice([random.randint(-50, 0), random.randint(200, 500)]))
else:
row.append(round(random.uniform(10, 50), 2))
consume_data.append(row)
# 拆分训练/测试数据
train_data = consume_data[:n_train]
test_data = consume_data[n_train:]
# 定义列名
cols = ["学号", "姓名"] + [f"消费{i + 1}" for i in range(10)]
# 保存训练数据(多格式)
# CSV
df_train = pd.DataFrame(train_data, columns=cols)
df_train.to_csv(save_paths["csv"], index=False)
# Excel
df_train.to_excel(save_paths["excel"], index=False)
# JSON嵌套格式
json_train = []
for row in train_data:
json_train.append({
"student_info": {"学号": row[0], "姓名": row[1]},
"consume_records": row[2:]
})
with open(save_paths["json"], "w", encoding="utf-8") as f:
json.dump(json_train, f, ensure_ascii=False, indent=2)
# 保存测试数据
df_test = pd.DataFrame(test_data, columns=cols)
df_test.to_csv(test_path, index=False)
print(f"模拟数据生成完成!训练数据{len(train_data)}条,测试数据{len(test_data)}")
return save_paths, test_path
# ===================== 数据导入模块 =====================
def import_csv_data(file_path):
"""导入CSV文件缺失值补None返回DataFrame"""
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在!")
return pd.DataFrame()
if os.path.getsize(file_path) == 0:
print(f"提示:文件{file_path}为空!")
return pd.DataFrame()
df = pd.read_csv(file_path, na_values=["", "NaN", "None"], keep_default_na=False)
df = df.where(pd.notna(df), None) # 缺失值统一替换为None
return df
def import_excel_data(file_path):
"""导入Excel文件兼容xls/xlsx自动识别表头缺失值补None"""
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在!")
return pd.DataFrame()
if os.path.getsize(file_path) == 0:
print(f"提示:文件{file_path}为空!")
return pd.DataFrame()
df = pd.read_excel(file_path, header=0) # 自动识别首行作表头
df = df.where(pd.notna(df), None) # 缺失值统一替换为None
return df
def import_json_data(file_path):
"""导入JSON文件解析嵌套字段缺失值补None"""
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在!")
return {}
if os.path.getsize(file_path) == 0:
print(f"提示:文件{file_path}为空!")
return {}
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 解析嵌套字段,标准化格式
parsed_data = []
for item in data:
row = {
"学号": item.get("student_info", {}).get("学号", None),
"姓名": item.get("student_info", {}).get("姓名", None),
"消费记录": item.get("consume_records", [None] * 10)
}
parsed_data.append(row)
return {"data": parsed_data}
# ===================== 数据清洗模块 =====================
def remove_duplicates(data, key_col="学号"):
"""
按主键去重,保留第一条
:param data: DataFrame/字典类型数据
:param key_col: 主键列名
:return: 去重后的数据
"""
if isinstance(data, pd.DataFrame):
if key_col not in data.columns:
print(f"错误:主键列{key_col}不存在!")
return data
cleaned = data.drop_duplicates(subset=[key_col], keep="first")
elif isinstance(data, dict) and "data" in data:
# 处理字典类型数据
seen_keys = set()
cleaned_list = []
for item in data["data"]:
key = item.get(key_col, None)
if key is None:
cleaned_list.append(item)
continue
if key not in seen_keys:
seen_keys.add(key)
cleaned_list.append(item)
cleaned = {"data": cleaned_list}
else:
print("错误:不支持的数据类型!")
return data
print(
f"去重完成!原数据{len(data) if isinstance(data, pd.DataFrame) else len(data['data'])}条,清洗后{len(cleaned) if isinstance(cleaned, pd.DataFrame) else len(cleaned['data'])}")
return cleaned
def fill_missing_values(data, strategy="mean", key_col="学号"):
"""
缺失值填充(均值/众数/前向填充)
:param data: DataFrame/字典类型数据
:param strategy: 填充策略mean/median/ffill
:param key_col: 主键列(用于保留非数值列)
:return: 填充后的数据
"""
if isinstance(data, pd.DataFrame):
# 分离数值列和非数值列
non_numeric_cols = [col for col in data.columns if col in [key_col, "姓名"]]
numeric_cols = [col for col in data.columns if col not in non_numeric_cols]
df_numeric = data[numeric_cols].copy()
# 填充策略
if strategy == "mean":
fill_value = df_numeric.astype(float).mean()
elif strategy == "mode":
fill_value = df_numeric.astype(float).mode().iloc[0]
elif strategy == "ffill":
df_filled = df_numeric.fillna(method="ffill")
else:
print(f"错误:不支持的填充策略{strategy}")
return data
if strategy != "ffill":
df_filled = df_numeric.fillna(fill_value)
# 合并非数值列
cleaned = pd.concat([data[non_numeric_cols].reset_index(drop=True), df_filled.reset_index(drop=True)], axis=1)
elif isinstance(data, dict) and "data" in data:
# 处理字典类型数据
cleaned_list = []
for item in data["data"]:
consume_records = item.get("消费记录", [None] * 10)
# 过滤None计算填充值
valid_vals = [v for v in consume_records if v is not None and isinstance(v, (int, float))]
if not valid_vals:
filled_records = [0.0] * 10 # 无有效值时填充0
else:
if strategy == "mean":
fill_val = np.mean(valid_vals)
elif strategy == "mode":
fill_val = np.mode(valid_vals)[0] if len(valid_vals) > 0 else 0.0
elif strategy == "ffill":
# 前向填充
filled_records = []
prev_val = 0.0
for val in consume_records:
if val is None or not isinstance(val, (int, float)):
filled_records.append(prev_val)
else:
filled_records.append(val)
prev_val = val
fill_val = None
else:
print(f"错误:不支持的填充策略{strategy}")
return data
if strategy != "ffill":
filled_records = [fill_val if v is None else v for v in consume_records]
cleaned_item = {
"学号": item.get("学号"),
"姓名": item.get("姓名"),
"消费记录": filled_records
}
cleaned_list.append(cleaned_item)
cleaned = {"data": cleaned_list}
else:
print("错误:不支持的数据类型!")
return data
print(f"缺失值填充完成(策略:{strategy}")
return cleaned
def handle_outliers(data, strategy="mean", key_col="学号", alpha=3):
"""
基于3σ准则处理异常值
:param data: DataFrame/字典类型数据
:param strategy: 处理策略drop/mean/mode/ffill
:param key_col: 主键列
:param alpha: 3σ准则的α
:return: 处理后的数据
"""
if isinstance(data, pd.DataFrame):
# 分离数值列和非数值列
non_numeric_cols = [col for col in data.columns if col in [key_col, "姓名"]]
numeric_cols = [col for col in data.columns if col not in non_numeric_cols]
df_numeric = data[numeric_cols].astype(float).copy()
# 计算3σ范围
mean_vals = df_numeric.mean()
std_vals = df_numeric.std()
lower_bound = mean_vals - alpha * std_vals
upper_bound = mean_vals + alpha * std_vals
# 标记异常值
outliers_mask = (df_numeric < lower_bound) | (df_numeric > upper_bound)
if strategy == "drop":
# 删除含异常值的行
row_mask = outliers_mask.any(axis=1)
df_cleaned = df_numeric[~row_mask]
non_numeric_cleaned = data[non_numeric_cols][~row_mask].reset_index(drop=True)
else:
# 填充异常值
df_cleaned = df_numeric.copy()
for col in numeric_cols:
# 计算该列的填充值
valid_vals = df_cleaned[col][~outliers_mask[col]]
if len(valid_vals) == 0:
fill_val = 0.0
elif strategy == "mean":
fill_val = valid_vals.mean()
elif strategy == "mode":
fill_val = valid_vals.mode().iloc[0]
elif strategy == "ffill":
df_cleaned[col] = df_cleaned[col].mask(outliers_mask[col]).fillna(method="ffill")
continue
else:
print(f"错误:不支持的异常值处理策略{strategy}")
return data
# 填充异常值
if strategy != "ffill":
df_cleaned = df_cleaned.mask(outliers_mask, fill_val)
non_numeric_cleaned = data[non_numeric_cols].reset_index(drop=True)
# 合并数据
cleaned = pd.concat([non_numeric_cleaned, df_cleaned.reset_index(drop=True)], axis=1)
elif isinstance(data, dict) and "data" in data:
# 处理字典类型数据
cleaned_list = []
for item in data["data"]:
consume_records = item.get("消费记录", [])
valid_vals = [v for v in consume_records if v is not None and isinstance(v, (int, float))]
if not valid_vals:
cleaned_list.append(item)
continue
# 计算3σ范围
mean_val = np.mean(valid_vals)
std_val = np.std(valid_vals)
lower = mean_val - alpha * std_val
upper = mean_val + alpha * std_val
# 处理异常值
cleaned_records = []
prev_val = mean_val # 前向填充的初始值
for val in consume_records:
if val is None:
cleaned_records.append(val)
continue
# 判断是否为异常值
is_outlier = val < lower or val > upper
if not is_outlier:
cleaned_records.append(val)
prev_val = val
else:
if strategy == "drop":
break # 丢弃整行
elif strategy == "mean":
cleaned_records.append(mean_val)
elif strategy == "mode":
cleaned_records.append(np.mode(valid_vals)[0] if len(valid_vals) > 0 else 0.0)
elif strategy == "ffill":
cleaned_records.append(prev_val)
else:
cleaned_records.append(val)
if strategy != "drop":
cleaned_item = {
"学号": item.get("学号"),
"姓名": item.get("姓名"),
"消费记录": cleaned_records
}
cleaned_list.append(cleaned_item)
cleaned = {"data": cleaned_list}
else:
print("错误:不支持的数据类型!")
return data
print(f"异常值处理完成(策略:{strategy}3σ准则")
return cleaned
# ===================== 数据分析模块 =====================
def statistical_analysis(data, key_col="学号"):
"""
统计特征分析:计算每个学生消费次数、均值、标准差
:param data: 清洗后的DataFrame数据
:return: 统计结果DataFrame
"""
if not isinstance(data, pd.DataFrame):
print("错误仅支持DataFrame类型数据的统计分析")
return pd.DataFrame()
# 分离消费列
consume_cols = [col for col in data.columns if "消费" in col]
# 统计每个学生的消费特征
stats = []
for idx, row in data.iterrows():
sid = row[key_col]
name = row["姓名"]
# 提取有效消费记录
consume_vals = [v for v in row[consume_cols] if v is not None and isinstance(v, (int, float))]
# 计算统计特征
count = len(consume_vals)
mean_val = round(np.mean(consume_vals), 2) if count > 0 else 0.0
std_val = round(np.std(consume_vals), 2) if count > 1 else 0.0
stats.append({
"学号": sid,
"姓名": name,
"消费次数": count,
"消费均值": mean_val,
"消费标准差": std_val
})
stats_df = pd.DataFrame(stats)
print("统计特征分析完成!")
return stats_df
def plot_top10_bar(stats_df, save_dir="plots"):
"""
绘制Top10柱状图次数/均值/标准差)并保存
:param stats_df: 统计特征DataFrame
:param save_dir: 图片保存目录
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 1. 消费次数Top10
top10_count = stats_df.nlargest(10, "消费次数")
plt.figure(figsize=(12, 4))
plt.bar(top10_count["姓名"], top10_count["消费次数"], color="skyblue")
plt.title("学生消费次数Top10")
plt.xlabel("学生姓名")
plt.ylabel("消费次数")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "top10_count.png"))
plt.show()
# 2. 消费均值Top10
top10_mean = stats_df.nlargest(10, "消费均值")
plt.figure(figsize=(12, 4))
plt.bar(top10_mean["姓名"], top10_mean["消费均值"], color="lightcoral")
plt.title("学生消费均值Top10")
plt.xlabel("学生姓名")
plt.ylabel("消费均值(元)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "top10_mean.png"))
plt.show()
# 3. 消费标准差Top10
top10_std = stats_df.nlargest(10, "消费标准差")
plt.figure(figsize=(12, 4))
plt.bar(top10_std["姓名"], top10_std["消费标准差"], color="lightgreen")
plt.title("学生消费标准差Top10")
plt.xlabel("学生姓名")
plt.ylabel("消费标准差(元)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "top10_std.png"))
plt.show()
print(f"Top10柱状图已保存至{save_dir}目录!")
def plot_pca_scatter(stats_df, save_dir="plots"):
"""
PCA降维后绘制散点图并保存
:param stats_df: 统计特征DataFrame
:param save_dir: 图片保存目录
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 提取数值特征
features = ["消费次数", "消费均值", "消费标准差"]
X = stats_df[features].values
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# PCA降维至2D
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# 绘制散点图
plt.figure(figsize=(10, 8))
plt.scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.7, c="orange")
# 标注部分学生
for i in range(min(20, len(stats_df))): # 仅标注前20个学生
plt.annotate(stats_df.iloc[i]["姓名"], (X_pca[i, 0], X_pca[i, 1]), fontsize=8)
plt.title("学生消费特征PCA降维散点图")
plt.xlabel("主成分1")
plt.ylabel("主成分2")
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "pca_scatter.png"))
plt.show()
print(f"PCA散点图已保存至{save_dir}目录!")
# ===================== 学生画像模型构建与应用模块 =====================
def build_portrait_model(stats_df, model_path="student_portrait_model.pkl"):
"""
构建学生画像模型(按消费特征分标签)
标签规则:
- 特别困难:消费均值<15 且 消费次数<5
- 困难消费均值15-20 且 消费次数<7
- 一般困难消费均值20-25 或 消费次数7-8
- 不困难:其他情况
:param stats_df: 统计特征DataFrame
:param model_path: 模型保存路径
:return: 带标签的DataFrame
"""
def get_label(row):
mean_val = row["消费均值"]
count = row["消费次数"]
if mean_val < 15 and count < 5:
return "特别困难"
elif 15 <= mean_val < 20 and count < 7:
return "困难"
elif (20 <= mean_val < 25) or (7 <= count <= 8):
return "一般困难"
else:
return "不困难"
# 生成标签
stats_df["困难标签"] = stats_df.apply(get_label, axis=1)
# 保存模型(此处简化为保存标签规则对应的统计阈值,实际可替换为机器学习模型)
model = {
"rules": {
"特别困难": {"消费均值<15": 15, "消费次数<5": 5},
"困难": {"15≤消费均值<20": (15, 20), "消费次数<7": 7},
"一般困难": {"20≤消费均值<25": (20, 25), "7≤消费次数≤8": (7, 8)},
"不困难": "其他"
},
"label_counts": stats_df["困难标签"].value_counts().to_dict()
}
joblib.dump(model, model_path)
print(f"画像模型构建完成!标签分布:{model['label_counts']}")
print(f"模型已保存至{model_path}")
return stats_df
def get_aid_strategy(label):
"""根据标签返回精准帮扶策略"""
strategies = {
"特别困难": "提供全额助学金+生活补贴+勤工俭学优先安排",
"困难": "提供半额助学金+生活补贴",
"一般困难": "提供临时补助+学业辅导",
"不困难": "无需资助,可推荐奖学金申请"
}
return strategies.get(label, "暂无适配的帮扶策略")
def apply_portrait_model(test_data, model_path="student_portrait_model.pkl"):
"""
应用画像模型对测试数据打标签并返回资助建议
:param test_data: 测试数据DataFrame
:param model_path: 模型路径
:return: 带标签和建议的字典
"""
# 加载模型
if not os.path.exists(model_path):
print(f"错误:模型文件{model_path}不存在!")
return {}
model = joblib.load(model_path)
# 先对测试数据做统计分析
test_stats = statistical_analysis(test_data)
# 打标签+生成建议
results = {}
for idx, row in test_stats.iterrows():
sid = row["学号"]
name = row["姓名"]
mean_val = row["消费均值"]
count = row["消费次数"]
# 应用模型规则
if mean_val < model["rules"]["特别困难"]["消费均值<15"] and count < model["rules"]["特别困难"]["消费次数<5"]:
label = "特别困难"
elif (model["rules"]["困难"]["15≤消费均值<20"][0] <= mean_val < model["rules"]["困难"]["15≤消费均值<20"][
1]) and count < model["rules"]["困难"]["消费次数<7"]:
label = "困难"
elif (model["rules"]["一般困难"]["20≤消费均值<25"][0] <= mean_val <
model["rules"]["一般困难"]["20≤消费均值<25"][1]) or (
model["rules"]["一般困难"]["7≤消费次数≤8"][0] <= count <= model["rules"]["一般困难"]["7≤消费次数≤8"][
1]):
label = "一般困难"
else:
label = "不困难"
# 获取帮扶策略
strategy = get_aid_strategy(label)
results[sid] = {
"姓名": name,
"消费次数": count,
"消费均值": mean_val,
"困难标签": label,
"帮扶策略": strategy
}
print("画像模型应用完成!测试数据标签及帮扶策略如下:")
for sid, info in results.items():
print(f"学号:{sid},姓名:{info['姓名']},标签:{info['困难标签']},策略:{info['帮扶策略']}")
return results
# ===================== 主函数(全流程执行) =====================
def main():
# 1. 生成模拟数据
save_paths = {"csv": "student_consume_train.csv", "excel": "student_consume_train.xlsx",
"json": "student_consume_train.json"}
test_path = "student_consume_test.csv"
generate_student_consume_data(n_train=100, n_test=5, save_paths=save_paths, test_path=test_path)
# 2. 数据导入以CSV为例可替换为excel/json
train_data = import_csv_data(save_paths["csv"])
if train_data.empty:
return
# 3. 数据清洗
# 去重
train_data_dedup = remove_duplicates(train_data)
# 缺失值填充(均值填充)
train_data_fill = fill_missing_values(train_data_dedup, strategy="mean")
# 异常值处理(均值填充)
train_data_clean = handle_outliers(train_data_fill, strategy="mean")
# 4. 数据分析
# 统计特征分析
stats_df = statistical_analysis(train_data_clean)
# 可视化
plot_top10_bar(stats_df)
plot_pca_scatter(stats_df)
# 5. 构建画像模型
stats_df_with_label = build_portrait_model(stats_df)
# 6. 模型应用(测试数据)
test_data = import_csv_data(test_path)
if not test_data.empty:
# 测试数据先清洗
test_data_dedup = remove_duplicates(test_data)
test_data_fill = fill_missing_values(test_data_dedup, strategy="mean")
test_data_clean = handle_outliers(test_data_fill, strategy="mean")
# 应用模型
aid_results = apply_portrait_model(test_data_clean)
if __name__ == "__main__":
main()