ADD file via upload

main
pfaskoir9 2 months ago
parent aad36980b6
commit 484c06c402

@ -0,0 +1,633 @@
import random
import pandas as pd
import numpy as np
import json
import os
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler
import joblib
# 设置中文显示
plt.rcParams['font.sans-serif'] = ['SimHei'] # 解决中文显示
plt.rcParams['axes.unicode_minus'] = False # 解决负号显示
# ===================== 数据生成模块 =====================
def generate_student_consume_data(
n_train=100, n_test=5,
save_paths={"csv": "student_consume_train.csv", "excel": "student_consume_train.xlsx",
"json": "student_consume_train.json"},
test_path="student_consume_test.csv"
):
"""
生成模拟学生消费数据含缺失值异常值
:param n_train: 训练数据量
:param n_test: 测试数据量
:param save_paths: 训练数据保存路径csv/excel/json
:param test_path: 测试数据保存路径
"""
# 生成基础信息
student_ids = [f"2024{str(i).zfill(6)}" for i in range(1, n_train + n_test + 1)]
names = [f"学生{j}" for j in range(1, n_train + n_test + 1)]
# 生成消费数据10次消费记录含缺失/异常值)
consume_data = []
for idx, (sid, name) in enumerate(zip(student_ids, names)):
row = [sid, name]
# 生成10次消费金额正常范围10-50元随机插入异常值/缺失值)
for _ in range(10):
# 10%概率生成缺失值5%概率生成异常值(>200或<0
if random.random() < 0.1:
row.append(None)
elif random.random() < 0.05:
row.append(random.choice([random.randint(-50, 0), random.randint(200, 500)]))
else:
row.append(round(random.uniform(10, 50), 2))
consume_data.append(row)
# 拆分训练/测试数据
train_data = consume_data[:n_train]
test_data = consume_data[n_train:]
# 定义列名
cols = ["学号", "姓名"] + [f"消费{i + 1}" for i in range(10)]
# 保存训练数据(多格式)
# CSV
df_train = pd.DataFrame(train_data, columns=cols)
df_train.to_csv(save_paths["csv"], index=False)
# Excel
df_train.to_excel(save_paths["excel"], index=False)
# JSON嵌套格式
json_train = []
for row in train_data:
json_train.append({
"student_info": {"学号": row[0], "姓名": row[1]},
"consume_records": row[2:]
})
with open(save_paths["json"], "w", encoding="utf-8") as f:
json.dump(json_train, f, ensure_ascii=False, indent=2)
# 保存测试数据
df_test = pd.DataFrame(test_data, columns=cols)
df_test.to_csv(test_path, index=False)
print(f"模拟数据生成完成!训练数据{len(train_data)}条,测试数据{len(test_data)}")
return save_paths, test_path
# ===================== 数据导入模块 =====================
def import_csv_data(file_path):
"""导入CSV文件缺失值补None返回DataFrame"""
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在!")
return pd.DataFrame()
if os.path.getsize(file_path) == 0:
print(f"提示:文件{file_path}为空!")
return pd.DataFrame()
df = pd.read_csv(file_path, na_values=["", "NaN", "None"], keep_default_na=False)
df = df.where(pd.notna(df), None) # 缺失值统一替换为None
return df
def import_excel_data(file_path):
"""导入Excel文件兼容xls/xlsx自动识别表头缺失值补None"""
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在!")
return pd.DataFrame()
if os.path.getsize(file_path) == 0:
print(f"提示:文件{file_path}为空!")
return pd.DataFrame()
df = pd.read_excel(file_path, header=0) # 自动识别首行作表头
df = df.where(pd.notna(df), None) # 缺失值统一替换为None
return df
def import_json_data(file_path):
"""导入JSON文件解析嵌套字段缺失值补None"""
if not os.path.exists(file_path):
print(f"错误:文件{file_path}不存在!")
return {}
if os.path.getsize(file_path) == 0:
print(f"提示:文件{file_path}为空!")
return {}
with open(file_path, "r", encoding="utf-8") as f:
data = json.load(f)
# 解析嵌套字段,标准化格式
parsed_data = []
for item in data:
row = {
"学号": item.get("student_info", {}).get("学号", None),
"姓名": item.get("student_info", {}).get("姓名", None),
"消费记录": item.get("consume_records", [None] * 10)
}
parsed_data.append(row)
return {"data": parsed_data}
# ===================== 数据清洗模块 =====================
def remove_duplicates(data, key_col="学号"):
"""
按主键去重保留第一条
:param data: DataFrame/字典类型数据
:param key_col: 主键列名
:return: 去重后的数据
"""
if isinstance(data, pd.DataFrame):
if key_col not in data.columns:
print(f"错误:主键列{key_col}不存在!")
return data
cleaned = data.drop_duplicates(subset=[key_col], keep="first")
elif isinstance(data, dict) and "data" in data:
# 处理字典类型数据
seen_keys = set()
cleaned_list = []
for item in data["data"]:
key = item.get(key_col, None)
if key is None:
cleaned_list.append(item)
continue
if key not in seen_keys:
seen_keys.add(key)
cleaned_list.append(item)
cleaned = {"data": cleaned_list}
else:
print("错误:不支持的数据类型!")
return data
print(
f"去重完成!原数据{len(data) if isinstance(data, pd.DataFrame) else len(data['data'])}条,清洗后{len(cleaned) if isinstance(cleaned, pd.DataFrame) else len(cleaned['data'])}")
return cleaned
def fill_missing_values(data, strategy="mean", key_col="学号"):
"""
缺失值填充均值/众数/前向填充
:param data: DataFrame/字典类型数据
:param strategy: 填充策略mean/median/ffill
:param key_col: 主键列用于保留非数值列
:return: 填充后的数据
"""
if isinstance(data, pd.DataFrame):
# 分离数值列和非数值列
non_numeric_cols = [col for col in data.columns if col in [key_col, "姓名"]]
numeric_cols = [col for col in data.columns if col not in non_numeric_cols]
df_numeric = data[numeric_cols].copy()
# 填充策略
if strategy == "mean":
fill_value = df_numeric.astype(float).mean()
elif strategy == "mode":
fill_value = df_numeric.astype(float).mode().iloc[0]
elif strategy == "ffill":
df_filled = df_numeric.fillna(method="ffill")
else:
print(f"错误:不支持的填充策略{strategy}")
return data
if strategy != "ffill":
df_filled = df_numeric.fillna(fill_value)
# 合并非数值列
cleaned = pd.concat([data[non_numeric_cols].reset_index(drop=True), df_filled.reset_index(drop=True)], axis=1)
elif isinstance(data, dict) and "data" in data:
# 处理字典类型数据
cleaned_list = []
for item in data["data"]:
consume_records = item.get("消费记录", [None] * 10)
# 过滤None计算填充值
valid_vals = [v for v in consume_records if v is not None and isinstance(v, (int, float))]
if not valid_vals:
filled_records = [0.0] * 10 # 无有效值时填充0
else:
if strategy == "mean":
fill_val = np.mean(valid_vals)
elif strategy == "mode":
fill_val = np.mode(valid_vals)[0] if len(valid_vals) > 0 else 0.0
elif strategy == "ffill":
# 前向填充
filled_records = []
prev_val = 0.0
for val in consume_records:
if val is None or not isinstance(val, (int, float)):
filled_records.append(prev_val)
else:
filled_records.append(val)
prev_val = val
fill_val = None
else:
print(f"错误:不支持的填充策略{strategy}")
return data
if strategy != "ffill":
filled_records = [fill_val if v is None else v for v in consume_records]
cleaned_item = {
"学号": item.get("学号"),
"姓名": item.get("姓名"),
"消费记录": filled_records
}
cleaned_list.append(cleaned_item)
cleaned = {"data": cleaned_list}
else:
print("错误:不支持的数据类型!")
return data
print(f"缺失值填充完成(策略:{strategy}")
return cleaned
def handle_outliers(data, strategy="mean", key_col="学号", alpha=3):
"""
基于3σ准则处理异常值
:param data: DataFrame/字典类型数据
:param strategy: 处理策略drop/mean/mode/ffill
:param key_col: 主键列
:param alpha: 3σ准则的α值
:return: 处理后的数据
"""
if isinstance(data, pd.DataFrame):
# 分离数值列和非数值列
non_numeric_cols = [col for col in data.columns if col in [key_col, "姓名"]]
numeric_cols = [col for col in data.columns if col not in non_numeric_cols]
df_numeric = data[numeric_cols].astype(float).copy()
# 计算3σ范围
mean_vals = df_numeric.mean()
std_vals = df_numeric.std()
lower_bound = mean_vals - alpha * std_vals
upper_bound = mean_vals + alpha * std_vals
# 标记异常值
outliers_mask = (df_numeric < lower_bound) | (df_numeric > upper_bound)
if strategy == "drop":
# 删除含异常值的行
row_mask = outliers_mask.any(axis=1)
df_cleaned = df_numeric[~row_mask]
non_numeric_cleaned = data[non_numeric_cols][~row_mask].reset_index(drop=True)
else:
# 填充异常值
df_cleaned = df_numeric.copy()
for col in numeric_cols:
# 计算该列的填充值
valid_vals = df_cleaned[col][~outliers_mask[col]]
if len(valid_vals) == 0:
fill_val = 0.0
elif strategy == "mean":
fill_val = valid_vals.mean()
elif strategy == "mode":
fill_val = valid_vals.mode().iloc[0]
elif strategy == "ffill":
df_cleaned[col] = df_cleaned[col].mask(outliers_mask[col]).fillna(method="ffill")
continue
else:
print(f"错误:不支持的异常值处理策略{strategy}")
return data
# 填充异常值
if strategy != "ffill":
df_cleaned = df_cleaned.mask(outliers_mask, fill_val)
non_numeric_cleaned = data[non_numeric_cols].reset_index(drop=True)
# 合并数据
cleaned = pd.concat([non_numeric_cleaned, df_cleaned.reset_index(drop=True)], axis=1)
elif isinstance(data, dict) and "data" in data:
# 处理字典类型数据
cleaned_list = []
for item in data["data"]:
consume_records = item.get("消费记录", [])
valid_vals = [v for v in consume_records if v is not None and isinstance(v, (int, float))]
if not valid_vals:
cleaned_list.append(item)
continue
# 计算3σ范围
mean_val = np.mean(valid_vals)
std_val = np.std(valid_vals)
lower = mean_val - alpha * std_val
upper = mean_val + alpha * std_val
# 处理异常值
cleaned_records = []
prev_val = mean_val # 前向填充的初始值
for val in consume_records:
if val is None:
cleaned_records.append(val)
continue
# 判断是否为异常值
is_outlier = val < lower or val > upper
if not is_outlier:
cleaned_records.append(val)
prev_val = val
else:
if strategy == "drop":
break # 丢弃整行
elif strategy == "mean":
cleaned_records.append(mean_val)
elif strategy == "mode":
cleaned_records.append(np.mode(valid_vals)[0] if len(valid_vals) > 0 else 0.0)
elif strategy == "ffill":
cleaned_records.append(prev_val)
else:
cleaned_records.append(val)
if strategy != "drop":
cleaned_item = {
"学号": item.get("学号"),
"姓名": item.get("姓名"),
"消费记录": cleaned_records
}
cleaned_list.append(cleaned_item)
cleaned = {"data": cleaned_list}
else:
print("错误:不支持的数据类型!")
return data
print(f"异常值处理完成(策略:{strategy}3σ准则")
return cleaned
# ===================== 数据分析模块 =====================
def statistical_analysis(data, key_col="学号"):
"""
统计特征分析计算每个学生消费次数均值标准差
:param data: 清洗后的DataFrame数据
:return: 统计结果DataFrame
"""
if not isinstance(data, pd.DataFrame):
print("错误仅支持DataFrame类型数据的统计分析")
return pd.DataFrame()
# 分离消费列
consume_cols = [col for col in data.columns if "消费" in col]
# 统计每个学生的消费特征
stats = []
for idx, row in data.iterrows():
sid = row[key_col]
name = row["姓名"]
# 提取有效消费记录
consume_vals = [v for v in row[consume_cols] if v is not None and isinstance(v, (int, float))]
# 计算统计特征
count = len(consume_vals)
mean_val = round(np.mean(consume_vals), 2) if count > 0 else 0.0
std_val = round(np.std(consume_vals), 2) if count > 1 else 0.0
stats.append({
"学号": sid,
"姓名": name,
"消费次数": count,
"消费均值": mean_val,
"消费标准差": std_val
})
stats_df = pd.DataFrame(stats)
print("统计特征分析完成!")
return stats_df
def plot_top10_bar(stats_df, save_dir="plots"):
"""
绘制Top10柱状图次数/均值/标准差并保存
:param stats_df: 统计特征DataFrame
:param save_dir: 图片保存目录
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 1. 消费次数Top10
top10_count = stats_df.nlargest(10, "消费次数")
plt.figure(figsize=(12, 4))
plt.bar(top10_count["姓名"], top10_count["消费次数"], color="skyblue")
plt.title("学生消费次数Top10")
plt.xlabel("学生姓名")
plt.ylabel("消费次数")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "top10_count.png"))
plt.show()
# 2. 消费均值Top10
top10_mean = stats_df.nlargest(10, "消费均值")
plt.figure(figsize=(12, 4))
plt.bar(top10_mean["姓名"], top10_mean["消费均值"], color="lightcoral")
plt.title("学生消费均值Top10")
plt.xlabel("学生姓名")
plt.ylabel("消费均值(元)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "top10_mean.png"))
plt.show()
# 3. 消费标准差Top10
top10_std = stats_df.nlargest(10, "消费标准差")
plt.figure(figsize=(12, 4))
plt.bar(top10_std["姓名"], top10_std["消费标准差"], color="lightgreen")
plt.title("学生消费标准差Top10")
plt.xlabel("学生姓名")
plt.ylabel("消费标准差(元)")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "top10_std.png"))
plt.show()
print(f"Top10柱状图已保存至{save_dir}目录!")
def plot_pca_scatter(stats_df, save_dir="plots"):
"""
PCA降维后绘制散点图并保存
:param stats_df: 统计特征DataFrame
:param save_dir: 图片保存目录
"""
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 提取数值特征
features = ["消费次数", "消费均值", "消费标准差"]
X = stats_df[features].values
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# PCA降维至2D
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X_scaled)
# 绘制散点图
plt.figure(figsize=(10, 8))
plt.scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.7, c="orange")
# 标注部分学生
for i in range(min(20, len(stats_df))): # 仅标注前20个学生
plt.annotate(stats_df.iloc[i]["姓名"], (X_pca[i, 0], X_pca[i, 1]), fontsize=8)
plt.title("学生消费特征PCA降维散点图")
plt.xlabel("主成分1")
plt.ylabel("主成分2")
plt.grid(alpha=0.3)
plt.tight_layout()
plt.savefig(os.path.join(save_dir, "pca_scatter.png"))
plt.show()
print(f"PCA散点图已保存至{save_dir}目录!")
# ===================== 学生画像模型构建与应用模块 =====================
def build_portrait_model(stats_df, model_path="student_portrait_model.pkl"):
"""
构建学生画像模型按消费特征分标签
标签规则
- 特别困难消费均值<15 消费次数<5
- 困难消费均值15-20 消费次数<7
- 一般困难消费均值20-25 消费次数7-8
- 不困难其他情况
:param stats_df: 统计特征DataFrame
:param model_path: 模型保存路径
:return: 带标签的DataFrame
"""
def get_label(row):
mean_val = row["消费均值"]
count = row["消费次数"]
if mean_val < 15 and count < 5:
return "特别困难"
elif 15 <= mean_val < 20 and count < 7:
return "困难"
elif (20 <= mean_val < 25) or (7 <= count <= 8):
return "一般困难"
else:
return "不困难"
# 生成标签
stats_df["困难标签"] = stats_df.apply(get_label, axis=1)
# 保存模型(此处简化为保存标签规则对应的统计阈值,实际可替换为机器学习模型)
model = {
"rules": {
"特别困难": {"消费均值<15": 15, "消费次数<5": 5},
"困难": {"15≤消费均值<20": (15, 20), "消费次数<7": 7},
"一般困难": {"20≤消费均值<25": (20, 25), "7≤消费次数≤8": (7, 8)},
"不困难": "其他"
},
"label_counts": stats_df["困难标签"].value_counts().to_dict()
}
joblib.dump(model, model_path)
print(f"画像模型构建完成!标签分布:{model['label_counts']}")
print(f"模型已保存至{model_path}")
return stats_df
def get_aid_strategy(label):
"""根据标签返回精准帮扶策略"""
strategies = {
"特别困难": "提供全额助学金+生活补贴+勤工俭学优先安排",
"困难": "提供半额助学金+生活补贴",
"一般困难": "提供临时补助+学业辅导",
"不困难": "无需资助,可推荐奖学金申请"
}
return strategies.get(label, "暂无适配的帮扶策略")
def apply_portrait_model(test_data, model_path="student_portrait_model.pkl"):
"""
应用画像模型对测试数据打标签并返回资助建议
:param test_data: 测试数据DataFrame
:param model_path: 模型路径
:return: 带标签和建议的字典
"""
# 加载模型
if not os.path.exists(model_path):
print(f"错误:模型文件{model_path}不存在!")
return {}
model = joblib.load(model_path)
# 先对测试数据做统计分析
test_stats = statistical_analysis(test_data)
# 打标签+生成建议
results = {}
for idx, row in test_stats.iterrows():
sid = row["学号"]
name = row["姓名"]
mean_val = row["消费均值"]
count = row["消费次数"]
# 应用模型规则
if mean_val < model["rules"]["特别困难"]["消费均值<15"] and count < model["rules"]["特别困难"]["消费次数<5"]:
label = "特别困难"
elif (model["rules"]["困难"]["15≤消费均值<20"][0] <= mean_val < model["rules"]["困难"]["15≤消费均值<20"][
1]) and count < model["rules"]["困难"]["消费次数<7"]:
label = "困难"
elif (model["rules"]["一般困难"]["20≤消费均值<25"][0] <= mean_val <
model["rules"]["一般困难"]["20≤消费均值<25"][1]) or (
model["rules"]["一般困难"]["7≤消费次数≤8"][0] <= count <= model["rules"]["一般困难"]["7≤消费次数≤8"][
1]):
label = "一般困难"
else:
label = "不困难"
# 获取帮扶策略
strategy = get_aid_strategy(label)
results[sid] = {
"姓名": name,
"消费次数": count,
"消费均值": mean_val,
"困难标签": label,
"帮扶策略": strategy
}
print("画像模型应用完成!测试数据标签及帮扶策略如下:")
for sid, info in results.items():
print(f"学号:{sid},姓名:{info['姓名']},标签:{info['困难标签']},策略:{info['帮扶策略']}")
return results
# ===================== 主函数(全流程执行) =====================
def main():
# 1. 生成模拟数据
save_paths = {"csv": "student_consume_train.csv", "excel": "student_consume_train.xlsx",
"json": "student_consume_train.json"}
test_path = "student_consume_test.csv"
generate_student_consume_data(n_train=100, n_test=5, save_paths=save_paths, test_path=test_path)
# 2. 数据导入以CSV为例可替换为excel/json
train_data = import_csv_data(save_paths["csv"])
if train_data.empty:
return
# 3. 数据清洗
# 去重
train_data_dedup = remove_duplicates(train_data)
# 缺失值填充(均值填充)
train_data_fill = fill_missing_values(train_data_dedup, strategy="mean")
# 异常值处理(均值填充)
train_data_clean = handle_outliers(train_data_fill, strategy="mean")
# 4. 数据分析
# 统计特征分析
stats_df = statistical_analysis(train_data_clean)
# 可视化
plot_top10_bar(stats_df)
plot_pca_scatter(stats_df)
# 5. 构建画像模型
stats_df_with_label = build_portrait_model(stats_df)
# 6. 模型应用(测试数据)
test_data = import_csv_data(test_path)
if not test_data.empty:
# 测试数据先清洗
test_data_dedup = remove_duplicates(test_data)
test_data_fill = fill_missing_values(test_data_dedup, strategy="mean")
test_data_clean = handle_outliers(test_data_fill, strategy="mean")
# 应用模型
aid_results = apply_portrait_model(test_data_clean)
if __name__ == "__main__":
main()
Loading…
Cancel
Save