You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
django/utils.py

370 lines
9.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# encoding: utf-8
"""
DjangoBlog 通用工具函数模块
提供缓存、Markdown处理、邮件发送、文件操作等通用功能
"""
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
# 获取当前模块的日志器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
获取最大的文章ID和评论ID
Returns:
tuple: (最大文章ID, 最大评论ID)
"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
计算字符串的SHA256哈希值
Args:
str (str): 输入字符串
Returns:
str: SHA256哈希值的十六进制表示
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""
缓存装饰器:自动缓存函数结果
Args:
expiration (int): 缓存过期时间默认3分钟
Returns:
function: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从视图类获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
# 如果没有特定缓存键,根据函数参数生成
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# 尝试从缓存获取结果
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
# 缓存未命中,执行函数并缓存结果
logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key))
value = func(*args, **kwargs)
if value is None:
# 对None值进行特殊处理
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
Args:
path (str): URL路径
servername (str): 主机名
serverport (str): 端口号
key_prefix (str): 缓存键前缀
Returns:
bool: 是否成功删除缓存
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 创建模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除缓存
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""
获取当前站点信息(带缓存)
Returns:
Site: 当前站点对象
"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""
Markdown处理工具类
提供Markdown到HTML的转换功能
"""
@staticmethod
def _convert_markdown(value):
"""
内部方法执行Markdown转换
Args:
value (str): Markdown文本
Returns:
tuple: (HTML内容, 目录HTML)
"""
# 配置Markdown扩展
md = markdown.Markdown(
extensions=[
'extra', # 额外语法支持
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value) # 转换Markdown
toc = md.toc # 提取目录
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""
获取带目录的Markdown转换结果
Args:
value (str): Markdown文本
Returns:
tuple: (HTML内容, 目录HTML)
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""
获取不带目录的Markdown转换结果
Args:
value (str): Markdown文本
Returns:
str: HTML内容
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
发送邮件(通过信号机制)
Args:
emailto (str): 收件人邮箱
title (str): 邮件标题
content (str): 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成随机数验证码
Returns:
str: 6位数字验证码
"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
将字典转换为URL参数字符串
Args:
dict (dict): 参数字典
Returns:
str: URL参数字符串
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
@cache_decorator()
def get_blog_setting():
"""
获取博客设置(带缓存)
如果不存在默认设置,则创建默认设置
Returns:
BlogSettings: 博客设置对象
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
# 如果没有设置记录,创建默认设置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value) # 缓存设置
return value
def save_user_avatar(url):
'''
保存用户头像到本地
Args:
url (str): 头像URL地址
Returns:
str: 本地静态文件路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
# 下载头像文件
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
# 检查文件是否为图片
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
# 生成唯一文件名并保存
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename) # 返回静态文件URL
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""
删除侧边栏缓存
"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""
删除视图缓存
Args:
prefix (str): 缓存前缀
keys (list): 缓存键列表
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""
获取资源URL
Returns:
str: 静态资源URL
"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
# HTML净化配置
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
"""
净化HTML防止XSS攻击
Args:
html (str): 原始HTML
Returns:
str: 净化后的HTML
"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)