You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DjangoBlog/djangoblog/utils.py

372 lines
12 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
#姜雨菲: 创建当前模块的日志记录器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
获取最新文章和评论的ID
用于获取当前系统中最新发布的文章ID和最新的评论ID
"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
对字符串进行SHA256加密
:param str: 需要加密的字符串
:return: 加密后的十六进制字符串
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""
缓存装饰器
用于缓存函数返回结果减少重复计算默认缓存3分钟
:param expiration: 缓存过期时间(秒)
:return: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从第一个参数(通常是视图实例)获取缓存键
view = args[0]
key = view.get_cache_key()
except:
# 获取失败时自动生成缓存键
key = None
if not key:
# 根据函数和参数生成唯一字符串,用于创建缓存键
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# 尝试从缓存获取数据
value = cache.get(key)
if value is not None:
# 缓存命中时返回结果(过滤默认占位值)
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
# 缓存未命中时执行原函数并缓存结果
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
手动删除指定URL路径的视图缓存
:param path: URL路径
:param servername: 主机名
:param serverport: 端口号
:param key_prefix: 缓存键前缀
:return: 是否刷新成功(布尔值)
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 构造模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除缓存
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""
获取当前站点信息(带缓存)
从Django的Site模型获取当前站点配置结果缓存3分钟
:return: Site模型实例
"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""
Markdown解析工具类
提供Markdown文本转HTML的功能支持代码高亮、目录生成等
"""
@staticmethod
def _convert_markdown(value):
"""
内部Markdown转换方法
配置Markdown解析器并转换文本
:param value: Markdown格式文本
:return: (转换后的HTML内容, 目录HTML)
"""
md = markdown.Markdown(
extensions=[
'extra', # 额外功能(表格、脚注等)
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value)
toc = md.toc
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""
Markdown转HTML带目录
:param value: Markdown格式文本
:return: (HTML内容, 目录HTML)
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""
Markdown转HTML仅内容不含目录
:param value: Markdown格式文本
:return: 转换后的HTML内容
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
发送邮件(通过信号机制)
触发邮件发送信号,解耦邮件发送逻辑
:param emailto: 收件人邮箱
:param title: 邮件标题
:param content: 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成6位随机数字验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
将字典转换为URL查询字符串
对键值对进行URL编码避免特殊字符问题
:param dict: 待转换的字典
:return: URL查询字符串格式key1=value1&key2=value2
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
"""
获取博客系统配置(带缓存)
从数据库获取博客全局配置,无配置时创建默认配置,结果缓存
:return: BlogSettings模型实例
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
# 无配置时初始化默认配置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
'''
保存用户头像到本地静态文件目录
从URL下载头像并保存支持常见图片格式失败时返回默认头像
:param url: 头像图片URL
:return: 本地头像的静态文件URL
'''
logger.info(url)
try:
# 定义头像保存目录
basedir = os.path.join(settings.STATICFILES, 'avatar')
# 下载头像图片
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
# 目录不存在时创建
if not os.path.exists(basedir):
os.makedirs(basedir)
# 验证图片格式
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
# 生成唯一文件名
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
# 写入文件
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
# 返回静态文件URL
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
# 异常时返回默认头像
return static('blog/img/avatar.png')
def delete_sidebar_cache():
"""
删除侧边栏缓存
根据LinkShowType的所有值生成缓存键并删除
"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""
删除模板片段缓存
:param prefix: 缓存前缀
:param keys: 缓存键列表
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""
获取静态资源基础URL
优先使用settings中的STATIC_URL无配置时使用站点域名拼接/static/
:return: 静态资源基础URL
"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
# HTML清理配置 - 防止XSS攻击
# 允许的HTML标签白名单
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p', 'span', 'div']
# 允许的CSS类白名单主要用于代码高亮
ALLOWED_CLASSES = [
'codehilite', 'highlight', 'hll', 'c', 'err', 'k', 'l', 'n', 'o', 'p', 'cm', 'cp', 'c1', 'cs',
'gd', 'ge', 'gr', 'gh', 'gi', 'go', 'gp', 'gs', 'gu', 'gt', 'kc', 'kd', 'kn', 'kp', 'kr', 'kt',
'ld', 'm', 'mf', 'mh', 'mi', 'mo', 'na', 'nb', 'nc', 'no', 'nd', 'ni', 'ne', 'nf', 'nl', 'nn',
'nt', 'nv', 'ow', 'w', 'mb', 'mh', 'mi', 'mo', 'sb', 'sc', 'sd', 'se', 'sh', 'si', 'sx', 's2',
's1', 'ss', 'bp', 'vc', 'vg', 'vi', 'il'
]
def class_filter(tag, name, value):
"""
自定义class属性过滤器
只保留ALLOWED_CLASSES中的CSS类过滤危险或未授权的类名
:param tag: HTML标签名
:param name: 属性名
:param value: 属性值
:return: 过滤后的属性值无合法类时返回False
"""
if name == 'class':
allowed_classes = [cls for cls in value.split() if cls in ALLOWED_CLASSES]
return ' '.join(allowed_classes) if allowed_classes else False
return value
# 允许的HTML属性白名单
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title'], # 链接标签允许的属性
'abbr': ['title'], # 缩写标签允许的属性
'acronym': ['title'], # 首字母缩写标签允许的属性
'span': class_filter, # span标签的class属性使用自定义过滤器
'div': class_filter, # div标签的class属性使用自定义过滤器
'pre': class_filter, # pre标签的class属性使用自定义过滤器
'code': class_filter # code标签的class属性使用自定义过滤器
}
# 允许的URL协议白名单防止javascript:等危险协议)
ALLOWED_PROTOCOLS = ['http', 'https', 'mailto']
def sanitize_html(html):
"""
安全的HTML清理函数
使用bleach库过滤危险HTML内容防止XSS攻击
:param html: 需要清理的HTML字符串
:return: 安全的HTML字符串
"""
return bleach.clean(
html,
tags=ALLOWED_TAGS, # 只允许白名单中的标签
attributes=ALLOWED_ATTRIBUTES, # 只允许白名单中的属性
protocols=ALLOWED_PROTOCOLS, # 限制URL协议
strip=True, # 移除不允许的标签(而非转义)
strip_comments=True # 移除HTML注释
)