#!/usr/bin/env python # encoding: utf-8 import logging import os import random import string import uuid from hashlib import sha256 import bleach import markdown import requests from django.conf import settings from django.contrib.sites.models import Site from django.core.cache import cache from django.templatetags.static import static # 获取日志记录器 logger = logging.getLogger(__name__) def get_max_articleid_commentid(): """获取最新的文章ID和评论ID""" from blog.models import Article from comments.models import Comment return (Article.objects.latest().pk, Comment.objects.latest().pk) def get_sha256(str): """计算字符串的SHA256哈希值""" m = sha256(str.encode('utf-8')) return m.hexdigest() def cache_decorator(expiration=3 * 60): """缓存装饰器,用于缓存函数结果 Args: expiration: 缓存过期时间,默认3分钟 """ def wrapper(func): def news(*args, **kwargs): try: # 尝试从视图获取缓存键 view = args[0] key = view.get_cache_key() except: key = None if not key: # 如果没有缓存键,根据函数和参数生成唯一键 unique_str = repr((func, args, kwargs)) m = sha256(unique_str.encode('utf-8')) key = m.hexdigest() value = cache.get(key) if value is not None: # logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key)) if str(value) == '__default_cache_value__': return None else: return value else: logger.debug( 'cache_decorator set cache:%s key:%s' % (func.__name__, key)) value = func(*args, **kwargs) if value is None: # 如果函数返回None,设置默认缓存值 cache.set(key, '__default_cache_value__', expiration) else: cache.set(key, value, expiration) return value return news return wrapper def expire_view_cache(path, servername, serverport, key_prefix=None): ''' 刷新视图缓存 :param path:url路径 :param servername:host :param serverport:端口 :param key_prefix:前缀 :return:是否成功 ''' from django.http import HttpRequest from django.utils.cache import get_cache_key # 创建模拟请求对象 request = HttpRequest() request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport} request.path = path # 获取缓存键并删除缓存 key = get_cache_key(request, key_prefix=key_prefix, cache=cache) if key: logger.info('expire_view_cache:get key:{path}'.format(path=path)) if cache.get(key): cache.delete(key) return True return False @cache_decorator() def get_current_site(): """获取当前站点(带缓存)""" site = Site.objects.get_current() return site class CommonMarkdown: """Markdown处理工具类""" @staticmethod def _convert_markdown(value): """内部方法:转换Markdown文本为HTML""" md = markdown.Markdown( extensions=[ 'extra', # 额外扩展 'codehilite', # 代码高亮 'toc', # 目录生成 'tables', # 表格支持 ] ) body = md.convert(value) # 转换Markdown为HTML toc = md.toc # 获取目录 return body, toc @staticmethod def get_markdown_with_toc(value): """获取带目录的Markdown转换结果""" body, toc = CommonMarkdown._convert_markdown(value) return body, toc @staticmethod def get_markdown(value): """获取Markdown转换结果(不带目录)""" body, toc = CommonMarkdown._convert_markdown(value) return body def send_email(emailto, title, content): """发送邮件(通过信号机制)""" from djangoblog.blog_signals import send_email_signal send_email_signal.send( send_email.__class__, emailto=emailto, title=title, content=content) def generate_code() -> str: """生成6位随机数验证码""" return ''.join(random.sample(string.digits, 6)) def parse_dict_to_url(dict): """将字典转换为URL参数字符串""" from urllib.parse import quote url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/')) for k, v in dict.items()]) return url def get_blog_setting(): """获取博客设置(带缓存)""" value = cache.get('get_blog_setting') if value: return value else: from blog.models import BlogSettings # 如果数据库中没有博客设置,创建默认设置 if not BlogSettings.objects.count(): setting = BlogSettings() setting.site_name = 'djangoblog' setting.site_description = '基于Django的博客系统' setting.site_seo_description = '基于Django的博客系统' setting.site_keywords = 'Django,Python' setting.article_sub_length = 300 setting.sidebar_article_count = 10 setting.sidebar_comment_count = 5 setting.show_google_adsense = False setting.open_site_comment = True setting.analytics_code = '' setting.beian_code = '' setting.show_gongan_code = False setting.comment_need_review = False setting.save() value = BlogSettings.objects.first() logger.info('set cache get_blog_setting') cache.set('get_blog_setting', value) # 设置缓存 return value def save_user_avatar(url): ''' 保存用户头像到本地 :param url:头像url :return: 本地路径 ''' logger.info(url) try: basedir = os.path.join(settings.STATICFILES, 'avatar') rsp = requests.get(url, timeout=2) # 下载头像 if rsp.status_code == 200: if not os.path.exists(basedir): os.makedirs(basedir) # 创建头像目录 # 检查文件是否为图片格式 image_extensions = ['.jpg', '.png', 'jpeg', '.gif'] isimage = len([i for i in image_extensions if url.endswith(i)]) > 0 ext = os.path.splitext(url)[1] if isimage else '.jpg' save_filename = str(uuid.uuid4().hex) + ext # 生成唯一文件名 logger.info('保存用户头像:' + basedir + save_filename) # 保存头像文件 with open(os.path.join(basedir, save_filename), 'wb+') as file: file.write(rsp.content) return static('avatar/' + save_filename) # 返回静态文件URL except Exception as e: logger.error(e) return static('blog/img/avatar.png') # 返回默认头像 def delete_sidebar_cache(): """删除侧边栏相关缓存""" from blog.models import LinkShowType keys = ["sidebar" + x for x in LinkShowType.values] for k in keys: logger.info('delete sidebar key:' + k) cache.delete(k) def delete_view_cache(prefix, keys): """删除视图缓存""" from django.core.cache.utils import make_template_fragment_key key = make_template_fragment_key(prefix, keys) cache.delete(key) def get_resource_url(): """获取资源URL(静态文件URL)""" if settings.STATIC_URL: return settings.STATIC_URL else: site = get_current_site() return 'http://' + site.domain + '/static/' # HTML清理允许的标签 ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p'] # HTML清理允许的属性 ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']} def sanitize_html(html): """清理HTML,移除不安全的标签和属性""" return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)