优化代码

develop
djq 3 months ago
parent 2ffaeee7af
commit 9633864a1a

@ -29,14 +29,13 @@ def verify(email: str, code: str) -> typing.Optional[str]:
email: 请求邮箱
code: 验证码
Return:
如果有错误就返回错误str
Node:
这里的错误处理不太合理应该采用raise抛出
否测调用方也需要对error进行处理
验证失败返回错误信息字符串验证成功返回None
"""
cache_code = get_code(email)
if cache_code != code:
return gettext("Verification code error")
# 验证成功时显式返回None确保所有分支返回值类型一致
return None
def set_code(email: str, code: str):
@ -46,4 +45,4 @@ def set_code(email: str, code: str):
def get_code(email: str) -> typing.Optional[str]:
"""获取code"""
return cache.get(email)
return cache.get(email)

@ -8,6 +8,7 @@ import random
import string
import uuid
from hashlib import sha256
from typing import Optional
import bleach
import markdown
@ -15,6 +16,7 @@ import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.http import HttpRequest
from django.templatetags.static import static
logger = logging.getLogger(__name__)
@ -26,57 +28,62 @@ def get_max_articleid_commentid():
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
m = sha256(str.encode('utf-8'))
def get_sha256(str_val: str) -> str:
"""计算字符串的SHA256哈希值"""
m = sha256(str_val.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
def cache_decorator(expiration: int = 3 * 60):
"""缓存装饰器,带过期时间参数"""
def wrapper(func):
def news(*args, **kwargs):
def news(*args, **kwargs) -> Optional[any]:
key: Optional[str] = None
try:
# 尝试从视图对象获取缓存键(针对视图函数)
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
key = view.get_cache_key() # 可能抛出AttributeError
except AttributeError:
# 非视图函数,生成唯一缓存键
unique_str = repr((func, args, kwargs))
key = sha256(unique_str.encode('utf-8')).hexdigest()
except Exception as e:
# 捕获其他特定异常,避免泛型异常屏蔽问题
logger.warning(f"获取缓存键失败: {e}")
key = None
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
if key:
# 从缓存获取数据
value = cache.get(key)
if value is not None:
if str(value) == '__default_cache_value__':
return None
return value
# 缓存未命中,执行原函数
logger.debug(f'cache_decorator set cache: {func.__name__} key: {key}')
value = func(*args, **kwargs)
# 处理空值缓存
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
def expire_view_cache(
path: str,
servername: str,
serverport: str,
key_prefix: Optional[str] = None
) -> bool:
"""刷新视图缓存"""
from django.utils.cache import get_cache_key
request = HttpRequest()
@ -85,7 +92,7 @@ def expire_view_cache(path, servername, serverport, key_prefix=None):
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
logger.info(f'expire_view_cache: get key: {path}')
if cache.get(key):
cache.delete(key)
return True
@ -93,14 +100,15 @@ def expire_view_cache(path, servername, serverport, key_prefix=None):
@cache_decorator()
def get_current_site():
site = Site.objects.get_current()
return site
def get_current_site() -> Site:
"""获取当前站点信息"""
return Site.objects.get_current()
class CommonMarkdown:
@staticmethod
def _convert_markdown(value):
def _convert_markdown(value: str) -> tuple[str, str]:
"""转换Markdown为HTML和目录"""
md = markdown.Markdown(
extensions=[
'extra',
@ -114,119 +122,156 @@ class CommonMarkdown:
return body, toc
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
def get_markdown_with_toc(value: str) -> tuple[str, str]:
"""获取带目录的Markdown转换结果"""
return CommonMarkdown._convert_markdown(value)
@staticmethod
def get_markdown(value):
body, toc = CommonMarkdown._convert_markdown(value)
def get_markdown(value: str) -> str:
"""获取纯HTML的Markdown转换结果"""
body, _ = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
def send_email(emailto: list, title: str, content: str) -> None:
"""发送邮件(通过信号机制)"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
content=content
)
def generate_code() -> str:
"""生成随机验证码"""
"""生成6位数字随机验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
def parse_dict_to_url(dict_data: dict) -> str:
"""将字典转换为URL查询字符串"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
return '&'.join([
f"{quote(k, safe='/')}={quote(v, safe='/')}"
for k, v in dict_data.items()
])
def get_blog_setting():
def get_blog_setting() -> 'BlogSettings': # 类型注解使用字符串避免循环导入
"""获取博客系统设置(带缓存)"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
# 缓存未命中,从数据库获取
from blog.models import BlogSettings
try:
# 尝试获取已有设置
value = BlogSettings.objects.first()
if not value:
# 无设置时初始化默认配置
value = BlogSettings(
site_name='djangoblog',
site_description='基于Django的博客系统',
site_seo_description='基于Django的博客系统',
site_keywords='Django,Python',
article_sub_length=300,
sidebar_article_count=10,
sidebar_comment_count=5,
show_google_adsense=False,
open_site_comment=True,
analytics_code='',
beian_code='',
show_gongan_code=False,
comment_need_review=False
)
value.save()
# 更新缓存
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
except Exception as e:
logger.error(f"获取博客设置失败: {e}")
# 确保始终返回有效值(即使数据库操作失败)
if not value:
value = BlogSettings() # 返回空对象避免调用方报错
return value
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''
logger.info(url)
def save_user_avatar(url: str) -> str:
"""保存用户头像到本地并返回URL"""
logger.info(f"处理头像URL: {url}")
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
# 发送请求获取图片(指定超时和用户代理)
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
rsp = requests.get(url, timeout=5, headers=headers)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
os.makedirs(basedir, exist_ok=True) # 确保目录存在
# 验证图片扩展名
image_extensions = ('.jpg', '.png', '.jpeg', '.gif')
ext = os.path.splitext(url)[1].lower()
if ext not in image_extensions:
ext = '.jpg' # 默认扩展名
# 生成唯一文件名并保存
save_filename = f"{uuid.uuid4().hex}{ext}"
save_path = os.path.join(basedir, save_filename)
with open(save_path, 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
return static(f'avatar/{save_filename}')
except requests.exceptions.RequestException as e:
logger.error(f"头像下载失败: {e}")
except OSError as e:
logger.error(f"头像保存失败: {e}")
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
logger.error(f"头像处理异常: {e}")
# 异常时返回默认头像
return static('blog/img/avatar.png')
def delete_sidebar_cache():
def delete_sidebar_cache() -> None:
"""删除侧边栏缓存"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
keys = [f"sidebar{x}" for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
logger.info(f'delete sidebar key: {k}')
cache.delete(k)
def delete_view_cache(prefix, keys):
def delete_view_cache(prefix: str, keys: list) -> None:
"""删除视图模板片段缓存"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
def get_resource_url() -> str:
"""获取静态资源基础URL"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
site = get_current_site()
return f'http://{site.domain}/static/'
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
# HTML清理配置
ALLOWED_TAGS = [
'a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i',
'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p'
]
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title'],
'abbr': ['title'],
'acronym': ['title']
}
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
def sanitize_html(html: str) -> str:
"""清理HTML内容仅保留允许的标签和属性"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
Loading…
Cancel
Save