from flask import Flask, send_from_directory, jsonify, request, session from flask_cors import CORS from flask_sqlalchemy import SQLAlchemy from werkzeug.security import generate_password_hash, check_password_hash from flask_socketio import SocketIO, emit, join_room, leave_room import os import uuid from datetime import datetime app = Flask(__name__, static_folder='../frontend/dist') app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_goldminer') app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///goldminer.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False CORS(app, supports_credentials=True) # Enable CORS with credentials db = SQLAlchemy(app) socketio = SocketIO(app, cors_allowed_origins="*") # 用户模型 class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True, nullable=False) password_hash = db.Column(db.String(200), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) last_login = db.Column(db.DateTime, nullable=True) high_score = db.Column(db.Integer, default=0) def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) # 游戏历史记录模型 class GameHistory(db.Model): id = db.Column(db.Integer, primary_key=True) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) score = db.Column(db.Integer, nullable=False) level_reached = db.Column(db.Integer, nullable=False) duration = db.Column(db.Integer, nullable=False) # 游戏时长(秒) gold_earned = db.Column(db.Integer, nullable=False) # 获得的金币 created_at = db.Column(db.DateTime, default=datetime.utcnow) # 建立与用户的关系 user = db.relationship('User', backref=db.backref('game_histories', lazy=True)) # 聊天室模型 class ChatRoom(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) is_active = db.Column(db.Boolean, default=True) # 建立与用户的关系 creator = db.relationship('User', backref=db.backref('created_rooms', lazy=True)) # 聊天消息模型 class ChatMessage(db.Model): id = db.Column(db.Integer, primary_key=True) room_id = db.Column(db.Integer, db.ForeignKey('chat_room.id'), nullable=False) user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False) message = db.Column(db.Text, nullable=False) created_at = db.Column(db.DateTime, default=datetime.utcnow) # 建立关系 room = db.relationship('ChatRoom', backref=db.backref('messages', lazy=True)) user = db.relationship('User', backref=db.backref('messages', lazy=True)) # 创建数据库表 with app.app_context(): db.create_all() # 注册API @app.route('/api/register', methods=['POST']) def register(): data = request.get_json() # 检查必要的字段 if not data or not data.get('username') or not data.get('password'): return jsonify({'error': '用户名和密码是必填项'}), 400 # 检查用户名是否已存在 if User.query.filter_by(username=data['username']).first(): return jsonify({'error': '用户名已存在'}), 400 # 创建新用户 user = User(username=data['username']) user.set_password(data['password']) db.session.add(user) db.session.commit() return jsonify({'message': '注册成功', 'username': user.username}), 201 # 登录API @app.route('/api/login', methods=['POST']) def login(): data = request.get_json() # 检查必要的字段 if not data or not data.get('username') or not data.get('password'): return jsonify({'error': '用户名和密码是必填项'}), 400 # 查找用户 user = User.query.filter_by(username=data['username']).first() # 检查密码 if user and user.check_password(data['password']): # 生成会话ID session_id = str(uuid.uuid4()) session['user_id'] = user.id session['session_id'] = session_id # 更新最后登录时间 user.last_login = datetime.utcnow() db.session.commit() return jsonify({ 'message': '登录成功', 'user': { 'id': user.id, 'username': user.username, 'high_score': user.high_score }, 'session_id': session_id }) return jsonify({'error': '用户名或密码不正确'}), 401 # 登出API @app.route('/api/logout', methods=['POST']) def logout(): session.clear() return jsonify({'message': '已成功登出'}) # 获取当前用户信息 @app.route('/api/user', methods=['GET']) def get_user(): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 user = User.query.get(user_id) if not user: session.clear() return jsonify({'error': '用户不存在'}), 401 return jsonify({ 'user': { 'id': user.id, 'username': user.username, 'high_score': user.high_score } }) # 更新高分 @app.route('/api/update_score', methods=['POST']) def update_score(): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 data = request.get_json() if not data or 'score' not in data: return jsonify({'error': '缺少分数参数'}), 400 user = User.query.get(user_id) if not user: return jsonify({'error': '用户不存在'}), 404 # 只有新分数更高时才更新 if data['score'] > user.high_score: user.high_score = data['score'] db.session.commit() return jsonify({'message': '高分已更新', 'high_score': user.high_score}) return jsonify({'message': '分数未超过历史最高分', 'high_score': user.high_score}) # 保存游戏历史记录 @app.route('/api/save_game_history', methods=['POST']) def save_game_history(): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 data = request.get_json() if not data or 'score' not in data: return jsonify({'error': '缺少必要参数'}), 400 # 创建新的游戏历史记录 game_history = GameHistory( user_id=user_id, score=data.get('score', 0), level_reached=data.get('level', 1), duration=data.get('duration', 0), gold_earned=data.get('goldEarned', 0) ) db.session.add(game_history) db.session.commit() return jsonify({ 'message': '游戏历史记录已保存', 'history_id': game_history.id }), 201 # 获取用户游戏历史记录 @app.route('/api/game_history', methods=['GET']) def get_game_history(): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 # 获取分页参数 page = request.args.get('page', 1, type=int) per_page = request.args.get('per_page', 10, type=int) # 查询用户的游戏历史记录,按时间降序排序 history_query = GameHistory.query.filter_by(user_id=user_id).order_by(GameHistory.created_at.desc()) # 分页 history_paginated = history_query.paginate(page=page, per_page=per_page, error_out=False) history_records = [] for record in history_paginated.items: history_records.append({ 'id': record.id, 'score': record.score, 'level_reached': record.level_reached, 'duration': record.duration, 'gold_earned': record.gold_earned, 'created_at': record.created_at.isoformat() }) return jsonify({ 'history': history_records, 'total': history_paginated.total, 'pages': history_paginated.pages, 'current_page': history_paginated.page }) # 获取排行榜 @app.route('/api/leaderboard', methods=['GET']) def get_leaderboard(): # 获取前10名高分用户 top_users = User.query.order_by(User.high_score.desc()).limit(10).all() leaderboard = [] for user in top_users: leaderboard.append({ 'username': user.username, 'high_score': user.high_score, 'last_login': user.last_login.isoformat() if user.last_login else None }) return jsonify({'leaderboard': leaderboard}) # 健康检查API @app.route('/api/health') def health_check(): return jsonify({"status": "ok"}) # 聊天室API @app.route('/api/chat/rooms', methods=['GET']) def get_chat_rooms(): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 # 获取所有活跃的聊天室 rooms = ChatRoom.query.filter_by(is_active=True).order_by(ChatRoom.created_at.desc()).all() room_list = [] for room in rooms: creator = User.query.get(room.creator_id) room_list.append({ 'id': room.id, 'name': room.name, 'creator': creator.username if creator else 'Unknown', 'created_at': room.created_at.isoformat() }) return jsonify({'rooms': room_list}) @app.route('/api/chat/rooms', methods=['POST']) def create_chat_room(): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 data = request.get_json() if not data or not data.get('name'): return jsonify({'error': '聊天室名称是必填项'}), 400 # 创建新聊天室 room = ChatRoom( name=data['name'], creator_id=user_id ) db.session.add(room) db.session.commit() return jsonify({ 'message': '聊天室创建成功', 'room': { 'id': room.id, 'name': room.name } }), 201 @app.route('/api/chat/rooms//messages', methods=['GET']) def get_room_messages(room_id): user_id = session.get('user_id') if not user_id: return jsonify({'error': '未登录'}), 401 # 检查聊天室是否存在 room = ChatRoom.query.get(room_id) if not room: return jsonify({'error': '聊天室不存在'}), 404 # 获取该聊天室的最近消息 messages = ChatMessage.query.filter_by(room_id=room_id).order_by(ChatMessage.created_at).all() message_list = [] for msg in messages: user = User.query.get(msg.user_id) message_list.append({ 'id': msg.id, 'user_id': msg.user_id, 'username': user.username if user else 'Unknown', 'message': msg.message, 'created_at': msg.created_at.isoformat() }) return jsonify({'messages': message_list}) # 静态文件服务 @app.route('/') def serve(): return send_from_directory(app.static_folder, 'index.html') @app.route('/') def static_proxy(path): return send_from_directory(app.static_folder, path) # WebSocket事件处理 @socketio.on('connect') def handle_connect(): user_id = session.get('user_id') if not user_id: return False # 拒绝未登录用户的连接 print(f'Client connected: {request.sid}') @socketio.on('disconnect') def handle_disconnect(): print(f'Client disconnected: {request.sid}') @socketio.on('join_room') def handle_join_room(data): user_id = session.get('user_id') if not user_id: return room_id = data.get('room_id') if not room_id: return # 获取用户和房间信息 user = User.query.get(user_id) room = ChatRoom.query.get(room_id) if not user or not room: return # 加入房间 join_room(str(room_id)) # 通知房间内的其他用户 emit('user_joined', { 'user_id': user_id, 'username': user.username, 'message': f'{user.username} 加入了聊天室' }, room=str(room_id)) @socketio.on('leave_room') def handle_leave_room(data): user_id = session.get('user_id') if not user_id: return room_id = data.get('room_id') if not room_id: return # 获取用户信息 user = User.query.get(user_id) if not user: return # 离开房间 leave_room(str(room_id)) # 通知房间内的其他用户 emit('user_left', { 'user_id': user_id, 'username': user.username, 'message': f'{user.username} 离开了聊天室' }, room=str(room_id)) @socketio.on('send_message') def handle_send_message(data): user_id = session.get('user_id') if not user_id: return room_id = data.get('room_id') message_text = data.get('message') if not room_id or not message_text: return # 获取用户和房间信息 user = User.query.get(user_id) room = ChatRoom.query.get(room_id) if not user or not room: return # 保存消息到数据库 message = ChatMessage( room_id=room_id, user_id=user_id, message=message_text ) db.session.add(message) db.session.commit() # 广播消息给房间内的所有用户 emit('new_message', { 'id': message.id, 'user_id': user_id, 'username': user.username, 'message': message_text, 'created_at': message.created_at.isoformat() }, room=str(room_id)) if __name__ == '__main__': socketio.run(app, debug=True, host='0.0.0.0')