From 7ecd2edd649802790265aa0114bf36c567d6cf90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=8F=E7=92=90?= <3217621994@qq.com> Date: Sun, 26 Oct 2025 23:59:24 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=81=A2=E5=A4=8Dsitemap.py=E5=B9=B6?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0utils.py=E8=AF=A6=E7=BB=86=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- utils.py | 370 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 370 insertions(+) create mode 100644 utils.py diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..a8ebbb1 --- /dev/null +++ b/utils.py @@ -0,0 +1,370 @@ +#!/usr/bin/env python +# encoding: utf-8 +""" +DjangoBlog 通用工具函数模块 +提供缓存、Markdown处理、邮件发送、文件操作等通用功能 +""" + +import logging +import os +import random +import string +import uuid +from hashlib import sha256 + +import bleach +import markdown +import requests +from django.conf import settings +from django.contrib.sites.models import Site +from django.core.cache import cache +from django.templatetags.static import static + +# 获取当前模块的日志器 +logger = logging.getLogger(__name__) + + +def get_max_articleid_commentid(): + """ + 获取最大的文章ID和评论ID + + Returns: + tuple: (最大文章ID, 最大评论ID) + """ + from blog.models import Article + from comments.models import Comment + return (Article.objects.latest().pk, Comment.objects.latest().pk) + + +def get_sha256(str): + """ + 计算字符串的SHA256哈希值 + + Args: + str (str): 输入字符串 + + Returns: + str: SHA256哈希值的十六进制表示 + """ + m = sha256(str.encode('utf-8')) + return m.hexdigest() + + +def cache_decorator(expiration=3 * 60): + """ + 缓存装饰器:自动缓存函数结果 + + Args: + expiration (int): 缓存过期时间,默认3分钟 + + Returns: + function: 装饰器函数 + """ + def wrapper(func): + def news(*args, **kwargs): + try: + # 尝试从视图类获取缓存键 + view = args[0] + key = view.get_cache_key() + except: + key = None + if not key: + # 如果没有特定缓存键,根据函数参数生成 + unique_str = repr((func, args, kwargs)) + m = sha256(unique_str.encode('utf-8')) + key = m.hexdigest() + + # 尝试从缓存获取结果 + value = cache.get(key) + if value is not None: + # logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key)) + if str(value) == '__default_cache_value__': + return None + else: + return value + else: + # 缓存未命中,执行函数并缓存结果 + logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key)) + value = func(*args, **kwargs) + if value is None: + # 对None值进行特殊处理 + cache.set(key, '__default_cache_value__', expiration) + else: + cache.set(key, value, expiration) + return value + return news + return wrapper + + +def expire_view_cache(path, servername, serverport, key_prefix=None): + ''' + 刷新视图缓存 + + Args: + path (str): URL路径 + servername (str): 主机名 + serverport (str): 端口号 + key_prefix (str): 缓存键前缀 + + Returns: + bool: 是否成功删除缓存 + ''' + from django.http import HttpRequest + from django.utils.cache import get_cache_key + + # 创建模拟请求对象 + request = HttpRequest() + request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport} + request.path = path + + # 获取缓存键并删除缓存 + key = get_cache_key(request, key_prefix=key_prefix, cache=cache) + if key: + logger.info('expire_view_cache:get key:{path}'.format(path=path)) + if cache.get(key): + cache.delete(key) + return True + return False + + +@cache_decorator() +def get_current_site(): + """ + 获取当前站点信息(带缓存) + + Returns: + Site: 当前站点对象 + """ + site = Site.objects.get_current() + return site + + +class CommonMarkdown: + """ + Markdown处理工具类 + 提供Markdown到HTML的转换功能 + """ + + @staticmethod + def _convert_markdown(value): + """ + 内部方法:执行Markdown转换 + + Args: + value (str): Markdown文本 + + Returns: + tuple: (HTML内容, 目录HTML) + """ + # 配置Markdown扩展 + md = markdown.Markdown( + extensions=[ + 'extra', # 额外语法支持 + 'codehilite', # 代码高亮 + 'toc', # 目录生成 + 'tables', # 表格支持 + ] + ) + body = md.convert(value) # 转换Markdown + toc = md.toc # 提取目录 + return body, toc + + @staticmethod + def get_markdown_with_toc(value): + """ + 获取带目录的Markdown转换结果 + + Args: + value (str): Markdown文本 + + Returns: + tuple: (HTML内容, 目录HTML) + """ + body, toc = CommonMarkdown._convert_markdown(value) + return body, toc + + @staticmethod + def get_markdown(value): + """ + 获取不带目录的Markdown转换结果 + + Args: + value (str): Markdown文本 + + Returns: + str: HTML内容 + """ + body, toc = CommonMarkdown._convert_markdown(value) + return body + + +def send_email(emailto, title, content): + """ + 发送邮件(通过信号机制) + + Args: + emailto (str): 收件人邮箱 + title (str): 邮件标题 + content (str): 邮件内容 + """ + from djangoblog.blog_signals import send_email_signal + send_email_signal.send( + send_email.__class__, + emailto=emailto, + title=title, + content=content) + + +def generate_code() -> str: + """生成随机数验证码 + + Returns: + str: 6位数字验证码 + """ + return ''.join(random.sample(string.digits, 6)) + + +def parse_dict_to_url(dict): + """ + 将字典转换为URL参数字符串 + + Args: + dict (dict): 参数字典 + + Returns: + str: URL参数字符串 + """ + from urllib.parse import quote + url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/')) + for k, v in dict.items()]) + return url + + +@cache_decorator() +def get_blog_setting(): + """ + 获取博客设置(带缓存) + 如果不存在默认设置,则创建默认设置 + + Returns: + BlogSettings: 博客设置对象 + """ + value = cache.get('get_blog_setting') + if value: + return value + else: + from blog.models import BlogSettings + # 如果没有设置记录,创建默认设置 + if not BlogSettings.objects.count(): + setting = BlogSettings() + setting.site_name = 'djangoblog' + setting.site_description = '基于Django的博客系统' + setting.site_seo_description = '基于Django的博客系统' + setting.site_keywords = 'Django,Python' + setting.article_sub_length = 300 + setting.sidebar_article_count = 10 + setting.sidebar_comment_count = 5 + setting.show_google_adsense = False + setting.open_site_comment = True + setting.analytics_code = '' + setting.beian_code = '' + setting.show_gongan_code = False + setting.comment_need_review = False + setting.save() + value = BlogSettings.objects.first() + logger.info('set cache get_blog_setting') + cache.set('get_blog_setting', value) # 缓存设置 + return value + + +def save_user_avatar(url): + ''' + 保存用户头像到本地 + + Args: + url (str): 头像URL地址 + + Returns: + str: 本地静态文件路径 + ''' + logger.info(url) + + try: + basedir = os.path.join(settings.STATICFILES, 'avatar') + # 下载头像文件 + rsp = requests.get(url, timeout=2) + if rsp.status_code == 200: + if not os.path.exists(basedir): + os.makedirs(basedir) + + # 检查文件是否为图片 + image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] + isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 + ext = os.path.splitext(url)[1] if isimage else '.jpg' + + # 生成唯一文件名并保存 + save_filename = str(uuid.uuid4().hex) + ext + logger.info('保存用户头像:' + basedir + save_filename) + with open(os.path.join(basedir, save_filename), 'wb+') as file: + file.write(rsp.content) + return static('avatar/' + save_filename) # 返回静态文件URL + except Exception as e: + logger.error(e) + return static('blog/img/avatar.png') # 返回默认头像 + + +def delete_sidebar_cache(): + """ + 删除侧边栏缓存 + """ + from blog.models import LinkShowType + keys = ["sidebar" + x for x in LinkShowType.values] + for k in keys: + logger.info('delete sidebar key:' + k) + cache.delete(k) + + +def delete_view_cache(prefix, keys): + """ + 删除视图缓存 + + Args: + prefix (str): 缓存前缀 + keys (list): 缓存键列表 + """ + from django.core.cache.utils import make_template_fragment_key + key = make_template_fragment_key(prefix, keys) + cache.delete(key) + + +def get_resource_url(): + """ + 获取资源URL + + Returns: + str: 静态资源URL + """ + if settings.STATIC_URL: + return settings.STATIC_URL + else: + site = get_current_site() + return 'http://' + site.domain + '/static/' + + +# HTML净化配置 +ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', + 'h2', 'p'] +ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']} + + +def sanitize_html(html): + """ + 净化HTML,防止XSS攻击 + + Args: + html (str): 原始HTML + + Returns: + str: 净化后的HTML + """ + return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) \ No newline at end of file