From 9633864a1abcf1be5d55fce83ccf28a0076bbcae Mon Sep 17 00:00:00 2001 From: djq <1092424998@qq.com> Date: Sat, 8 Nov 2025 00:32:16 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DjangoBlog-master/accounts/utils.py | 9 +- .../DjangoBlog-master/djangoblog/utils.py | 255 ++++++++++-------- 2 files changed, 154 insertions(+), 110 deletions(-) diff --git a/src/DjangoBlog-master(1)/DjangoBlog-master/accounts/utils.py b/src/DjangoBlog-master(1)/DjangoBlog-master/accounts/utils.py index 4b94bdf..0de05f9 100644 --- a/src/DjangoBlog-master(1)/DjangoBlog-master/accounts/utils.py +++ b/src/DjangoBlog-master(1)/DjangoBlog-master/accounts/utils.py @@ -29,14 +29,13 @@ def verify(email: str, code: str) -> typing.Optional[str]: email: 请求邮箱 code: 验证码 Return: - 如果有错误就返回错误str - Node: - 这里的错误处理不太合理,应该采用raise抛出 - 否测调用方也需要对error进行处理 + 验证失败返回错误信息字符串,验证成功返回None """ cache_code = get_code(email) if cache_code != code: return gettext("Verification code error") + # 验证成功时显式返回None,确保所有分支返回值类型一致 + return None def set_code(email: str, code: str): @@ -46,4 +45,4 @@ def set_code(email: str, code: str): def get_code(email: str) -> typing.Optional[str]: """获取code""" - return cache.get(email) + return cache.get(email) \ No newline at end of file diff --git a/src/DjangoBlog-master(1)/DjangoBlog-master/djangoblog/utils.py b/src/DjangoBlog-master(1)/DjangoBlog-master/djangoblog/utils.py index 57f63dc..c686785 100644 --- a/src/DjangoBlog-master(1)/DjangoBlog-master/djangoblog/utils.py +++ b/src/DjangoBlog-master(1)/DjangoBlog-master/djangoblog/utils.py @@ -8,6 +8,7 @@ import random import string import uuid from hashlib import sha256 +from typing import Optional import bleach import markdown @@ -15,6 +16,7 @@ import requests from django.conf import settings from django.contrib.sites.models import Site from django.core.cache import cache +from django.http import HttpRequest from django.templatetags.static import static logger = logging.getLogger(__name__) @@ -26,57 +28,62 @@ def get_max_articleid_commentid(): return (Article.objects.latest().pk, Comment.objects.latest().pk) -def get_sha256(str): - m = sha256(str.encode('utf-8')) +def get_sha256(str_val: str) -> str: + """计算字符串的SHA256哈希值""" + m = sha256(str_val.encode('utf-8')) return m.hexdigest() -def cache_decorator(expiration=3 * 60): +def cache_decorator(expiration: int = 3 * 60): + """缓存装饰器,带过期时间参数""" + def wrapper(func): - def news(*args, **kwargs): + def news(*args, **kwargs) -> Optional[any]: + key: Optional[str] = None try: + # 尝试从视图对象获取缓存键(针对视图函数) view = args[0] - key = view.get_cache_key() - except: - key = None - if not key: + key = view.get_cache_key() # 可能抛出AttributeError + except AttributeError: + # 非视图函数,生成唯一缓存键 unique_str = repr((func, args, kwargs)) + key = sha256(unique_str.encode('utf-8')).hexdigest() + except Exception as e: + # 捕获其他特定异常,避免泛型异常屏蔽问题 + logger.warning(f"获取缓存键失败: {e}") + key = None - m = sha256(unique_str.encode('utf-8')) - key = m.hexdigest() - value = cache.get(key) - if value is not None: - # logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key)) - if str(value) == '__default_cache_value__': - return None - else: + if key: + # 从缓存获取数据 + value = cache.get(key) + if value is not None: + if str(value) == '__default_cache_value__': + return None return value + + # 缓存未命中,执行原函数 + logger.debug(f'cache_decorator set cache: {func.__name__} key: {key}') + value = func(*args, **kwargs) + + # 处理空值缓存 + if value is None: + cache.set(key, '__default_cache_value__', expiration) else: - logger.debug( - 'cache_decorator set cache:%s key:%s' % - (func.__name__, key)) - value = func(*args, **kwargs) - if value is None: - cache.set(key, '__default_cache_value__', expiration) - else: - cache.set(key, value, expiration) - return value + cache.set(key, value, expiration) + return value return news return wrapper -def expire_view_cache(path, servername, serverport, key_prefix=None): - ''' - 刷新视图缓存 - :param path:url路径 - :param servername:host - :param serverport:端口 - :param key_prefix:前缀 - :return:是否成功 - ''' - from django.http import HttpRequest +def expire_view_cache( + path: str, + servername: str, + serverport: str, + key_prefix: Optional[str] = None +) -> bool: + """刷新视图缓存""" from django.utils.cache import get_cache_key request = HttpRequest() @@ -85,7 +92,7 @@ def expire_view_cache(path, servername, serverport, key_prefix=None): key = get_cache_key(request, key_prefix=key_prefix, cache=cache) if key: - logger.info('expire_view_cache:get key:{path}'.format(path=path)) + logger.info(f'expire_view_cache: get key: {path}') if cache.get(key): cache.delete(key) return True @@ -93,14 +100,15 @@ def expire_view_cache(path, servername, serverport, key_prefix=None): @cache_decorator() -def get_current_site(): - site = Site.objects.get_current() - return site +def get_current_site() -> Site: + """获取当前站点信息""" + return Site.objects.get_current() class CommonMarkdown: @staticmethod - def _convert_markdown(value): + def _convert_markdown(value: str) -> tuple[str, str]: + """转换Markdown为HTML和目录""" md = markdown.Markdown( extensions=[ 'extra', @@ -114,119 +122,156 @@ class CommonMarkdown: return body, toc @staticmethod - def get_markdown_with_toc(value): - body, toc = CommonMarkdown._convert_markdown(value) - return body, toc + def get_markdown_with_toc(value: str) -> tuple[str, str]: + """获取带目录的Markdown转换结果""" + return CommonMarkdown._convert_markdown(value) @staticmethod - def get_markdown(value): - body, toc = CommonMarkdown._convert_markdown(value) + def get_markdown(value: str) -> str: + """获取纯HTML的Markdown转换结果""" + body, _ = CommonMarkdown._convert_markdown(value) return body -def send_email(emailto, title, content): +def send_email(emailto: list, title: str, content: str) -> None: + """发送邮件(通过信号机制)""" from djangoblog.blog_signals import send_email_signal send_email_signal.send( send_email.__class__, emailto=emailto, title=title, - content=content) + content=content + ) def generate_code() -> str: - """生成随机数验证码""" + """生成6位数字随机验证码""" return ''.join(random.sample(string.digits, 6)) -def parse_dict_to_url(dict): +def parse_dict_to_url(dict_data: dict) -> str: + """将字典转换为URL查询字符串""" from urllib.parse import quote - url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/')) - for k, v in dict.items()]) - return url + return '&'.join([ + f"{quote(k, safe='/')}={quote(v, safe='/')}" + for k, v in dict_data.items() + ]) -def get_blog_setting(): +def get_blog_setting() -> 'BlogSettings': # 类型注解使用字符串避免循环导入 + """获取博客系统设置(带缓存)""" value = cache.get('get_blog_setting') if value: return value - else: - from blog.models import BlogSettings - if not BlogSettings.objects.count(): - setting = BlogSettings() - setting.site_name = 'djangoblog' - setting.site_description = '基于Django的博客系统' - setting.site_seo_description = '基于Django的博客系统' - setting.site_keywords = 'Django,Python' - setting.article_sub_length = 300 - setting.sidebar_article_count = 10 - setting.sidebar_comment_count = 5 - setting.show_google_adsense = False - setting.open_site_comment = True - setting.analytics_code = '' - setting.beian_code = '' - setting.show_gongan_code = False - setting.comment_need_review = False - setting.save() + + # 缓存未命中,从数据库获取 + from blog.models import BlogSettings + try: + # 尝试获取已有设置 value = BlogSettings.objects.first() + if not value: + # 无设置时初始化默认配置 + value = BlogSettings( + site_name='djangoblog', + site_description='基于Django的博客系统', + site_seo_description='基于Django的博客系统', + site_keywords='Django,Python', + article_sub_length=300, + sidebar_article_count=10, + sidebar_comment_count=5, + show_google_adsense=False, + open_site_comment=True, + analytics_code='', + beian_code='', + show_gongan_code=False, + comment_need_review=False + ) + value.save() + # 更新缓存 logger.info('set cache get_blog_setting') cache.set('get_blog_setting', value) return value + except Exception as e: + logger.error(f"获取博客设置失败: {e}") + # 确保始终返回有效值(即使数据库操作失败) + if not value: + value = BlogSettings() # 返回空对象避免调用方报错 + return value -def save_user_avatar(url): - ''' - 保存用户头像 - :param url:头像url - :return: 本地路径 - ''' - logger.info(url) - +def save_user_avatar(url: str) -> str: + """保存用户头像到本地并返回URL""" + logger.info(f"处理头像URL: {url}") try: basedir = os.path.join(settings.STATICFILES, 'avatar') - rsp = requests.get(url, timeout=2) + # 发送请求获取图片(指定超时和用户代理) + headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} + rsp = requests.get(url, timeout=5, headers=headers) + if rsp.status_code == 200: - if not os.path.exists(basedir): - os.makedirs(basedir) - - image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] - isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 - ext = os.path.splitext(url)[1] if isimage else '.jpg' - save_filename = str(uuid.uuid4().hex) + ext - logger.info('保存用户头像:' + basedir + save_filename) - with open(os.path.join(basedir, save_filename), 'wb+') as file: + os.makedirs(basedir, exist_ok=True) # 确保目录存在 + + # 验证图片扩展名 + image_extensions = ('.jpg', '.png', '.jpeg', '.gif') + ext = os.path.splitext(url)[1].lower() + if ext not in image_extensions: + ext = '.jpg' # 默认扩展名 + + # 生成唯一文件名并保存 + save_filename = f"{uuid.uuid4().hex}{ext}" + save_path = os.path.join(basedir, save_filename) + with open(save_path, 'wb+') as file: file.write(rsp.content) - return static('avatar/' + save_filename) + + return static(f'avatar/{save_filename}') + + except requests.exceptions.RequestException as e: + logger.error(f"头像下载失败: {e}") + except OSError as e: + logger.error(f"头像保存失败: {e}") except Exception as e: - logger.error(e) - return static('blog/img/avatar.png') + logger.error(f"头像处理异常: {e}") + + # 异常时返回默认头像 + return static('blog/img/avatar.png') -def delete_sidebar_cache(): +def delete_sidebar_cache() -> None: + """删除侧边栏缓存""" from blog.models import LinkShowType - keys = ["sidebar" + x for x in LinkShowType.values] + keys = [f"sidebar{x}" for x in LinkShowType.values] for k in keys: - logger.info('delete sidebar key:' + k) + logger.info(f'delete sidebar key: {k}') cache.delete(k) -def delete_view_cache(prefix, keys): +def delete_view_cache(prefix: str, keys: list) -> None: + """删除视图模板片段缓存""" from django.core.cache.utils import make_template_fragment_key key = make_template_fragment_key(prefix, keys) cache.delete(key) -def get_resource_url(): +def get_resource_url() -> str: + """获取静态资源基础URL""" if settings.STATIC_URL: return settings.STATIC_URL - else: - site = get_current_site() - return 'http://' + site.domain + '/static/' + site = get_current_site() + return f'http://{site.domain}/static/' -ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', - 'h2', 'p'] -ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']} +# HTML清理配置 +ALLOWED_TAGS = [ + 'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', + 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p' +] +ALLOWED_ATTRIBUTES = { + 'a': ['href', 'title'], + 'abbr': ['title'], + 'acronym': ['title'] +} -def sanitize_html(html): - return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) +def sanitize_html(html: str) -> str: + """清理HTML内容,仅保留允许的标签和属性""" + return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) \ No newline at end of file