You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

688 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# Android客户端API接口 - 仅用于认证和通讯录
from flask import Blueprint, request, jsonify
from functools import wraps
from models import db, User, Contact
import secrets
api = Blueprint('api', __name__, url_prefix='/api')
# Token存储 (生产环境建议使用Redis)
api_tokens = {}
def token_required(f):
@wraps(f)
def decorated(*args, **kwargs):
token = request.headers.get('Authorization')
if not token:
return jsonify({'code': 401, 'message': '缺少认证令牌'}), 401
if token.startswith('Bearer '):
token = token[7:]
user_id = api_tokens.get(token)
if not user_id:
return jsonify({'code': 401, 'message': '无效或过期的令牌'}), 401
user = User.query.get(user_id)
if not user or not user.is_active:
return jsonify({'code': 401, 'message': '用户不存在或已禁用'}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def success_response(data=None, message='success'):
return jsonify({'code': 200, 'message': message, 'data': data})
def error_response(message, code=400):
return jsonify({'code': code, 'message': message}), code
# ==================== 认证接口 ====================
@api.route('/auth/login', methods=['POST'])
def api_login():
"""用户登录"""
data = request.get_json()
if not data:
return error_response('请求数据无效')
username = data.get('username')
password = data.get('password')
if not username or not password:
return error_response('用户名和密码不能为空')
user = User.query.filter_by(username=username).first()
if not user or not user.check_password(password):
return error_response('用户名或密码错误')
if not user.is_active:
return error_response('账户已被禁用')
token = secrets.token_hex(32)
api_tokens[token] = user.id
return success_response({
'token': token,
'user': {
'id': user.id,
'username': user.username,
'email': user.email,
'is_admin': user.is_admin
}
}, '登录成功')
@api.route('/auth/register', methods=['POST'])
def api_register():
"""用户注册"""
data = request.get_json()
if not data:
return error_response('请求数据无效')
username = data.get('username')
email = data.get('email')
password = data.get('password')
if not username or not email or not password:
return error_response('用户名、邮箱和密码不能为空')
if len(password) < 6:
return error_response('密码至少6位')
if User.query.filter_by(username=username).first():
return error_response('用户名已存在')
if User.query.filter_by(email=email).first():
return error_response('邮箱已被注册')
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
return success_response(message='注册成功')
@api.route('/auth/logout', methods=['POST'])
@token_required
def api_logout():
"""用户登出"""
token = request.headers.get('Authorization', '').replace('Bearer ', '')
if token in api_tokens:
del api_tokens[token]
return success_response(message='登出成功')
@api.route('/auth/profile', methods=['GET'])
@token_required
def api_profile():
"""获取用户信息"""
user = request.current_user
return success_response({
'id': user.id,
'username': user.username,
'email': user.email,
'is_admin': user.is_admin,
'created_at': user.created_at.isoformat()
})
@api.route('/auth/password', methods=['PUT'])
@token_required
def api_change_password():
"""修改密码"""
data = request.get_json()
old_password = data.get('old_password')
new_password = data.get('new_password')
if not old_password or not new_password:
return error_response('原密码和新密码不能为空')
if len(new_password) < 6:
return error_response('新密码至少6位')
user = request.current_user
if not user.check_password(old_password):
return error_response('原密码错误')
user.set_password(new_password)
db.session.commit()
return success_response(message='密码修改成功')
# ==================== 通讯录接口 ====================
def contact_to_dict(contact):
return {
'id': contact.id,
'name': contact.name,
'email': contact.email,
'note': contact.note or '',
'created_at': contact.created_at.isoformat()
}
@api.route('/contacts', methods=['GET'])
@token_required
def api_contacts():
"""获取通讯录列表"""
user = request.current_user
contacts = Contact.query.filter_by(user_id=user.id).order_by(Contact.name).all()
return success_response({
'contacts': [contact_to_dict(c) for c in contacts],
'total': len(contacts)
})
@api.route('/contacts', methods=['POST'])
@token_required
def api_add_contact():
"""添加联系人"""
user = request.current_user
data = request.get_json()
name = data.get('name')
email = data.get('email')
note = data.get('note', '')
if not name or not email:
return error_response('姓名和邮箱不能为空')
existing = Contact.query.filter_by(user_id=user.id, email=email).first()
if existing:
return error_response('该邮箱已在通讯录中')
contact_user = User.query.filter_by(email=email).first()
contact = Contact(
user_id=user.id,
contact_user_id=contact_user.id if contact_user else None,
name=name,
email=email,
note=note
)
db.session.add(contact)
db.session.commit()
return success_response(contact_to_dict(contact), '添加成功')
@api.route('/contacts/<int:id>', methods=['DELETE'])
@token_required
def api_delete_contact(id):
"""删除联系人"""
user = request.current_user
contact = Contact.query.filter_by(id=id, user_id=user.id).first_or_404()
db.session.delete(contact)
db.session.commit()
return success_response(message='删除成功')
@api.route('/contacts/<int:id>', methods=['PUT'])
@token_required
def api_update_contact(id):
"""更新联系人"""
user = request.current_user
contact = Contact.query.filter_by(id=id, user_id=user.id).first_or_404()
data = request.get_json()
if data.get('name'):
contact.name = data.get('name')
if 'note' in data:
contact.note = data.get('note', '')
db.session.commit()
return success_response(contact_to_dict(contact), '更新成功')
@api.route('/contacts/<int:id>/emails', methods=['GET'])
@token_required
def api_contact_emails(id):
"""获取与联系人的往来邮件"""
from models import Email
from sqlalchemy import or_, and_
user = request.current_user
contact = Contact.query.filter_by(id=id, user_id=user.id).first_or_404()
emails = Email.query.filter(
or_(
and_(Email.sender_id == user.id, Email.recipient_address == contact.email, Email.is_draft == False),
and_(Email.recipient_id == user.id, Email.sender_address == contact.email, Email.is_deleted == False)
)
).order_by(Email.created_at.desc()).all()
def email_to_dict(e):
return {
'id': e.id,
'sender_address': e.sender_address,
'recipient_address': e.recipient_address,
'subject': e.subject or '(无主题)',
'body': e.body or '',
'created_at': e.created_at.isoformat(),
'is_read': e.is_read,
'is_sent': e.sender_id == user.id,
'is_starred': e.is_starred
}
return success_response({
'contact': contact_to_dict(contact),
'emails': [email_to_dict(e) for e in emails],
'total': len(emails)
})
# ==================== 星标邮件接口 ====================
@api.route('/emails/starred', methods=['GET'])
@token_required
def api_starred_emails():
"""获取星标邮件列表"""
from models import Email
user = request.current_user
emails = Email.query.filter_by(
recipient_id=user.id,
is_deleted=False,
is_starred=True
).order_by(Email.created_at.desc()).all()
def email_to_dict(e):
return {
'id': e.id,
'sender_address': e.sender_address,
'recipient_address': e.recipient_address,
'subject': e.subject or '(无主题)',
'body': e.body or '',
'created_at': e.created_at.isoformat(),
'is_read': e.is_read,
'is_starred': e.is_starred
}
return success_response({
'emails': [email_to_dict(e) for e in emails],
'total': len(emails)
})
@api.route('/emails/<int:id>/star', methods=['POST'])
@token_required
def api_toggle_star(id):
"""切换邮件星标状态"""
from models import Email
user = request.current_user
email = Email.query.filter_by(id=id, recipient_id=user.id).first_or_404()
email.is_starred = not email.is_starred
db.session.commit()
return success_response({
'id': email.id,
'is_starred': email.is_starred
}, '星标状态已更新')
@api.route('/emails/star-by-info', methods=['POST'])
@token_required
def api_toggle_star_by_info():
"""通过邮件信息切换星标状态用于POP3客户端"""
from models import Email
user = request.current_user
data = request.get_json()
sender = data.get('sender', '')
subject = data.get('subject', '')
if not sender:
return error_response('发件人不能为空')
# 通过发件人和主题查找邮件
email = Email.query.filter_by(
recipient_id=user.id,
sender_address=sender,
subject=subject,
is_deleted=False
).order_by(Email.created_at.desc()).first()
if not email:
return error_response('邮件不存在', 404)
email.is_starred = not email.is_starred
db.session.commit()
return success_response({
'id': email.id,
'is_starred': email.is_starred
}, '星标状态已更新')
@api.route('/emails/check-starred', methods=['POST'])
@token_required
def api_check_starred():
"""检查邮件是否已星标用于POP3客户端"""
from models import Email
user = request.current_user
data = request.get_json()
sender = data.get('sender', '')
subject = data.get('subject', '')
email = Email.query.filter_by(
recipient_id=user.id,
sender_address=sender,
subject=subject,
is_deleted=False
).order_by(Email.created_at.desc()).first()
if not email:
return success_response({'is_starred': False, 'found': False})
return success_response({
'id': email.id,
'is_starred': email.is_starred,
'found': True
})
@api.route('/contacts/auto-add', methods=['POST'])
@token_required
def api_auto_add_contact():
"""自动添加联系人(发送邮件时调用)"""
user = request.current_user
data = request.get_json()
email = data.get('email')
if not email:
return error_response('邮箱不能为空')
# 检查是否已存在
existing = Contact.query.filter_by(user_id=user.id, email=email).first()
if existing:
return success_response(contact_to_dict(existing), '联系人已存在')
# 自动添加
contact_user = User.query.filter_by(email=email).first()
name = data.get('name') or email.split('@')[0]
contact = Contact(
user_id=user.id,
contact_user_id=contact_user.id if contact_user else None,
name=name,
email=email
)
db.session.add(contact)
db.session.commit()
return success_response(contact_to_dict(contact), '联系人已添加')
@api.route('/contacts/search', methods=['GET'])
@token_required
def api_search_contacts():
"""搜索联系人"""
user = request.current_user
keyword = request.args.get('q', '')
if not keyword:
return error_response('请输入搜索关键词')
contacts = Contact.query.filter(
Contact.user_id == user.id,
(Contact.name.contains(keyword) | Contact.email.contains(keyword))
).order_by(Contact.name).all()
return success_response({
'contacts': [contact_to_dict(c) for c in contacts],
'total': len(contacts)
})
# ==================== 管理员接口 ====================
def admin_required(f):
@wraps(f)
def decorated(*args, **kwargs):
if not request.current_user.is_admin:
return error_response('需要管理员权限', 403)
return f(*args, **kwargs)
return decorated
@api.route('/admin/dashboard', methods=['GET'])
@token_required
@admin_required
def api_admin_dashboard():
"""获取管理后台仪表盘数据"""
from models import Email, Log
from config import load_config
config = load_config()
user_count = User.query.count()
email_count = Email.query.count()
return success_response({
'user_count': user_count,
'email_count': email_count,
'smtp_port': config.get('smtp_port', 2525),
'pop3_port': config.get('pop3_port', 1100),
'domain': config.get('domain', 'localhost')
})
@api.route('/admin/users', methods=['GET'])
@token_required
@admin_required
def api_admin_users():
"""获取用户列表"""
users = User.query.order_by(User.id).all()
return success_response({
'users': [{
'id': u.id,
'username': u.username,
'email': u.email,
'is_admin': u.is_admin,
'is_active': u.is_active,
'created_at': u.created_at.isoformat()
} for u in users],
'total': len(users)
})
@api.route('/admin/users', methods=['POST'])
@token_required
@admin_required
def api_admin_add_user():
"""添加用户"""
data = request.get_json()
username = data.get('username')
email = data.get('email')
password = data.get('password')
is_admin = data.get('is_admin', False)
if not username or not email or not password:
return error_response('用户名、邮箱和密码不能为空')
if User.query.filter_by(username=username).first():
return error_response('用户名已存在')
if User.query.filter_by(email=email).first():
return error_response('邮箱已存在')
user = User(username=username, email=email, is_admin=is_admin)
user.set_password(password)
db.session.add(user)
db.session.commit()
return success_response(message='用户添加成功')
@api.route('/admin/users/<int:id>/toggle', methods=['POST'])
@token_required
@admin_required
def api_admin_toggle_user(id):
"""启用/禁用用户"""
user = User.query.get_or_404(id)
if user.id == request.current_user.id:
return error_response('不能禁用自己')
user.is_active = not user.is_active
db.session.commit()
return success_response({
'is_active': user.is_active
}, '用户状态已更新')
@api.route('/admin/users/<int:id>', methods=['DELETE'])
@token_required
@admin_required
def api_admin_delete_user(id):
"""删除用户"""
from models import Email, Contact
user = User.query.get_or_404(id)
if user.id == request.current_user.id:
return error_response('不能删除自己')
# 清除其他用户通讯录中对该用户的引用(必须在删除用户前执行)
db.session.execute(
db.text("UPDATE contact SET contact_user_id = NULL WHERE contact_user_id = :uid"),
{'uid': id}
)
# 删除用户的通讯录
db.session.execute(
db.text("DELETE FROM contact WHERE user_id = :uid"),
{'uid': id}
)
# 删除用户发送和接收的邮件
db.session.execute(
db.text("DELETE FROM email WHERE sender_id = :uid OR recipient_id = :uid"),
{'uid': id}
)
# 删除用户
db.session.execute(
db.text("DELETE FROM user WHERE id = :uid"),
{'uid': id}
)
db.session.commit()
return success_response(message='用户已删除')
@api.route('/admin/filters', methods=['GET'])
@token_required
@admin_required
def api_admin_filters():
"""获取过滤规则列表"""
from models import EmailFilter
filters = EmailFilter.query.order_by(EmailFilter.id).all()
return success_response({
'filters': [{
'id': f.id,
'filter_type': f.filter_type,
'value': f.value,
'action': f.action,
'created_at': f.created_at.isoformat()
} for f in filters],
'total': len(filters)
})
@api.route('/admin/filters', methods=['POST'])
@token_required
@admin_required
def api_admin_add_filter():
"""添加过滤规则"""
from models import EmailFilter
data = request.get_json()
filter_type = data.get('filter_type')
value = data.get('value')
action = data.get('action', 'block')
if not filter_type or not value:
return error_response('类型和值不能为空')
f = EmailFilter(filter_type=filter_type, value=value, action=action)
db.session.add(f)
db.session.commit()
return success_response(message='过滤规则添加成功')
@api.route('/admin/filters/<int:id>', methods=['DELETE'])
@token_required
@admin_required
def api_admin_delete_filter(id):
"""删除过滤规则"""
from models import EmailFilter
f = EmailFilter.query.get_or_404(id)
db.session.delete(f)
db.session.commit()
return success_response(message='过滤规则已删除')
@api.route('/admin/logs', methods=['GET'])
@token_required
@admin_required
def api_admin_logs():
"""获取日志列表"""
from models import Log
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 50, type=int)
logs = Log.query.order_by(Log.created_at.desc()).paginate(page=page, per_page=per_page)
return success_response({
'logs': [{
'id': log.id,
'level': log.level,
'message': log.message,
'source': log.source,
'ip_address': log.ip_address,
'created_at': log.created_at.isoformat()
} for log in logs.items],
'total': logs.total,
'page': logs.page,
'pages': logs.pages,
'has_next': logs.has_next,
'has_prev': logs.has_prev
})
@api.route('/admin/logs/clear', methods=['POST'])
@token_required
@admin_required
def api_admin_clear_logs():
"""清空日志"""
from models import Log
Log.query.delete()
db.session.commit()
return success_response(message='日志已清空')
@api.route('/admin/broadcast', methods=['POST'])
@token_required
@admin_required
def api_admin_broadcast():
"""群发邮件"""
from models import Email
data = request.get_json()
subject = data.get('subject', '')
body = data.get('body', '')
if not subject or not body:
return error_response('主题和内容不能为空')
current = request.current_user
users = User.query.filter(User.id != current.id).all()
for user in users:
email = Email(
sender_id=current.id,
recipient_id=user.id,
sender_address=current.email,
recipient_address=user.email,
subject=subject,
body=body,
raw_data=f'From: {current.email}\r\nTo: {user.email}\r\nSubject: {subject}\r\n\r\n{body}'
)
db.session.add(email)
db.session.commit()
return success_response({
'sent_count': len(users)
}, f'已向 {len(users)} 个用户发送群发邮件')