diff --git a/script.py b/script.py new file mode 100644 index 0000000..6f91f85 --- /dev/null +++ b/script.py @@ -0,0 +1,633 @@ +import random +import pandas as pd +import numpy as np +import json +import os +import matplotlib.pyplot as plt +from sklearn.decomposition import PCA +from sklearn.preprocessing import StandardScaler +import joblib + +# 设置中文显示 +plt.rcParams['font.sans-serif'] = ['SimHei'] # 解决中文显示 +plt.rcParams['axes.unicode_minus'] = False # 解决负号显示 + + +# ===================== 数据生成模块 ===================== +def generate_student_consume_data( + n_train=100, n_test=5, + save_paths={"csv": "student_consume_train.csv", "excel": "student_consume_train.xlsx", + "json": "student_consume_train.json"}, + test_path="student_consume_test.csv" +): + """ + 生成模拟学生消费数据(含缺失值、异常值) + :param n_train: 训练数据量 + :param n_test: 测试数据量 + :param save_paths: 训练数据保存路径(csv/excel/json) + :param test_path: 测试数据保存路径 + """ + # 生成基础信息 + student_ids = [f"2024{str(i).zfill(6)}" for i in range(1, n_train + n_test + 1)] + names = [f"学生{j}" for j in range(1, n_train + n_test + 1)] + + # 生成消费数据(10次消费记录,含缺失/异常值) + consume_data = [] + for idx, (sid, name) in enumerate(zip(student_ids, names)): + row = [sid, name] + # 生成10次消费金额(正常范围10-50元,随机插入异常值/缺失值) + for _ in range(10): + # 10%概率生成缺失值,5%概率生成异常值(>200或<0) + if random.random() < 0.1: + row.append(None) + elif random.random() < 0.05: + row.append(random.choice([random.randint(-50, 0), random.randint(200, 500)])) + else: + row.append(round(random.uniform(10, 50), 2)) + consume_data.append(row) + + # 拆分训练/测试数据 + train_data = consume_data[:n_train] + test_data = consume_data[n_train:] + + # 定义列名 + cols = ["学号", "姓名"] + [f"消费{i + 1}" for i in range(10)] + + # 保存训练数据(多格式) + # CSV + df_train = pd.DataFrame(train_data, columns=cols) + df_train.to_csv(save_paths["csv"], index=False) + # Excel + df_train.to_excel(save_paths["excel"], index=False) + # JSON(嵌套格式) + json_train = [] + for row in train_data: + json_train.append({ + "student_info": {"学号": row[0], "姓名": row[1]}, + "consume_records": row[2:] + }) + with open(save_paths["json"], "w", encoding="utf-8") as f: + json.dump(json_train, f, ensure_ascii=False, indent=2) + + # 保存测试数据 + df_test = pd.DataFrame(test_data, columns=cols) + df_test.to_csv(test_path, index=False) + + print(f"模拟数据生成完成!训练数据{len(train_data)}条,测试数据{len(test_data)}条") + return save_paths, test_path + + +# ===================== 数据导入模块 ===================== +def import_csv_data(file_path): + """导入CSV文件,缺失值补None,返回DataFrame""" + if not os.path.exists(file_path): + print(f"错误:文件{file_path}不存在!") + return pd.DataFrame() + if os.path.getsize(file_path) == 0: + print(f"提示:文件{file_path}为空!") + return pd.DataFrame() + + df = pd.read_csv(file_path, na_values=["", "NaN", "None"], keep_default_na=False) + df = df.where(pd.notna(df), None) # 缺失值统一替换为None + return df + + +def import_excel_data(file_path): + """导入Excel文件(兼容xls/xlsx),自动识别表头,缺失值补None""" + if not os.path.exists(file_path): + print(f"错误:文件{file_path}不存在!") + return pd.DataFrame() + if os.path.getsize(file_path) == 0: + print(f"提示:文件{file_path}为空!") + return pd.DataFrame() + + df = pd.read_excel(file_path, header=0) # 自动识别首行作表头 + df = df.where(pd.notna(df), None) # 缺失值统一替换为None + return df + + +def import_json_data(file_path): + """导入JSON文件,解析嵌套字段,缺失值补None""" + if not os.path.exists(file_path): + print(f"错误:文件{file_path}不存在!") + return {} + if os.path.getsize(file_path) == 0: + print(f"提示:文件{file_path}为空!") + return {} + + with open(file_path, "r", encoding="utf-8") as f: + data = json.load(f) + + # 解析嵌套字段,标准化格式 + parsed_data = [] + for item in data: + row = { + "学号": item.get("student_info", {}).get("学号", None), + "姓名": item.get("student_info", {}).get("姓名", None), + "消费记录": item.get("consume_records", [None] * 10) + } + parsed_data.append(row) + return {"data": parsed_data} + + +# ===================== 数据清洗模块 ===================== +def remove_duplicates(data, key_col="学号"): + """ + 按主键去重,保留第一条 + :param data: DataFrame/字典类型数据 + :param key_col: 主键列名 + :return: 去重后的数据 + """ + if isinstance(data, pd.DataFrame): + if key_col not in data.columns: + print(f"错误:主键列{key_col}不存在!") + return data + cleaned = data.drop_duplicates(subset=[key_col], keep="first") + elif isinstance(data, dict) and "data" in data: + # 处理字典类型数据 + seen_keys = set() + cleaned_list = [] + for item in data["data"]: + key = item.get(key_col, None) + if key is None: + cleaned_list.append(item) + continue + if key not in seen_keys: + seen_keys.add(key) + cleaned_list.append(item) + cleaned = {"data": cleaned_list} + else: + print("错误:不支持的数据类型!") + return data + print( + f"去重完成!原数据{len(data) if isinstance(data, pd.DataFrame) else len(data['data'])}条,清洗后{len(cleaned) if isinstance(cleaned, pd.DataFrame) else len(cleaned['data'])}条") + return cleaned + + +def fill_missing_values(data, strategy="mean", key_col="学号"): + """ + 缺失值填充(均值/众数/前向填充) + :param data: DataFrame/字典类型数据 + :param strategy: 填充策略(mean/median/ffill) + :param key_col: 主键列(用于保留非数值列) + :return: 填充后的数据 + """ + if isinstance(data, pd.DataFrame): + # 分离数值列和非数值列 + non_numeric_cols = [col for col in data.columns if col in [key_col, "姓名"]] + numeric_cols = [col for col in data.columns if col not in non_numeric_cols] + df_numeric = data[numeric_cols].copy() + + # 填充策略 + if strategy == "mean": + fill_value = df_numeric.astype(float).mean() + elif strategy == "mode": + fill_value = df_numeric.astype(float).mode().iloc[0] + elif strategy == "ffill": + df_filled = df_numeric.fillna(method="ffill") + else: + print(f"错误:不支持的填充策略{strategy}!") + return data + + if strategy != "ffill": + df_filled = df_numeric.fillna(fill_value) + + # 合并非数值列 + cleaned = pd.concat([data[non_numeric_cols].reset_index(drop=True), df_filled.reset_index(drop=True)], axis=1) + + elif isinstance(data, dict) and "data" in data: + # 处理字典类型数据 + cleaned_list = [] + for item in data["data"]: + consume_records = item.get("消费记录", [None] * 10) + # 过滤None,计算填充值 + valid_vals = [v for v in consume_records if v is not None and isinstance(v, (int, float))] + if not valid_vals: + filled_records = [0.0] * 10 # 无有效值时填充0 + else: + if strategy == "mean": + fill_val = np.mean(valid_vals) + elif strategy == "mode": + fill_val = np.mode(valid_vals)[0] if len(valid_vals) > 0 else 0.0 + elif strategy == "ffill": + # 前向填充 + filled_records = [] + prev_val = 0.0 + for val in consume_records: + if val is None or not isinstance(val, (int, float)): + filled_records.append(prev_val) + else: + filled_records.append(val) + prev_val = val + fill_val = None + else: + print(f"错误:不支持的填充策略{strategy}!") + return data + + if strategy != "ffill": + filled_records = [fill_val if v is None else v for v in consume_records] + + cleaned_item = { + "学号": item.get("学号"), + "姓名": item.get("姓名"), + "消费记录": filled_records + } + cleaned_list.append(cleaned_item) + cleaned = {"data": cleaned_list} + else: + print("错误:不支持的数据类型!") + return data + + print(f"缺失值填充完成(策略:{strategy})") + return cleaned + + +def handle_outliers(data, strategy="mean", key_col="学号", alpha=3): + """ + 基于3σ准则处理异常值 + :param data: DataFrame/字典类型数据 + :param strategy: 处理策略(drop/mean/mode/ffill) + :param key_col: 主键列 + :param alpha: 3σ准则的α值 + :return: 处理后的数据 + """ + if isinstance(data, pd.DataFrame): + # 分离数值列和非数值列 + non_numeric_cols = [col for col in data.columns if col in [key_col, "姓名"]] + numeric_cols = [col for col in data.columns if col not in non_numeric_cols] + df_numeric = data[numeric_cols].astype(float).copy() + + # 计算3σ范围 + mean_vals = df_numeric.mean() + std_vals = df_numeric.std() + lower_bound = mean_vals - alpha * std_vals + upper_bound = mean_vals + alpha * std_vals + + # 标记异常值 + outliers_mask = (df_numeric < lower_bound) | (df_numeric > upper_bound) + + if strategy == "drop": + # 删除含异常值的行 + row_mask = outliers_mask.any(axis=1) + df_cleaned = df_numeric[~row_mask] + non_numeric_cleaned = data[non_numeric_cols][~row_mask].reset_index(drop=True) + else: + # 填充异常值 + df_cleaned = df_numeric.copy() + for col in numeric_cols: + # 计算该列的填充值 + valid_vals = df_cleaned[col][~outliers_mask[col]] + if len(valid_vals) == 0: + fill_val = 0.0 + elif strategy == "mean": + fill_val = valid_vals.mean() + elif strategy == "mode": + fill_val = valid_vals.mode().iloc[0] + elif strategy == "ffill": + df_cleaned[col] = df_cleaned[col].mask(outliers_mask[col]).fillna(method="ffill") + continue + else: + print(f"错误:不支持的异常值处理策略{strategy}!") + return data + # 填充异常值 + if strategy != "ffill": + df_cleaned = df_cleaned.mask(outliers_mask, fill_val) + non_numeric_cleaned = data[non_numeric_cols].reset_index(drop=True) + + # 合并数据 + cleaned = pd.concat([non_numeric_cleaned, df_cleaned.reset_index(drop=True)], axis=1) + + elif isinstance(data, dict) and "data" in data: + # 处理字典类型数据 + cleaned_list = [] + for item in data["data"]: + consume_records = item.get("消费记录", []) + valid_vals = [v for v in consume_records if v is not None and isinstance(v, (int, float))] + if not valid_vals: + cleaned_list.append(item) + continue + + # 计算3σ范围 + mean_val = np.mean(valid_vals) + std_val = np.std(valid_vals) + lower = mean_val - alpha * std_val + upper = mean_val + alpha * std_val + + # 处理异常值 + cleaned_records = [] + prev_val = mean_val # 前向填充的初始值 + for val in consume_records: + if val is None: + cleaned_records.append(val) + continue + # 判断是否为异常值 + is_outlier = val < lower or val > upper + if not is_outlier: + cleaned_records.append(val) + prev_val = val + else: + if strategy == "drop": + break # 丢弃整行 + elif strategy == "mean": + cleaned_records.append(mean_val) + elif strategy == "mode": + cleaned_records.append(np.mode(valid_vals)[0] if len(valid_vals) > 0 else 0.0) + elif strategy == "ffill": + cleaned_records.append(prev_val) + else: + cleaned_records.append(val) + + if strategy != "drop": + cleaned_item = { + "学号": item.get("学号"), + "姓名": item.get("姓名"), + "消费记录": cleaned_records + } + cleaned_list.append(cleaned_item) + + cleaned = {"data": cleaned_list} + else: + print("错误:不支持的数据类型!") + return data + + print(f"异常值处理完成(策略:{strategy},3σ准则)") + return cleaned + + +# ===================== 数据分析模块 ===================== +def statistical_analysis(data, key_col="学号"): + """ + 统计特征分析:计算每个学生消费次数、均值、标准差 + :param data: 清洗后的DataFrame数据 + :return: 统计结果DataFrame + """ + if not isinstance(data, pd.DataFrame): + print("错误:仅支持DataFrame类型数据的统计分析!") + return pd.DataFrame() + + # 分离消费列 + consume_cols = [col for col in data.columns if "消费" in col] + # 统计每个学生的消费特征 + stats = [] + for idx, row in data.iterrows(): + sid = row[key_col] + name = row["姓名"] + # 提取有效消费记录 + consume_vals = [v for v in row[consume_cols] if v is not None and isinstance(v, (int, float))] + # 计算统计特征 + count = len(consume_vals) + mean_val = round(np.mean(consume_vals), 2) if count > 0 else 0.0 + std_val = round(np.std(consume_vals), 2) if count > 1 else 0.0 + + stats.append({ + "学号": sid, + "姓名": name, + "消费次数": count, + "消费均值": mean_val, + "消费标准差": std_val + }) + + stats_df = pd.DataFrame(stats) + print("统计特征分析完成!") + return stats_df + + +def plot_top10_bar(stats_df, save_dir="plots"): + """ + 绘制Top10柱状图(次数/均值/标准差)并保存 + :param stats_df: 统计特征DataFrame + :param save_dir: 图片保存目录 + """ + if not os.path.exists(save_dir): + os.makedirs(save_dir) + + # 1. 消费次数Top10 + top10_count = stats_df.nlargest(10, "消费次数") + plt.figure(figsize=(12, 4)) + plt.bar(top10_count["姓名"], top10_count["消费次数"], color="skyblue") + plt.title("学生消费次数Top10") + plt.xlabel("学生姓名") + plt.ylabel("消费次数") + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(os.path.join(save_dir, "top10_count.png")) + plt.show() + + # 2. 消费均值Top10 + top10_mean = stats_df.nlargest(10, "消费均值") + plt.figure(figsize=(12, 4)) + plt.bar(top10_mean["姓名"], top10_mean["消费均值"], color="lightcoral") + plt.title("学生消费均值Top10") + plt.xlabel("学生姓名") + plt.ylabel("消费均值(元)") + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(os.path.join(save_dir, "top10_mean.png")) + plt.show() + + # 3. 消费标准差Top10 + top10_std = stats_df.nlargest(10, "消费标准差") + plt.figure(figsize=(12, 4)) + plt.bar(top10_std["姓名"], top10_std["消费标准差"], color="lightgreen") + plt.title("学生消费标准差Top10") + plt.xlabel("学生姓名") + plt.ylabel("消费标准差(元)") + plt.xticks(rotation=45) + plt.tight_layout() + plt.savefig(os.path.join(save_dir, "top10_std.png")) + plt.show() + + print(f"Top10柱状图已保存至{save_dir}目录!") + + +def plot_pca_scatter(stats_df, save_dir="plots"): + """ + PCA降维后绘制散点图并保存 + :param stats_df: 统计特征DataFrame + :param save_dir: 图片保存目录 + """ + if not os.path.exists(save_dir): + os.makedirs(save_dir) + + # 提取数值特征 + features = ["消费次数", "消费均值", "消费标准差"] + X = stats_df[features].values + # 标准化 + scaler = StandardScaler() + X_scaled = scaler.fit_transform(X) + # PCA降维至2D + pca = PCA(n_components=2) + X_pca = pca.fit_transform(X_scaled) + + # 绘制散点图 + plt.figure(figsize=(10, 8)) + plt.scatter(X_pca[:, 0], X_pca[:, 1], alpha=0.7, c="orange") + # 标注部分学生 + for i in range(min(20, len(stats_df))): # 仅标注前20个学生 + plt.annotate(stats_df.iloc[i]["姓名"], (X_pca[i, 0], X_pca[i, 1]), fontsize=8) + plt.title("学生消费特征PCA降维散点图") + plt.xlabel("主成分1") + plt.ylabel("主成分2") + plt.grid(alpha=0.3) + plt.tight_layout() + plt.savefig(os.path.join(save_dir, "pca_scatter.png")) + plt.show() + + print(f"PCA散点图已保存至{save_dir}目录!") + + +# ===================== 学生画像模型构建与应用模块 ===================== +def build_portrait_model(stats_df, model_path="student_portrait_model.pkl"): + """ + 构建学生画像模型(按消费特征分标签) + 标签规则: + - 特别困难:消费均值<15 且 消费次数<5 + - 困难:消费均值15-20 且 消费次数<7 + - 一般困难:消费均值20-25 或 消费次数7-8 + - 不困难:其他情况 + :param stats_df: 统计特征DataFrame + :param model_path: 模型保存路径 + :return: 带标签的DataFrame + """ + + def get_label(row): + mean_val = row["消费均值"] + count = row["消费次数"] + if mean_val < 15 and count < 5: + return "特别困难" + elif 15 <= mean_val < 20 and count < 7: + return "困难" + elif (20 <= mean_val < 25) or (7 <= count <= 8): + return "一般困难" + else: + return "不困难" + + # 生成标签 + stats_df["困难标签"] = stats_df.apply(get_label, axis=1) + # 保存模型(此处简化为保存标签规则对应的统计阈值,实际可替换为机器学习模型) + model = { + "rules": { + "特别困难": {"消费均值<15": 15, "消费次数<5": 5}, + "困难": {"15≤消费均值<20": (15, 20), "消费次数<7": 7}, + "一般困难": {"20≤消费均值<25": (20, 25), "7≤消费次数≤8": (7, 8)}, + "不困难": "其他" + }, + "label_counts": stats_df["困难标签"].value_counts().to_dict() + } + joblib.dump(model, model_path) + + print(f"画像模型构建完成!标签分布:{model['label_counts']}") + print(f"模型已保存至{model_path}") + return stats_df + + +def get_aid_strategy(label): + """根据标签返回精准帮扶策略""" + strategies = { + "特别困难": "提供全额助学金+生活补贴+勤工俭学优先安排", + "困难": "提供半额助学金+生活补贴", + "一般困难": "提供临时补助+学业辅导", + "不困难": "无需资助,可推荐奖学金申请" + } + return strategies.get(label, "暂无适配的帮扶策略") + + +def apply_portrait_model(test_data, model_path="student_portrait_model.pkl"): + """ + 应用画像模型对测试数据打标签并返回资助建议 + :param test_data: 测试数据(DataFrame) + :param model_path: 模型路径 + :return: 带标签和建议的字典 + """ + # 加载模型 + if not os.path.exists(model_path): + print(f"错误:模型文件{model_path}不存在!") + return {} + model = joblib.load(model_path) + + # 先对测试数据做统计分析 + test_stats = statistical_analysis(test_data) + + # 打标签+生成建议 + results = {} + for idx, row in test_stats.iterrows(): + sid = row["学号"] + name = row["姓名"] + mean_val = row["消费均值"] + count = row["消费次数"] + + # 应用模型规则 + if mean_val < model["rules"]["特别困难"]["消费均值<15"] and count < model["rules"]["特别困难"]["消费次数<5"]: + label = "特别困难" + elif (model["rules"]["困难"]["15≤消费均值<20"][0] <= mean_val < model["rules"]["困难"]["15≤消费均值<20"][ + 1]) and count < model["rules"]["困难"]["消费次数<7"]: + label = "困难" + elif (model["rules"]["一般困难"]["20≤消费均值<25"][0] <= mean_val < + model["rules"]["一般困难"]["20≤消费均值<25"][1]) or ( + model["rules"]["一般困难"]["7≤消费次数≤8"][0] <= count <= model["rules"]["一般困难"]["7≤消费次数≤8"][ + 1]): + label = "一般困难" + else: + label = "不困难" + + # 获取帮扶策略 + strategy = get_aid_strategy(label) + + results[sid] = { + "姓名": name, + "消费次数": count, + "消费均值": mean_val, + "困难标签": label, + "帮扶策略": strategy + } + + print("画像模型应用完成!测试数据标签及帮扶策略如下:") + for sid, info in results.items(): + print(f"学号:{sid},姓名:{info['姓名']},标签:{info['困难标签']},策略:{info['帮扶策略']}") + return results + + +# ===================== 主函数(全流程执行) ===================== +def main(): + # 1. 生成模拟数据 + save_paths = {"csv": "student_consume_train.csv", "excel": "student_consume_train.xlsx", + "json": "student_consume_train.json"} + test_path = "student_consume_test.csv" + generate_student_consume_data(n_train=100, n_test=5, save_paths=save_paths, test_path=test_path) + + # 2. 数据导入(以CSV为例,可替换为excel/json) + train_data = import_csv_data(save_paths["csv"]) + if train_data.empty: + return + + # 3. 数据清洗 + # 去重 + train_data_dedup = remove_duplicates(train_data) + # 缺失值填充(均值填充) + train_data_fill = fill_missing_values(train_data_dedup, strategy="mean") + # 异常值处理(均值填充) + train_data_clean = handle_outliers(train_data_fill, strategy="mean") + + # 4. 数据分析 + # 统计特征分析 + stats_df = statistical_analysis(train_data_clean) + # 可视化 + plot_top10_bar(stats_df) + plot_pca_scatter(stats_df) + + # 5. 构建画像模型 + stats_df_with_label = build_portrait_model(stats_df) + + # 6. 模型应用(测试数据) + test_data = import_csv_data(test_path) + if not test_data.empty: + # 测试数据先清洗 + test_data_dedup = remove_duplicates(test_data) + test_data_fill = fill_missing_values(test_data_dedup, strategy="mean") + test_data_clean = handle_outliers(test_data_fill, strategy="mean") + # 应用模型 + aid_results = apply_portrait_model(test_data_clean) + + +if __name__ == "__main__": + main() \ No newline at end of file