You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

385 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import random
import string
from flask import Flask, render_template, request, redirect, url_for, flash, session, get_flashed_messages, jsonify, g
from datetime import timedelta
import os
from flask_mail import Message
from flask_migrate import Migrate
from werkzeug.security import generate_password_hash, check_password_hash
from werkzeug.utils import secure_filename
import config
from exts import db, mail
from forms import RegisterForm, LoginForm
from models import UserModel, EmailCaptchaModel, VideoUri, FavoriteVideo
from sqlalchemy.orm import sessionmaker
app = Flask(__name__)
app.config.from_object(config)
app.config['SESSION_TYPE'] = 'filesystem'
app.config['SECRET_KEY'] = os.urandom(24) # 随机数密钥
app.config['PERMANENT_SESSION_LIFETIME'] = timedelta(minutes=500) # 生成过期日期
db.init_app(app)
mail.init_app(app)
migration = Migrate(app, db)
video_information = []
connected_video_information = []
@app.route('/index')
def index():
return render_template("index.html")
@app.before_request
def my_before_request():
username = session.get('username')
if username:
user = UserModel.query.filter_by(username=username).first()
setattr(g, "user", user)
# print(f"{username}+cookie") # 已成功,测试用
else:
setattr(g, "user", None)
@app.route('/resources')
def resources():
Session_Mysql = sessionmaker()
session_Mysql = Session_Mysql()
id = session.get('user_id')
user_id = id
user = UserModel.query.filter_by(id=user_id).first()
# if user:
# session['user_url'] = user.user_avatar_url
# print(session.get('user_url'))
# session.update('user_url') 开了就会报错不想解决了就是要去更新session['user_url']的值,但是不知道怎么更新
video = session_Mysql.query(VideoUri.id, VideoUri.video_img_src, VideoUri.video_url,
VideoUri.video_tite).filter_by()
video_lists = db.session.execute(video).all()
video_information.clear()
for video_list in video_lists:
video_information.append({
'video_id': video_list[0],
'video_href': video_list[2],
'video_img_src': video_list[1],
'video_title': video_list[3]
})
user_name = session.get('username')
id = session.get('user_id')
favorite = session_Mysql.query(FavoriteVideo.video_id).filter_by(favorite_id=id)
favorite_list = db.session.execute(favorite).all() # 根据用户id获得所有收藏视频id
connected_video_information.clear()
for i in favorite_list: # 遍历
for j in i:
video = session_Mysql.query(VideoUri.id, VideoUri.video_img_src, VideoUri.video_url,
VideoUri.video_tite).filter_by(id=j)
video_list = db.session.execute(video).all()
for k in video_list: # 更新视频收藏表
connected_video_information.append({
'video_id': k[0],
'video_href': k[2],
'video_img_src': k[1],
'video_title': k[3]
})
print(f"connected_video_information={connected_video_information}")
# print(f"video_information={video_information}")
# print(f"first_session['video_information']={session.get('video_information')}")
return render_template("resources.html", video_information=video_information,
connected_video_information=connected_video_information, user= user)
@app.context_processor
def my_context_processor():
return {'user': g.user}
# g.user是全局变量可以在html中使用
# user的表
# 测试用实际要sql操作
# 有关数据库的操作(以UserModel表为例)
# 1.添加行
# user = UserModel(email=email, username=username, password=generate_password_hash(password),stu_id=stu_id)
# db.session.add(user)
# db.session.commit()
# 2.删除行,下面的所有first保证有值输出(在数据库中找不到会用NONE代替),以免报错
# user = UserModel.query.filter_by(email=email).first()
# UserModel表 表里的email字段=传入的email
# db.session.delete(user)
# db.session.commit()
# 3.修改行
# user = UserModel.query.filter_by(email=email).first()
# user.username = username
# db.session.commit()
# 4.查询行
# user = UserModel.query.filter_by(email=email).first()
# print(user.username)
# 5.查询所有行
# users = UserModel.query.all()
# for user in users:
# print(user.username)
# 测试用实际要sql操作
@app.route('/login', methods=['POST', 'GET'])
def login():
if request.method == 'POST':
form = LoginForm(request.form)
if form.validate():
email = form.email.data
password = form.password.data
user = UserModel.query.filter_by(email=email).first()
if not user:
print("邮箱未注册")
return redirect('/login')
# 下面是验证密码是否正确,check_password_hash()函数的第一个参数是数据库中的密码,第二个参数是用户输入的密码
if check_password_hash(user.password, password):
print("登录成功")
# return jsonify({'code': 200, 'message': '登录成功'})
session['username'] = user.username
session['user_id'] = user.id
session['user_avatar_url'] = user.user_avatar_url
session['user_signature'] = user.user_say
print(f"session['user_signature']={session.get('user_signature')}")
# print(f"{session['username']}+test") # 已成功,测试用
return render_template("resources.html", user_name=user.username, video_information=video_information)
else:
print("密码错误")
return redirect('/login')
else:
print(form.errors)
return redirect('/login')
return render_template('login.html')
return render_template('login.html')
@app.route('/users_manage', methods=['POST', 'GET'])
def users_managing():
user_id = session.get('user_id')
user = UserModel.query.filter_by(id=user_id).first()
return render_template('user_manage.html', user=user)
@app.route('/upload-avatar', methods=['POST', 'GET'])
def upload_avatar():
file = request.files['avatar']
user_id = session.get('user_id')
user = UserModel.query.filter_by(id=user_id).first()
if file:
# Generate a secure filename
filename = secure_filename(file.filename)
print(f"filename={filename}")
# Specify the directory path for saving uploaded files
upload_folder = 'static/imgs/users'
os.makedirs(upload_folder, exist_ok=True)
# Save the file with the new name in the upload folder
new_filename = str(filename) # Provide the desired new filename with the appropriate extension
file.save(os.path.join(upload_folder, new_filename))
user.user_avatar_url = f"imgs/users/{new_filename}"
db.session.commit()
print(f"user.user_avatar_url={session.get('user_url')}")
# Continue processing or return a response
return 'File uploaded successfully.'
return 'No file uploaded.'
@app.route('/upload-stu_id', methods=['POST', 'GET'])
def upload_stu_id():
update_stu_id_dict = request.get_json()
user_id = update_stu_id_dict['user_id']
user = UserModel.query.filter_by(id=user_id).first()
user.stu_id = update_stu_id_dict['stu_id']
db.session.commit()
print(f"user.stu_id={user.stu_id}")
return jsonify(list({'statick', 'ok'}))
@app.route('/upload-email', methods=['POST', 'GET'])
def upload_email():
return 'ok'
@app.route('/upload-address', methods=['POST', 'GET'])
def upload_address():
update_address_dict = request.get_json()
user_id = update_address_dict['user_id']
user = UserModel.query.filter_by(id=user_id).first()
user.address = update_address_dict['address']
db.session.commit()
return jsonify(list({'statick', 'ok'}))
@app.route('/upload-enterprise', methods=['POST', 'GET'])
def upload_enterprise():
update_enterprise_dict = request.get_json()
user_id = update_enterprise_dict['user_id']
user = UserModel.query.filter_by(id=user_id).first()
user.enterprise = update_enterprise_dict['enterprise']
db.session.commit()
return jsonify(list({'statick', 'ok'}))
@app.route('/user_data', methods=['GET'])
def user_data():
username = session.get('username')
user_id = session.get('user_id')
user_url = session.get('user_avatar_url')
user_signature = session.get('user_signature')
print(f"{user_url}+1111111111111") # 已成功,测试用
# print(f"{username}+1111111111111") # 已成功,测试用
if username:
# 根据需要获取用户数据的逻辑,例如从数据库中查询用户信息
# ...
# 构造包含用户名的响应数据
# data = {'username': username}
data = {'username': username, 'user_id': user_id, 'user_url': user_url, 'user_signature': user_signature}
print(jsonify(data))
print(data)
# 返回 JSON 格式的响应数据
return jsonify(data)
else:
# 如果用户未登录,则返回空数据或错误信息
return jsonify({'error': 'User not logged in'})
@app.route('/logout')
def logout():
session.clear()
return redirect('/index')
'''@app.route('/user/favorite')
def get_favorite():
Session_Mysql = sessionmaker()
session_Mysql = Session_Mysql()
user = get_flashed_messages(session['username']) #从session中获取用户名
id = session_Mysql.query(UserModel.id).filter_by(username=user) #查找id
favorite = session_Mysql.query(FavoriteVideo.video_id).filter_by(favorite_id=id) #用id查找收藏的视频id
favorite_uri_list = [] #创建收藏视频地址的列表
for i in favorite:
favorite_uri_list.append(session_Mysql.query(VideoUri.video_uri).filter_by(id=i))
session['favorite_list'] = favorite_uri_list #将视频地址列表放入session中
return render_template()这里填一个页面
''' # 这个可能需要改进,还要完成用户收藏页面(应该也可以直接在用户页面显示)
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
return render_template('register.html')
else:
form = RegisterForm(request.form)
if form.validate():
email = form.email.data
username = form.username.data
password = form.password.data
stu_id = form.stu_id.data
user = UserModel(email=email, username=username, password=generate_password_hash(password), stu_id=stu_id)
db.session.add(user)
db.session.commit()
return redirect('/login')
else:
print(form.errors)
return redirect('/register')
@app.route('/captcha/email')
def captcha_email():
email = request.args.get('email')
source = string.digits * 4
captcha = ''.join(random.sample(source, 4))
message = Message(subject="数据库作业验证码", recipients=[email], body=f"验证码为:{captcha}")
mail.send(message)
email_captcha = EmailCaptchaModel(email=email, captcha=captcha)
db.session.add(email_captcha)
db.session.commit()
# RESTful API
# {code: 200/400/500, message: '', data: {}}
return jsonify({"code": 200, "message": '', "data": None})
@app.after_request
def add_header(response):
response.cache_control.no_store = True
return response
@app.route('/resources/video/collection_update', methods=['POST'])
def update_video_collection():
data = request.get_json() # Get the JSON data from the request
print(data)
username = data['username']
video_href = data['video_href']
id = data['id']
user_id = data['user_id']
# video_tite = data['video_tite']
favo = FavoriteVideo.query.filter_by(video_id=id,
favorite_id=user_id).first() # 这里查找是否已经收藏过了如果为空则没有收藏因为进入resources时就已经更新过了
if favo is None:
fav = FavoriteVideo(video_id=id, favorite_id=user_id)
db.session.add(fav)
db.session.commit()
video = VideoUri.query.filter_by(id=id).first()
video_information.append({
'video_id': id,
'video_href': video_href,
'video_img_src': video.video_img_src,
# 'video_tite': video_tite,
})
return jsonify({'static': 'OK'})
@app.route('/resources/video/collecting_cancel', methods=['POST'])
def cancel_collecting():
data = request.get_json()
print(data)
username = data['username']
video_href = data['video_href']
id = data['id']
user_id = data['user_id']
fav = FavoriteVideo.query.filter_by(favorite_id=user_id, video_id=id).first()
db.session.delete(fav)
db.session.commit()
return jsonify({'static': 'OK'})
@app.route('/resources/video/init', methods=['POST'])
def videoPage_init():
local_connected_video_information = []
Session_Mysql = sessionmaker()
session_Mysql = Session_Mysql()
id = request.get_json()['user_id']
favorite = session_Mysql.query(FavoriteVideo.video_id).filter_by(favorite_id=id)
favorite_list = db.session.execute(favorite).all() # 根据用户id获得所有收藏视频id
local_connected_video_information.clear()
for i in favorite_list: # 遍历
for j in i:
video = session_Mysql.query(VideoUri.id, VideoUri.video_img_src, VideoUri.video_url,
VideoUri.video_tite).filter_by(id=j)
video_list = db.session.execute(video).all()
print(f"video_list: {video_list}")
for k in video_list: # 更新视频收藏表
local_connected_video_information.append({
'video_id': k[0],
'video_href': k[2],
'video_img_src': k[1],
'video_tite': k[3]
})
print(f"local_connected_video_information: {local_connected_video_information}")
return jsonify(local_connected_video_information)
if __name__ == '__main__':
app.run(debug=True)