You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
gitProject/src/AiDiary/main.py

1016 lines
33 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask import Flask, render_template, request, redirect, url_for, session, flash, jsonify, send_file
from flask_cors import CORS
from functools import wraps
from openai import OpenAI
import json
import markdown
import sqlite3
import hashlib
from datetime import datetime
import os
import re
from flask import send_from_directory
import random
# 初始化Flask应用
app = Flask(__name__)
app.secret_key = 'your-secret-key-here' # 添加密钥用于session
CORS(app)
# 提供mypo文件夹中的图片访问
@app.route('/mypo/<path:filename>')
def serve_mypo_file(filename):
return send_from_directory('mypo', filename)
# 登录验证装饰器
def login_required(func):
@wraps(func)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('请先登录!', 'error')
return redirect(url_for('login'))
return func(*args, **kwargs)
return decorated_function
# 配置DeepSeek API
DEEPSEEK_API_KEY = "sk-2685bfd0fe054c9f82a12ac7905dd80d"
client = OpenAI(
api_key=DEEPSEEK_API_KEY,
base_url="https://api.deepseek.com"
)
# 数据库初始化
def init_db():
conn = sqlite3.connect('database.db')
c = conn.cursor()
# 用户表
c.execute('''CREATE TABLE IF NOT EXISTS users
(id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP)''')
# 日记表
c.execute('''CREATE TABLE IF NOT EXISTS diaries
(id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
tags TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id))''')
# 情绪分析结果表
c.execute('''CREATE TABLE IF NOT EXISTS emotion_analyses
(id INTEGER PRIMARY KEY AUTOINCREMENT,
diary_id INTEGER NOT NULL,
dominant_emotion TEXT NOT NULL,
analysis TEXT NOT NULL,
emotion_scores TEXT NOT NULL,
suggestion TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (diary_id) REFERENCES diaries(id) ON DELETE CASCADE)''')
# 收藏卡片表
c.execute('''CREATE TABLE IF NOT EXISTS collected_cards
(id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
card_name TEXT NOT NULL,
card_type TEXT NOT NULL,
card_content TEXT NOT NULL,
card_image_url TEXT NOT NULL,
collected_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id))''')
# 周报表
c.execute('''CREATE TABLE IF NOT EXISTS weekly_reports
(id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
diary_count INTEGER NOT NULL,
date_range TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(id))''')
conn.commit()
conn.close()
# 密码哈希函数
def hash_password(password):
return hashlib.sha256(password.encode()).hexdigest()
# 主页
@app.route('/')
def index():
return render_template('index.html')
# 情绪分析器页面已在下方使用login_required装饰器定义
# 为避免重复路由定义,此处已移除
# 用户注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = sqlite3.connect('database.db')
c = conn.cursor()
try:
c.execute('INSERT INTO users (username, password) VALUES (?, ?)',
(username, hash_password(password)))
conn.commit()
flash('注册成功!请登录。', 'success')
return redirect(url_for('login'))
except sqlite3.IntegrityError:
flash('用户名已存在!', 'error')
finally:
conn.close()
return render_template('register.html')
# 用户登录
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('SELECT * FROM users WHERE username = ? AND password = ?',
(username, hash_password(password)))
user = c.fetchone()
conn.close()
if user:
session['user_id'] = user[0]
session['username'] = user[1]
flash('登录成功!', 'success')
return redirect(url_for('index'))
else:
flash('用户名或密码错误!', 'error')
return render_template('login.html')
# 用户登出
@app.route('/logout')
def logout():
session.clear()
flash('已成功登出!', 'info')
return redirect(url_for('index'))
# 个人中心
@app.route('/profile')
@login_required
def profile():
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('SELECT created_at FROM users WHERE id = ?', (session['user_id'],))
created_at = c.fetchone()[0]
conn.close()
# 格式化日期显示
created_at_obj = datetime.strptime(created_at, '%Y-%m-%d %H:%M:%S')
formatted_date = created_at_obj.strftime('%Y年%m月%d')
return render_template('profile.html', created_at=formatted_date)
# 修改密码
@app.route('/change_password', methods=['POST'])
@login_required
def change_password():
old_password = request.form['old_password']
new_password = request.form['new_password']
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('SELECT password FROM users WHERE id = ?', (session['user_id'],))
current_password = c.fetchone()[0]
if hash_password(old_password) == current_password:
c.execute('UPDATE users SET password = ? WHERE id = ?',
(hash_password(new_password), session['user_id']))
conn.commit()
flash('密码修改成功!', 'success')
else:
flash('原密码错误!', 'error')
conn.close()
return redirect(url_for('profile'))
# 情绪分析功能
# def analyze_emotion(content):
# """调用DeepSeek API进行情绪分析"""
# try:
# system_prompt = """
# 你是一个专业的情绪分析专家。请分析用户提供的的日记内容,并返回以下信息:
# 1. 主要情绪(积极、消极、中性或其他具体情绪)
# 2. 情绪分析说明(简要解释为什么是这种情绪)
# 3. 情绪得分以JSON格式返回各种情绪的百分比
# 4. 三条适合的建议或回应
# 5.500字
#
# 请将结果以JSON格式返回包含dominant_emotion、analysis、emotion_scores和suggestion四个字段。
# """
#
# response = client.chat.completions.create(
# model="deepseek-chat",
# messages=[
# {"role": "system", "content": system_prompt},
# {"role": "user", "content": content}
# ],
# stream=False
# )
#
# result = json.loads(response.choices[0].message.content)
# return result
#
# except Exception as e:
# print(f"情绪分析出错: {str(e)}")
# return {
# "error": f"分析失败: {str(e)}",
# "dominant_emotion": "未知",
# "analysis": "无法分析情绪",
# "emotion_scores": {"未知": 1.0},
# "suggestion": "请尝试重新输入内容"
# }
# 保存周报
@app.route('/save_weekly_report', methods=['POST'])
@login_required
def save_weekly_report():
data = request.get_json()
if not data or 'title' not in data or 'content' not in data:
return jsonify({"error": "缺少必要参数"}), 400
title = data['title']
content = data['content']
diary_count = data.get('diary_count', 1)
date_range = data.get('date_range', '')
conn = sqlite3.connect('database.db')
c = conn.cursor()
try:
c.execute('''INSERT INTO weekly_reports
(user_id, title, content, diary_count, date_range)
VALUES (?, ?, ?, ?, ?)''',
(session['user_id'], title, content, diary_count, date_range))
conn.commit()
report_id = c.lastrowid
return jsonify({
"success": True,
"message": "报告已成功保存!",
"report_id": report_id
})
except Exception as e:
print(f"保存周报失败: {str(e)}")
return jsonify({"error": "保存报告失败"}), 500
finally:
conn.close()
# 获取用户的所有周报
@app.route('/get_weekly_reports')
@login_required
def get_weekly_reports():
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('''SELECT id, title, content, diary_count, date_range, created_at
FROM weekly_reports
WHERE user_id = ?
ORDER BY created_at DESC''',
(session['user_id'],))
reports = c.fetchall()
conn.close()
formatted_reports = []
for report in reports:
created_at = datetime.strptime(report[5], '%Y-%m-%d %H:%M:%S')
formatted_reports.append({
'id': report[0],
'title': report[1],
'content': report[2],
'diary_count': report[3],
'date_range': report[4],
'created_at': created_at.strftime('%Y-%m-%d %H:%M')
})
return jsonify(formatted_reports)
# 获取单个周报详情
@app.route('/get_weekly_report/<int:report_id>')
@login_required
def get_weekly_report(report_id):
conn = sqlite3.connect('database.db')
c = conn.cursor()
# 验证报告属于当前用户
c.execute('''SELECT id, title, content, diary_count, date_range, created_at
FROM weekly_reports
WHERE id = ? AND user_id = ?''',
(report_id, session['user_id']))
report = c.fetchone()
conn.close()
if not report:
return jsonify({"error": "报告不存在或您没有权限访问"}), 404
created_at = datetime.strptime(report[5], '%Y-%m-%d %H:%M:%S')
return jsonify({
'id': report[0],
'title': report[1],
'content': report[2],
'diary_count': report[3],
'date_range': report[4],
'created_at': created_at.strftime('%Y-%m-%d %H:%M')
})
# 删除周报
@app.route('/delete_weekly_report/<int:report_id>', methods=['DELETE'])
@login_required
def delete_weekly_report(report_id):
conn = sqlite3.connect('database.db')
c = conn.cursor()
# 验证报告属于当前用户
c.execute('SELECT id FROM weekly_reports WHERE id = ? AND user_id = ?',
(report_id, session['user_id']))
report = c.fetchone()
if not report:
conn.close()
return jsonify({"error": "报告不存在或您没有权限删除"}), 404
try:
c.execute('DELETE FROM weekly_reports WHERE id = ?', (report_id,))
conn.commit()
conn.close()
return jsonify({"success": True, "message": "报告已删除"})
except Exception as e:
print(f"删除周报失败: {str(e)}")
conn.close()
return jsonify({"error": "删除报告失败"}), 500
# 生成周报分析(流式版本)
@app.route('/generate_weekly_report_stream', methods=['POST'])
@login_required
def generate_weekly_report_stream():
data = request.get_json()
if not data or 'content' not in data:
return jsonify({"error": "请提供日记内容"}), 400
try:
diary_count = data.get('diary_count', 1)
content = data['content']
# 调用DeepSeek API生成流式情绪分析
system_prompt = f"""
请根据用户提供的{diary_count}篇日记内容,生成一段详细的情感分析和建议。
要求:
1. 详细分析控制在500-800字左右
2. 包含情绪状态分析、趋势观察和实用建议
3. 语言温暖、积极、有建设性
4. 使用完整的标点符号,确保内容通顺
5. 直接给出分析结果,不要有前言后语
请直接返回分析文本内容。
"""
def generate():
try:
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"请分析以下日记内容:\n\n{content}"}
],
stream=True,
max_tokens=2000
)
for chunk in response:
if chunk.choices[0].delta.content is not None:
yield f"data: {json.dumps({'content': chunk.choices[0].delta.content})}\n\n"
except Exception as e:
print(f"流式生成周报分析出错: {str(e)}")
# 返回错误信息
error_msg = "分析过程中出现错误,请稍后重试。"
yield f"data: {json.dumps({'content': error_msg})}\n\n"
return app.response_class(generate(), mimetype='text/plain')
except Exception as e:
print(f"初始化流式生成周报分析出错: {str(e)}")
# 返回一个默认的错误响应
def error_generator():
error_msg = "分析服务暂时不可用,请稍后重试。"
yield f"data: {json.dumps({'content': error_msg})}\n\n"
return app.response_class(error_generator(), mimetype='text/plain')
# 生成周报分析(流式版本)- 改进版
# @app.route('/generate_weekly_report_stream', methods=['POST'])
# @login_required
# def generate_weekly_report_stream():
# data = request.get_json()
#
# if not data or 'content' not in data:
# return jsonify({"error": "请提供日记内容"}), 400
#
# try:
# diary_count = data.get('diary_count', 1)
# content = data['content']
#
# # 改进的系统提示词,要求结构化和详细的分析
# system_prompt = f"""
# 你是一个专业的心理咨询师和情绪分析专家。请根据用户提供的{diary_count}篇日记内容,生成一份详细、结构化的情绪分析周报。
#
# 请按照以下JSON格式返回结果确保分析和建议都详细且有逻辑性
# {{
# "emotional_tags": ["标签1", "标签2", "标签3", "标签4", "标签5"],
# "emotional_trend": "上升/下降/平稳/波动",
# "main_emotion": "主要情绪名称",
# "intensity_score": 数字1-10,
# "analysis_content": \"\"\"
# 【情绪状态概述】
# 这里写总体情绪状态的概括性描述约100字。
#
# 【具体表现分析】
# 这里写具体的情绪表现分析包括积极情绪和消极情绪的具体表现约200字。
#
# 【情绪变化规律】
# 这里写情绪变化的规律和模式分析约150字。
#
# 【潜在关注点】
# 这里写需要关注的情绪问题或潜在风险约100字。
#
# 【改善建议方向】
# 这里写总体的改善方向和建议框架约100字。
# \"\"\",
# "suggestions": [
# "第一条具体可行的建议,包含具体的行动步骤和时间安排",
# "第二条具体可行的建议,包含具体的行动步骤和时间安排",
# "第三条具体可行的建议,包含具体的行动步骤和时间安排",
# "第四条具体可行的建议,包含具体的行动步骤和时间安排",
# "第五条具体可行的建议,包含具体的行动步骤和时间安排"
# ]
# }}
#
# 要求:
# 1. emotional_tags提取5个最精准的情绪标签
# 2. analysis_content使用清晰的分段结构每个部分用【】标注总共约650字
# 3. suggestions提供5条具体、可行、详细的建议每条建议都包含具体的行动指南
# 4. 语言专业但温暖,有建设性
#
# 请确保返回纯JSON格式。
# """
#
# def generate():
# try:
# response = client.chat.completions.create(
# model="deepseek-chat",
# messages=[
# {"role": "system", "content": system_prompt},
# {"role": "user", "content": f"请分析以下{diary_count}篇日记内容:\n\n{content}"}
# ],
# stream=False,
# max_tokens=3000
# )
#
# full_response = response.choices[0].message.content.strip()
# print("AI原始响应:", full_response)
#
# # 清理响应提取JSON部分
# json_start = full_response.find('{')
# json_end = full_response.rfind('}') + 1
#
# if json_start >= 0 and json_end > json_start:
# json_str = full_response[json_start:json_end]
# try:
# parsed_data = json.loads(json_str)
#
# # 先发送情绪标签和指标
# yield f"data: {json.dumps({'type': 'emotional_metrics', 'data': {
# 'emotional_tags': parsed_data.get('emotional_tags', []),
# 'main_emotion': parsed_data.get('main_emotion', '平静'),
# 'intensity_score': parsed_data.get('intensity_score', 5),
# 'emotional_trend': parsed_data.get('emotional_trend', '平稳')
# }})}\n\n"
#
# # 流式发送分析内容
# analysis_content = parsed_data.get('analysis_content', '')
# if analysis_content:
# for i in range(0, len(analysis_content), 5):
# chunk = analysis_content[i:i + 5]
# yield f"data: {json.dumps({'type': 'stream_content', 'content': chunk})}\n\n"
#
# # 流式发送建议内容
# suggestions = parsed_data.get('suggestions', [])
# if suggestions:
# suggestions_text = "\n\n".join(
# [f"{i + 1}. {suggestion}" for i, suggestion in enumerate(suggestions)])
# for i in range(0, len(suggestions_text), 5):
# chunk = suggestions_text[i:i + 5]
# yield f"data: {json.dumps({'type': 'suggestions_stream', 'content': chunk})}\n\n"
#
# # 最后发送完整数据
# yield f"data: {json.dumps({'type': 'structured', 'data': parsed_data})}\n\n"
#
# except json.JSONDecodeError as e:
# print(f"JSON解析错误: {e}")
# #default_data = create_default_analysis(content, diary_count)
# #yield f"data: {json.dumps({'type': 'structured', 'data': default_data})}\n\n"
#
#
#
#
# except Exception as e:
# print(f"生成周报分析出错: {str(e)}")
# #default_data = create_default_analysis(content, diary_count)
# #yield f"data: {json.dumps({'type': 'structured', 'data': default_data})}\n\n"
#
# return app.response_class(generate(), mimetype='text/plain')
#
# except Exception as e:
# print(f"初始化流式生成周报分析出错: {str(e)}")
# 其他页面路由
@app.route('/emotionanalyzer')
@login_required
def emotionanalyzer():
return render_template('emotionanalyzer.html')
# 日记管理功能
@app.route('/diary', methods=['GET', 'POST'])
@login_required
def diary():
if request.method == 'POST':
# 创建或更新日记
title = request.form['title']
# 使用HTML内容而不是普通文本
content = request.form.get('content-html', request.form.get('content', ''))
tags = request.form.get('tags', '')
diary_id = request.form.get('diary_id')
# 检查内容是否为空移除HTML标签后检查
if not re.sub(r'<[^>]+>', '', content).strip():
flash('日记内容不能为空。', 'error')
return redirect(url_for('diary_list'))
conn = sqlite3.connect('database.db')
c = conn.cursor()
if diary_id:
# 更新现有日记
c.execute('''UPDATE diaries
SET title = ?, content = ?, tags = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ? AND user_id = ?''',
(title, content, tags, diary_id, session['user_id']))
flash('日记已更新!', 'success')
else:
# 创建新日记
c.execute('''INSERT INTO diaries (user_id, title, content, tags)
VALUES (?, ?, ?, ?)''',
(session['user_id'], title, content, tags))
flash('日记已保存!', 'success')
conn.commit()
conn.close()
return redirect(url_for('diary_list'))
# GET 请求,显示日记编辑页面
diary_id = request.args.get('id')
diary_data = None
if diary_id:
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('SELECT id, title, content, tags FROM diaries WHERE id = ? AND user_id = ?',
(diary_id, session['user_id']))
diary_data = c.fetchone()
conn.close()
if not diary_data:
flash('您没有权限访问此日记!', 'error')
return redirect(url_for('diary_list'))
# 添加支持编辑模式下的HTML内容显示
return render_template('diary.html', diary=diary_data, content_as_html=True)
# 日记列表
@app.route('/diary_list')
@login_required
def diary_list():
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('''SELECT id, title, content, tags, created_at, updated_at
FROM diaries
WHERE user_id = ?
ORDER BY updated_at DESC''',
(session['user_id'],))
diaries = c.fetchall()
conn.close()
return render_template('diary_list.html', diaries=diaries)
# 删除日记
@app.route('/diary_delete/<int:diary_id>')
@login_required
def diary_delete(diary_id):
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('DELETE FROM diaries WHERE id = ? AND user_id = ?',
(diary_id, session['user_id']))
conn.commit()
conn.close()
flash('日记已删除!', 'success')
return redirect(url_for('diary_list'))
# 日记查看功能
@app.route('/diary_view/<int:diary_id>')
@login_required
def diary_view(diary_id):
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('SELECT id, title, content, tags FROM diaries WHERE id = ? AND user_id = ?',
(diary_id, session['user_id']))
diary_data = c.fetchone()
conn.close()
if not diary_data:
flash('您没有权限访问此日记!', 'error')
return redirect(url_for('diary_list'))
# 直接使用HTML内容显示如果内容已经是HTML格式
diary_html = diary_data
return render_template('diary.html', diary=diary_html, view_only=True, content_as_html=True)
# 保存情绪分析结果到数据库
@app.route('/save_emotion_analysis', methods=['POST'])
@login_required
def save_emotion_analysis():
data = request.get_json()
if not data or 'diary_id' not in data or 'analysis_result' not in data:
return jsonify({"error": "缺少必要参数"}), 400
diary_id = data['diary_id']
analysis_result = data['analysis_result']
# 验证日记属于当前用户
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('SELECT id FROM diaries WHERE id = ? AND user_id = ?',
(diary_id, session['user_id']))
diary = c.fetchone()
if not diary:
conn.close()
return jsonify({"error": "您没有权限访问此日记"}), 403
# 插入新的情绪分析结果
c.execute('''INSERT INTO emotion_analyses
(diary_id, dominant_emotion, analysis, emotion_scores, suggestion)
VALUES (?, ?, ?, ?, ?)''',
(diary_id,
analysis_result.get('dominant_emotion', '未知'),
analysis_result.get('analysis', ''),
json.dumps(analysis_result.get('emotion_scores', {})),
analysis_result.get('suggestion', '')))
# 检查并限制分析结果数量不超过99条
c.execute('''SELECT COUNT(*) FROM emotion_analyses WHERE diary_id = ?''', (diary_id,))
count = c.fetchone()[0]
if count > 99:
# 删除最旧的记录
c.execute('''DELETE FROM emotion_analyses
WHERE diary_id = ?
AND id IN (SELECT id FROM emotion_analyses
WHERE diary_id = ?
ORDER BY created_at ASC
LIMIT ?)''',
(diary_id, diary_id, count - 99))
conn.commit()
conn.close()
return jsonify({"success": True})
# 获取日记的情绪分析结果
@app.route('/get_emotion_analysis/<int:diary_id>')
@login_required
def get_emotion_analysis(diary_id):
conn = sqlite3.connect('database.db')
c = conn.cursor()
# 验证日记属于当前用户
c.execute('SELECT id FROM diaries WHERE id = ? AND user_id = ?',
(diary_id, session['user_id']))
diary = c.fetchone()
if not diary:
conn.close()
return jsonify({"error": "您没有权限访问此日记"}), 403
c.execute('''SELECT dominant_emotion, analysis, emotion_scores, suggestion
FROM emotion_analyses WHERE diary_id = ?''',
(diary_id,))
analysis = c.fetchone()
conn.close()
if not analysis:
return jsonify({"exists": False})
return jsonify({
"exists": True,
"dominant_emotion": analysis[0],
"analysis": analysis[1],
"emotion_scores": json.loads(analysis[2]),
"suggestion": analysis[3]
})
# 获取用户日记列表数据(用于情绪分析页面下拉选择)
@app.route('/diary_list_data')
@login_required
def diary_list_data():
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute('''SELECT id, title, created_at
FROM diaries
WHERE user_id = ?
ORDER BY updated_at DESC''',
(session['user_id'],))
diaries = c.fetchall()
conn.close()
# 格式化日期时间
formatted_diaries = []
for diary in diaries:
created_at = datetime.strptime(diary[2], '%Y-%m-%d %H:%M:%S')
formatted_diaries.append({
'id': diary[0],
'title': diary[1],
'created_at': created_at.strftime('%Y-%m-%d')
})
return jsonify(formatted_diaries)
# 获取单篇日记内容(用于情绪分析页面加载)
@app.route('/get_diary/<int:diary_id>')
@login_required
def get_diary(diary_id):
conn = sqlite3.connect('database.db')
c = conn.cursor()
# 验证日记属于当前用户
c.execute('SELECT id, title, content FROM diaries WHERE id = ? AND user_id = ?',
(diary_id, session['user_id']))
diary = c.fetchone()
conn.close()
if not diary:
return jsonify({"error": "您没有权限访问此日记"}), 403
return jsonify({
'id': diary[0],
'title': diary[1],
'content': diary[2]
})
# 插入的卡片
@app.route('/get_user_cards')
@login_required
def get_user_cards():
try:
user_id = session['user_id']
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("""
SELECT card_name, card_type, card_image_url
FROM collected_cards
WHERE user_id = ?
ORDER BY collected_at DESC
""", (user_id,))
cards = c.fetchall()
conn.close()
# 转换为前端需要的格式
user_cards = []
for card in cards:
user_cards.append({
"name": card[0], # card_name
"directory": card[1], # card_type
"imageUrl": card[2] # card_image_url
})
return jsonify(user_cards)
except Exception as e:
print(f"获取用户卡片失败: {str(e)}")
return jsonify([])
# 卡册页面
@app.route('/chuka')
@login_required
def chuka():
return render_template('chuka.html')
# 抽卡结果展示
@app.route('/showcard')
@login_required
def showcard():
return render_template('showcard.html')
# 获取所有卡片
def get_all_cards():
try:
imgs_root = os.path.join(app.root_path, 'static', 'imgs')
if not os.path.exists(imgs_root):
return []
all_cards = []
image_exts = ['.jpg', '.jpeg', '.png', '.gif'] # 支持的图片格式
# 遍历imgs下的子文件夹
for dirpath, dirnames, filenames in os.walk(imgs_root):
# 跳过根目录,只处理子文件夹
if dirpath == imgs_root:
continue
# 分类名 = 子文件夹名
category = os.path.basename(dirpath)
# 遍历文件夹下的所有图片,生成卡片信息
for filename in filenames:
if any(filename.lower().endswith(ext) for ext in image_exts):
card_name = os.path.splitext(filename)[0] # 卡片去扩展名
image_url = f"/static/imgs/{category}/{filename}" # 前端可访问的图片路径
all_cards.append({
"name": card_name,
"directory": category, # 分类
"imageUrl": image_url
})
return all_cards
except Exception as e:
print(f"获取卡片列表失败:{e}")
return []
# 抽卡接口
@app.route('/draw_card', methods=['POST'])
@login_required
def draw_card():
user_id = session['user_id']
# 获取系统中所有卡片(按分类)
all_cards = get_all_cards()
if not all_cards:
return jsonify({"error": "无可用卡片", "is_full": False}), 500
# 查询用户已拥有的卡片
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("""
SELECT card_name, card_type
FROM collected_cards
WHERE user_id = ?
""", (user_id,))
owned_cards = set((row[1], row[0]) for row in c.fetchall())
conn.close()
# 没有的卡片
unowned_cards = [
card for card in all_cards
if (card['directory'], card['name']) not in owned_cards
]
if not unowned_cards:
# 已收集所有卡片
return jsonify({"error": "你已经收集所有卡片啦!", "is_full": True}), 200
# 随机抽取一张未拥有的卡片
drawn_card = random.choice(unowned_cards)
# 存入
conn = sqlite3.connect('database.db')
c = conn.cursor()
c.execute("""
INSERT INTO collected_cards
(user_id, card_name, card_type, card_content, card_image_url)
VALUES (?, ?, ?, ?, ?)
""", (
user_id,
drawn_card['name'], # 卡片名
drawn_card['directory'], # 分类
f"获得{drawn_card['directory']}系列卡片:{drawn_card['name']}", # 卡片描述
drawn_card['imageUrl'] # 图片路径
))
conn.commit()
conn.close()
# 返回抽中结果
return jsonify({
"success": True,
"drawn_card": drawn_card # 包含name、directory、imageUrl
})
# 新增:按分类获取用户卡片
@app.route('/get_cards_by_category')
@login_required
def get_cards_by_category():
user_id = session['user_id']
conn = sqlite3.connect('database.db')
c = conn.cursor()
# 查询用户卡片按分类card_type=文件夹名)分组
c.execute("""
SELECT card_type, card_name, card_image_url, collected_at
FROM collected_cards
WHERE user_id = ?
ORDER BY card_type, collected_at DESC
""", (user_id,))
cards = c.fetchall()
conn.close()
# 按分类整理数据:{分类名: [卡片列表]}
category_cards = {}
for card in cards:
category = card[0] # 分类名(文件夹名)
if category not in category_cards:
category_cards[category] = []
# 格式化卡片信息(去掉稀有度)
collected_at = datetime.strptime(card[3], '%Y-%m-%d %H:%M:%S')
category_cards[category].append({
"name": card[1],
"imageUrl": card[2],
"collectedAt": collected_at.strftime('%Y-%m-%d') # 收集时间(可选)
})
return jsonify(category_cards)
# 初始化数据库
init_db()
if __name__ == '__main__':
app.run(debug=True, port=5000)