#!/usr/bin/env python # encoding: utf-8 # Zxy导入日志模块 import logging import os import random import string import uuid from hashlib import sha256 import bleach import markdown import requests from django.conf import settings from django.contrib.sites.models import Site from django.core.cache import cache from django.templatetags.static import static # Zxy获取日志记录器 logger = logging.getLogger(__name__) # Zxy获取文章和评论的最大 ID def get_max_articleid_commentid(): from blog.models import Article from comments.models import Comment return (Article.objects.latest().pk, Comment.objects.latest().pk) # Zxy计算字符串的 SHA256 值 def get_sha256(str): m = sha256(str.encode('utf-8')) return m.hexdigest() # Zxy缓存装饰器,用于缓存函数的返回值 def cache_decorator(expiration=3 * 60): def wrapper(func): def news(*args, **kwargs): try: view = args[0] key = view.get_cache_key() except: key = None if not key: unique_str = repr((func, args, kwargs)) m = sha256(unique_str.encode('utf-8')) key = m.hexdigest() value = cache.get(key) if value is not None: if str(value) == '__default_cache_value__': return None else: return value else: logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key)) value = func(*args, **kwargs) if value is None: cache.set(key, '__default_cache_value__', expiration) else: cache.set(key, value, expiration) return value return news return wrapper # Zxy刷新视图缓存 def expire_view_cache(path, servername, serverport, key_prefix=None): from django.http import HttpRequest from django.utils.cache import get_cache_key request = HttpRequest() request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport} request.path = path key = get_cache_key(request, key_prefix=key_prefix, cache=cache) if key: logger.info('expire_view_cache:get key:{path}'.format(path=path)) if cache.get(key): cache.delete(key) return True return False # Zxy获取当前站点 @cache_decorator() def get_current_site(): site = Site.objects.get_current() return site # Zxy定义 CommonMarkdown 类,用于处理 Markdown 转换 class CommonMarkdown: @staticmethod def _convert_markdown(value): md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc', 'tables']) body = md.convert(value) toc = md.toc return body, toc @staticmethod def get_markdown_with_toc(value): body, toc = CommonMarkdown._convert_markdown(value) return body, toc @staticmethod def get_markdown(value): body, toc = CommonMarkdown._convert_markdown(value) return body # Zxy发送邮件 def send_email(emailto, title, content): from djangoblog.blog_signals import send_email_signal send_email_signal.send(send_email.__class__, emailto=emailto, title=title, content=content) # Zxy生成随机验证码 def generate_code() -> str: return ''.join(random.sample(string.digits, 6)) # Zxy将字典转换为 URL 参数 def parse_dict_to_url(dict): from urllib.parse import quote url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/')) for k, v in dict.items()]) return url # Zxy获取博客设置 def get_blog_setting(): value = cache.get('get_blog_setting') if value: return value else: from blog.models import BlogSettings if not BlogSettings.objects.count(): setting = BlogSettings() setting.site_name = 'djangoblog' setting.site_description = '基于Django的博客系统' setting.site_seo_description = '基于Django的博客系统' setting.site_keywords = 'Django,Python' setting.article_sub_length = 300 setting.sidebar_article_count = 10 setting.sidebar_comment_count = 5 setting.show_google_adsense = False setting.open_site_comment = True setting.analytics_code = '' setting.beian_code = '' setting.show_gongan_code = False setting.comment_need_review = False setting.save() value = BlogSettings.objects.first() logger.info('set cache get_blog_setting') cache.set('get_blog_setting', value) return value # Zxy保存用户头像 def save_user_avatar(url): logger.info(url) try: basedir = os.path.join(settings.STATICFILES, 'avatar') rsp = requests.get(url, timeout=2) if rsp.status_code == 200: if not os.path.exists(basedir): os.makedirs(basedir) image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 ext = os.path.splitext(url)[1] if isimage else '.jpg' save_filename = str(uuid.uuid4().hex) + ext logger.info('保存用户头像:' + basedir + save_filename) with open(os.path.join(basedir, save_filename), 'wb+') as file: file.write(rsp.content) return static('avatar/' + save_filename) except Exception as e: logger.error(e) return static('blog/img/avatar.png') # Zxy删除侧边栏缓存 def delete_sidebar_cache(): from blog.models import LinkShowType keys = ["sidebar" + x for x in LinkShowType.values] for k in keys: logger.info('delete sidebar key:' + k) cache.delete(k) # Zxy删除视图缓存 def delete_view_cache(prefix, keys): from django.core.cache.utils import make_template_fragment_key key = make_template_fragment_key(prefix, keys) cache.delete(key) # Zxy获取资源 URL def get_resource_url(): if settings.STATIC_URL: return settings.STATIC_URL else: site = get_current_site() return 'http://' + site.domain + '/static/' # Zxy定义允许的 HTML 标签和属性 ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p'] ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']} # Zxy清理 HTML 内容 def sanitize_html(html): return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)