#!/usr/bin/env python # encoding: utf-8 """ djangoblog 工具函数模块(utils.py) 提供博客系统所需的通用工具函数,包括缓存、安全、Markdown解析、邮件发送、文件处理等功能。 """ # 导入标准库和第三方库 import logging import os import random import string import uuid from hashlib import sha256 import bleach # 用于清理 HTML 标签,防止 XSS 攻击 import markdown # 用于将 Markdown 文本转换为 HTML import requests # 用于发起 HTTP 请求 from django.conf import settings from django.contrib.sites.models import Site # Django 的站点框架 from django.core.cache import cache # 使用 Django 缓存后端 from django.templatetags.static import static # 用于生成静态文件 URL # 获取当前模块的日志记录器 logger = logging.getLogger(__name__) def get_max_articleid_commentid(): """ 获取当前数据库中最新(主键最大)的文章和评论的 ID。 返回: tuple: (最新文章的主键, 最新评论的主键) """ from ..blog.models import Article from ..comments.models import Comment return (Article.objects.latest().pk, Comment.objects.latest().pk) #wwc def get_sha256(str): """ 计算输入字符串的 SHA256 哈希值。 参数: str (str): 要哈希的字符串 返回: str: SHA256 哈希值的十六进制字符串表示 """ m = sha256(str.encode('utf-8')) return m.hexdigest() #wwc def cache_decorator(expiration=3 * 60): """ 缓存装饰器,用于为函数添加缓存功能,避免重复计算或数据库查询。 参数: expiration (int): 缓存过期时间(秒),默认 3 分钟 返回: decorator: 可用于装饰函数的装饰器 工作原理: 1. 尝试从视图对象获取缓存键(通过 get_cache_key 方法) 2. 如果没有,则基于函数名和参数生成唯一哈希作为键 3. 查询缓存,命中则返回缓存值 4. 未命中则执行原函数,将结果存入缓存并返回 5. 特殊处理返回值为 None 的情况,避免缓存穿透 """ #wwc def wrapper(func): def news(*args, **kwargs): try: view = args[0] key = view.get_cache_key() except: key = None if not key: unique_str = repr((func, args, kwargs)) m = sha256(unique_str.encode('utf-8')) key = m.hexdigest() value = cache.get(key) if value is not None: if str(value) == '__default_cache_value__': return None else: return value else: logger.debug( 'cache_decorator set cache:%s key:%s' % (func.__name__, key)) value = func(*args, **kwargs) if value is None: cache.set(key, '__default_cache_value__', expiration) else: cache.set(key, value, expiration) return value return news return wrapper #wwc def expire_view_cache(path, servername, serverport, key_prefix=None): """ 手动清除特定视图的缓存(用于内容更新后刷新缓存)。 参数: path (str): URL 路径(如 '/article/1/') servername (str): 服务器域名 serverport (str): 服务器端口 key_prefix (str, optional): 缓存键前缀 返回: bool: 缓存是否成功删除 实现: 1. 构造一个模拟的 HttpRequest 对象 2. 使用 Django 的 get_cache_key 工具生成与视图缓存对应的键 3. 如果键存在且缓存中有值,则删除该缓存 """ from django.http import HttpRequest from django.utils.cache import get_cache_key request = HttpRequest() request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport} request.path = path key = get_cache_key(request, key_prefix=key_prefix, cache=cache) if key: logger.info('expire_view_cache:get key:{path}'.format(path=path)) if cache.get(key): cache.delete(key) return True return False #wwc @cache_decorator() def get_current_site(): """ 获取当前 Django 站点对象,并使用缓存优化性能。 返回: Site: 当前站点实例(包含域名、名称等信息) """ site = Site.objects.get_current() return site #wwc class CommonMarkdown: """ 提供统一的 Markdown 解析功能,支持代码高亮、目录生成等。 """ #wwc @staticmethod def _convert_markdown(value): """ 内部方法:将 Markdown 字符串转换为 HTML,并提取目录(TOC)。 参数: value (str): Markdown 格式的文本 返回: tuple: (HTML 内容字符串, 目录 HTML 字符串) """ md = markdown.Markdown( extensions=[ 'extra', # 标准扩展(表格、脚注等) 'codehilite', # 代码高亮 'toc', # 自动生成目录 'tables', # 表格支持 ] ) body = md.convert(value) toc = md.toc return body, toc #wwc @staticmethod def get_markdown_with_toc(value): """ 解析 Markdown 文本,同时返回 HTML 内容和目录。 参数: value (str): Markdown 文本 返回: tuple: (HTML 内容, TOC 目录) """ body, toc = CommonMarkdown._convert_markdown(value) return body, toc #wwc @staticmethod def get_markdown(value): """ 仅解析 Markdown 文本为 HTML 内容,不返回目录。 参数: value (str): Markdown 文本 返回: str: 转换后的 HTML 字符串 """ body, toc = CommonMarkdown._convert_markdown(value) return body #wwc def send_email(emailto, title, content): """ 发送邮件的快捷方法,通过 Django 信号机制解耦。 参数: emailto (str): 收件人邮箱 title (str): 邮件标题 content (str): 邮件正文 实现: 触发自定义的 send_email_signal 信号,由信号处理器完成实际的邮件发送逻辑。 """ from ..djangoblog.blog_signals import send_email_signal send_email_signal.send( send_email.__class__, emailto=emailto, title=title, content=content) #wwc def generate_code() -> str: """ 生成一个 6 位的随机数字验证码。 返回: str: 6 位数字组成的字符串(如 '123456') """ return ''.join(random.sample(string.digits, 6)) #wwc def parse_dict_to_url(dict): """ 将字典转换为 URL 查询参数字符串(键值对用 & 连接)。 参数: dict (dict): 要转换的字典 返回: str: URL 编码后的查询字符串(如 'key1=value1&key2=value2') 注意: 使用 urllib.parse.quote 对键和值进行 URL 编码,safe='/' 表示斜杠不编码。 """ from urllib.parse import quote url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/')) for k, v in dict.items()]) return url #wwc def get_blog_setting(): """ 获取博客系统设置,优先从缓存读取,未命中则从数据库获取并缓存。 如果数据库中没有设置记录,则创建一个默认设置并保存。 返回: BlogSettings: 博客设置模型实例 """ value = cache.get('get_blog_setting') if value: return value else: from ..blog.models import BlogSettings if not BlogSettings.objects.count(): setting = BlogSettings() setting.site_name = 'djangoblog' setting.site_description = '基于Django的博客系统' setting.site_seo_description = '基于Django的博客系统' setting.site_keywords = 'Django,Python' setting.article_sub_length = 300 setting.sidebar_article_count = 10 setting.sidebar_comment_count = 5 setting.show_google_adsense = False setting.open_site_comment = True setting.analytics_code = '' setting.beian_code = '' setting.show_gongan_code = False setting.comment_need_review = False setting.save() value = BlogSettings.objects.first() logger.info('set cache get_blog_setting') cache.set('get_blog_setting', value) return value #wwc def save_user_avatar(url): """ 从指定 URL 下载用户头像并保存到本地静态文件目录。 参数: url (str): 头像图片的远程 URL 返回: str: 保存后的本地静态文件 URL(如 '/static/avatar/abc123.jpg') 下载失败时返回默认头像路径。 流程: 1. 创建本地头像存储目录 2. 下载图片内容 3. 根据原始 URL 判断文件类型,生成唯一文件名 4. 保存文件 5. 返回静态 URL """ logger.info(url) try: basedir = os.path.join(settings.STATICFILES, 'avatar') rsp = requests.get(url, timeout=2) if rsp.status_code == 200: if not os.path.exists(basedir): os.makedirs(basedir) image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 ext = os.path.splitext(url)[1] if isimage else '.jpg' save_filename = str(uuid.uuid4().hex) + ext logger.info('保存用户头像:' + basedir + save_filename) with open(os.path.join(basedir, save_filename), 'wb+') as file: file.write(rsp.content) return static('avatar/' + save_filename) except Exception as e: logger.error(e) return static('blog/img/avatar.png') #wwc def delete_sidebar_cache(): """ 清除侧边栏所有缓存。 侧边栏内容(如最新文章、热门评论)通常会被缓存以提高性能。 当内容更新时,需调用此函数清除相关缓存。 实现: 遍历 LinkShowType 的所有值,删除以 'sidebar' 为前缀的缓存键。 """ from ..blog.models import LinkShowType keys = ["sidebar" + x for x in LinkShowType.values] for k in keys: logger.info('delete sidebar key:' + k) cache.delete(k) #wwc def delete_view_cache(prefix, keys): """ 删除基于模板片段缓存(@cache)的缓存。 参数: prefix (str): 缓存片段的名称(与模板中 cache 标签的第一个参数对应) keys (list): 缓存的变量列表(用于生成唯一键) 实现: 使用 make_template_fragment_key 生成正确的缓存键,然后删除。 """ from django.core.cache.utils import make_template_fragment_key key = make_template_fragment_key(prefix, keys) cache.delete(key) #wwc def get_resource_url(): """ 获取静态资源的基础 URL。 如果设置了 STATIC_URL,则直接返回。 否则,构建一个完整的 URL(http://domain/static/)。 返回: str: 静态资源基础 URL """ if settings.STATIC_URL: return settings.STATIC_URL else: site = get_current_site() return 'http://' + site.domain + '/static/' # 定义允许在用户内容中使用的 HTML 标签和属性 # 用于 sanitize_html 函数,防止 XSS 攻击 ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p'] ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']} #wwc def sanitize_html(html): """ 清理 HTML 内容,移除不安全的标签和属性,防止跨站脚本(XSS)攻击。 参数: html (str): 待清理的 HTML 字符串 返回: str: 清理后的 HTML 字符串,仅包含 ALLOWED_TAGS 和 ALLOWED_ATTRIBUTES 中定义的内容 """ return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)