Update utils.py

hjn_branch
plqo32bax 3 months ago
parent d76912faaa
commit 2103ba08dd

@ -1,7 +1,6 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
@ -9,41 +8,61 @@ import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
import bleach # HTML清理库用于防止XSS攻击
import markdown # Markdown解析库
import requests # HTTP请求库
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
from django.contrib.sites.models import Site # Django站点框架
from django.core.cache import cache # Django缓存框架
from django.templatetags.static import static # 静态文件URL生成
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""获取最大的文章ID和评论ID"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""计算字符串的SHA256哈希值
Args:
str: 要计算哈希的字符串
Returns:
str: 64位的十六进制哈希值
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""缓存装饰器,用于缓存函数结果
Args:
expiration: 缓存过期时间默认3分钟
Returns:
function: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从视图类获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
# 如果没有特定的缓存键,根据函数参数生成唯一键
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# 尝试从缓存获取结果
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
@ -52,9 +71,8 @@ def cache_decorator(expiration=3 * 60):
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
# 缓存未命中,执行函数并缓存结果
logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
@ -68,21 +86,26 @@ def cache_decorator(expiration=3 * 60):
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''刷新视图缓存
Args:
path: URL路径
servername: 服务器主机名
serverport: 服务器端口
key_prefix: 缓存键前缀
Returns:
bool: 是否成功删除
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 创建模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
@ -92,40 +115,62 @@ def expire_view_cache(path, servername, serverport, key_prefix=None):
return False
@cache_decorator()
@cache_decorator() # 应用缓存装饰器
def get_current_site():
"""获取当前站点信息"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""Markdown处理工具类"""
@staticmethod
def _convert_markdown(value):
"""内部方法转换Markdown为HTML
Args:
value: Markdown格式文本
Returns:
tuple: (HTML内容, 目录)
"""
# 配置Markdown扩展
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
'extra', # 额外语法支持
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value)
toc = md.toc
body = md.convert(value) # 转换Markdown为HTML
toc = md.toc # 获取目录
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""获取带目录的Markdown转换结果"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""获取Markdown转换结果不含目录"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""发送邮件(通过信号机制)
Args:
emailto: 收件人邮箱
title: 邮件标题
content: 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
# 使用Django信号发送邮件实现解耦[9,10,11](@ref)
send_email_signal.send(
send_email.__class__,
emailto=emailto,
@ -134,11 +179,19 @@ def send_email(emailto, title, content):
def generate_code() -> str:
"""生成随机数验证码"""
"""生成6位随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""将字典转换为URL参数字符串
Args:
dict: 参数字典
Returns:
str: URL参数字符串
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
@ -146,12 +199,19 @@ def parse_dict_to_url(dict):
def get_blog_setting():
"""获取博客设置,使用缓存提高性能[6,8](@ref)
Returns:
BlogSettings: 博客设置对象
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
# 缓存未命中,从数据库获取
from blog.models import BlogSettings
if not BlogSettings.objects.count():
# 如果不存在设置,创建默认设置
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
@ -169,39 +229,44 @@ def get_blog_setting():
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
cache.set('get_blog_setting', value) # 设置缓存
return value
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''保存用户头像到本地
Args:
url: 头像URL地址
Returns:
str: 本地静态文件路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
rsp = requests.get(url, timeout=2) # 下载头像
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
os.makedirs(basedir) # 创建目录
# 检查图片扩展名
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
save_filename = str(uuid.uuid4().hex) + ext # 生成唯一文件名
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
file.write(rsp.content) # 保存文件
return static('avatar/' + save_filename) # 返回静态文件URL
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""删除侧边栏相关缓存"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
@ -210,12 +275,19 @@ def delete_sidebar_cache():
def delete_view_cache(prefix, keys):
"""删除视图缓存
Args:
prefix: 缓存前缀
keys: 缓存键
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""获取资源URL基础路径"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
@ -223,10 +295,19 @@ def get_resource_url():
return 'http://' + site.domain + '/static/'
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
# HTML标签和属性白名单用于防止XSS攻击
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i',
'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
"""清理HTML移除不安全的标签和属性
Args:
html: 要清理的HTML内容
Returns:
str: 安全的HTML内容
"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
Loading…
Cancel
Save