You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tentest/doc/DjangoBlog/djangoblog/utils.py

408 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# encoding: utf-8
# 指定脚本解释器为Python编码格式为UTF-8
import logging #wr 日志模块,用于记录系统运行信息
import os #wr 操作系统交互模块,用于文件路径、目录操作
import random #wr 随机数模块,用于生成随机数据
import string #wr 字符串模块,包含字符串常量(如数字、字母等)
import uuid #wr 用于生成唯一标识符
from hashlib import sha256 #wr 哈希算法模块用于计算SHA-256哈希值
import bleach #wr HTML清理库用于过滤不安全的HTML标签和属性防止XSS攻击
import markdown #wr Markdown转换库用于将Markdown文本转为HTML
import requests #wr HTTP请求库用于发送网络请求如下载图片
from django.conf import settings #wr Django配置模块用于获取项目设置
from django.contrib.sites.models import Site #wr Django站点模型用于管理网站域名等信息
from django.core.cache import cache #wr Django缓存模块用于缓存数据提升性能
from django.templatetags.static import static #wr Django静态文件工具用于生成静态文件URL
#wr 初始化日志记录器(指定记录器名称为当前模块)
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
wr获取文章和评论的最大ID
用于系统初始化或数据校验时获取当前最大的文章ID和评论ID
:return: 元组 (最大文章ID, 最大评论ID)
"""
from blog.models import Article # 导入文章模型
from comments.models import Comment # 导入评论模型
#wr 返回最新文章的ID和最新评论的ID假设模型有pk字段作为主键
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
wr计算字符串的SHA-256哈希值
用于密码加密、数据校验等场景(哈希值不可逆,确保数据安全)
:param str: 待哈希的字符串
:return: 哈希后的十六进制字符串
"""
#wr 创建SHA-256哈希对象对字符串进行UTF-8编码后计算哈希
m = sha256(str.encode('utf-8'))
return m.hexdigest() #wr 返回十六进制格式的哈希结果
def cache_decorator(expiration=3 * 60):
"""
wr缓存装饰器用于缓存函数返回结果减少重复计算或数据库查询
:param expiration: 缓存过期时间默认3分钟
:return: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
#wr 尝试从视图对象中获取缓存键适用于Django视图函数
view = args[0]
key = view.get_cache_key()
except:
#wr 若无法从视图获取,则基于函数、参数生成唯一缓存键
key = None
if not key:
#wr 将函数和参数转换为字符串,确保唯一性
unique_str = repr((func, args, kwargs))
#wr 计算字符串的SHA-256哈希作为缓存键避免键过长
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# wr尝试从缓存中获取数据
value = cache.get(key)
if value is not None:
#wr 缓存命中:返回缓存值(处理默认空值标记)
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
#wr 缓存未命中:执行原函数获取结果
logger.debug(f'cache_decorator set cache:{func.__name__} key:{key}')
value = func(*args, **kwargs)
# wr根据结果设置缓存空结果用特殊标记避免缓存穿透
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
wr刷新指定视图的缓存
用于在数据更新后主动清除对应视图的缓存,确保用户获取最新数据
:param path: 视图对应的URL路径'/article/1/'
:param servername: 服务器域名(如'example.com'
:param serverport: 服务器端口(如'80'
:param key_prefix: 缓存键前缀(可选)
:return: 布尔值True表示缓存清除成功False表示未找到缓存
'''
from django.http import HttpRequest # wr导入HTTP请求类
from django.utils.cache import get_cache_key #wr 导入获取缓存键的工具
# wr构建模拟请求对象用于生成缓存键
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# wr获取该请求对应的缓存键
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info(f'expire_view_cache:get key:{path}')
# wr若缓存存在则删除
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator() # wr应用缓存装饰器默认缓存3分钟
def get_current_site():
"""
wr 用于生成绝对URL、网站标题等场景缓存减轻数据库压力
:return: Site模型实例
"""
site = Site.objects.get_current() # 获取当前站点Django默认功能
return site
class CommonMarkdown:
"""
wrMarkdown处理工具类提供Markdown文本到HTML的转换功能支持代码高亮和目录生成
"""
@staticmethod
def _convert_markdown(value):
"""
wr 内部方法执行Markdown转换
:param value: Markdown格式的文本
:return: 元组 (转换后的HTML内容, 目录HTML)
"""
# wr初始化Markdown转换器启用必要扩展
# - extra: 支持表格、脚注等扩展语法
# - codehilite: 代码高亮
# - toc: 生成目录
# - tables: 表格支持extra已包含这里冗余确保兼容性
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value) # 转换文本为HTML
toc = md.toc # 获取生成的目录HTML
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""
wr转换Markdown文本为HTML并返回内容和目录
:param value: Markdown文本
:return: 元组 (HTML内容, 目录HTML)
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""
wr转换Markdown文本为HTML仅返回内容忽略目录
:param value: Markdown文本
:return: HTML内容字符串
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
wr 发送邮件通过Django信号机制解耦邮件发送逻辑
用于用户注册验证、评论通知等场景
:param emailto: 收件人邮箱
:param title: 邮件标题
:param content: 邮件内容
"""
from djangoblog.blog_signals import send_email_signal # 导入邮件发送信号
#wr 发送信号实际发送逻辑由信号接收者实现如SMTP发送
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""
wr生成6位数字验证码
用于用户登录、注册等场景的身份验证
:return: 6位数字字符串
"""
#wr 从数字字符集中随机选择6个字符并拼接
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
wr将字典转换为URL查询参数字符串{'a':1, 'b':2} → 'a=1&b=2'
用于构建带参数的URL
:param dict: 键值对字典
:return: URL查询参数字符串
"""
from urllib.parse import quote #wr 导入URL编码工具
#wr 对键和值进行URL编码处理特殊字符然后拼接为key=value&key=value格式
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
"""
wr获取博客系统设置如网站名称、描述等带缓存机制
用于网站全局配置的统一管理
:return: BlogSettings模型实例
"""
#wr 尝试从缓存获取
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings #wr 导入博客设置模型
#wr 若数据库中无设置记录,创建默认设置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog' #wr 网站名称
setting.site_description = '基于Django的博客系统' #wr 网站描述
setting.site_seo_description = '基于Django的博客系统' #wr SEO描述
setting.site_keywords = 'Django,Python' #wr 网站关键词
setting.article_sub_length = 300 #wr 文章摘要长度
setting.sidebar_article_count = 10 #wr 侧边栏文章数量
setting.sidebar_comment_count = 5 #wr 侧边栏评论数量
setting.show_google_adsense = False #wr 是否显示谷歌广告
setting.open_site_comment = True #wr 是否开启评论
setting.analytics_code = '' #wr 统计代码如Google Analytics
setting.beian_code = '' #wr 备案号
setting.show_gongan_code = False #wr 是否显示公安备案号
setting.comment_need_review = False #wr 评论是否需要审核
setting.save() #wr 保存默认设置
#wr 从数据库获取设置
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value) #wr 缓存设置
return value
def save_user_avatar(url):
'''
wr下载并保存用户头像到本地静态文件目录
用于用户上传头像或第三方登录时获取头像
:param url: 头像图片的URL
:return: 本地头像的静态文件URL异常时返回默认头像
'''
logger.info(url) #wr 记录头像URL
try:
#wr 定义头像保存目录项目静态文件目录下的avatar文件夹
basedir = os.path.join(settings.STATICFILES, 'avatar')
#wr 发送GET请求下载图片超时2秒
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200: # 下载成功
#wr 若目录不存在则创建
if not os.path.exists(basedir):
os.makedirs(basedir)
#wr 支持的图片扩展名
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
#wr 判断URL是否指向图片文件
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
#wr 提取扩展名(非图片文件默认用.jpg
ext = os.path.splitext(url)[1] if isimage else '.jpg'
#wr 生成唯一文件名UUID避免重复
save_filename = str(uuid.uuid4().hex) + ext
logger.info(f'保存用户头像:{basedir}{save_filename}')
#wr 写入文件
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
#wr 返回静态文件URL如/static/avatar/xxx.jpg
return static('avatar/' + save_filename)
except Exception as e:
#wr 异常处理(如网络错误、文件写入失败等)
logger.error(e)
#wr 返回默认头像URL
return static('blog/img/avatar.png')
def delete_sidebar_cache():
"""
wr删除侧边栏相关缓存
用于侧边栏数据(如热门文章、最新评论)更新后刷新缓存
"""
from blog.models import LinkShowType # 导入链接展示类型模型
#wr 生成所有侧边栏缓存键基于LinkShowType的取值
keys = ["sidebar" + x for x in LinkShowType.values]
#wr 逐个删除缓存
for k in keys:
logger.info(f'delete sidebar key:{k}')
cache.delete(k)
def delete_view_cache(prefix, keys):
"""
wr删除Django模板片段缓存
用于模板中通过{% cache %}标签缓存的内容更新后刷新
:param prefix: 缓存前缀(对应{% cache %}的第一个参数)
:param keys: 缓存键的动态部分(对应{% cache %}的后续参数)
"""
from django.core.cache.utils import make_template_fragment_key #wr 生成模板缓存键的工具
#wr 生成模板片段缓存键
key = make_template_fragment_key(prefix, keys)
cache.delete(key) #wr 删除缓存
def get_resource_url():
"""
wr获取静态资源的基础URL
用于统一管理静态文件路径如CSS、JS、图片等
:return: 静态资源基础URL字符串
"""
if settings.STATIC_URL:
#wr 若项目设置中定义了STATIC_URL直接使用
return settings.STATIC_URL
else:
#wr 否则使用当前站点域名拼接静态目录路径
site = get_current_site()
return 'http://' + site.domain + '/static/'
#wr HTML清理配置限制允许的HTML标签防止XSS攻击
#wr 只允许必要的标签,避免<script>、<iframe>等危险标签
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p', 'span', 'div']
# 允许的class属性值白名单仅包含代码高亮相关的class如codehilite、hll等
# 防止恶意注入样式类影响页面布局或触发XSS
ALLOWED_CLASSES = [
'codehilite', 'highlight', 'hll', 'c', 'err', 'k', 'l', 'n', 'o', 'p', 'cm', 'cp', 'c1', 'cs',
'gd', 'ge', 'gr', 'gh', 'gi', 'go', 'gp', 'gs', 'gu', 'gt', 'kc', 'kd', 'kn', 'kp', 'kr', 'kt',
'ld', 'm', 'mf', 'mh', 'mi', 'mo', 'na', 'nb', 'nc', 'no', 'nd', 'ni', 'ne', 'nf', 'nl', 'nn',
'nt', 'nv', 'ow', 'w', 'mb', 'mh', 'mi', 'mo', 'sb', 'sc', 'sd', 'se', 'sh', 'si', 'sx', 's2',
's1', 'ss', 'bp', 'vc', 'vg', 'vi', 'il'
]
def class_filter(tag, name, value):
"""
wr自定义class属性过滤器仅保留白名单中的class值
用于bleach清理HTML时过滤不安全的class属性
:param tag: HTML标签名'span'
:param name: 属性名(这里固定为'class'
:param value: 属性值(如'codehilite myclass'
:return: 过滤后的class值仅包含白名单中的类或False表示移除该属性
"""
if name == 'class':
#wr 拆分class值保留在白名单中的类
allowed_classes = [cls for cls in value.split() if cls in ALLOWED_CLASSES]
#wr 拼接允许的类若为空则返回False移除class属性
return ' '.join(allowed_classes) if allowed_classes else False
return value # 非class属性直接返回
#wr 允许的HTML属性白名单为每个标签指定允许的属性
#wr 如'a'标签允许'href'和'title',防止'onclick'等危险属性
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title'],
'abbr': ['title'],
'acronym': ['title'],
'span': class_filter, #wr 使用自定义过滤器处理span的class属性
'div': class_filter, #wr 使用自定义过滤器处理div的class属性
'pre': class_filter, #wr 使用自定义过滤器处理pre的class属性
'code': class_filter #wr 使用自定义过滤器处理code的class属性
}
#wr 允许的URL协议白名单限制链接只能使用安全协议
#wr 防止javascript:、data:等危险协议触发XSS
ALLOWED_PROTOCOLS = ['http', 'https', 'mailto']
def sanitize_html(html):
"""
wr安全清理HTML内容防止XSS攻击
使用bleach库按白名单过滤不安全的标签、属性和协议
:param html: 原始HTML字符串
:return: 清理后的安全HTML字符串
"""
return bleach.clean(
html,
tags=ALLOWED_TAGS, #wr 允许的标签
attributes=ALLOWED_ATTRIBUTES, #wr 允许的属性(含过滤器)
protocols=ALLOWED_PROTOCOLS, #wr 允许的URL协议
strip=True, #wr 移除不允许的标签(而非转义)
strip_comments=True #wr 移除HTML注释防止注释中的恶意代码
)