You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

524 lines
19 KiB

from flask import Flask, render_template, request, redirect, url_for, flash, jsonify
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from flask_cors import CORS
from models import db, User, Email, EmailFilter, Log, Contact
from config import Config, load_config, save_config
from smtp_server import SMTPServer
from pop3_server import POP3Server
from api import api
import socket
app = Flask(__name__)
app.config.from_object(Config)
CORS(app) # 允许跨域请求
db.init_app(app)
app.register_blueprint(api) # 注册API蓝图
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'
server_config = load_config()
smtp_server = None
pop3_server = None
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
def admin_required(f):
from functools import wraps
@wraps(f)
def decorated(*args, **kwargs):
if not current_user.is_authenticated or not current_user.is_admin:
flash('需要管理员权限', 'error')
return redirect(url_for('index'))
return f(*args, **kwargs)
return decorated
@app.route('/')
def index():
if current_user.is_authenticated:
return redirect(url_for('inbox'))
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username')
password = request.form.get('password')
user = User.query.filter_by(username=username).first()
if user and user.check_password(password) and user.is_active:
login_user(user)
return redirect(url_for('inbox'))
flash('用户名或密码错误', 'error')
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
confirm = request.form.get('confirm')
if password != confirm:
flash('两次密码输入不一致', 'error')
elif User.query.filter_by(username=username).first():
flash('用户名已存在', 'error')
elif User.query.filter_by(email=email).first():
flash('邮箱已被注册', 'error')
elif len(password) < 6:
flash('密码至少6位', 'error')
else:
user = User(username=username, email=email)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('注册成功,请登录', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route('/inbox')
@login_required
def inbox():
emails = Email.query.filter_by(recipient_id=current_user.id, is_deleted=False).order_by(Email.created_at.desc()).all()
return render_template('inbox.html', emails=emails)
@app.route('/starred')
@login_required
def starred():
emails = Email.query.filter_by(recipient_id=current_user.id, is_deleted=False, is_starred=True).order_by(Email.created_at.desc()).all()
return render_template('starred.html', emails=emails)
@app.route('/email/<int:id>/star', methods=['POST'])
@login_required
def toggle_star(id):
email = Email.query.get_or_404(id)
if email.recipient_id == current_user.id:
email.is_starred = not email.is_starred
db.session.commit()
if request.headers.get('X-Requested-With') == 'XMLHttpRequest':
return jsonify({'success': True, 'is_starred': email.is_starred})
flash('已更新星标状态', 'success')
return redirect(request.referrer or url_for('inbox'))
@app.route('/sent')
@login_required
def sent():
emails = Email.query.filter_by(sender_id=current_user.id, is_draft=False).order_by(Email.created_at.desc()).all()
return render_template('sent.html', emails=emails)
@app.route('/drafts')
@login_required
def drafts():
emails = Email.query.filter_by(sender_id=current_user.id, is_draft=True).order_by(Email.created_at.desc()).all()
return render_template('drafts.html', emails=emails)
@app.route('/email/<int:id>')
@login_required
def view_email(id):
email = Email.query.get_or_404(id)
if email.recipient_id == current_user.id:
email.is_read = True
db.session.commit()
return render_template('view_email.html', email=email)
@app.route('/compose', methods=['GET', 'POST'])
@login_required
def compose():
draft_id = request.args.get('draft_id', type=int)
draft = None
if draft_id:
draft = Email.query.filter_by(id=draft_id, sender_id=current_user.id, is_draft=True).first()
if request.method == 'POST':
recipient = request.form.get('recipient', '')
subject = request.form.get('subject', '')
body = request.form.get('body', '')
action = request.form.get('action', 'send')
if action == 'draft':
if draft:
draft.recipient_address = recipient
draft.subject = subject
draft.body = body
else:
draft = Email(
sender_id=current_user.id,
sender_address=current_user.email,
recipient_address=recipient,
subject=subject,
body=body,
is_draft=True
)
db.session.add(draft)
db.session.commit()
flash('草稿已保存', 'success')
return redirect(url_for('drafts'))
else:
recipient_user = User.query.filter_by(email=recipient).first()
if draft:
draft.recipient_id = recipient_user.id if recipient_user else None
draft.recipient_address = recipient
draft.subject = subject
draft.body = body
draft.is_draft = False
draft.raw_data = f'From: {current_user.email}\r\nTo: {recipient}\r\nSubject: {subject}\r\n\r\n{body}'
else:
email = Email(
sender_id=current_user.id,
recipient_id=recipient_user.id if recipient_user else None,
sender_address=current_user.email,
recipient_address=recipient,
subject=subject,
body=body,
raw_data=f'From: {current_user.email}\r\nTo: {recipient}\r\nSubject: {subject}\r\n\r\n{body}'
)
db.session.add(email)
# 自动为发件人创建收件人的通讯录条目
if recipient:
existing_contact = Contact.query.filter_by(user_id=current_user.id, email=recipient).first()
if not existing_contact:
default_name = recipient.split('@')[0] if '@' in recipient else recipient
contact = Contact(
user_id=current_user.id,
contact_user_id=recipient_user.id if recipient_user else None,
name=default_name,
email=recipient
)
db.session.add(contact)
# 如果收件人是系统用户,也为收件人创建发件人的通讯录条目
if recipient_user:
existing_contact = Contact.query.filter_by(user_id=recipient_user.id, email=current_user.email).first()
if not existing_contact:
default_name = current_user.email.split('@')[0] if '@' in current_user.email else current_user.username
contact = Contact(
user_id=recipient_user.id,
contact_user_id=current_user.id,
name=default_name,
email=current_user.email
)
db.session.add(contact)
db.session.commit()
flash('邮件发送成功', 'success')
return redirect(url_for('sent'))
return render_template('compose.html', draft=draft)
@app.route('/draft/delete/<int:id>')
@login_required
def delete_draft(id):
draft = Email.query.filter_by(id=id, sender_id=current_user.id, is_draft=True).first_or_404()
db.session.delete(draft)
db.session.commit()
flash('草稿已删除', 'success')
return redirect(url_for('drafts'))
@app.route('/delete/<int:id>')
@login_required
def delete_email(id):
email = Email.query.get_or_404(id)
if email.recipient_id == current_user.id:
email.is_deleted = True
db.session.commit()
flash('邮件已删除', 'success')
return redirect(url_for('inbox'))
# 通讯录功能
@app.route('/contacts')
@login_required
def contacts():
contact_list = Contact.query.filter_by(user_id=current_user.id).order_by(Contact.name).all()
return render_template('contacts.html', contacts=contact_list)
@app.route('/contacts/add', methods=['POST'])
@login_required
def add_contact():
name = request.form.get('name')
email = request.form.get('email')
note = request.form.get('note', '')
existing = Contact.query.filter_by(user_id=current_user.id, email=email).first()
if existing:
flash('该邮箱已在通讯录中', 'error')
else:
contact_user = User.query.filter_by(email=email).first()
contact = Contact(
user_id=current_user.id,
contact_user_id=contact_user.id if contact_user else None,
name=name,
email=email,
note=note
)
db.session.add(contact)
db.session.commit()
flash('联系人添加成功', 'success')
return redirect(url_for('contacts'))
@app.route('/contacts/delete/<int:id>')
@login_required
def delete_contact(id):
contact = Contact.query.filter_by(id=id, user_id=current_user.id).first_or_404()
db.session.delete(contact)
db.session.commit()
flash('联系人已删除', 'success')
return redirect(url_for('contacts'))
@app.route('/contacts/edit/<int:id>', methods=['GET', 'POST'])
@login_required
def edit_contact(id):
contact = Contact.query.filter_by(id=id, user_id=current_user.id).first_or_404()
if request.method == 'POST':
contact.name = request.form.get('name', contact.name)
contact.note = request.form.get('note', '')
db.session.commit()
flash('联系人已更新', 'success')
return redirect(url_for('contacts'))
return render_template('edit_contact.html', contact=contact)
@app.route('/contacts/<int:id>')
@login_required
def contact_emails(id):
contact = Contact.query.filter_by(id=id, user_id=current_user.id).first_or_404()
# 获取与该联系人的所有往来邮件
from sqlalchemy import or_, and_
emails = Email.query.filter(
or_(
and_(Email.sender_id == current_user.id, Email.recipient_address == contact.email, Email.is_draft == False),
and_(Email.recipient_id == current_user.id, Email.sender_address == contact.email, Email.is_deleted == False)
)
).order_by(Email.created_at.desc()).all()
return render_template('contact_emails.html', contact=contact, emails=emails)
# 用户设置
@app.route('/settings', methods=['GET', 'POST'])
@login_required
def user_settings():
if request.method == 'POST':
action = request.form.get('action')
if action == 'password':
old_password = request.form.get('old_password')
new_password = request.form.get('new_password')
confirm = request.form.get('confirm_password')
if not current_user.check_password(old_password):
flash('原密码错误', 'error')
elif new_password != confirm:
flash('两次密码输入不一致', 'error')
elif len(new_password) < 6:
flash('新密码至少6位', 'error')
else:
current_user.set_password(new_password)
db.session.commit()
flash('密码修改成功', 'success')
return render_template('settings.html')
# Admin routes
@app.route('/admin')
@login_required
@admin_required
def admin_dashboard():
user_count = User.query.count()
email_count = Email.query.count()
return render_template('admin/dashboard.html', user_count=user_count, email_count=email_count, config=server_config)
@app.route('/admin/users')
@login_required
@admin_required
def admin_users():
users = User.query.all()
return render_template('admin/users.html', users=users)
@app.route('/admin/users/add', methods=['POST'])
@login_required
@admin_required
def admin_add_user():
username = request.form.get('username')
email = request.form.get('email')
password = request.form.get('password')
is_admin = request.form.get('is_admin') == 'on'
if User.query.filter_by(username=username).first():
flash('用户名已存在', 'error')
elif User.query.filter_by(email=email).first():
flash('邮箱已存在', 'error')
else:
user = User(username=username, email=email, is_admin=is_admin)
user.set_password(password)
db.session.add(user)
db.session.commit()
flash('用户添加成功', 'success')
return redirect(url_for('admin_users'))
@app.route('/admin/users/toggle/<int:id>')
@login_required
@admin_required
def admin_toggle_user(id):
user = User.query.get_or_404(id)
if user.id != current_user.id:
user.is_active = not user.is_active
db.session.commit()
return redirect(url_for('admin_users'))
@app.route('/admin/users/delete/<int:id>')
@login_required
@admin_required
def admin_delete_user(id):
user = User.query.get_or_404(id)
if user.id != current_user.id:
db.session.delete(user)
db.session.commit()
flash('用户已删除', 'success')
return redirect(url_for('admin_users'))
@app.route('/admin/settings', methods=['GET', 'POST'])
@login_required
@admin_required
def admin_settings():
global server_config
if request.method == 'POST':
server_config['smtp_port'] = int(request.form.get('smtp_port', 2525))
server_config['pop3_port'] = int(request.form.get('pop3_port', 1100))
server_config['domain'] = request.form.get('domain', 'localhost')
server_config['max_message_size'] = int(request.form.get('max_message_size', 10485760))
save_config(server_config)
flash('设置已保存,重启服务后生效', 'success')
return render_template('admin/settings.html', config=server_config)
@app.route('/admin/services')
@login_required
@admin_required
def admin_services():
smtp_status = smtp_server.running if smtp_server else False
pop3_status = pop3_server.running if pop3_server else False
return render_template('admin/services.html', smtp_status=smtp_status, pop3_status=pop3_status, config=server_config)
@app.route('/admin/services/toggle/<service>')
@login_required
@admin_required
def admin_toggle_service(service):
global smtp_server, pop3_server
if service == 'smtp':
if smtp_server and smtp_server.running:
smtp_server.stop()
else:
smtp_server = SMTPServer(app, server_config['smtp_port'], server_config['domain'])
smtp_server.start()
elif service == 'pop3':
if pop3_server and pop3_server.running:
pop3_server.stop()
else:
pop3_server = POP3Server(app, server_config['pop3_port'], server_config['domain'])
pop3_server.start()
return redirect(url_for('admin_services'))
@app.route('/admin/filters')
@login_required
@admin_required
def admin_filters():
filters = EmailFilter.query.all()
return render_template('admin/filters.html', filters=filters)
@app.route('/admin/filters/add', methods=['POST'])
@login_required
@admin_required
def admin_add_filter():
filter_type = request.form.get('filter_type')
value = request.form.get('value')
action = request.form.get('action', 'block')
f = EmailFilter(filter_type=filter_type, value=value, action=action)
db.session.add(f)
db.session.commit()
flash('过滤规则添加成功', 'success')
return redirect(url_for('admin_filters'))
@app.route('/admin/filters/delete/<int:id>')
@login_required
@admin_required
def admin_delete_filter(id):
f = EmailFilter.query.get_or_404(id)
db.session.delete(f)
db.session.commit()
flash('过滤规则已删除', 'success')
return redirect(url_for('admin_filters'))
@app.route('/admin/logs')
@login_required
@admin_required
def admin_logs():
page = request.args.get('page', 1, type=int)
logs = Log.query.order_by(Log.created_at.desc()).paginate(page=page, per_page=50)
return render_template('admin/logs.html', logs=logs)
@app.route('/admin/logs/clear')
@login_required
@admin_required
def admin_clear_logs():
Log.query.delete()
db.session.commit()
flash('日志已清空', 'success')
return redirect(url_for('admin_logs'))
@app.route('/admin/broadcast', methods=['GET', 'POST'])
@login_required
@admin_required
def admin_broadcast():
if request.method == 'POST':
subject = request.form.get('subject', '')
body = request.form.get('body', '')
users = User.query.filter(User.id != current_user.id).all()
for user in users:
email = Email(
sender_id=current_user.id,
recipient_id=user.id,
sender_address=current_user.email,
recipient_address=user.email,
subject=subject,
body=body,
raw_data=f'From: {current_user.email}\r\nTo: {user.email}\r\nSubject: {subject}\r\n\r\n{body}'
)
db.session.add(email)
db.session.commit()
flash(f'已向 {len(users)} 个用户发送群发邮件', 'success')
return redirect(url_for('admin_broadcast'))
return render_template('admin/broadcast.html')
def init_db():
with app.app_context():
db.create_all()
if not User.query.filter_by(username='admin').first():
admin = User(username='admin', email='admin@localhost', is_admin=True)
admin.set_password('admin123')
db.session.add(admin)
db.session.commit()
print('Created admin user: admin / admin123')
if __name__ == '__main__':
init_db()
smtp_server = SMTPServer(app, server_config['smtp_port'], server_config['domain'])
pop3_server = POP3Server(app, server_config['pop3_port'], server_config['domain'])
smtp_server.start()
pop3_server.start()
app.run(host='0.0.0.0', debug=True, port=5000, use_reloader=False)