添加 utils.py 注释

jyf_branch
姜雨菲 4 months ago
parent ee999e6f1d
commit 6d0a39a831

@ -17,41 +17,61 @@ from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
#姜雨菲: 创建当前模块的日志记录器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
获取最新文章和评论的ID
用于获取当前系统中最新发布的文章ID和最新的评论ID
"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
对字符串进行SHA256加密
:param str: 需要加密的字符串
:return: 加密后的十六进制字符串
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""
缓存装饰器
用于缓存函数返回结果减少重复计算默认缓存3分钟
:param expiration: 缓存过期时间
:return: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从第一个参数(通常是视图实例)获取缓存键
view = args[0]
key = view.get_cache_key()
except:
# 获取失败时自动生成缓存键
key = None
if not key:
# 根据函数和参数生成唯一字符串,用于创建缓存键
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# 尝试从缓存获取数据
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
# 缓存命中时返回结果(过滤默认占位值)
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
# 缓存未命中时执行原函数并缓存结果
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
@ -70,19 +90,22 @@ def cache_decorator(expiration=3 * 60):
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
手动删除指定URL路径的视图缓存
:param path: URL路径
:param servername: 主机名
:param serverport: 端口号
:param key_prefix: 缓存键前缀
:return: 是否刷新成功布尔值
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 构造模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除缓存
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
@ -94,19 +117,34 @@ def expire_view_cache(path, servername, serverport, key_prefix=None):
@cache_decorator()
def get_current_site():
"""
获取当前站点信息带缓存
从Django的Site模型获取当前站点配置结果缓存3分钟
:return: Site模型实例
"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""
Markdown解析工具类
提供Markdown文本转HTML的功能支持代码高亮目录生成等
"""
@staticmethod
def _convert_markdown(value):
"""
内部Markdown转换方法
配置Markdown解析器并转换文本
:param value: Markdown格式文本
:return: (转换后的HTML内容, 目录HTML)
"""
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
'extra', # 额外功能(表格、脚注等)
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value)
@ -115,16 +153,33 @@ class CommonMarkdown:
@staticmethod
def get_markdown_with_toc(value):
"""
Markdown转HTML带目录
:param value: Markdown格式文本
:return: (HTML内容, 目录HTML)
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""
Markdown转HTML仅内容不含目录
:param value: Markdown格式文本
:return: 转换后的HTML内容
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
发送邮件通过信号机制
触发邮件发送信号解耦邮件发送逻辑
:param emailto: 收件人邮箱
:param title: 邮件标题
:param content: 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
@ -134,11 +189,17 @@ def send_email(emailto, title, content):
def generate_code() -> str:
"""生成随机数验证码"""
"""生成6位随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
将字典转换为URL查询字符串
对键值对进行URL编码避免特殊字符问题
:param dict: 待转换的字典
:return: URL查询字符串格式key1=value1&key2=value2
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
@ -146,11 +207,17 @@ def parse_dict_to_url(dict):
def get_blog_setting():
"""
获取博客系统配置带缓存
从数据库获取博客全局配置无配置时创建默认配置结果缓存
:return: BlogSettings模型实例
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
# 无配置时初始化默认配置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
@ -175,33 +242,46 @@ def get_blog_setting():
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
保存用户头像到本地静态文件目录
从URL下载头像并保存支持常见图片格式失败时返回默认头像
:param url: 头像图片URL
:return: 本地头像的静态文件URL
'''
logger.info(url)
try:
# 定义头像保存目录
basedir = os.path.join(settings.STATICFILES, 'avatar')
# 下载头像图片
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
# 目录不存在时创建
if not os.path.exists(basedir):
os.makedirs(basedir)
# 验证图片格式
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
# 生成唯一文件名
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
# 写入文件
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
# 返回静态文件URL
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
# 异常时返回默认头像
return static('blog/img/avatar.png')
def delete_sidebar_cache():
"""
删除侧边栏缓存
根据LinkShowType的所有值生成缓存键并删除
"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
@ -210,12 +290,22 @@ def delete_sidebar_cache():
def delete_view_cache(prefix, keys):
"""
删除模板片段缓存
:param prefix: 缓存前缀
:param keys: 缓存键列表
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""
获取静态资源基础URL
优先使用settings中的STATIC_URL无配置时使用站点域名拼接/static/
:return: 静态资源基础URL
"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
@ -223,10 +313,12 @@ def get_resource_url():
return 'http://' + site.domain + '/static/'
# HTML清理配置 - 防止XSS攻击
# 允许的HTML标签白名单
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p', 'span', 'div']
# 安全的class值白名单 - 只允许代码高亮相关的class
# 允许的CSS类白名单主要用于代码高亮
ALLOWED_CLASSES = [
'codehilite', 'highlight', 'hll', 'c', 'err', 'k', 'l', 'n', 'o', 'p', 'cm', 'cp', 'c1', 'cs',
'gd', 'ge', 'gr', 'gh', 'gi', 'go', 'gp', 'gs', 'gu', 'gt', 'kc', 'kd', 'kn', 'kp', 'kr', 'kt',
@ -236,37 +328,45 @@ ALLOWED_CLASSES = [
]
def class_filter(tag, name, value):
"""自定义class属性过滤器"""
"""
自定义class属性过滤器
只保留ALLOWED_CLASSES中的CSS类过滤危险或未授权的类名
:param tag: HTML标签名
:param name: 属性名
:param value: 属性值
:return: 过滤后的属性值无合法类时返回False
"""
if name == 'class':
# 只允许预定义的安全class值
allowed_classes = [cls for cls in value.split() if cls in ALLOWED_CLASSES]
return ' '.join(allowed_classes) if allowed_classes else False
return value
# 安全的属性白名单
# 允许的HTML属性白名单
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title'],
'abbr': ['title'],
'acronym': ['title'],
'span': class_filter,
'div': class_filter,
'pre': class_filter,
'code': class_filter
'a': ['href', 'title'], # 链接标签允许的属性
'abbr': ['title'], # 缩写标签允许的属性
'acronym': ['title'], # 首字母缩写标签允许的属性
'span': class_filter, # span标签的class属性使用自定义过滤器
'div': class_filter, # div标签的class属性使用自定义过滤器
'pre': class_filter, # pre标签的class属性使用自定义过滤器
'code': class_filter # code标签的class属性使用自定义过滤器
}
# 安全的协议白名单 - 防止javascript:等危险协议
# 允许的URL协议白名单防止javascript:等危险协议)
ALLOWED_PROTOCOLS = ['http', 'https', 'mailto']
def sanitize_html(html):
"""
安全的HTML清理函数
使用bleach库进行白名单过滤防止XSS攻击
使用bleach库过滤危险HTML内容防止XSS攻击
:param html: 需要清理的HTML字符串
:return: 安全的HTML字符串
"""
return bleach.clean(
html,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRIBUTES,
protocols=ALLOWED_PROTOCOLS, # 限制允许的协议
strip=True, # 移除不允许的标签而不是转义
html,
tags=ALLOWED_TAGS, # 只允许白名单中的标签
attributes=ALLOWED_ATTRIBUTES, # 只允许白名单中的属性
protocols=ALLOWED_PROTOCOLS, # 限制URL协议
strip=True, # 移除不允许的标签(而非转义)
strip_comments=True # 移除HTML注释
)
)
Loading…
Cancel
Save