You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

487 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from flask import Flask, send_from_directory, jsonify, request, session
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_socketio import SocketIO, emit, join_room, leave_room
import os
import uuid
from datetime import datetime
from sqlalchemy.exc import OperationalError
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = Flask(__name__, static_folder='../frontend/dist')
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_goldminer')
# 云端MySQL数据库配置
DB_USER = os.environ.get('DB_USER', 'goldminer')
DB_PASSWORD = os.environ.get('DB_PASSWORD', 'nBAWq9DDwJ14Fugq')
DB_HOST = os.environ.get('DB_HOST', 'mysql2.sqlpub.com')
DB_PORT = os.environ.get('DB_PORT', '3307')
DB_NAME = os.environ.get('DB_NAME', 'goldminer')
# 构建MySQL连接字符串包含端口
app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True # 开启SQL查询日志方便调试
logger.info(f"连接到云端数据库: {DB_HOST}:{DB_PORT}/{DB_NAME}")
CORS(app, supports_credentials=True, origins=['*']) # Enable CORS with credentials for all origins
db = SQLAlchemy(app)
socketio = SocketIO(app, cors_allowed_origins="*", engineio_logger=True)
# 用户模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime, nullable=True)
high_score = db.Column(db.Integer, default=0)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
# 游戏历史记录模型
class GameHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
score = db.Column(db.Integer, nullable=False)
level_reached = db.Column(db.Integer, nullable=False)
duration = db.Column(db.Integer, nullable=False) # 游戏时长(秒)
gold_earned = db.Column(db.Integer, nullable=False) # 获得的金币
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 建立与用户的关系
user = db.relationship('User', backref=db.backref('game_histories', lazy=True))
# 聊天室模型
class ChatRoom(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_active = db.Column(db.Boolean, default=True)
# 建立与用户的关系
creator = db.relationship('User', backref=db.backref('created_rooms', lazy=True))
# 聊天消息模型
class ChatMessage(db.Model):
id = db.Column(db.Integer, primary_key=True)
room_id = db.Column(db.Integer, db.ForeignKey('chat_room.id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
message = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 建立关系
room = db.relationship('ChatRoom', backref=db.backref('messages', lazy=True))
user = db.relationship('User', backref=db.backref('messages', lazy=True))
# 检查数据库连接并创建表
def init_db():
with app.app_context():
try:
logger.info("正在连接到云端MySQL数据库...")
# 尝试创建表结构(如果不存在)
db.create_all()
logger.info("数据库连接成功,并且所有表都已创建(如果不存在)。")
except OperationalError as e:
logger.error("无法连接到云端MySQL数据库。请检查您的数据库配置和网络连接。")
logger.error(f"错误详情: {e}")
logger.error("请确保:")
logger.error(f"1. 能够访问数据库服务器 {DB_HOST}:{DB_PORT}")
logger.error(f"2. 数据库 '{DB_NAME}' 存在且用户 '{DB_USER}' 有权限访问。")
logger.error(f"3. 提供的密码是正确的。")
# 这里不会阻止应用启动,但会记录错误
except Exception as e:
logger.error(f"初始化数据库时发生未知错误: {e}")
# 注册API
@app.route('/api/register', methods=['POST'])
def register():
data = request.get_json()
# 检查必要的字段
if not data or not data.get('username') or not data.get('password'):
return jsonify({'error': '用户名和密码是必填项'}), 400
# 检查用户名是否已存在
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': '用户名已存在'}), 400
# 创建新用户
user = User(username=data['username'])
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({'message': '注册成功', 'username': user.username}), 201
# 登录API
@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
# 检查必要的字段
if not data or not data.get('username') or not data.get('password'):
return jsonify({'error': '用户名和密码是必填项'}), 400
# 查找用户
user = User.query.filter_by(username=data['username']).first()
# 检查密码
if user and user.check_password(data['password']):
# 生成会话ID
session_id = str(uuid.uuid4())
session['user_id'] = user.id
session['session_id'] = session_id
# 更新最后登录时间
user.last_login = datetime.utcnow()
db.session.commit()
return jsonify({
'message': '登录成功',
'user': {
'id': user.id,
'username': user.username,
'high_score': user.high_score
},
'session_id': session_id
})
return jsonify({'error': '用户名或密码不正确'}), 401
# 登出API
@app.route('/api/logout', methods=['POST'])
def logout():
session.clear()
return jsonify({'message': '已成功登出'})
# 获取当前用户信息
@app.route('/api/user', methods=['GET'])
def get_user():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
user = User.query.get(user_id)
if not user:
session.clear()
return jsonify({'error': '用户不存在'}), 401
return jsonify({
'user': {
'id': user.id,
'username': user.username,
'high_score': user.high_score
}
})
# 更新高分
@app.route('/api/update_score', methods=['POST'])
def update_score():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
data = request.get_json()
if not data or 'score' not in data:
return jsonify({'error': '缺少分数参数'}), 400
user = User.query.get(user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
# 只有新分数更高时才更新
if data['score'] > user.high_score:
user.high_score = data['score']
db.session.commit()
return jsonify({'message': '高分已更新', 'high_score': user.high_score})
return jsonify({'message': '分数未超过历史最高分', 'high_score': user.high_score})
# 保存游戏历史记录
@app.route('/api/save_game_history', methods=['POST'])
def save_game_history():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
data = request.get_json()
if not data or 'score' not in data:
return jsonify({'error': '缺少必要参数'}), 400
# 创建新的游戏历史记录
game_history = GameHistory(
user_id=user_id,
score=data.get('score', 0),
level_reached=data.get('level', 1),
duration=data.get('duration', 0),
gold_earned=data.get('goldEarned', 0)
)
db.session.add(game_history)
db.session.commit()
return jsonify({
'message': '游戏历史记录已保存',
'history_id': game_history.id
}), 201
# 获取用户游戏历史记录
@app.route('/api/game_history', methods=['GET'])
def get_game_history():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
# 获取分页参数
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 查询用户的游戏历史记录,按时间降序排序
history_query = GameHistory.query.filter_by(user_id=user_id).order_by(GameHistory.created_at.desc())
# 分页
history_paginated = history_query.paginate(page=page, per_page=per_page, error_out=False)
history_records = []
for record in history_paginated.items:
history_records.append({
'id': record.id,
'score': record.score,
'level_reached': record.level_reached,
'duration': record.duration,
'gold_earned': record.gold_earned,
'created_at': record.created_at.isoformat()
})
return jsonify({
'history': history_records,
'total': history_paginated.total,
'pages': history_paginated.pages,
'current_page': history_paginated.page
})
# 获取排行榜
@app.route('/api/leaderboard', methods=['GET'])
def get_leaderboard():
# 获取前10名高分用户
top_users = User.query.order_by(User.high_score.desc()).limit(10).all()
leaderboard = []
for user in top_users:
leaderboard.append({
'username': user.username,
'high_score': user.high_score,
'last_login': user.last_login.isoformat() if user.last_login else None
})
return jsonify({'leaderboard': leaderboard})
# 健康检查API
@app.route('/api/health')
def health_check():
return jsonify({"status": "ok"})
# 聊天室API
@app.route('/api/chat/rooms', methods=['GET'])
def get_chat_rooms():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
# 获取所有活跃的聊天室
rooms = ChatRoom.query.filter_by(is_active=True).order_by(ChatRoom.created_at.desc()).all()
room_list = []
for room in rooms:
creator = User.query.get(room.creator_id)
room_list.append({
'id': room.id,
'name': room.name,
'creator': creator.username if creator else 'Unknown',
'created_at': room.created_at.isoformat()
})
return jsonify({'rooms': room_list})
@app.route('/api/chat/rooms', methods=['POST'])
def create_chat_room():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
data = request.get_json()
if not data or not data.get('name'):
return jsonify({'error': '聊天室名称是必填项'}), 400
# 创建新聊天室
room = ChatRoom(
name=data['name'],
creator_id=user_id
)
db.session.add(room)
db.session.commit()
return jsonify({
'message': '聊天室创建成功',
'room': {
'id': room.id,
'name': room.name
}
}), 201
@app.route('/api/chat/rooms/<int:room_id>/messages', methods=['GET'])
def get_room_messages(room_id):
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
# 检查聊天室是否存在
room = ChatRoom.query.get(room_id)
if not room:
return jsonify({'error': '聊天室不存在'}), 404
# 获取该聊天室的最近消息
messages = ChatMessage.query.filter_by(room_id=room_id).order_by(ChatMessage.created_at).all()
message_list = []
for msg in messages:
user = User.query.get(msg.user_id)
message_list.append({
'id': msg.id,
'user_id': msg.user_id,
'username': user.username if user else 'Unknown',
'message': msg.message,
'created_at': msg.created_at.isoformat()
})
return jsonify({'messages': message_list})
# 静态文件服务
@app.route('/')
def serve():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/<path:path>')
def static_proxy(path):
return send_from_directory(app.static_folder, path)
# WebSocket事件处理
@socketio.on('connect')
def handle_connect():
user_id = session.get('user_id')
if not user_id:
return False # 拒绝未登录用户的连接
print(f'Client connected: {request.sid}')
@socketio.on('disconnect')
def handle_disconnect():
print(f'Client disconnected: {request.sid}')
@socketio.on('join_room')
def handle_join_room(data):
user_id = session.get('user_id')
if not user_id:
return
room_id = data.get('room_id')
if not room_id:
return
# 获取用户和房间信息
user = User.query.get(user_id)
room = ChatRoom.query.get(room_id)
if not user or not room:
return
# 加入房间
join_room(str(room_id))
# 通知房间内的其他用户
emit('user_joined', {
'user_id': user_id,
'username': user.username,
'message': f'{user.username} 加入了聊天室'
}, room=str(room_id))
@socketio.on('leave_room')
def handle_leave_room(data):
user_id = session.get('user_id')
if not user_id:
return
room_id = data.get('room_id')
if not room_id:
return
# 获取用户信息
user = User.query.get(user_id)
if not user:
return
# 离开房间
leave_room(str(room_id))
# 通知房间内的其他用户
emit('user_left', {
'user_id': user_id,
'username': user.username,
'message': f'{user.username} 离开了聊天室'
}, room=str(room_id))
@socketio.on('send_message')
def handle_send_message(data):
user_id = session.get('user_id')
if not user_id:
return
room_id = data.get('room_id')
message_text = data.get('message')
if not room_id or not message_text:
return
# 获取用户和房间信息
user = User.query.get(user_id)
room = ChatRoom.query.get(room_id)
if not user or not room:
return
# 保存消息到数据库
message = ChatMessage(
room_id=room_id,
user_id=user_id,
message=message_text
)
db.session.add(message)
db.session.commit()
# 广播消息给房间内的所有用户
emit('new_message', {
'id': message.id,
'user_id': user_id,
'username': user.username,
'message': message_text,
'created_at': message.created_at.isoformat()
}, room=str(room_id))
if __name__ == '__main__':
# 在启动应用前初始化数据库
init_db()
# 使用 eventlet 或 gevent 来获得更好的性能
socketio.run(app, debug=True, host='0.0.0.0', port=5000)