diff --git a/src/DjangoBlog-master/DjangoBlog-master/djangoblog/utils.py b/src/DjangoBlog-master/DjangoBlog-master/djangoblog/utils.py index 57f63dc..45b4cc2 100644 --- a/src/DjangoBlog-master/DjangoBlog-master/djangoblog/utils.py +++ b/src/DjangoBlog-master/DjangoBlog-master/djangoblog/utils.py @@ -1,7 +1,6 @@ #!/usr/bin/env python # encoding: utf-8 - import logging import os import random @@ -9,41 +8,61 @@ import string import uuid from hashlib import sha256 -import bleach -import markdown -import requests +import bleach # HTML清理库,用于防止XSS攻击 +import markdown # Markdown解析库 +import requests # HTTP请求库 from django.conf import settings -from django.contrib.sites.models import Site -from django.core.cache import cache -from django.templatetags.static import static +from django.contrib.sites.models import Site # Django站点框架 +from django.core.cache import cache # Django缓存框架 +from django.templatetags.static import static # 静态文件URL生成 logger = logging.getLogger(__name__) def get_max_articleid_commentid(): + """获取最大的文章ID和评论ID""" from blog.models import Article from comments.models import Comment return (Article.objects.latest().pk, Comment.objects.latest().pk) def get_sha256(str): + """计算字符串的SHA256哈希值 + + Args: + str: 要计算哈希的字符串 + + Returns: + str: 64位的十六进制哈希值 + """ m = sha256(str.encode('utf-8')) return m.hexdigest() def cache_decorator(expiration=3 * 60): + """缓存装饰器,用于缓存函数结果 + + Args: + expiration: 缓存过期时间(秒),默认3分钟 + + Returns: + function: 装饰器函数 + """ def wrapper(func): def news(*args, **kwargs): try: + # 尝试从视图类获取缓存键 view = args[0] key = view.get_cache_key() except: key = None if not key: + # 如果没有特定的缓存键,根据函数参数生成唯一键 unique_str = repr((func, args, kwargs)) - m = sha256(unique_str.encode('utf-8')) key = m.hexdigest() + + # 尝试从缓存获取结果 value = cache.get(key) if value is not None: # logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key)) @@ -52,9 +71,8 @@ def cache_decorator(expiration=3 * 60): else: return value else: - logger.debug( - 'cache_decorator set cache:%s key:%s' % - (func.__name__, key)) + # 缓存未命中,执行函数并缓存结果 + logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key)) value = func(*args, **kwargs) if value is None: cache.set(key, '__default_cache_value__', expiration) @@ -68,21 +86,26 @@ def cache_decorator(expiration=3 * 60): def expire_view_cache(path, servername, serverport, key_prefix=None): - ''' - 刷新视图缓存 - :param path:url路径 - :param servername:host - :param serverport:端口 - :param key_prefix:前缀 - :return:是否成功 + '''刷新视图缓存 + + Args: + path: URL路径 + servername: 服务器主机名 + serverport: 服务器端口 + key_prefix: 缓存键前缀 + + Returns: + bool: 是否成功删除 ''' from django.http import HttpRequest from django.utils.cache import get_cache_key + # 创建模拟请求对象 request = HttpRequest() request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport} request.path = path + # 获取缓存键并删除 key = get_cache_key(request, key_prefix=key_prefix, cache=cache) if key: logger.info('expire_view_cache:get key:{path}'.format(path=path)) @@ -92,40 +115,62 @@ def expire_view_cache(path, servername, serverport, key_prefix=None): return False -@cache_decorator() +@cache_decorator() # 应用缓存装饰器 def get_current_site(): + """获取当前站点信息""" site = Site.objects.get_current() return site class CommonMarkdown: + """Markdown处理工具类""" + @staticmethod def _convert_markdown(value): + """内部方法:转换Markdown为HTML + + Args: + value: Markdown格式文本 + + Returns: + tuple: (HTML内容, 目录) + """ + # 配置Markdown扩展 md = markdown.Markdown( extensions=[ - 'extra', - 'codehilite', - 'toc', - 'tables', + 'extra', # 额外语法支持 + 'codehilite', # 代码高亮 + 'toc', # 目录生成 + 'tables', # 表格支持 ] ) - body = md.convert(value) - toc = md.toc + body = md.convert(value) # 转换Markdown为HTML + toc = md.toc # 获取目录 return body, toc @staticmethod def get_markdown_with_toc(value): + """获取带目录的Markdown转换结果""" body, toc = CommonMarkdown._convert_markdown(value) return body, toc @staticmethod def get_markdown(value): + """获取Markdown转换结果(不含目录)""" body, toc = CommonMarkdown._convert_markdown(value) return body def send_email(emailto, title, content): + """发送邮件(通过信号机制) + + Args: + emailto: 收件人邮箱 + title: 邮件标题 + content: 邮件内容 + """ from djangoblog.blog_signals import send_email_signal + # 使用Django信号发送邮件,实现解耦[9,10,11](@ref) send_email_signal.send( send_email.__class__, emailto=emailto, @@ -134,11 +179,19 @@ def send_email(emailto, title, content): def generate_code() -> str: - """生成随机数验证码""" + """生成6位随机数字验证码""" return ''.join(random.sample(string.digits, 6)) def parse_dict_to_url(dict): + """将字典转换为URL参数字符串 + + Args: + dict: 参数字典 + + Returns: + str: URL参数字符串 + """ from urllib.parse import quote url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/')) for k, v in dict.items()]) @@ -146,12 +199,19 @@ def parse_dict_to_url(dict): def get_blog_setting(): + """获取博客设置,使用缓存提高性能[6,8](@ref) + + Returns: + BlogSettings: 博客设置对象 + """ value = cache.get('get_blog_setting') if value: return value else: + # 缓存未命中,从数据库获取 from blog.models import BlogSettings if not BlogSettings.objects.count(): + # 如果不存在设置,创建默认设置 setting = BlogSettings() setting.site_name = 'djangoblog' setting.site_description = '基于Django的博客系统' @@ -169,39 +229,44 @@ def get_blog_setting(): setting.save() value = BlogSettings.objects.first() logger.info('set cache get_blog_setting') - cache.set('get_blog_setting', value) + cache.set('get_blog_setting', value) # 设置缓存 return value def save_user_avatar(url): - ''' - 保存用户头像 - :param url:头像url - :return: 本地路径 + '''保存用户头像到本地 + + Args: + url: 头像URL地址 + + Returns: + str: 本地静态文件路径 ''' logger.info(url) try: basedir = os.path.join(settings.STATICFILES, 'avatar') - rsp = requests.get(url, timeout=2) + rsp = requests.get(url, timeout=2) # 下载头像 if rsp.status_code == 200: if not os.path.exists(basedir): - os.makedirs(basedir) + os.makedirs(basedir) # 创建目录 + # 检查图片扩展名 image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 ext = os.path.splitext(url)[1] if isimage else '.jpg' - save_filename = str(uuid.uuid4().hex) + ext + save_filename = str(uuid.uuid4().hex) + ext # 生成唯一文件名 logger.info('保存用户头像:' + basedir + save_filename) with open(os.path.join(basedir, save_filename), 'wb+') as file: - file.write(rsp.content) - return static('avatar/' + save_filename) + file.write(rsp.content) # 保存文件 + return static('avatar/' + save_filename) # 返回静态文件URL except Exception as e: logger.error(e) - return static('blog/img/avatar.png') + return static('blog/img/avatar.png') # 返回默认头像 def delete_sidebar_cache(): + """删除侧边栏相关缓存""" from blog.models import LinkShowType keys = ["sidebar" + x for x in LinkShowType.values] for k in keys: @@ -210,12 +275,19 @@ def delete_sidebar_cache(): def delete_view_cache(prefix, keys): + """删除视图缓存 + + Args: + prefix: 缓存前缀 + keys: 缓存键 + """ from django.core.cache.utils import make_template_fragment_key key = make_template_fragment_key(prefix, keys) cache.delete(key) def get_resource_url(): + """获取资源URL基础路径""" if settings.STATIC_URL: return settings.STATIC_URL else: @@ -223,10 +295,19 @@ def get_resource_url(): return 'http://' + site.domain + '/static/' -ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', - 'h2', 'p'] +# HTML标签和属性白名单,用于防止XSS攻击 +ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', + 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p'] ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']} def sanitize_html(html): - return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) + """清理HTML,移除不安全的标签和属性 + + Args: + html: 要清理的HTML内容 + + Returns: + str: 安全的HTML内容 + """ + return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) \ No newline at end of file