Compare commits

..

9 Commits
main ... lzh

@ -1,2 +1,6 @@
# battlecar
python app.py
登陆 127.0.0.1:8000/login

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

Binary file not shown.

@ -0,0 +1,38 @@
import qrcode
import os
def read_file_content(file_path):
"""读取文件内容并返回字符串"""
with open(file_path, 'r', encoding='utf-8') as file:
return file.read()
def generate_qr_code(data, output_path):
"""生成包含指定数据的二维码图像并保存到指定路径"""
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(data)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save(output_path)
# 指定文件路径
file_path = r"E:\_Ufo\000jiegou\TheBattleCar\coordinate.txt"
output_qr_path = r"E:\_Ufo\000jiegou\TheBattleCar\coordinate_qr.png"
# 检查文件是否存在
if not os.path.exists(file_path):
print(f"文件 {file_path} 不存在。")
exit(1)
# 读取文件内容
file_content = read_file_content(file_path)
# 生成二维码图像
generate_qr_code(file_content, output_qr_path)
print(f"二维码已成功生成并保存到 {output_qr_path}")

File diff suppressed because it is too large Load Diff

@ -11,6 +11,38 @@ from flask_login import LoginManager, UserMixin, login_user, login_required, cur
from wtforms import Form, StringField
from wtforms.validators import Length, EqualTo, ValidationError
import pymysql
from flask import flash
from cryptography.fernet import Fernet,InvalidToken
# 生成密钥并保存到文件(仅在第一次运行时生成)
def generate_key():
key = Fernet.generate_key()
with open("secret.key", "wb") as key_file:
key_file.write(key)
# 加载密钥
def load_key():
return open("secret.key", "rb").read()
# 加密函数
def encrypt_message(message):
key = load_key()
f = Fernet(key)
encrypted_message = f.encrypt(message.encode())
return encrypted_message
# 解密函数
def decrypt_message(encrypted_message):
key = load_key()
f = Fernet(key)
try:
#print(f"Encrypted message: {encrypted_message}")
decrypted_message = f.decrypt(encrypted_message).decode()
return decrypted_message
except (InvalidToken, ValueError) as e:
logger.error(f"Failed to decrypt message: {e}")
return None
conn = pymysql.connect(host="localhost", port=3306,
user="root", password="lin556688",
@ -32,6 +64,7 @@ app.config['MYSQL_USER'] = 'root' # 替换为你的 MySQL 用户名
app.config['MYSQL_PASSWORD'] = 'lin556688' # 替换为你的 MySQL 密码
app.config['MYSQL_DB'] = 'mydatabase'
app.config['MYSQL_CURSORCLASS'] = 'DictCursor'
app.config['MAX_CONTENT_LENGTH'] = 2 * 1024 * 1024 # 2MB
mysql = MySQL(app)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
@ -110,6 +143,14 @@ def upload_file():
logger.warning("No selected file")
return jsonify({'message': 'No selected file'}), 400
if file and allowed_file(file.filename):
# 检查文件大小
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > app.config['MAX_CONTENT_LENGTH']:
logger.warning("File size exceeds the limit")
return jsonify({'message': 'File size exceeds the limit'}), 400
filename = secure_filename(f"{uuid.uuid4().hex}_{file.filename}")
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
@ -119,6 +160,11 @@ def upload_file():
logger.warning("File type not allowed")
return jsonify({'message': 'File type not allowed'}), 400
# 检查文件类型
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# 提供上传文件的访问
@app.route('/uploads/<filename>')
def uploaded_file(filename):
@ -133,42 +179,75 @@ def login():
username = request.form['username']
password = request.form['password']
# 过滤掉一些常见的SQL注入字符
forbidden_chars = ['AND', 'OR', '1=', "'", '"', '--', ';']
for char in forbidden_chars:
if char in username.upper() or char in password.upper():
logger.warning("Detected forbidden characters in input")
flash("Invalid input. Please try again.")
return render_template('login.html')
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM users WHERE username = %s AND password = %s", (username, password))
# 直接拼接查询语句
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
# 使用参数化查询检查用户是否存在
#cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user_data = cursor.fetchone()
cursor.close()
# print(user_data['password'].encode('utf-8'))
#print(user_data)
if user_data:
user = User(user_data['id'], user_data['username'], user_data['role'])
login_user(user)
logger.info(f"User {username} logged in with role {user_data['role']}")
if user_data['role'] == '侦查者':
return redirect(url_for('scout'))
elif user_data['role'] == '指挥者':
return redirect(url_for('commander'))
elif user_data['role'] == '攻击者':
return redirect(url_for('attacker'))
# 解密密码
decrypted_password = decrypt_message(user_data['password'].encode('utf-8'))
if decrypted_password == password:
user = User(user_data['id'], user_data['username'], user_data['role'])
login_user(user)
logger.info(f"User {username} logged in with role {user_data['role']}")
if user_data['role'] == '侦查者':
return redirect(url_for('scout'))
elif user_data['role'] == '指挥者':
return redirect(url_for('commander'))
elif user_data['role'] == '攻击者':
return redirect(url_for('attacker'))
else:
logger.warning("Invalid password")
flash("Invalid password. Please try again.")
else:
logger.warning("Invalid credentials")
return "Invalid credentials. Please try again."
#print(1)
logger.warning("Invalid username")
return "Invalid username. Please try again."
return render_template('login.html')
# 处理数据库语句
def con_my_sql(sql_code):
def con_my_sql(sql_code, params=None):
try:
# 尝试连接数据库
conn.ping(reconnect=True)
cursor= conn.cursor(pymysql.cursors.DictCursor)
cursor.execute(sql_code)
# 创建游标对象,结果以字典形式返回
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 如果传入了参数,使用参数化查询
if params:
cursor.execute(sql_code, params)
else:
cursor.execute(sql_code)
# 提交事务
conn.commit()
conn.close()
# 返回游标对象
return cursor
except pymysql.MySQLError as err_massage:
# 捕获异常,回滚事务
conn.rollback()
# 关闭连接
conn.close()
# 返回异常类型和异常信息
return type(err_massage), err_massage
class RegisterForm(Form):
captcha = StringField(validators=[Length(min=4,max=4,message='校验码格式错误')])
username = StringField(validators=[Length(min=3,max=10,message='用户名长度必须在3到10个字符之间')])
@ -178,6 +257,8 @@ class RegisterForm(Form):
def validate_captcha(self, filed):
if filed.data not in ['1111', '2222', '3333']:
raise ValidationError('校验码错误')
# 注册页面
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
@ -191,20 +272,26 @@ def register():
# 静态注册码进行角色注册
if captcha == "1111":
role = "侦查者"
if captcha == "2222":
elif captcha == "2222":
role = "指挥者"
if captcha == "3333":
elif captcha == "3333":
role = "攻击者"
else:
return '无效的校验码 <a href="/">返回登录</a>'
code = "select * from users where username = '%s'" % username
cursor_ans = con_my_sql(code)
# 使用参数化查询检查用户是否存在
code = "SELECT * FROM users WHERE username = %s"
cursor_ans = con_my_sql(code, (username,))
cursor_select = cursor_ans.fetchall()
if len(cursor_select) > 0:
return '用户已经存在 <a href="/">返回登录</a>'
else:
code = "INSERT INTO users(username, password, role) VALUES('%s', '%s', '%s')" % (username, password, role)
print(con_my_sql(code))
# 加密密码
encrypted_password = encrypt_message(password)
# 使用参数化查询插入新用户
code = "INSERT INTO users(username, password, role) VALUES(%s, %s, %s)"
con_my_sql(code, (username, encrypted_password, role))
return '注册成功 <a href="/">返回登录</a>'
else:
print(form.errors)
@ -218,7 +305,81 @@ def scout():
if current_user.role != '侦查者':
logger.warning("Unauthorized access to scout page")
return '用户无此页面访问权限 <a href="/">返回登录<a/>'
return render_template('scout.html')
# 获取侦察者的通知
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM notifications WHERE user_id = %s ORDER BY created_at DESC", (current_user.id,))
notifications = cursor.fetchall()
cursor.close()
logger.info(f"Notifications fetched: {notifications}")
# 解密通知
for notification in notifications:
notification['message'] = decrypt_message(notification['message'])
return render_template('scout.html', notifications=notifications)
# 处理消息
@app.route('/handle_message/<int:message_id>/<action>', methods=['POST'])
@login_required
def handle_message(message_id, action):
logger.info(f"Handling message action: {action} for ID: {message_id}")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to handle message")
return redirect(url_for('login'))
cursor = mysql.connection.cursor()
if action == 'accept':
cursor.execute("UPDATE messages SET status = 'accepted' WHERE id = %s", (message_id,))
flash('消息已接受')
elif action == 'reject':
cursor.execute("UPDATE messages SET status = 'rejected' WHERE id = %s", (message_id,))
flash('消息已打回')
mysql.connection.commit()
cursor.close()
logger.info(f"Message status updated: {action}")
# 获取消息内容
cursor = mysql.connection.cursor()
cursor.execute("SELECT message FROM messages WHERE id = %s", (message_id,))
message_data = cursor.fetchone()
cursor.close()
logger.info(f"Message data fetched: {message_data}")
# 解密消息
decrypted_message = decrypt_message(message_data['message'])
# 将结果反馈给侦察者
cursor = mysql.connection.cursor()
cursor.execute("SELECT id FROM users WHERE role = '侦查者'")
scouts = cursor.fetchall()
cursor.close()
logger.info(f"Scouts found: {scouts}")
for scout in scouts:
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO notifications (user_id, message_id, action, message) VALUES (%s, %s, %s, %s)",
(scout['id'], message_id, action, message_data['message']))
mysql.connection.commit()
cursor.close()
logger.info(f"Notification inserted for scout: {scout['id']}")
return redirect(url_for('commander'))
# 清除通知
@app.route('/clear_notifications', methods=['POST'])
@login_required
def clear_notifications():
user_id = request.json.get('user_id')
if not user_id:
return jsonify({'success': False, 'message': 'User ID is required'}), 400
cursor = mysql.connection.cursor()
cursor.execute("DELETE FROM notifications WHERE user_id = %s", (user_id,))
mysql.connection.commit()
cursor.close()
return jsonify({'success': True, 'message': 'Notifications cleared successfully'})
# 指挥者页面
@app.route('/commander')
@ -246,7 +407,21 @@ def commander():
attacks = cursor.fetchall()
cursor.close()
return render_template('commander.html', media_items=media_items, attacks=attacks)
# 解密坐标
for attack in attacks:
attack['coordinate'] = decrypt_message(attack['coordinate'])
# 获取侦察者发送的消息
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, message, photo_url, created_at, status FROM messages ORDER BY created_at DESC")
messages = cursor.fetchall()
cursor.close()
# 解密消息
for message in messages:
message['message'] = decrypt_message(message['message'])
return render_template('commander.html', media_items=media_items, attacks=attacks, messages=messages)
# 指挥者发送攻击坐标
@app.route('/send_attack', methods=['POST'])
@ -262,6 +437,10 @@ def send_attack():
logger.warning("No coordinate provided")
return "No coordinate provided", 400
# 加密坐标
encrypted_coordinate = encrypt_message(coordinate)
coordinate = encrypted_coordinate
# 插入攻击坐标到数据库
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO attacks (coordinate) VALUES (%s)", (coordinate,))
@ -272,6 +451,47 @@ def send_attack():
flash('攻击坐标已发送')
return redirect(url_for('commander'))
# 清除照片
@app.route('/clear_photos', methods=['POST'])
@login_required
def clear_photos():
logger.info("Handling clear photos request")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to clear photos")
return redirect(url_for('login'))
directory = app.config['UPLOAD_FOLDER']
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
logger.error(f"Error deleting file: {str(e)}")
flash(f"Error deleting file: {str(e)}")
logger.info("Photos cleared successfully")
flash('照片已清空')
return redirect(url_for('commander'))
# 清除侦察者发送的消息
@app.route('/clear_messages', methods=['POST'])
@login_required
def clear_messages():
logger.info("Handling clear messages request")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to clear messages")
return redirect(url_for('login'))
cursor = mysql.connection.cursor()
cursor.execute("DELETE FROM messages")
mysql.connection.commit()
cursor.close()
logger.info("Messages cleared successfully")
flash('消息已清空')
return redirect(url_for('commander'))
# 攻击者页面
@app.route('/attacker')
@login_required
@ -287,6 +507,10 @@ def attacker():
attacks = cursor.fetchall()
cursor.close()
# 解密坐标
for attack in attacks:
attack['coordinate'] = decrypt_message(attack['coordinate'])
return render_template('attacker.html', attacks=attacks)
# 攻击者执行攻击
@ -340,9 +564,13 @@ def send_message():
logger.warning("Allowed file types are png, jpg, jpeg, gif")
return "Allowed file types are png, jpg, jpeg, gif", 400
# 加密数据
encrypted_message = encrypt_message(message)
message = encrypted_message
# 插入数据到 MySQL
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO messages (message, photo_url) VALUES (%s, %s)", (message, photo_url))
cursor.execute("INSERT INTO messages (message, photo_url, status) VALUES (%s, %s, %s)", (message, photo_url, 'pending'))
mysql.connection.commit()
cursor.close()
logger.info(f"Message sent: {message}, Photo URL: {photo_url}")
@ -351,26 +579,54 @@ def send_message():
return render_template('send_message.html')
# 获取所有消息
@app.route('/messages', methods=['GET'])
@login_required
def get_messages():
logger.info("Handling get messages request")
def messages():
logger.info("Handling messages request")
# 角色检查
if current_user.role != '指挥者':
logger.warning("Unauthorized access to messages")
if request.accept_mimetypes.best_match(['application/json', 'text/html']) == 'application/json':
return jsonify({"error": "Unauthorized access"}), 403
else:
return redirect(url_for('login'))
# 获取消息
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, message, photo_url, created_at FROM messages")
cursor.execute("SELECT id, message, photo_url, created_at, status FROM messages ORDER BY created_at DESC")
result = cursor.fetchall()
cursor.close()
# 解密消息
messages = []
for row in result:
decrypted_message = decrypt_message(row['message'])
messages.append({
'id': row['id'],
'message': row['message'],
'message': decrypted_message,
'photo_url': row['photo_url'],
'created_at': row['created_at']
'created_at': row['created_at'],
'status': row['status']
})
return jsonify(messages)
# 根据请求类型返回不同的响应
if request.accept_mimetypes.best_match(['application/json', 'text/html']) == 'application/json':
return jsonify(messages)
else:
return render_template('view_messages.html', messages=messages)
if __name__ == '__main__':
# generate_key() # 仅在第一次运行时生成密钥
app.run(debug=True, host='0.0.0.0', port=8000)
# # 测试加密和解密
# test_message = "This is a test message"
# encrypted_message = encrypt_message(test_message)
# decrypted_message = decrypt_message(encrypted_message)
# print(f"Original message: {test_message}")
# print(f"Encrypted message: {encrypted_message}")
# print(f"Decrypted message: {decrypted_message}")

