diff --git a/src/utils.py b/src/utils.py new file mode 100644 index 0000000..e416eff --- /dev/null +++ b/src/utils.py @@ -0,0 +1,354 @@ +#!/usr/bin/env python +# encoding: utf-8 + + +# 导入必要模块:日志、文件操作、随机数生成、加密、HTTP请求等 +import logging +import os +import random +import string +import uuid +from hashlib import sha256 + +# 导入第三方库:HTML过滤、Markdown转换、HTTP请求 +import bleach +import markdown +import requests +# 导入Django核心模块:配置、缓存、站点模型、静态文件工具 +from django.conf import settings +from django.contrib.sites.models import Site +from django.core.cache import cache +from django.templatetags.static import static + +# 创建当前模块的日志记录器 +logger = logging.getLogger(__name__) + + +def get_max_articleid_commentid(): + """ + 获取最大的文章ID和评论ID + 用于系统统计或数据同步场景,快速获取最新数据的ID边界 + """ + # 延迟导入模型(避免循环导入问题) + from blog.models import Article + from comments.models import Comment + # 返回最新文章和评论的主键(ID) + return (Article.objects.latest().pk, Comment.objects.latest().pk) + + +def get_sha256(str): + """ + 对字符串进行SHA256加密 + 用于密码加密、数据校验等场景(如生成唯一标识) + """ + m = sha256(str.encode('utf-8')) # 编码为UTF-8后加密 + return m.hexdigest() # 返回十六进制加密结果 + + +def cache_decorator(expiration=3 * 60): + """ + 缓存装饰器:为函数添加缓存功能,减少重复计算或数据库查询 + 默认缓存时间为3分钟(180秒),可通过参数调整 + + Args: + expiration: 缓存过期时间(秒) + """ + + def wrapper(func): + def news(*args, **kwargs): + # 尝试从请求对象中获取缓存键(适用于视图函数) + try: + view = args[0] + key = view.get_cache_key() + except: + key = None # 非视图函数则生成自定义键 + + # 生成自定义缓存键(基于函数和参数的唯一标识) + if not key: + unique_str = repr((func, args, kwargs)) # 序列化函数和参数 + m = sha256(unique_str.encode('utf-8')) + key = m.hexdigest() # 生成唯一哈希键 + + # 从缓存获取数据 + value = cache.get(key) + if value is not None: + # 处理空值标记(避免缓存None导致的重复计算) + if str(value) == '__default_cache_value__': + return None + else: + return value # 返回缓存数据 + else: + # 缓存未命中,执行原函数并缓存结果 + logger.debug(f'cache_decorator set cache:{func.__name__} key:{key}') + value = func(*args, **kwargs) + # 缓存空值时用特殊标记,避免缓存穿透 + if value is None: + cache.set(key, '__default_cache_value__', expiration) + else: + cache.set(key, value, expiration) + return value + + return news + + return wrapper + + +def expire_view_cache(path, servername, serverport, key_prefix=None): + ''' + 主动刷新视图缓存:删除指定URL路径的缓存 + 用于数据更新后同步清理缓存,确保用户看到最新内容 + + Args: + path: URL路径(如'/article/1/') + servername: 服务器域名/主机名 + serverport: 服务器端口 + key_prefix: 缓存键前缀 + + Returns: + bool: 缓存是否成功删除 + ''' + from django.http import HttpRequest + from django.utils.cache import get_cache_key + + # 构造模拟请求对象(用于生成缓存键) + request = HttpRequest() + request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport} + request.path = path + + # 获取缓存键并删除缓存 + key = get_cache_key(request, key_prefix=key_prefix, cache=cache) + if key: + logger.info(f'expire_view_cache:get key:{path}') + if cache.get(key): + cache.delete(key) + return True + return False + + +@cache_decorator() +def get_current_site(): + """ + 获取当前站点信息(缓存装饰器确保高效获取) + 基于Django的sites框架,用于生成绝对URL等场景 + """ + site = Site.objects.get_current() + return site + + +class CommonMarkdown: + """ + Markdown转换工具类:将Markdown文本转换为HTML,并支持生成目录(TOC) + 集成代码高亮、表格等扩展功能 + """ + + @staticmethod + def _convert_markdown(value): + """内部转换方法:执行Markdown到HTML的转换,返回内容和目录""" + md = markdown.Markdown( + extensions=[ + 'extra', # 基础扩展(表格、脚注等) + 'codehilite', # 代码高亮 + 'toc', # 目录生成 + 'tables', # 表格支持 + ] + ) + body = md.convert(value) # 转换正文为HTML + toc = md.toc # 提取目录 + return body, toc + + @staticmethod + def get_markdown_with_toc(value): + """获取带目录的HTML内容""" + body, toc = CommonMarkdown._convert_markdown(value) + return body, toc + + @staticmethod + def get_markdown(value): + """仅获取转换后的HTML正文(不含目录)""" + body, toc = CommonMarkdown._convert_markdown(value) + return body + + +def send_email(emailto, title, content): + """ + 发送邮件的封装函数:通过信号机制发送邮件,解耦邮件发送逻辑 + 实际发送由信号接收者处理(如调用Django邮件后端) + """ + from djangoblog.blog_signals import send_email_signal + # 发送信号,传递邮件参数 + send_email_signal.send( + send_email.__class__, + emailto=emailto, + title=title, + content=content) + + +def generate_code() -> str: + """生成6位数字随机验证码,用于邮箱验证、登录等场景""" + return ''.join(random.sample(string.digits, 6)) # 从0-9中随机选择6个数字 + + +def parse_dict_to_url(dict): + """ + 将字典转换为URL查询参数字符串(如{'a':1,'b':2} → 'a=1&b=2') + 自动对键值进行URL编码,支持特殊字符 + + Args: + dict: 键值对字典 + + Returns: + str: URL查询参数字符串 + """ + from urllib.parse import quote + return '&'.join([ + f'{quote(k, safe="/")}={quote(v, safe="/")}' + for k, v in dict.items() + ]) + + +def get_blog_setting(): + """ + 获取博客系统设置(单例模式),并缓存结果 + 包含站点名称、描述、SEO配置等核心设置 + + Returns: + BlogSettings对象:系统设置实例 + """ + # 先从缓存获取,减少数据库查询 + value = cache.get('get_blog_setting') + if value: + return value + else: + from blog.models import BlogSettings + # 若不存在设置记录,创建默认配置 + if not BlogSettings.objects.count(): + setting = BlogSettings() + setting.site_name = 'djangoblog' + setting.site_description = '基于Django的博客系统' + setting.site_seo_description = '基于Django的博客系统' + setting.site_keywords = 'Django,Python' + setting.article_sub_length = 300 + setting.sidebar_article_count = 10 + setting.sidebar_comment_count = 5 + setting.show_google_adsense = False + setting.open_site_comment = True + setting.analytics_code = '' + setting.beian_code = '' + setting.show_gongan_code = False + setting.comment_need_review = False + setting.save() + # 获取设置并缓存 + value = BlogSettings.objects.first() + logger.info('set cache get_blog_setting') + cache.set('get_blog_setting', value) + return value + + +def save_user_avatar(url): + ''' + 保存用户头像到本地静态目录,并返回访问URL + 用于处理第三方登录(如GitHub)的头像保存 + + Args: + url: 头像的远程URL + + Returns: + str: 本地头像的静态文件URL(默认返回系统默认头像) + ''' + logger.info(url) + try: + # 定义本地保存路径(static/avatar目录) + basedir = os.path.join(settings.STATICFILES, 'avatar') + # 下载头像图片 + rsp = requests.get(url, timeout=2) + if rsp.status_code == 200: + # 创建目录(若不存在) + if not os.path.exists(basedir): + os.makedirs(basedir) + + # 验证文件类型并确定扩展名 + image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] + isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 + ext = os.path.splitext(url)[1] if isimage else '.jpg' + # 生成唯一文件名(UUID避免冲突) + save_filename = str(uuid.uuid4().hex) + ext + logger.info(f'保存用户头像:{basedir}{save_filename}') + # 写入文件 + with open(os.path.join(basedir, save_filename), 'wb+') as file: + file.write(rsp.content) + # 返回静态文件URL + return static('avatar/' + save_filename) + except Exception as e: + logger.error(e) + # 失败时返回默认头像 + return static('blog/img/avatar.png') + + +def delete_sidebar_cache(): + """ + 删除侧边栏缓存:当侧边栏数据(如链接、文章列表)更新时调用 + 确保用户看到最新的侧边栏内容 + """ + from blog.models import LinkShowType + # 生成所有侧边栏缓存键(基于链接展示类型) + keys = ["sidebar" + x for x in LinkShowType.values] + for k in keys: + logger.info(f'delete sidebar key:{k}') + cache.delete(k) + + +def delete_view_cache(prefix, keys): + """ + 删除模板片段缓存:用于删除指定前缀和参数的模板缓存 + 如文章详情页的评论区缓存 + + Args: + prefix: 缓存前缀(模板中定义) + keys: 缓存键参数列表 + """ + from django.core.cache.utils import make_template_fragment_key + key = make_template_fragment_key(prefix, keys) + cache.delete(key) + + +def get_resource_url(): + """ + 获取静态资源基础URL + 优先使用settings中的STATIC_URL,否则基于当前站点域名生成 + + Returns: + str: 静态资源URL前缀(如'http://example.com/static/') + """ + if settings.STATIC_URL: + return settings.STATIC_URL + else: + site = get_current_site() + return f'http://{site.domain}/static/' + + +# HTML过滤配置:限制允许的标签和属性,防止XSS攻击 +ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', + 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p'] +ALLOWED_ATTRIBUTES = { + 'a': ['href', 'title'], + 'abbr': ['title'], + 'acronym': ['title'] +} + + +def sanitize_html(html): + """ + 净化HTML内容:仅保留允许的标签和属性,过滤恶意代码 + 用于处理用户输入的HTML(如评论、文章内容),防止XSS攻击 + + Args: + html: 原始HTML字符串 + + Returns: + str: 净化后的安全HTML + """ + return bleach.clean( + html, + tags=ALLOWED_TAGS, + attributes=ALLOWED_ATTRIBUTES + ) \ No newline at end of file