You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

452 lines
14 KiB

from flask import Flask, send_from_directory, jsonify, request, session
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from werkzeug.security import generate_password_hash, check_password_hash
from flask_socketio import SocketIO, emit, join_room, leave_room
import os
import uuid
from datetime import datetime
app = Flask(__name__, static_folder='../frontend/dist')
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev_key_for_goldminer')
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///goldminer.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
CORS(app, supports_credentials=True, origins=['*']) # Enable CORS with credentials for all origins
db = SQLAlchemy(app)
socketio = SocketIO(app, cors_allowed_origins="*", engineio_logger=True)
# 用户模型
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(200), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime, nullable=True)
high_score = db.Column(db.Integer, default=0)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
# 游戏历史记录模型
class GameHistory(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
score = db.Column(db.Integer, nullable=False)
level_reached = db.Column(db.Integer, nullable=False)
duration = db.Column(db.Integer, nullable=False) # 游戏时长(秒)
gold_earned = db.Column(db.Integer, nullable=False) # 获得的金币
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 建立与用户的关系
user = db.relationship('User', backref=db.backref('game_histories', lazy=True))
# 聊天室模型
class ChatRoom(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
creator_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
is_active = db.Column(db.Boolean, default=True)
# 建立与用户的关系
creator = db.relationship('User', backref=db.backref('created_rooms', lazy=True))
# 聊天消息模型
class ChatMessage(db.Model):
id = db.Column(db.Integer, primary_key=True)
room_id = db.Column(db.Integer, db.ForeignKey('chat_room.id'), nullable=False)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
message = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
# 建立关系
room = db.relationship('ChatRoom', backref=db.backref('messages', lazy=True))
user = db.relationship('User', backref=db.backref('messages', lazy=True))
# 创建数据库表
with app.app_context():
db.create_all()
# 注册API
@app.route('/api/register', methods=['POST'])
def register():
data = request.get_json()
# 检查必要的字段
if not data or not data.get('username') or not data.get('password'):
return jsonify({'error': '用户名和密码是必填项'}), 400
# 检查用户名是否已存在
if User.query.filter_by(username=data['username']).first():
return jsonify({'error': '用户名已存在'}), 400
# 创建新用户
user = User(username=data['username'])
user.set_password(data['password'])
db.session.add(user)
db.session.commit()
return jsonify({'message': '注册成功', 'username': user.username}), 201
# 登录API
@app.route('/api/login', methods=['POST'])
def login():
data = request.get_json()
# 检查必要的字段
if not data or not data.get('username') or not data.get('password'):
return jsonify({'error': '用户名和密码是必填项'}), 400
# 查找用户
user = User.query.filter_by(username=data['username']).first()
# 检查密码
if user and user.check_password(data['password']):
# 生成会话ID
session_id = str(uuid.uuid4())
session['user_id'] = user.id
session['session_id'] = session_id
# 更新最后登录时间
user.last_login = datetime.utcnow()
db.session.commit()
return jsonify({
'message': '登录成功',
'user': {
'id': user.id,
'username': user.username,
'high_score': user.high_score
},
'session_id': session_id
})
return jsonify({'error': '用户名或密码不正确'}), 401
# 登出API
@app.route('/api/logout', methods=['POST'])
def logout():
session.clear()
return jsonify({'message': '已成功登出'})
# 获取当前用户信息
@app.route('/api/user', methods=['GET'])
def get_user():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
user = User.query.get(user_id)
if not user:
session.clear()
return jsonify({'error': '用户不存在'}), 401
return jsonify({
'user': {
'id': user.id,
'username': user.username,
'high_score': user.high_score
}
})
# 更新高分
@app.route('/api/update_score', methods=['POST'])
def update_score():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
data = request.get_json()
if not data or 'score' not in data:
return jsonify({'error': '缺少分数参数'}), 400
user = User.query.get(user_id)
if not user:
return jsonify({'error': '用户不存在'}), 404
# 只有新分数更高时才更新
if data['score'] > user.high_score:
user.high_score = data['score']
db.session.commit()
return jsonify({'message': '高分已更新', 'high_score': user.high_score})
return jsonify({'message': '分数未超过历史最高分', 'high_score': user.high_score})
# 保存游戏历史记录
@app.route('/api/save_game_history', methods=['POST'])
def save_game_history():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
data = request.get_json()
if not data or 'score' not in data:
return jsonify({'error': '缺少必要参数'}), 400
# 创建新的游戏历史记录
game_history = GameHistory(
user_id=user_id,
score=data.get('score', 0),
level_reached=data.get('level', 1),
duration=data.get('duration', 0),
gold_earned=data.get('goldEarned', 0)
)
db.session.add(game_history)
db.session.commit()
return jsonify({
'message': '游戏历史记录已保存',
'history_id': game_history.id
}), 201
# 获取用户游戏历史记录
@app.route('/api/game_history', methods=['GET'])
def get_game_history():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
# 获取分页参数
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 10, type=int)
# 查询用户的游戏历史记录,按时间降序排序
history_query = GameHistory.query.filter_by(user_id=user_id).order_by(GameHistory.created_at.desc())
# 分页
history_paginated = history_query.paginate(page=page, per_page=per_page, error_out=False)
history_records = []
for record in history_paginated.items:
history_records.append({
'id': record.id,
'score': record.score,
'level_reached': record.level_reached,
'duration': record.duration,
'gold_earned': record.gold_earned,
'created_at': record.created_at.isoformat()
})
return jsonify({
'history': history_records,
'total': history_paginated.total,
'pages': history_paginated.pages,
'current_page': history_paginated.page
})
# 获取排行榜
@app.route('/api/leaderboard', methods=['GET'])
def get_leaderboard():
# 获取前10名高分用户
top_users = User.query.order_by(User.high_score.desc()).limit(10).all()
leaderboard = []
for user in top_users:
leaderboard.append({
'username': user.username,
'high_score': user.high_score,
'last_login': user.last_login.isoformat() if user.last_login else None
})
return jsonify({'leaderboard': leaderboard})
# 健康检查API
@app.route('/api/health')
def health_check():
return jsonify({"status": "ok"})
# 聊天室API
@app.route('/api/chat/rooms', methods=['GET'])
def get_chat_rooms():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
# 获取所有活跃的聊天室
rooms = ChatRoom.query.filter_by(is_active=True).order_by(ChatRoom.created_at.desc()).all()
room_list = []
for room in rooms:
creator = User.query.get(room.creator_id)
room_list.append({
'id': room.id,
'name': room.name,
'creator': creator.username if creator else 'Unknown',
'created_at': room.created_at.isoformat()
})
return jsonify({'rooms': room_list})
@app.route('/api/chat/rooms', methods=['POST'])
def create_chat_room():
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
data = request.get_json()
if not data or not data.get('name'):
return jsonify({'error': '聊天室名称是必填项'}), 400
# 创建新聊天室
room = ChatRoom(
name=data['name'],
creator_id=user_id
)
db.session.add(room)
db.session.commit()
return jsonify({
'message': '聊天室创建成功',
'room': {
'id': room.id,
'name': room.name
}
}), 201
@app.route('/api/chat/rooms/<int:room_id>/messages', methods=['GET'])
def get_room_messages(room_id):
user_id = session.get('user_id')
if not user_id:
return jsonify({'error': '未登录'}), 401
# 检查聊天室是否存在
room = ChatRoom.query.get(room_id)
if not room:
return jsonify({'error': '聊天室不存在'}), 404
# 获取该聊天室的最近消息
messages = ChatMessage.query.filter_by(room_id=room_id).order_by(ChatMessage.created_at).all()
message_list = []
for msg in messages:
user = User.query.get(msg.user_id)
message_list.append({
'id': msg.id,
'user_id': msg.user_id,
'username': user.username if user else 'Unknown',
'message': msg.message,
'created_at': msg.created_at.isoformat()
})
return jsonify({'messages': message_list})
# 静态文件服务
@app.route('/')
def serve():
return send_from_directory(app.static_folder, 'index.html')
@app.route('/<path:path>')
def static_proxy(path):
return send_from_directory(app.static_folder, path)
# WebSocket事件处理
@socketio.on('connect')
def handle_connect():
user_id = session.get('user_id')
if not user_id:
return False # 拒绝未登录用户的连接
print(f'Client connected: {request.sid}')
@socketio.on('disconnect')
def handle_disconnect():
print(f'Client disconnected: {request.sid}')
@socketio.on('join_room')
def handle_join_room(data):
user_id = session.get('user_id')
if not user_id:
return
room_id = data.get('room_id')
if not room_id:
return
# 获取用户和房间信息
user = User.query.get(user_id)
room = ChatRoom.query.get(room_id)
if not user or not room:
return
# 加入房间
join_room(str(room_id))
# 通知房间内的其他用户
emit('user_joined', {
'user_id': user_id,
'username': user.username,
'message': f'{user.username} 加入了聊天室'
}, room=str(room_id))
@socketio.on('leave_room')
def handle_leave_room(data):
user_id = session.get('user_id')
if not user_id:
return
room_id = data.get('room_id')
if not room_id:
return
# 获取用户信息
user = User.query.get(user_id)
if not user:
return
# 离开房间
leave_room(str(room_id))
# 通知房间内的其他用户
emit('user_left', {
'user_id': user_id,
'username': user.username,
'message': f'{user.username} 离开了聊天室'
}, room=str(room_id))
@socketio.on('send_message')
def handle_send_message(data):
user_id = session.get('user_id')
if not user_id:
return
room_id = data.get('room_id')
message_text = data.get('message')
if not room_id or not message_text:
return
# 获取用户和房间信息
user = User.query.get(user_id)
room = ChatRoom.query.get(room_id)
if not user or not room:
return
# 保存消息到数据库
message = ChatMessage(
room_id=room_id,
user_id=user_id,
message=message_text
)
db.session.add(message)
db.session.commit()
# 广播消息给房间内的所有用户
emit('new_message', {
'id': message.id,
'user_id': user_id,
'username': user.username,
'message': message_text,
'created_at': message.created_at.isoformat()
}, room=str(room_id))
if __name__ == '__main__':
socketio.run(app, debug=True, host='0.0.0.0')