You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
DjangoBlog/djangoblog/utils.py

210 lines
6.4 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#gq:
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
from urllib.parse import quote
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.cache.utils import make_template_fragment_key
from django.http import HttpRequest
from django.templatetags.static import static
from django.utils.cache import get_cache_key
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""获取最新文章和评论的ID"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""计算字符串的SHA-256哈希值"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""函数缓存装饰器默认缓存3分钟"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从请求对象获取缓存键
view = args[0]
key = view.get_cache_key()
except:
# 否则根据函数和参数生成唯一键
unique_str = repr((func, args, kwargs))
key = get_sha256(unique_str)
value = cache.get(key)
if value is not None:
# 返回缓存值,处理空值标记
return None if str(value) == '__default_cache_value__' else value
# 缓存未命中,执行函数并缓存结果
logger.debug(f'cache_decorator set cache:{func.__name__} key:{key}')
value = func(*args, **kwargs)
cache.set(key, value if value is not None else '__default_cache_value__', expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
"""刷新指定URL的视图缓存"""
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info(f'expire_view_cache:get key:{path}')
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""获取当前站点信息(带缓存)"""
return Site.objects.get_current()
class CommonMarkdown:
"""Markdown解析工具类"""
@staticmethod
def _convert_markdown(value):
"""内部方法执行Markdown转换返回HTML和目录"""
md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc', 'tables'])
return md.convert(value), md.toc
@staticmethod
def get_markdown_with_toc(value):
"""转换Markdown为HTML含目录"""
return CommonMarkdown._convert_markdown(value)
@staticmethod
def get_markdown(value):
"""转换Markdown为HTML不含目录"""
body, _ = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""发送邮件(通过信号解耦)"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(send_email.__class__, emailto=emailto, title=title, content=content)
def generate_code() -> str:
"""生成6位随机数字验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""将字典转换为URL查询字符串"""
return '&'.join([f'{quote(k, safe="/")}={quote(v, safe="/")}' for k, v in dict.items()])
def get_blog_setting():
"""获取博客系统设置(带缓存,无数据时初始化)"""
value = cache.get('get_blog_setting')
if value:
return value
from blog.models import BlogSettings
if not BlogSettings.objects.count():
# 初始化默认设置
setting = BlogSettings(
site_name='djangoblog',
site_description='基于Django的博客系统',
site_seo_description='基于Django的博客系统',
site_keywords='Django,Python',
article_sub_length=300,
sidebar_article_count=10,
sidebar_comment_count=5,
show_google_adsense=False,
open_site_comment=True,
analytics_code='',
beian_code='',
show_gongan_code=False,
comment_need_review=False
)
setting.save()
value = BlogSettings.objects.first()
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
"""下载并保存用户头像到本地返回静态文件URL"""
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
os.makedirs(basedir, exist_ok=True)
# 确定文件扩展名
ext = os.path.splitext(url)[1] if any(
url.endswith(ext) for ext in ['.jpg', '.png', 'jpeg', '.gif']) else '.jpg'
save_filename = f'{uuid.uuid4().hex}{ext}'
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static(f'avatar/{save_filename}')
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""删除侧边栏相关缓存"""
from blog.models import LinkShowType
keys = [f"sidebar{x}" for x in LinkShowType.values]
for k in keys:
logger.info(f'delete sidebar key:{k}')
cache.delete(k)
def delete_view_cache(prefix, keys):
"""删除指定模板片段缓存"""
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""获取静态资源基础URL"""
if settings.STATIC_URL:
return settings.STATIC_URL
site = get_current_site()
return f'http://{site.domain}/static/'
# HTML清理配置
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
"""清理HTML只保留允许的标签和属性"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)