|
|
#gq:
|
|
|
#!/usr/bin/env python
|
|
|
# encoding: utf-8
|
|
|
|
|
|
import logging
|
|
|
import os
|
|
|
import random
|
|
|
import string
|
|
|
import uuid
|
|
|
from hashlib import sha256
|
|
|
from urllib.parse import quote
|
|
|
|
|
|
import bleach
|
|
|
import markdown
|
|
|
import requests
|
|
|
from django.conf import settings
|
|
|
from django.contrib.sites.models import Site
|
|
|
from django.core.cache import cache
|
|
|
from django.core.cache.utils import make_template_fragment_key
|
|
|
from django.http import HttpRequest
|
|
|
from django.templatetags.static import static
|
|
|
from django.utils.cache import get_cache_key
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
def get_max_articleid_commentid():
|
|
|
"""获取最新文章和评论的ID"""
|
|
|
from blog.models import Article
|
|
|
from comments.models import Comment
|
|
|
return (Article.objects.latest().pk, Comment.objects.latest().pk)
|
|
|
|
|
|
|
|
|
def get_sha256(str):
|
|
|
"""计算字符串的SHA-256哈希值"""
|
|
|
m = sha256(str.encode('utf-8'))
|
|
|
return m.hexdigest()
|
|
|
|
|
|
|
|
|
def cache_decorator(expiration=3 * 60):
|
|
|
"""函数缓存装饰器,默认缓存3分钟"""
|
|
|
|
|
|
def wrapper(func):
|
|
|
def news(*args, **kwargs):
|
|
|
try:
|
|
|
# 尝试从请求对象获取缓存键
|
|
|
view = args[0]
|
|
|
key = view.get_cache_key()
|
|
|
except:
|
|
|
# 否则根据函数和参数生成唯一键
|
|
|
unique_str = repr((func, args, kwargs))
|
|
|
key = get_sha256(unique_str)
|
|
|
|
|
|
value = cache.get(key)
|
|
|
if value is not None:
|
|
|
# 返回缓存值,处理空值标记
|
|
|
return None if str(value) == '__default_cache_value__' else value
|
|
|
|
|
|
# 缓存未命中,执行函数并缓存结果
|
|
|
logger.debug(f'cache_decorator set cache:{func.__name__} key:{key}')
|
|
|
value = func(*args, **kwargs)
|
|
|
cache.set(key, value if value is not None else '__default_cache_value__', expiration)
|
|
|
return value
|
|
|
|
|
|
return news
|
|
|
|
|
|
return wrapper
|
|
|
|
|
|
|
|
|
def expire_view_cache(path, servername, serverport, key_prefix=None):
|
|
|
"""刷新指定URL的视图缓存"""
|
|
|
request = HttpRequest()
|
|
|
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
|
|
|
request.path = path
|
|
|
|
|
|
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
|
|
|
if key:
|
|
|
logger.info(f'expire_view_cache:get key:{path}')
|
|
|
cache.delete(key)
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
|
|
|
@cache_decorator()
|
|
|
def get_current_site():
|
|
|
"""获取当前站点信息(带缓存)"""
|
|
|
return Site.objects.get_current()
|
|
|
|
|
|
|
|
|
class CommonMarkdown:
|
|
|
"""Markdown解析工具类"""
|
|
|
|
|
|
@staticmethod
|
|
|
def _convert_markdown(value):
|
|
|
"""内部方法:执行Markdown转换,返回HTML和目录"""
|
|
|
md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc', 'tables'])
|
|
|
return md.convert(value), md.toc
|
|
|
|
|
|
@staticmethod
|
|
|
def get_markdown_with_toc(value):
|
|
|
"""转换Markdown为HTML(含目录)"""
|
|
|
return CommonMarkdown._convert_markdown(value)
|
|
|
|
|
|
@staticmethod
|
|
|
def get_markdown(value):
|
|
|
"""转换Markdown为HTML(不含目录)"""
|
|
|
body, _ = CommonMarkdown._convert_markdown(value)
|
|
|
return body
|
|
|
|
|
|
|
|
|
def send_email(emailto, title, content):
|
|
|
"""发送邮件(通过信号解耦)"""
|
|
|
from djangoblog.blog_signals import send_email_signal
|
|
|
send_email_signal.send(send_email.__class__, emailto=emailto, title=title, content=content)
|
|
|
|
|
|
|
|
|
def generate_code() -> str:
|
|
|
"""生成6位随机数字验证码"""
|
|
|
return ''.join(random.sample(string.digits, 6))
|
|
|
|
|
|
|
|
|
def parse_dict_to_url(dict):
|
|
|
"""将字典转换为URL查询字符串"""
|
|
|
return '&'.join([f'{quote(k, safe="/")}={quote(v, safe="/")}' for k, v in dict.items()])
|
|
|
|
|
|
|
|
|
def get_blog_setting():
|
|
|
"""获取博客系统设置(带缓存,无数据时初始化)"""
|
|
|
value = cache.get('get_blog_setting')
|
|
|
if value:
|
|
|
return value
|
|
|
|
|
|
from blog.models import BlogSettings
|
|
|
if not BlogSettings.objects.count():
|
|
|
# 初始化默认设置
|
|
|
setting = BlogSettings(
|
|
|
site_name='djangoblog',
|
|
|
site_description='基于Django的博客系统',
|
|
|
site_seo_description='基于Django的博客系统',
|
|
|
site_keywords='Django,Python',
|
|
|
article_sub_length=300,
|
|
|
sidebar_article_count=10,
|
|
|
sidebar_comment_count=5,
|
|
|
show_google_adsense=False,
|
|
|
open_site_comment=True,
|
|
|
analytics_code='',
|
|
|
beian_code='',
|
|
|
show_gongan_code=False,
|
|
|
comment_need_review=False
|
|
|
)
|
|
|
setting.save()
|
|
|
|
|
|
value = BlogSettings.objects.first()
|
|
|
cache.set('get_blog_setting', value)
|
|
|
return value
|
|
|
|
|
|
|
|
|
def save_user_avatar(url):
|
|
|
"""下载并保存用户头像到本地,返回静态文件URL"""
|
|
|
try:
|
|
|
basedir = os.path.join(settings.STATICFILES, 'avatar')
|
|
|
rsp = requests.get(url, timeout=2)
|
|
|
if rsp.status_code == 200:
|
|
|
os.makedirs(basedir, exist_ok=True)
|
|
|
|
|
|
# 确定文件扩展名
|
|
|
ext = os.path.splitext(url)[1] if any(
|
|
|
url.endswith(ext) for ext in ['.jpg', '.png', 'jpeg', '.gif']) else '.jpg'
|
|
|
save_filename = f'{uuid.uuid4().hex}{ext}'
|
|
|
|
|
|
with open(os.path.join(basedir, save_filename), 'wb+') as file:
|
|
|
file.write(rsp.content)
|
|
|
return static(f'avatar/{save_filename}')
|
|
|
except Exception as e:
|
|
|
logger.error(e)
|
|
|
return static('blog/img/avatar.png') # 返回默认头像
|
|
|
|
|
|
|
|
|
def delete_sidebar_cache():
|
|
|
"""删除侧边栏相关缓存"""
|
|
|
from blog.models import LinkShowType
|
|
|
keys = [f"sidebar{x}" for x in LinkShowType.values]
|
|
|
for k in keys:
|
|
|
logger.info(f'delete sidebar key:{k}')
|
|
|
cache.delete(k)
|
|
|
|
|
|
|
|
|
def delete_view_cache(prefix, keys):
|
|
|
"""删除指定模板片段缓存"""
|
|
|
key = make_template_fragment_key(prefix, keys)
|
|
|
cache.delete(key)
|
|
|
|
|
|
|
|
|
def get_resource_url():
|
|
|
"""获取静态资源基础URL"""
|
|
|
if settings.STATIC_URL:
|
|
|
return settings.STATIC_URL
|
|
|
site = get_current_site()
|
|
|
return f'http://{site.domain}/static/'
|
|
|
|
|
|
|
|
|
# HTML清理配置
|
|
|
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
|
|
|
'h2', 'p']
|
|
|
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
|
|
|
|
|
|
|
|
|
def sanitize_html(html):
|
|
|
"""清理HTML,只保留允许的标签和属性"""
|
|
|
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES) |