You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
homework/src%2FDjangoBlog/utils.py

262 lines
8.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
#yyb encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
#yyb 获取日志记录器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""#yyb 获取最新的文章ID和评论ID"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""#yyb 计算字符串的SHA256哈希值"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""#yyb 缓存装饰器,用于缓存函数结果
Args:
expiration: 缓存过期时间默认3分钟
"""
def wrapper(func):
def news(*args, **kwargs):
try:
#yyb 尝试从视图获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
#yyb 如果没有缓存键,根据函数和参数生成唯一键
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
#yyb logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
#yyb 如果函数返回None设置默认缓存值
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
#yyb 刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
#yyb 创建模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
#yyb 获取缓存键并删除缓存
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""#yyb 获取当前站点(带缓存)"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""#yyb Markdown处理工具类"""
@staticmethod
def _convert_markdown(value):
"""#yyb 内部方法转换Markdown文本为HTML"""
md = markdown.Markdown(
extensions=[
'extra', #yyb 额外扩展
'codehilite', #yyb 代码高亮
'toc', #yyb 目录生成
'tables', #yyb 表格支持
]
)
body = md.convert(value) #yyb 转换Markdown为HTML
toc = md.toc #yyb 获取目录
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""#yyb 获取带目录的Markdown转换结果"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""#yyb 获取Markdown转换结果不带目录"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""#yyb 发送邮件(通过信号机制)"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""#yyb 生成6位随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""#yyb 将字典转换为URL参数字符串"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
"""#yyb 获取博客设置(带缓存)"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
#yyb 如果数据库中没有博客设置,创建默认设置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value) #yyb 设置缓存
return value
def save_user_avatar(url):
'''
#yyb 保存用户头像到本地
:param url:头像url
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2) #yyb 下载头像
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir) #yyb 创建头像目录
#yyb 检查文件是否为图片格式
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext #yyb 生成唯一文件名
logger.info('保存用户头像:' + basedir + save_filename)
#yyb 保存头像文件
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename) #yyb 返回静态文件URL
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png') #yyb 返回默认头像
def delete_sidebar_cache():
"""#yyb 删除侧边栏相关缓存"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""#yyb 删除视图缓存"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""#yyb 获取资源URL静态文件URL"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
#yyb HTML清理允许的标签
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
#yyb HTML清理允许的属性
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
"""#yyb 清理HTML移除不安全的标签和属性"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)