@ -0,0 +1,599 @@
import os
import uuid
import logging
from datetime import datetime
from flask import Flask, current_app, session, redirect, url_for, request, flash, render_template, jsonify, send_from_directory
from flask_sqlalchemy import SQLAlchemy
from flask_mysqldb import MySQL
import MySQLdb.cursors
from werkzeug.utils import secure_filename
from flask_login import LoginManager, UserMixin, login_user, login_required, current_user
from wtforms import Form, StringField
from wtforms.validators import Length, EqualTo, ValidationError
import pymysql
from cryptography.fernet import Fernet,InvalidToken
# 生成密钥并保存到文件(仅在第一次运行时生成)
def generate_key():
key = Fernet.generate_key()
with open("secret.key", "wb") as key_file:
key_file.write(key)
# 加载密钥
def load_key():
return open("secret.key", "rb").read()
# 加密函数
def encrypt_message(message):
key = load_key()
f = Fernet(key)
encrypted_message = f.encrypt(message.encode())
return encrypted_message
# 解密函数
def decrypt_message(encrypted_message):
key = load_key()
f = Fernet(key)
try:
print(f"Encrypted message: {encrypted_message}")
decrypted_message = f.decrypt(encrypted_message).decode()
return decrypted_message
except (InvalidToken, ValueError) as e:
logger.error(f"Failed to decrypt message: {e}")
return None
conn = pymysql.connect(host="localhost", port=3306,
user="root", password="lin556688",
database="mydatabase", charset="utf8mb4")
# 初始化 Flask 应用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your_secret_key'
app.secret_key = os.urandom(24) # 生成更安全的会话密钥
app.config['UPLOAD_FOLDER'] = 'uploads/' # 设置上传文件存储目录
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) # 确保目录存在
# 配置日志
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# MySQL 配置
app.config['MYSQL_HOST'] = 'localhost'
app.config['MYSQL_USER'] = 'root' # 替换为你的 MySQL 用户名
app.config['MYSQL_PASSWORD'] = 'lin556688' # 替换为你的 MySQL 密码
app.config['MYSQL_DB'] = 'mydatabase'
app.config['MYSQL_CURSORCLASS'] = 'DictCursor'
mysql = MySQL(app)
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif'}
# 配置日志记录器
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 自定义日志处理器类
class DatabaseLogHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
# 去掉时间戳中的毫秒部分
timestamp = datetime.strptime(record.asctime, '%Y-%m-%d %H:%M:%S,%f').strftime('%Y-%m-%d %H:%M:%S')
cursor = mysql.connection.cursor()
cursor.execute(
"INSERT INTO logs (timestamp, level, message) VALUES (%s, %s, %s)",
(timestamp, record.levelname, log_entry)
)
mysql.connection.commit()
cursor.close()
# 添加自定义日志处理器
db_handler = DatabaseLogHandler()
db_handler.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
db_handler.setFormatter(formatter)
logger.addHandler(db_handler)
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# 初始化 Flask-Login
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
class User(UserMixin):
def __init__(self, id, username, role):
self.id = id
self.username = username
self.role = role
@login_manager.user_loader
def load_user(user_id):
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user_data = cursor.fetchone()
cursor.close()
if user_data:
return User(user_data['id'], user_data['username'], user_data['role'])
return None
@app.route('/')
def home():
return render_template('login.html')
@app.route('/view_logs')
@login_required
def view_logs():
with open('app.log', 'r') as log_file:
logs = log_file.readlines()
return render_template('logs.html', logs=logs)
# 处理文件上传
@app.route('/upload', methods=['POST'])
def upload_file():
logger.info("Handling file upload request")
if 'file' not in request.files:
logger.warning("No file part in request")
return jsonify({'message': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
logger.warning("No selected file")
return jsonify({'message': 'No selected file'}), 400
if file and allowed_file(file.filename):
filename = secure_filename(f"{uuid.uuid4().hex}_{file.filename}")
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
logger.info(f"File uploaded successfully: {filename}")
return jsonify({'message': 'File uploaded successfully', 'filepath': filepath}), 201
else:
logger.warning("File type not allowed")
return jsonify({'message': 'File type not allowed'}), 400
# 提供上传文件的访问
@app.route('/uploads/<filename>')
def uploaded_file(filename):
logger.info(f"Accessing uploaded file: {filename}")
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
# 登录页面
@app.route('/login', methods=['GET', 'POST'])
def login():
logger.info("Handling login request")
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
cursor = mysql.connection.cursor()
# 使用参数化查询检查用户是否存在
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user_data = cursor.fetchone()
cursor.close()
print(user_data['password'].encode('utf-8'))
if user_data:
# 解密密码
decrypted_password = decrypt_message(user_data['password'].encode('utf-8'))
print("解密:",decrypted_password)
user = User(user_data['id'], user_data['username'], user_data['role'])
login_user(user)
logger.info(f"User {username} logged in with role {user_data['role']}")
if user_data['role'] == '侦查者':
return redirect(url_for('scout'))
elif user_data['role'] == '指挥者':
return redirect(url_for('commander'))
elif user_data['role'] == '攻击者':
return redirect(url_for('attacker'))
else:
logger.warning("Invalid credentials")
return "Invalid credentials. Please try again."
return render_template('login.html')
# 处理数据库语句
def con_my_sql(sql_code, params=None):
try:
# 尝试连接数据库
conn.ping(reconnect=True)
# 创建游标对象,结果以字典形式返回
cursor = conn.cursor(pymysql.cursors.DictCursor)
# 如果传入了参数,使用参数化查询
if params:
cursor.execute(sql_code, params)
else:
cursor.execute(sql_code)
# 提交事务
conn.commit()
# 返回游标对象
return cursor
except pymysql.MySQLError as err_massage:
# 捕获异常,回滚事务
conn.rollback()
# 关闭连接
conn.close()
# 返回异常类型和异常信息
return type(err_massage), err_massage
class RegisterForm(Form):
captcha = StringField(validators=[Length(min=4,max=4,message='校验码格式错误')])
username = StringField(validators=[Length(min=3,max=10,message='用户名长度必须在3到10个字符之间')])
password = StringField(validators=[Length(min=6,max=20,message='密码长度必须在6到20个字符之间')])
password_confirm = StringField(validators=[EqualTo('password',message='两次输入的密码不一致')])
def validate_captcha(self, filed):
if filed.data not in ['1111', '2222', '3333']:
raise ValidationError('校验码错误')
# 注册页面
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
return render_template('register.html')
else:
form = RegisterForm(request.form)
if form.validate():
username = form.username.data
password = form.password.data
captcha = form.captcha.data
# 静态注册码进行角色注册
if captcha == "1111":
role = "侦查者"
elif captcha == "2222":
role = "指挥者"
elif captcha == "3333":
role = "攻击者"
else:
return '无效的校验码 <a href="/">返回登录</a>'
# 使用参数化查询检查用户是否存在
code = "SELECT * FROM users WHERE username = %s"
cursor_ans = con_my_sql(code, (username,))
cursor_select = cursor_ans.fetchall()
if len(cursor_select) > 0:
return '用户已经存在 <a href="/">返回登录</a>'
else:
# 加密密码
encrypted_password = encrypt_message(password)
# 使用参数化查询插入新用户
code = "INSERT INTO users(username, password, role) VALUES(%s, %s, %s)"
con_my_sql(code, (username, encrypted_password, role))
return '注册成功 <a href="/">返回登录</a>'
else:
print(form.errors)
return redirect(url_for('register'))
# 侦查者页面
@app.route('/scout')
@login_required
def scout():
logger.info("Accessing scout page")
if current_user.role != '侦查者':
logger.warning("Unauthorized access to scout page")
return '用户无此页面访问权限 <a href="/">返回登录<a/>'
# 获取侦察者的通知
cursor = mysql.connection.cursor()
cursor.execute("SELECT * FROM notifications WHERE user_id = %s ORDER BY created_at DESC", (current_user.id,))
notifications = cursor.fetchall()
cursor.close()
logger.info(f"Notifications fetched: {notifications}")
# 解密通知
for notification in notifications:
notification['message'] = decrypt_message(notification['message'])
return render_template('scout.html', notifications=notifications)
# 处理消息
@app.route('/handle_message/<int:message_id>/<action>', methods=['POST'])
@login_required
def handle_message(message_id, action):
logger.info(f"Handling message action: {action} for ID: {message_id}")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to handle message")
return redirect(url_for('login'))
cursor = mysql.connection.cursor()
if action == 'accept':
cursor.execute("UPDATE messages SET status = 'accepted' WHERE id = %s", (message_id,))
flash('消息已接受')
elif action == 'reject':
cursor.execute("UPDATE messages SET status = 'rejected' WHERE id = %s", (message_id,))
flash('消息已打回')
mysql.connection.commit()
cursor.close()
logger.info(f"Message status updated: {action}")
# 获取消息内容
cursor = mysql.connection.cursor()
cursor.execute("SELECT message FROM messages WHERE id = %s", (message_id,))
message_data = cursor.fetchone()
cursor.close()
logger.info(f"Message data fetched: {message_data}")
# 解密消息
decrypted_message = decrypt_message(message_data['message'])
# 将结果反馈给侦察者
cursor = mysql.connection.cursor()
cursor.execute("SELECT id FROM users WHERE role = '侦查者'")
scouts = cursor.fetchall()
cursor.close()
logger.info(f"Scouts found: {scouts}")
for scout in scouts:
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO notifications (user_id, message_id, action, message) VALUES (%s, %s, %s, %s)",
(scout['id'], message_id, action, message_data['message']))
mysql.connection.commit()
cursor.close()
logger.info(f"Notification inserted for scout: {scout['id']}")
return redirect(url_for('commander'))
# 清除通知
@app.route('/clear_notifications', methods=['POST'])
@login_required
def clear_notifications():
user_id = request.json.get('user_id')
if not user_id:
return jsonify({'success': False, 'message': 'User ID is required'}), 400
cursor = mysql.connection.cursor()
cursor.execute("DELETE FROM notifications WHERE user_id = %s", (user_id,))
mysql.connection.commit()
cursor.close()
return jsonify({'success': True, 'message': 'Notifications cleared successfully'})
# 指挥者页面
@app.route('/commander')
@login_required
def commander():
logger.info("Accessing commander page")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to commander page")
return '用户无此页面访问权限 <a href="/">返回登录<a/>'
# 获取特定目录下的所有文件和攻击坐标状态
directory = 'E:/_Ufo/0000jiegou/TheBattleCar/uploads'
media_items = []
try:
media_items = [f for f in os.listdir(directory)
if os.path.isfile(os.path.join(directory, f)) and allowed_file(f)]
except Exception as e:
logger.error(f"Error accessing directory: {str(e)}")
flash(f"Error accessing directory: {str(e)}")
# 获取攻击状态
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, coordinate, attacked, created_at FROM attacks ORDER BY created_at DESC")
attacks = cursor.fetchall()
cursor.close()
# 解密坐标
for attack in attacks:
attack['coordinate'] = decrypt_message(attack['coordinate'])
# 获取侦察者发送的消息
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, message, photo_url, created_at, status FROM messages ORDER BY created_at DESC")
messages = cursor.fetchall()
cursor.close()
# 解密消息
for message in messages:
message['message'] = decrypt_message(message['message'])
return render_template('commander.html', media_items=media_items, attacks=attacks, messages=messages)
# 指挥者发送攻击坐标
@app.route('/send_attack', methods=['POST'])
@login_required
def send_attack():
logger.info("Handling send attack request")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to send attack")
return redirect(url_for('login'))
coordinate = request.form.get('coordinate')
if not coordinate:
logger.warning("No coordinate provided")
return "No coordinate provided", 400
# 加密坐标
encrypted_coordinate = encrypt_message(coordinate)
coordinate = encrypted_coordinate
# 插入攻击坐标到数据库
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO attacks (coordinate) VALUES (%s)", (coordinate,))
mysql.connection.commit()
cursor.close()
logger.info(f"Attack coordinate sent: {coordinate}")
flash('攻击坐标已发送')
return redirect(url_for('commander'))
# 清除照片
@app.route('/clear_photos', methods=['POST'])
@login_required
def clear_photos():
logger.info("Handling clear photos request")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to clear photos")
return redirect(url_for('login'))
directory = app.config['UPLOAD_FOLDER']
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
try:
if os.path.isfile(file_path):
os.unlink(file_path)
except Exception as e:
logger.error(f"Error deleting file: {str(e)}")
flash(f"Error deleting file: {str(e)}")
logger.info("Photos cleared successfully")
flash('照片已清空')
return redirect(url_for('commander'))
# 清除侦察者发送的消息
@app.route('/clear_messages', methods=['POST'])
@login_required
def clear_messages():
logger.info("Handling clear messages request")
if current_user.role != '指挥者':
logger.warning("Unauthorized access to clear messages")
return redirect(url_for('login'))
cursor = mysql.connection.cursor()
cursor.execute("DELETE FROM messages")
mysql.connection.commit()
cursor.close()
logger.info("Messages cleared successfully")
flash('消息已清空')
return redirect(url_for('commander'))
# 攻击者页面
@app.route('/attacker')
@login_required
def attacker():
logger.info("Accessing attacker page")
if current_user.role != '攻击者':
logger.warning("Unauthorized access to attacker page")
return '用户无此页面访问权限 <a href="/">返回登录<a/>'
# 获取攻击坐标列表
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, coordinate, created_at FROM attacks ORDER BY created_at DESC")
attacks = cursor.fetchall()
cursor.close()
# 解密坐标
for attack in attacks:
attack['coordinate'] = decrypt_message(attack['coordinate'])
return render_template('attacker.html', attacks=attacks)
# 攻击者执行攻击
@app.route('/execute_attack/<int:attack_id>', methods=['POST'])
@login_required
def execute_attack(attack_id):
logger.info(f"Handling execute attack request for ID: {attack_id}")
if current_user.role != '攻击者':
logger.warning("Unauthorized access to execute attack")
return redirect(url_for('login'))
cursor = mysql.connection.cursor()
cursor.execute("UPDATE attacks SET attacked = TRUE WHERE id = %s", (attack_id,))
mysql.connection.commit()
cursor.close()
logger.info(f"Attack executed for ID: {attack_id}")
flash(f'已对坐标ID为 {attack_id} 的目标完成打击')
return redirect(url_for('attacker'))
# 退出登录(清除会话)
@app.route('/logout')
@login_required
def logout():
logger.info("Handling logout request")
session.pop('username', None)
session.pop('role', None)
return redirect(url_for('login'))
# 发送消息
@app.route('/send_message', methods=['GET', 'POST'])
@login_required
def send_message():
logger.info("Handling send message request")
if request.method == 'POST':
message = request.form.get('message')
if not message:
logger.warning("No message provided")
return "No message provided", 400
photo_url = None
if 'photo' in request.files:
file = request.files['photo']
if file.filename != '':
if file and allowed_file(file.filename):
filename = secure_filename(f"{uuid.uuid4().hex}_{file.filename}")
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
photo_url = url_for('uploaded_file', filename=filename, _external=True)
else:
logger.warning("Allowed file types are png, jpg, jpeg, gif")
return "Allowed file types are png, jpg, jpeg, gif", 400
# 加密数据
encrypted_message = encrypt_message(message)
message = encrypted_message
# 插入数据到 MySQL
cursor = mysql.connection.cursor()
cursor.execute("INSERT INTO messages (message, photo_url, status) VALUES (%s, %s, %s)", (message, photo_url, 'pending'))
mysql.connection.commit()
cursor.close()
logger.info(f"Message sent: {message}, Photo URL: {photo_url}")
return f"Message and photo (if uploaded) have been received. Message: {message}\nPhoto URL: {photo_url if photo_url else 'N/A'}"
return render_template('send_message.html')
@app.route('/messages', methods=['GET'])
@login_required
def messages():
logger.info("Handling messages request")
# 角色检查
if current_user.role != '指挥者':
logger.warning("Unauthorized access to messages")
if request.accept_mimetypes.best_match(['application/json', 'text/html']) == 'application/json':
return jsonify({"error": "Unauthorized access"}), 403
else:
return redirect(url_for('login'))
# 获取消息
cursor = mysql.connection.cursor()
cursor.execute("SELECT id, message, photo_url, created_at, status FROM messages ORDER BY created_at DESC")
result = cursor.fetchall()
cursor.close()
# 解密消息
messages = []
for row in result:
decrypted_message = decrypt_message(row['message'])
messages.append({
'id': row['id'],
'message': decrypted_message,
'photo_url': row['photo_url'],
'created_at': row['created_at'],
'status': row['status']
})
# 根据请求类型返回不同的响应
if request.accept_mimetypes.best_match(['application/json', 'text/html']) == 'application/json':
return jsonify(messages)
else:
return render_template('view_messages.html', messages=messages)
if __name__ == '__main__':
# generate_key() # 仅在第一次运行时生成密钥
app.run(debug=True, host='0.0.0.0', port=8000)
# # 测试加密和解密
# test_message = "This is a test message"
# encrypted_message = encrypt_message(test_message)
# decrypted_message = decrypt_message(encrypted_message)
# print(f"Original message: {test_message}")
# print(f"Encrypted message: {encrypted_message}")
# print(f"Decrypted message: {decrypted_message}")

Binary file not shown.

File diff suppressed because it is too large Load Diff

@ -0,0 +1,20 @@
from cryptography.fernet import Fernet
import os
# 生成一个新的Fernet密钥
key = Fernet.generate_key()
# 指定密钥文件的路径
key_path = 'secret.key'
# 将密钥保存到文件中
with open(key_path, 'wb') as key_file:
key_file.write(key)
# 打印密钥以便查看
# print(f"Generated key: {key.decode()}")
# 确保密钥文件的权限设置为只读
os.chmod(key_path, 0o400)
print(f"Key has been saved to {key_path}")

@ -0,0 +1,75 @@
import re
import random
def generate_valid_latitude():
# 生成有效的纬度坐标 (0°-90°)
latitude_degree = random.randint(0, 90)
latitude_minute = random.randint(0, 59)
latitude_second = random.randint(0, 59)
latitude_direction = random.choice(['N', 'S'])
return f"{latitude_degree}°{latitude_minute}{latitude_second}{latitude_direction}"
def generate_valid_longitude():
# 生成有效的经度坐标 (0°-180°)
longitude_degree = random.randint(0, 180)
longitude_minute = random.randint(0, 59)
longitude_second = random.randint(0, 59)
longitude_direction = random.choice(['E', 'W'])
return f"{longitude_degree}°{longitude_minute}{longitude_second}{longitude_direction}"
def generate_invalid_latitude():
# 生成错误的纬度坐标 (超过90°或负值)
latitude_degree = random.randint(91, 180)
latitude_minute = random.randint(0, 59)
latitude_second = random.randint(0, 59)
latitude_direction = random.choice(['N', 'S'])
return f"{latitude_degree}°{latitude_minute}{latitude_second}{latitude_direction}"
def generate_invalid_longitude():
# 生成错误的经度坐标 (超过180°或负值)
longitude_degree = random.randint(181, 360)
longitude_minute = random.randint(0, 59)
longitude_second = random.randint(0, 59)
longitude_direction = random.choice(['E', 'W'])
return f"{longitude_degree}°{longitude_minute}{longitude_second}{longitude_direction}"
def generate_invalid_coordinate():
# 随机决定生成哪种类型的错误坐标
if random.choice([True, False]):
# 纬度正确,经度错误
return f"{generate_valid_latitude()},{generate_invalid_longitude()}"
else:
# 纬度错误,经度正确
return f"{generate_invalid_latitude()},{generate_valid_longitude()}"
def insert_invalid_coordinates(file_path, num_invalid_coords):
# 读取文件内容
with open(file_path, 'r', encoding='utf-8') as file:
original_content = file.read()
# 正则表达式匹配现有的经纬度坐标
pattern = r'(\d{1,2}°[0-5]?\d[0-5]?\d″[NS]),(\d{1,3}°[0-5]?\d[0-5]?\d″[WE])'
matches = re.findall(pattern, original_content)
# 将现有的坐标转换为字符串列表
valid_coords = [f"{match[0]},{match[1]}" for match in matches]
# 生成并插入错误坐标
invalid_coords = [generate_invalid_coordinate() for _ in range(num_invalid_coords)]
all_coords = valid_coords + invalid_coords
# 打乱所有坐标的顺序
random.shuffle(all_coords)
# 写回文件
with open(file_path, 'w', encoding='utf-8') as file:
file.write("\n".join(all_coords))
# 指定文件路径和要插入的错误坐标数量
file_path = r"E:\_Ufo\000jiegou\TheBattleCar\coordinate.txt"
num_invalid_coords = 1000
# 插入错误坐标
insert_invalid_coordinates(file_path, num_invalid_coords)
print(f"已成功将 {num_invalid_coords} 个错误坐标插入到 {file_path} 文件中。")

@ -0,0 +1,33 @@
import re
def parse_coordinates(file_path):
valid_coordinates = []
with open(file_path, 'r', encoding='utf-8') as file:
for line in file:
match = re.match(r'(\d+°\d+\d+″[NS]),(\d+°\d+\d+″[EW])', line.strip())
if match:
lat, lon = match.groups()
lat_value = parse_dms(lat)
lon_value = parse_dms(lon)
if -90 <= lat_value <= 90 and -180 <= lon_value <= 180:
valid_coordinates.append((lat, lon))
return valid_coordinates
def parse_dms(dms):
parts = re.split('[°′″]', dms)
degrees = int(parts[0])
minutes = int(parts[1])
seconds = float(parts[2])
direction = parts[3]
total_degrees = degrees + minutes / 60 + seconds / 3600
if direction in ['S', 'W']:
total_degrees = -total_degrees
return total_degrees
file_path = 'e:\\_Ufo\\000jiegou\\TheBattleCar\\coordinate.txt'
valid_coords = parse_coordinates(file_path)
for coord in valid_coords:
print(coord)

@ -0,0 +1 @@
Ex5GV0VAWNvGixTKY9rKEf_kTrAFPeEO7L3zL3rHzVY=

@ -0,0 +1,29 @@
// server.js
const express = require('express');
const http = require('http');
const socketIo = require('socket.io');
const app = express();
const server = http.createServer(app);
const io = socketIo(server);
let clients = {};
io.on('connection', (socket) => {
console.log('A client connected:', socket.id);
clients[socket.id] = { status: 'connected' };
socket.on('disconnect', () => {
console.log('A client disconnected:', socket.id);
clients[socket.id] = { status: 'disconnected' };
});
// 定时发送客户端状态
setInterval(() => {
io.emit('clients_status', clients);
}, 5000);
});
server.listen(3000, () => {
console.log('Server listening on port 3000');
});

@ -9,7 +9,7 @@
display: flex;
justify-content: center;
align-items: center;
background: linear-gradient(#141e30,#243b55);
background: linear-gradient(#141e30, #243b55);
}
h1, h2 {
text-align: center;
@ -71,6 +71,10 @@
li img {
vertical-align: middle;
margin-right: 10px;
max-width: 100px;
}
.center-button{
text-align: center;
}
</style>
</head>
@ -113,6 +117,36 @@
<li>No images found in the directory.</li>
{% endif %}
</ul>
<form action="{{ url_for('clear_photos') }}" method="post" class="center-button">
<button type="submit">清空照片</button>
</form>
<h2>侦察者发送的消息和图片</h2>
<ul>
{% if messages %}
{% for message in messages %}
<li>
<strong>消息:</strong> {{ message.message }}
- <em>{{ message.created_at }}</em>
- <span>状态: {{ message.status }}</span>
{% if message.photo_url %}
<img src="{{ message.photo_url }}" alt="Message Photo">
{% endif %}
<form action="{{ url_for('handle_message', message_id=message.id, action='accept') }}" method="post" style="display: inline;">
<button type="submit">接受</button>
</form>
<form action="{{ url_for('handle_message', message_id=message.id, action='reject') }}" method="post" style="display: inline;">
<button type="submit">打回</button>
</form>
</li>
{% endfor %}
{% else %}
<li>没有消息和图片。</li>
{% endif %}
</ul>
<form action="{{ url_for('clear_messages') }}" method="post" class="center-button">
<button type="submit">清空消息</button>
</form>
</div>
</body>
</html>

@ -0,0 +1,39 @@
<!-- templates/network.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Network Connection Status</title>
<script src="https://cdnjs.cloudflare.com/ajax/libs/socket.io/4.0.1/socket.io.min.js"></script>
<style>
body { font-family: Arial, sans-serif; }
#network { padding: 20px; }
.client {
color: white;
padding: 10px;
margin: 5px;
border-radius: 5px;
}
.connected { background-color: green; }
.disconnected { background-color: red; }
</style>
</head>
<body>
<h1>Client Connection Status</h1>
<div id="network"></div>
<script>
const socket = io('http://localhost:3000');
const networkElement = document.getElementById('network');
socket.on('clients_status', (clients) => {
networkElement.innerHTML = '';
for (const [id, info] of Object.entries(clients)) {
const clientElement = document.createElement('div');
clientElement.className = 'client ' + info.status;
clientElement.textContent = `Client ID: ${id} - Status: ${info.status}`;
networkElement.appendChild(clientElement);
}
});
</script>
</body>
</html>

@ -5,12 +5,88 @@
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>侦查者</title>
<link rel="stylesheet" href="/static/login.css">
<style>
body {
color: black; /* 字体颜色改为白色 */
font-size: 12px; /* 字体大小调小 */
}
.login h2, .login button {
color: white; /* 按钮和标题字体颜色也改为白色 */
font-size: 14px; /* 调整按钮和标题的字体大小 */
}
.notification-list {
list-style-type: none;
padding: 0;
}
.notification-list li {
padding: 10px;
margin: 10px 0;
border: 1px solid #ccc;
border-radius: 4px;
background-color: #f9f9f9;
word-wrap: break-word; /* 防止长单词溢出 */
}
.notification-list li strong {
margin-right: 10px;
}
.notification-list li span {
font-style: italic;
color: #777;
}
</style>
</head>
<body>
<div class="login">
<h2>侦查者</h2>
<button onclick="window.location.href='http://192.168.146.178:5000/'" class="register-button">控制小车</button>
<button onclick="window.location.href='http://169.254.142.1:5000/'" class="register-button">控制小车</button>
<button onclick="window.location.href='{{ url_for('send_message') }}'" class="register-button">发送消息</button>
<h2>通知</h2>
<ul class="notification-list">
{% if notifications %}
{% for notification in notifications %}
<li>
消息ID: {{ notification.message_id }} - 操作: {{ notification.action }} - 时间: {{ notification.created_at }} - 内容: {{ notification.message }}
</li>
{% endfor %}
{% else %}
<li>没有通知。</li>
{% endif %}
</ul>
<!-- 文件上传表单 -->
<h2>上传文件</h2>
<form action="{{ url_for('upload_file') }}" method="post" enctype="multipart/form-data">
<input type="file" name="file" accept=".png, .jpg, .jpeg, .gif">
<input type="submit" value="Upload">
</form>
<button onclick="clearNotifications()" class="register-button">清空通知</button>
</div>
<script>
function clearNotifications() {
// 发送请求清空通知
fetch('/clear_notifications', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify({ user_id: {{ current_user.id }} })
})
.then(response => response.json())
.then(data => {
if (data.success) {
location.reload(); // 刷新页面以显示更新后的通知列表
} else {
alert('清空通知失败');
}
})
.catch(error => {
console.error('Error:', error);
alert('清空通知失败');
});
}
</script>
</body>
</html>

@ -1,6 +1,16 @@
# test.py
from models import db, User
import re
user = User(username='commander', role='指挥者')
db.session.add(user)
db.session.commit()
# 隐藏坐标字符串
hidden_string = "x3f9a2c8d1e5b7f4c39d52f48n1a2f116d24f20e9b8c7d2e3f9a2c8d1e5b7f4cg9m5h6k2l0j7n8p4qv2wx3r4t5y6u7i8o9p0l1k2m3h4g5f6e7d8c9n4p3o2i1u0y5t4r3w2x1qvx9g8m7h6k5l4j3n2p1o0v9b8e7d6c5f4a3g2d1e0z39°5248″N39d52f48n1a2f116d24f20e9b8c7d2e3f9a2c8d1e5b7f4cg9m5h6k2l0j7n8p4qv2wx3r4t5y6u7i8o9p0l1k2m3h4g5f6e7d8c9n4p3o2i1u0y5t4r3w2x1qvx9g8m7h6k5l4j3n2p1o0v9b8e7d6c5f4a3g2d1e0z116°2420″E39d52f48n1a2f116d24f20e9b8c7d2e3f9a2c8d1e5b7f4cg9m5h6k2l0j7n8p4qv2wx3r4t5y6u7i8o9p0l1k2m3h4g5f6e7d8c9n4p3o2i1u0y5t4r3w2x1qvx9g8m7h6k5l4j3n2p1o0v9b8e7d6c5f4a3g2d1e0z"
# 使用正则表达式提取经纬度坐标
pattern = r'(\d{2}°\d{2}\d{2}″[NE])'
matches = re.findall(pattern, hidden_string)
# 打印解码后的经纬度坐标
if len(matches) >= 2:
latitude = matches[0]
longitude = matches[1]
print(f"解码后的经纬度坐标是:{latitude}, {longitude}")
else:
print("未能解码出完整的经纬度坐标。")

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 49 KiB

Loading…
Cancel
Save