You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Django/djangoblog-yql/utils.py

247 lines
7.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# encoding: utf-8
# 余秋玲: 此文件定义了DjangoBlog的通用工具函数
# 余秋玲: 导入必要的模块
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
# 余秋玲: 获取日志记录器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
# 余秋玲: 获取最大的文章ID和评论ID
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
# 余秋玲: 计算字符串的SHA256哈希值
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
# 余秋玲: 缓存装饰器,用于缓存函数结果
def wrapper(func):
def news(*args, **kwargs):
try:
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
# 余秋玲: 刷新视图缓存
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
# 余秋玲: 获取当前站点,带缓存
site = Site.objects.get_current()
return site
class CommonMarkdown:
# 余秋玲: Markdown处理工具类
@staticmethod
def _convert_markdown(value):
# 余秋玲: 内部方法转换Markdown为HTML
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value)
toc = md.toc
return body, toc
@staticmethod
def get_markdown_with_toc(value):
# 余秋玲: 获取带目录的Markdown转换结果
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
# 余秋玲: 获取Markdown转换结果
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
# 余秋玲: 发送邮件函数
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
# 余秋玲: 生成随机数验证码
"""生成随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
# 余秋玲: 将字典解析为URL查询字符串
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
# 余秋玲: 获取博客设置,带缓存
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
# 余秋玲: 保存用户头像
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
def delete_sidebar_cache():
# 余秋玲: 删除侧边栏缓存
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
# 余秋玲: 删除视图缓存
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
# 余秋玲: 获取资源URL
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
# 余秋玲: 允许的HTML标签和属性
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', '