Compare commits

..

114 Commits

Author SHA1 Message Date
盛钧涛 9be7adbd14 Merge branch 'sjt_branch'
2 months ago
盛钧涛 beb070948b add 1111
2 months ago
盛钧涛 322e77599c Merge branch 'develop'
2 months ago
盛钧涛 3808109f8a add 111
2 months ago
盛钧涛 9b51b07881 Merge branch 'master' of https://bdgit.educoder.net/puhanfmc3/tentest
2 months ago
盛钧涛 466a6864bc add new code
2 months ago
陌渝 8bc899d943 Merge branch 'develop'
2 months ago
陌渝 fb5f469f4a Merge branch 'master' of https://bdgit.educoder.net/puhanfmc3/tentest
2 months ago
陌渝 30400814ab 1
2 months ago
陌渝 2cb5514f11 Merge remote-tracking branch 'origin' into develop
2 months ago
陌渝 54ab827919 Merge branch 'wr_branch' into develop
2 months ago
陌渝 9c3bf6e105 1
2 months ago
陌渝 4abe1ee4a9 2
2 months ago
盛钧涛 1c77fe8e22 Merge branch 'develop'
2 months ago
盛钧涛 184d73b9e4 Merge branch 'mk_branch' into develop
2 months ago
陌渝 75628202e3 1
2 months ago
陌渝 5c389a179c 1
2 months ago
陌渝 c47c8ea166 1
2 months ago
陌渝 9928651766 2
2 months ago
陌渝 24c10c00ce Merge branch 'wr_branch' of https://bdgit.educoder.net/puhanfmc3/tentest into wr_branch
2 months ago
陌渝 6352e52bed 1
2 months ago
陌渝 f9bbb4c87e 1
2 months ago
陌渝 26d4c9752f 1
2 months ago
陌渝 87d4d4e590 1
2 months ago
陌渝 c8413e7577 6
2 months ago
盛钧涛 c51b8b0c56 add oauth
2 months ago
盛钧涛 cedd0a20cf add oauth
2 months ago
mk 72b8e67060 质量分析报告
2 months ago
mk 18e424dd97 质量分析报告
2 months ago
盛钧涛 d52dc2800c Merge branch 'mk_branch'
2 months ago
盛钧涛 e9ed2007f8 Merge branch 'mk_branch' into develop
2 months ago
mk 3ad5c5a754 6
2 months ago
mk dfe9b32351 6
2 months ago
盛钧涛 48708d60dd Merge branch 'frr_branch'
2 months ago
盛钧涛 d6d92db1f3 Merge branch 'frr_branch' into develop
2 months ago
盛钧涛 8641273b52 11
2 months ago
盛钧涛 d888017d59 Merge branch 'frr_branch'
2 months ago
盛钧涛 afe4e723e0 del
2 months ago
盛钧涛 4c2e221de9 Merge branch 'frr_branch' into develop
2 months ago
盛钧涛 ebd9538c68 del
2 months ago
盛钧涛 0cc903fbc4 del
2 months ago
盛钧涛 8380c07eab del others
2 months ago
frr 2da8518e4f 修改
2 months ago
frr 37d35baab4 首次提交
2 months ago
frr db19f9ab12 111
2 months ago
frr de046165a5 2
2 months ago
frr d4b3780968 1
2 months ago
pxksbc67f 763ee35ee5 Add frrweek8work3
2 months ago
盛钧涛 c36961b293 change accounts
2 months ago
盛钧涛 6143a4a432 Merge branch 'master' into develop
2 months ago
盛钧涛 f91eca2d4d add DjangoBlog
2 months ago
盛钧涛 51967e47d9 add DjangoBlog
2 months ago
盛钧涛 0d52b1cd2e Merge branch 'develop'
2 months ago
盛钧涛 12ee321c7f del DjangoBlog
2 months ago
盛钧涛 017f932fe9 del DjangoBlog
2 months ago
盛钧涛 506541d6f7 add DjangoBlog
2 months ago
mk c06d15ba90 Merge branch 'mk_branch' into develop
3 months ago
盛钧涛 074e9f5476 Merge branch 'sjt_branch' into develop
3 months ago
盛钧涛 dcbee62795 add change
3 months ago
盛钧涛 58fcb6b24a Merge branch 'sjt_branch' into develop
3 months ago
盛钧涛 9e1408389e add change
3 months ago
mk 746a6adc0b add
3 months ago
mk d208e833ff Merge remote-tracking branch 'origin/develop' into develop
3 months ago
盛钧涛 2c96334b4b add week6work
3 months ago
盛钧涛 260a92ff65 Merge branch 'sjt_branch' into develop
3 months ago
盛钧涛 7fd7fde93d del test
3 months ago
盛钧涛 f2cdcfba87 add week5work
3 months ago
盛钧涛 245a18b53f add test
3 months ago
陌渝 b59800b7a9 删除文件
3 months ago
陌渝 509ec8360d 添加文件
3 months ago
陌渝 32dd36a0e2 添加文件
3 months ago
陌渝 8e8d9ea64c 添加文件
3 months ago
陌渝 e899883087 添加文件
3 months ago
陌渝 5be757fbbf 添加文件
3 months ago
盛钧涛 59897191ac del 2test
3 months ago
盛钧涛 a07a4e068b Merge branch 'frr_branch' of https://bdgit.educoder.net/puhanfmc3/tentest into develop
3 months ago
pxksbc67f 9792d18d9a Add frrweek4work2
3 months ago
盛钧涛 ee8fcfefcb add doc and src
3 months ago
盛钧涛 3e0fc26d2c shanchu
3 months ago
盛钧涛 22a24281be Merge branch 'develop' of https://bdgit.educoder.net/puhanfmc3/tentest into develop
3 months ago
puhanfmc3 62e81357dd Add week4work3
3 months ago
盛钧涛 15da01e6e9 rm week4work
3 months ago
puhanfmc3 84f4c9e8f3 Add week4work
3 months ago
盛钧涛 77df3e9e9e add src
3 months ago
盛钧涛 ac177291fe add doc
3 months ago
盛钧涛 43fabaeb51 add doc
3 months ago
盛钧涛 580968353e add src
3 months ago
盛钧涛 65f769084f add doc
3 months ago
盛钧涛 a7541d3093 test2
3 months ago
盛钧涛 1a332d9c92 test
3 months ago
盛钧涛 ed17f8fd02 Merge branch 'sjt_branch' of https://bdgit.educoder.net/puhanfmc3/tentest into sjt_branch
3 months ago
盛钧涛 9bdaac90a5 add doc and src
3 months ago
puhanfmc3 afc75321c9 Add src
3 months ago
puhanfmc3 3b612a260f Delete 'src'
3 months ago
puhanfmc3 04a70ac42b Add src
3 months ago
puhanfmc3 feddbef978 Add doc
3 months ago
puhanfmc3 cf77dfa0e7 Add src
3 months ago
puhanfmc3 2bb0abdc49 Add doc
3 months ago
puhanfmc3 cca871a53b Delete 'src.md'
3 months ago
puhanfmc3 4762f52d39 Delete 'doc.md'
3 months ago
puhanfmc3 cc874b667f Delete '.idea/vcs.xml'
3 months ago
puhanfmc3 797f57fe57 Delete '.idea/modules.xml'
3 months ago
puhanfmc3 45f725295e Delete '.idea/misc.xml'
3 months ago
puhanfmc3 2b8a829300 Delete '.idea/PythonGITproject.iml'
3 months ago
puhanfmc3 c6fb2238ac Delete '.idea/inspectionProfiles/profiles_settings.xml'
3 months ago
puhanfmc3 d4f097745d Delete '.idea/.gitignore'
3 months ago
puhanfmc3 fd175c6b8f Delete 'test1.py'
3 months ago
盛钧涛 731d3625dd add test1
3 months ago
盛钧涛 d0115e3660 add test1
3 months ago
陌渝 735a5e5a66 Merge branch 'master' of https://bdgit.educoder.net/puhanfmc3/tentest
4 months ago
陌渝 8583e895d3 提交wr666
4 months ago
pxksbc67f c138ab8d57 Add src
4 months ago
pxksbc67f c3c378b76b Add doc
4 months ago
盛钧涛 51df660506 xiugaichenggtest
4 months ago

@ -1,3 +0,0 @@
# 默认忽略的文件
/shelf/
/workspace.xml

@ -1,5 +0,0 @@
from django.apps import AppConfig
class AccountsConfig(AppConfig):
name = 'accounts'

@ -1,26 +0,0 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
class EmailOrUsernameModelBackend(ModelBackend):
"""
允许使用用户名或邮箱登录
"""
def authenticate(self, request, username=None, password=None, **kwargs):
if '@' in username:
kwargs = {'email': username}
else:
kwargs = {'username': username}
try:
user = get_user_model().objects.get(**kwargs)
if user.check_password(password):
return user
except get_user_model().DoesNotExist:
return None
def get_user(self, username):
try:
return get_user_model().objects.get(pk=username)
except get_user_model().DoesNotExist:
return None

@ -1,49 +0,0 @@
from django.contrib import admin
from django.urls import reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
def disable_commentstatus(modeladmin, request, queryset):
queryset.update(is_enable=False)
def enable_commentstatus(modeladmin, request, queryset):
queryset.update(is_enable=True)
disable_commentstatus.short_description = _('Disable comments')
enable_commentstatus.short_description = _('Enable comments')
class CommentAdmin(admin.ModelAdmin):
list_per_page = 20
list_display = (
'id',
'body',
'link_to_userinfo',
'link_to_article',
'is_enable',
'creation_time')
list_display_links = ('id', 'body', 'is_enable')
list_filter = ('is_enable',)
exclude = ('creation_time', 'last_modify_time')
actions = [disable_commentstatus, enable_commentstatus]
raw_id_fields = ('author', 'article')
search_fields = ('body',)
def link_to_userinfo(self, obj):
info = (obj.author._meta.app_label, obj.author._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
def link_to_article(self, obj):
info = (obj.article._meta.app_label, obj.article._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.article.id,))
return format_html(
u'<a href="%s">%s</a>' % (link, obj.article.title))
link_to_userinfo.short_description = _('User')
link_to_article.short_description = _('Article')

@ -1,5 +0,0 @@
from django.apps import AppConfig
class CommentsConfig(AppConfig):
name = 'comments'

@ -1,13 +0,0 @@
from django import forms
from django.forms import ModelForm
from .models import Comment
class CommentForm(ModelForm):
parent_comment_id = forms.IntegerField(
widget=forms.HiddenInput, required=False)
class Meta:
model = Comment
fields = ['body']

@ -1,39 +0,0 @@
from django.conf import settings
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from blog.models import Article
# Create your models here.
class Comment(models.Model):
body = models.TextField('正文', max_length=300)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
on_delete=models.CASCADE)
article = models.ForeignKey(
Article,
verbose_name=_('article'),
on_delete=models.CASCADE)
parent_comment = models.ForeignKey(
'self',
verbose_name=_('parent comment'),
blank=True,
null=True,
on_delete=models.CASCADE)
is_enable = models.BooleanField(_('enable'),
default=False, blank=False, null=False)
class Meta:
ordering = ['-id']
verbose_name = _('comment')
verbose_name_plural = verbose_name
get_latest_by = 'id'
def __str__(self):
return self.body

@ -1,109 +0,0 @@
from django.test import Client, RequestFactory, TransactionTestCase
from django.urls import reverse
from accounts.models import BlogUser
from blog.models import Category, Article
from comments.models import Comment
from comments.templatetags.comments_tags import *
from djangoblog.utils import get_max_articleid_commentid
# Create your tests here.
class CommentsTest(TransactionTestCase):
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
from blog.models import BlogSettings
value = BlogSettings()
value.comment_need_review = True
value.save()
self.user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="liangliangyy1")
def update_article_comment_status(self, article):
comments = article.comment_set.all()
for comment in comments:
comment.is_enable = True
comment.save()
def test_validate_comment(self):
self.client.login(username='liangliangyy1', password='liangliangyy1')
category = Category()
category.name = "categoryccc"
category.save()
article = Article()
article.title = "nicetitleccc"
article.body = "nicecontentccc"
article.author = self.user
article.category = category
article.type = 'a'
article.status = 'p'
article.save()
comment_url = reverse(
'comments:postcomment', kwargs={
'article_id': article.id})
response = self.client.post(comment_url,
{
'body': '123ffffffffff'
})
self.assertEqual(response.status_code, 302)
article = Article.objects.get(pk=article.pk)
self.assertEqual(len(article.comment_list()), 0)
self.update_article_comment_status(article)
self.assertEqual(len(article.comment_list()), 1)
response = self.client.post(comment_url,
{
'body': '123ffffffffff',
})
self.assertEqual(response.status_code, 302)
article = Article.objects.get(pk=article.pk)
self.update_article_comment_status(article)
self.assertEqual(len(article.comment_list()), 2)
parent_comment_id = article.comment_list()[0].id
response = self.client.post(comment_url,
{
'body': '''
# Title1
```python
import os
```
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
''',
'parent_comment_id': parent_comment_id
})
self.assertEqual(response.status_code, 302)
self.update_article_comment_status(article)
article = Article.objects.get(pk=article.pk)
self.assertEqual(len(article.comment_list()), 3)
comment = Comment.objects.get(id=parent_comment_id)
tree = parse_commenttree(article.comment_list(), comment)
self.assertEqual(len(tree), 1)
data = show_comment_item(comment, True)
self.assertIsNotNone(data)
s = get_max_articleid_commentid()
self.assertIsNotNone(s)
from comments.utils import send_comment_email
send_comment_email(comment)

@ -1,11 +0,0 @@
from django.urls import path
from . import views
app_name = "comments"
urlpatterns = [
path(
'article/<int:article_id>/postcomment',
views.CommentPostView.as_view(),
name='postcomment'),
]

@ -1,38 +0,0 @@
import logging
from django.utils.translation import gettext_lazy as _
from djangoblog.utils import get_current_site
from djangoblog.utils import send_email
logger = logging.getLogger(__name__)
def send_comment_email(comment):
site = get_current_site().domain
subject = _('Thanks for your comment')
article_url = f"https://{site}{comment.article.get_absolute_url()}"
html_content = _("""<p>Thank you very much for your comments on this site</p>
You can visit <a href="%(article_url)s" rel="bookmark">%(article_title)s</a>
to review your comments,
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
%(article_url)s""") % {'article_url': article_url, 'article_title': comment.article.title}
tomail = comment.author.email
send_email([tomail], subject, html_content)
try:
if comment.parent_comment:
html_content = _("""Your comment on <a href="%(article_url)s" rel="bookmark">%(article_title)s</a><br/> has
received a reply. <br/> %(comment_body)s
<br/>
go check it out!
<br/>
If the link above cannot be opened, please copy this link to your browser.
%(article_url)s
""") % {'article_url': article_url, 'article_title': comment.article.title,
'comment_body': comment.parent_comment.body}
tomail = comment.parent_comment.author.email
send_email([tomail], subject, html_content)
except Exception as e:
logger.error(e)

@ -1,63 +0,0 @@
# Create your views here.
from django.core.exceptions import ValidationError
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.utils.decorators import method_decorator
from django.views.decorators.csrf import csrf_protect
from django.views.generic.edit import FormView
from accounts.models import BlogUser
from blog.models import Article
from .forms import CommentForm
from .models import Comment
class CommentPostView(FormView):
form_class = CommentForm
template_name = 'blog/article_detail.html'
@method_decorator(csrf_protect)
def dispatch(self, *args, **kwargs):
return super(CommentPostView, self).dispatch(*args, **kwargs)
def get(self, request, *args, **kwargs):
article_id = self.kwargs['article_id']
article = get_object_or_404(Article, pk=article_id)
url = article.get_absolute_url()
return HttpResponseRedirect(url + "#comments")
def form_invalid(self, form):
article_id = self.kwargs['article_id']
article = get_object_or_404(Article, pk=article_id)
return self.render_to_response({
'form': form,
'article': article
})
def form_valid(self, form):
"""提交的数据验证合法后的逻辑"""
user = self.request.user
author = BlogUser.objects.get(pk=user.pk)
article_id = self.kwargs['article_id']
article = get_object_or_404(Article, pk=article_id)
if article.comment_status == 'c' or article.status == 'c':
raise ValidationError("该文章评论已关闭.")
comment = form.save(False)
comment.article = article
from djangoblog.utils import get_blog_setting
settings = get_blog_setting()
if not settings.comment_need_review:
comment.is_enable = True
comment.author = author
if form.cleaned_data['parent_comment_id']:
parent_comment = Comment.objects.get(
pk=form.cleaned_data['parent_comment_id'])
comment.parent_comment = parent_comment
comment.save(True)
return HttpResponseRedirect(
"%s#div-comment-%d" %
(article.get_absolute_url(), comment.pk))

@ -1 +0,0 @@
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'

@ -1,64 +0,0 @@
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
from accounts.admin import *
from blog.admin import *
from blog.models import *
from comments.admin import *
from comments.models import *
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import *
from oauth.models import *
from owntracks.admin import *
from owntracks.models import *
from servermanager.admin import *
from servermanager.models import *
class DjangoBlogAdminSite(AdminSite):
site_header = 'djangoblog administration'
site_title = 'djangoblog site admin'
def __init__(self, name='admin'):
super().__init__(name)
def has_permission(self, request):
return request.user.is_superuser
# def get_urls(self):
# urls = super().get_urls()
# from django.urls import path
# from blog.views import refresh_memcache
#
# my_urls = [
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# return urls + my_urls
admin_site = DjangoBlogAdminSite(name='admin')
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
admin_site.register(BlogUser, BlogUserAdmin)
admin_site.register(Comment, CommentAdmin)
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
admin_site.register(Site, SiteAdmin)
admin_site.register(LogEntry, LogEntryAdmin)

@ -1,11 +0,0 @@
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'djangoblog'
def ready(self):
super().ready()
# Import and load plugins here
from .plugin_manage.loader import load_plugins
load_plugins()

@ -1,122 +0,0 @@
import _thread
import logging
import django.dispatch
from django.conf import settings
from django.contrib.admin.models import LogEntry
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.core.mail import EmailMultiAlternatives
from django.db.models.signals import post_save
from django.dispatch import receiver
from comments.models import Comment
from comments.utils import send_comment_email
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
logger = logging.getLogger(__name__)
oauth_user_login_signal = django.dispatch.Signal(['id'])
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto)
msg.content_subtype = "html"
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto)
try:
result = msg.send()
log.send_result = result > 0
except Exception as e:
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
log.save()
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
delete_sidebar_cache()
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
clearcache = False
if isinstance(instance, LogEntry):
return
if 'get_full_url' in dir(instance):
is_update_views = update_fields == {'views'}
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
logger.error("notify sipder", ex)
if not is_update_views:
clearcache = True
if isinstance(instance, Comment):
if instance.is_enable:
path = instance.article.get_absolute_url()
site = get_current_site().domain
if site.find(':') > 0:
site = site[0:site.find(':')]
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
if cache.get('seo_processor'):
cache.delete('seo_processor')
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache()
delete_view_cache('article_comments', [str(instance.article.pk)])
_thread.start_new_thread(send_comment_email, (instance,))
if clearcache:
cache.clear()
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
if user and user.username:
logger.info(user)
delete_sidebar_cache()
# cache.clear()

@ -1,183 +0,0 @@
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
def __init__(self, connection_alias, **connection_options):
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager()
self.include_spelling = True
def _get_models(self, iterable):
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
def _delete(self, models):
for m in models:
m.delete()
return True
def _rebuild(self, models):
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs)
def update(self, index, iterable, commit=True):
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取推荐词, 如果没有找到添加原搜索词"""
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"])
else:
keywords.append(suggest["text"])
return ' '.join(keywords)
@log_query
def search(self, query_string, **kwargs):
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
raw_results = []
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
result = result_class(
app_label,
model_name,
raw_result['_id'],
raw_result['_score'],
**additional_fields)
raw_results.append(result)
facets = {}
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
class ElasticSearchQuery(BaseSearchQuery):
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""
Provides a mechanism for sanitizing user input before presenting the
value to the backend.
Whoosh 1.X differs here in that you can no longer use a backslash
to escape reserved characters. Instead, the whole word should be
quoted.
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
return value.query_string
def get_count(self):
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
return self._spelling_suggestion
def build_params(self, spelling_query=None):
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
def search(self):
# 是否建议搜索
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
backend = ElasticSearchBackend
query = ElasticSearchQuery

@ -1,40 +0,0 @@
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed
from django.utils import timezone
from django.utils.feedgenerator import Rss201rev2Feed
from blog.models import Article
from djangoblog.utils import CommonMarkdown
class DjangoBlogFeed(Feed):
feed_type = Rss201rev2Feed
description = '大巧无工,重剑无锋.'
title = "且听风吟 大巧无工,重剑无锋. "
link = "/feed/"
def author_name(self):
return get_user_model().objects.first().nickname
def author_link(self):
return get_user_model().objects.first().get_absolute_url()
def items(self):
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
def item_title(self, item):
return item.title
def item_description(self, item):
return CommonMarkdown.get_markdown(item.body)
def feed_copyright(self):
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
return item.get_absolute_url()
def item_guid(self, item):
return

@ -1,91 +0,0 @@
from django.contrib import admin
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse, NoReverseMatch
from django.utils.encoding import force_str
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
class LogEntryAdmin(admin.ModelAdmin):
list_filter = [
'content_type'
]
search_fields = [
'object_repr',
'change_message'
]
list_display_links = [
'action_time',
'get_change_message',
]
list_display = [
'action_time',
'user_link',
'content_type',
'object_link',
'get_change_message',
]
def has_add_permission(self, request):
return False
def has_change_permission(self, request, obj=None):
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
def has_delete_permission(self, request, obj=None):
return False
def object_link(self, obj):
object_link = escape(obj.object_repr)
content_type = obj.content_type
if obj.action_flag != DELETION and content_type is not None:
# try returning an actual link instead of object repr string
try:
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
object_link = '<a href="{}">{}</a>'.format(url, object_link)
except NoReverseMatch:
pass
return mark_safe(object_link)
object_link.admin_order_field = 'object_repr'
object_link.short_description = _('object')
def user_link(self, obj):
content_type = ContentType.objects.get_for_model(type(obj.user))
user_link = escape(force_str(obj.user))
try:
# try returning an actual link instead of object repr string
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
user_link = '<a href="{}">{}</a>'.format(url, user_link)
except NoReverseMatch:
pass
return mark_safe(user_link)
user_link.admin_order_field = 'user'
user_link.short_description = _('user')
def get_queryset(self, request):
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
def get_actions(self, request):
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions

@ -1,194 +0,0 @@
import logging
from pathlib import Path
from django.template import TemplateDoesNotExist
from django.template.loader import render_to_string
logger = logging.getLogger(__name__)
class BasePlugin:
# 插件元数据
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
PLUGIN_AUTHOR = None
# 插件配置
SUPPORTED_POSITIONS = [] # 支持的显示位置
DEFAULT_PRIORITY = 100 # 默认优先级(数字越小优先级越高)
POSITION_PRIORITIES = {} # 各位置的优先级 {'sidebar': 50, 'article_bottom': 80}
def __init__(self):
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
# 设置插件路径
self.plugin_dir = self._get_plugin_directory()
self.plugin_slug = self._get_plugin_slug()
self.init_plugin()
self.register_hooks()
def _get_plugin_directory(self):
"""获取插件目录路径"""
import inspect
plugin_file = inspect.getfile(self.__class__)
return Path(plugin_file).parent
def _get_plugin_slug(self):
"""获取插件标识符(目录名)"""
return self.plugin_dir.name
def init_plugin(self):
"""
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
logger.info(f'{self.PLUGIN_NAME} initialized.')
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
pass
# === 位置渲染系统 ===
def render_position_widget(self, position, context, **kwargs):
"""
根据位置渲染插件组件
Args:
position: 位置标识
context: 模板上下文
**kwargs: 额外参数
Returns:
dict: {'html': 'HTML内容', 'priority': 优先级} None
"""
if position not in self.SUPPORTED_POSITIONS:
return None
# 检查条件显示
if not self.should_display(position, context, **kwargs):
return None
# 调用具体的位置渲染方法
method_name = f'render_{position}_widget'
if hasattr(self, method_name):
html = getattr(self, method_name)(context, **kwargs)
if html:
priority = self.POSITION_PRIORITIES.get(position, self.DEFAULT_PRIORITY)
return {
'html': html,
'priority': priority,
'plugin_name': self.PLUGIN_NAME
}
return None
def should_display(self, position, context, **kwargs):
"""
判断插件是否应该在指定位置显示
子类可重写此方法实现条件显示逻辑
Args:
position: 位置标识
context: 模板上下文
**kwargs: 额外参数
Returns:
bool: 是否显示
"""
return True
# === 各位置渲染方法 - 子类重写 ===
def render_sidebar_widget(self, context, **kwargs):
"""渲染侧边栏组件"""
return None
def render_article_bottom_widget(self, context, **kwargs):
"""渲染文章底部组件"""
return None
def render_article_top_widget(self, context, **kwargs):
"""渲染文章顶部组件"""
return None
def render_header_widget(self, context, **kwargs):
"""渲染页头组件"""
return None
def render_footer_widget(self, context, **kwargs):
"""渲染页脚组件"""
return None
def render_comment_before_widget(self, context, **kwargs):
"""渲染评论前组件"""
return None
def render_comment_after_widget(self, context, **kwargs):
"""渲染评论后组件"""
return None
# === 模板系统 ===
def render_template(self, template_name, context=None):
"""
渲染插件模板
Args:
template_name: 模板文件名
context: 模板上下文
Returns:
HTML字符串
"""
if context is None:
context = {}
template_path = f"plugins/{self.plugin_slug}/{template_name}"
try:
return render_to_string(template_path, context)
except TemplateDoesNotExist:
logger.warning(f"Plugin template not found: {template_path}")
return ""
# === 静态资源系统 ===
def get_static_url(self, static_file):
"""获取插件静态文件URL"""
from django.templatetags.static import static
return static(f"{self.plugin_slug}/static/{self.plugin_slug}/{static_file}")
def get_css_files(self):
"""获取插件CSS文件列表"""
return []
def get_js_files(self):
"""获取插件JavaScript文件列表"""
return []
def get_head_html(self, context=None):
"""获取需要插入到<head>中的HTML内容"""
return ""
def get_body_html(self, context=None):
"""获取需要插入到<body>底部的HTML内容"""
return ""
def get_plugin_info(self):
"""
获取插件信息
:return: 包含插件元数据的字典
"""
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION,
'author': self.PLUGIN_AUTHOR,
'slug': self.plugin_slug,
'directory': str(self.plugin_dir),
'supported_positions': self.SUPPORTED_POSITIONS,
'priorities': self.POSITION_PRIORITIES
}

@ -1,22 +0,0 @@
ARTICLE_DETAIL_LOAD = 'article_detail_load'
ARTICLE_CREATE = 'article_create'
ARTICLE_UPDATE = 'article_update'
ARTICLE_DELETE = 'article_delete'
ARTICLE_CONTENT_HOOK_NAME = "the_content"
# 位置钩子常量
POSITION_HOOKS = {
'article_top': 'article_top_widgets',
'article_bottom': 'article_bottom_widgets',
'sidebar': 'sidebar_widgets',
'header': 'header_widgets',
'footer': 'footer_widgets',
'comment_before': 'comment_before_widgets',
'comment_after': 'comment_after_widgets',
}
# 资源注入钩子
HEAD_RESOURCES_HOOK = 'head_resources'
BODY_RESOURCES_HOOK = 'body_resources'

@ -1,44 +0,0 @@
import logging
logger = logging.getLogger(__name__)
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
_hooks[hook_name].append(callback)
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
callback(*args, **kwargs)
except Exception as e:
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
value = callback(value, *args, **kwargs)
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value

@ -1,64 +0,0 @@
import os
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
# 全局插件注册表
_loaded_plugins = []
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
"""
global _loaded_plugins
_loaded_plugins = []
for plugin_name in settings.ACTIVE_PLUGINS:
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
# 导入插件模块
plugin_module = __import__(f'plugins.{plugin_name}.plugin', fromlist=['plugin'])
# 获取插件实例
if hasattr(plugin_module, 'plugin'):
plugin_instance = plugin_module.plugin
_loaded_plugins.append(plugin_instance)
logger.info(f"Successfully loaded plugin: {plugin_name} - {plugin_instance.PLUGIN_NAME}")
else:
logger.warning(f"Plugin {plugin_name} does not have 'plugin' instance")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
except AttributeError as e:
logger.error(f"Failed to get plugin instance: {plugin_name}", exc_info=e)
except Exception as e:
logger.error(f"Unexpected error loading plugin: {plugin_name}", exc_info=e)
def get_loaded_plugins():
"""获取所有已加载的插件"""
return _loaded_plugins
def get_plugin_by_name(plugin_name):
"""根据名称获取插件"""
for plugin in _loaded_plugins:
if plugin.plugin_slug == plugin_name:
return plugin
return None
def get_plugin_by_slug(plugin_slug):
"""根据slug获取插件"""
for plugin in _loaded_plugins:
if plugin.plugin_slug == plugin_slug:
return plugin
return None
def get_plugins_info():
"""获取所有插件的信息"""
return [plugin.get_plugin_info() for plugin in _loaded_plugins]
def get_plugins_by_position(position):
"""获取支持指定位置的插件"""
return [plugin for plugin in _loaded_plugins if position in plugin.SUPPORTED_POSITIONS]

@ -1,386 +0,0 @@
"""
Django settings for djangoblog project.
Generated by 'django-admin startproject' using Django 1.10.2.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.10/ref/settings/
"""
import os
import sys
from pathlib import Path
from django.utils.translation import gettext_lazy as _
def env_to_bool(env, default):
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.environ.get(
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@(mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env_to_bool('DJANGO_DEBUG', True)
# DEBUG = False
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
# ALLOWED_HOSTS = []
ALLOWED_HOSTS = ['*', '127.0.0.1', 'example.com']
# django 4.0新增配置
CSRF_TRUSTED_ORIGINS = ['http://example.com']
# Application definition
INSTALLED_APPS = [
# 'django.contrib.admin',
'django.contrib.admin.apps.SimpleAdminConfig',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.sitemaps',
'mdeditor',
'haystack',
'blog',
'accounts',
'comments',
'oauth',
'servermanager',
'owntracks',
'compressor',
'djangoblog'
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.gzip.GZipMiddleware',
# 'django.middleware.cache.UpdateCacheMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.cache.FetchFromCacheMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'blog.middleware.OnlineMiddleware'
]
ROOT_URLCONF = 'djangoblog.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'blog.context_processors.seo_processor'
],
},
},
]
WSGI_APPLICATION = 'djangoblog.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog',
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'root',
'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'root',
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1',
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306),
'OPTIONS': {
'charset': 'utf8mb4'},
}}
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
LANGUAGES = (
('en', _('English')),
('zh-hans', _('Simplified Chinese')),
('zh-hant', _('Traditional Chinese')),
)
LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'),
)
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
}
# Automatically update searching index
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
# Allow user login with username and password
AUTHENTICATION_BACKENDS = [
'accounts.user_login_backend.EmailOrUsernameModelBackend']
STATIC_ROOT = os.path.join(BASE_DIR, 'collectedstatic')
STATIC_URL = '/static/'
STATICFILES = os.path.join(BASE_DIR, 'static')
# 添加插件静态文件目录
STATICFILES_DIRS = [
os.path.join(BASE_DIR, 'plugins'), # 让Django能找到插件的静态文件
]
AUTH_USER_MODEL = 'accounts.BlogUser'
LOGIN_URL = '/login/'
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_TIME_FORMAT = '%Y-%m-%d'
# bootstrap color styles
BOOTSTRAP_COLOR_TYPES = [
'default', 'primary', 'success', 'info', 'warning', 'danger'
]
# paginate
PAGINATE_BY = 10
# http cache timeout
CACHE_CONTROL_MAX_AGE = 2592000
# cache setting
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'TIMEOUT': 10800,
'LOCATION': 'unique-snowflake',
}
}
# 使用redis作为缓存
if os.environ.get("DJANGO_REDIS_URL"):
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}',
}
}
SITE_ID = 1
BAIDU_NOTIFY_URL = os.environ.get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
# Email:
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = env_to_bool('DJANGO_EMAIL_TLS', False)
EMAIL_USE_SSL = env_to_bool('DJANGO_EMAIL_SSL', True)
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com'
EMAIL_PORT = int(os.environ.get('DJANGO_EMAIL_PORT') or 465)
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
# Setting debug=false did NOT handle except email notifications
ADMINS = [('admin', os.environ.get('DJANGO_ADMIN_EMAIL') or 'admin@admin.com')]
# WX ADMIN password(Two times md5)
WXADMIN = os.environ.get(
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
LOG_PATH = os.path.join(BASE_DIR, 'logs')
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH, exist_ok=True)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'INFO',
'handlers': ['console', 'log_file'],
},
'formatters': {
'verbose': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s',
}
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'log_file': {
'level': 'INFO',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': os.path.join(LOG_PATH, 'djangoblog.log'),
'when': 'D',
'formatter': 'verbose',
'interval': 1,
'delay': True,
'backupCount': 5,
'encoding': 'utf-8'
},
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': {
'class': 'logging.NullHandler',
},
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True,
}
}
}
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
# other
'compressor.finders.CompressorFinder',
)
COMPRESS_ENABLED = True
# 根据环境变量决定是否启用离线压缩
COMPRESS_OFFLINE = os.environ.get('COMPRESS_OFFLINE', 'False').lower() == 'true'
# 压缩输出目录
COMPRESS_OUTPUT_DIR = 'compressed'
# 压缩文件名模板 - 包含哈希值用于缓存破坏
COMPRESS_CSS_HASHING_METHOD = 'mtime'
COMPRESS_JS_HASHING_METHOD = 'mtime'
# 高级CSS压缩过滤器
COMPRESS_CSS_FILTERS = [
# 创建绝对URL
'compressor.filters.css_default.CssAbsoluteFilter',
# CSS压缩器 - 高压缩等级
'compressor.filters.cssmin.CSSCompressorFilter',
]
# 高级JS压缩过滤器
COMPRESS_JS_FILTERS = [
# JS压缩器 - 高压缩等级
'compressor.filters.jsmin.SlimItFilter',
]
# 压缩缓存配置
COMPRESS_CACHE_BACKEND = 'default'
COMPRESS_CACHE_KEY_FUNCTION = 'compressor.cache.simple_cachekey'
# 预压缩配置
COMPRESS_PRECOMPILERS = (
# 支持SCSS/SASS
('text/x-scss', 'django_libsass.SassCompiler'),
('text/x-sass', 'django_libsass.SassCompiler'),
)
# 压缩性能优化
COMPRESS_MINT_DELAY = 30 # 压缩延迟(秒)
COMPRESS_MTIME_DELAY = 10 # 修改时间检查延迟
COMPRESS_REBUILD_TIMEOUT = 2592000 # 重建超时30天
# 压缩等级配置
COMPRESS_CSS_COMPRESSOR = 'compressor.css.CssCompressor'
COMPRESS_JS_COMPRESSOR = 'compressor.js.JsCompressor'
# 静态文件缓存配置
STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage'
# 浏览器缓存配置(通过中间件或服务器配置)
COMPRESS_URL = STATIC_URL
COMPRESS_ROOT = STATIC_ROOT
MEDIA_ROOT = os.path.join(BASE_DIR, 'uploads')
MEDIA_URL = '/media/'
X_FRAME_OPTIONS = 'SAMEORIGIN'
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
if os.environ.get('DJANGO_ELASTICSEARCH_HOST'):
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_HOST')
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}
# Plugin System
PLUGINS_DIR = BASE_DIR / 'plugins'
ACTIVE_PLUGINS = [
'article_copyright',
'reading_time',
'external_links',
'view_count',
'seo_optimizer',
'image_lazy_loading',
'article_recommendation',
]

@ -1,59 +0,0 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
priority = 0.5
changefreq = 'daily'
def items(self):
return ['blog:index', ]
def location(self, item):
return reverse(item)
class ArticleSiteMap(Sitemap):
changefreq = "monthly"
priority = "0.6"
def items(self):
return Article.objects.filter(status='p')
def lastmod(self, obj):
return obj.last_modify_time
class CategorySiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.6"
def items(self):
return Category.objects.all()
def lastmod(self, obj):
return obj.last_modify_time
class TagSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
def items(self):
return Tag.objects.all()
def lastmod(self, obj):
return obj.last_modify_time
class UserSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
def items(self):
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
return obj.date_joined

@ -1,21 +0,0 @@
import logging
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
class SpiderNotify():
@staticmethod
def baidu_notify(urls):
try:
data = '\n'.join(urls)
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
logger.info(result.text)
except Exception as e:
logger.error(e)
@staticmethod
def notify(url):
SpiderNotify.baidu_notify(url)

@ -1,32 +0,0 @@
from django.test import TestCase
from djangoblog.utils import *
class DjangoBlogTest(TestCase):
def setUp(self):
pass
def test_utils(self):
md5 = get_sha256('test')
self.assertIsNotNone(md5)
c = CommonMarkdown.get_markdown('''
# Title1
```python
import os
```
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
''')
self.assertIsNotNone(c)
d = {
'd': 'key1',
'd2': 'key2'
}
data = parse_dict_to_url(d)
self.assertIsNotNone(data)

@ -1,78 +0,0 @@
"""djangoblog URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/1.10/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
from django.conf import settings
from django.conf.urls.i18n import i18n_patterns
from django.conf.urls.static import static
from django.contrib.sitemaps.views import sitemap
from django.urls import path, include
from django.urls import re_path
from haystack.views import search_view_factory
from django.http import JsonResponse
import time
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
sitemaps = {
'blog': ArticleSiteMap,
'Category': CategorySiteMap,
'Tag': TagSiteMap,
'User': UserSiteMap,
'static': StaticViewSitemap
}
handler404 = 'blog.views.page_not_found_view'
handler500 = 'blog.views.server_error_view'
handle403 = 'blog.views.permission_denied_view'
def health_check(request):
"""
健康检查接口
简单返回服务健康状态
"""
return JsonResponse({
'status': 'healthy',
'timestamp': time.time()
})
urlpatterns = [
path('i18n/', include('django.conf.urls.i18n')),
path('health/', health_check, name='health_check'),
]
urlpatterns += i18n_patterns(
re_path(r'^admin/', admin_site.urls),
re_path(r'', include('blog.urls', namespace='blog')),
re_path(r'mdeditor/', include('mdeditor.urls')),
re_path(r'', include('comments.urls', namespace='comment')),
re_path(r'', include('accounts.urls', namespace='account')),
re_path(r'', include('oauth.urls', namespace='oauth')),
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
re_path(r'', include('servermanager.urls', namespace='servermanager')),
re_path(r'', include('owntracks.urls', namespace='owntracks'))
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)

@ -1,272 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
def wrapper(func):
def news(*args, **kwargs):
try:
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
site = Site.objects.get_current()
return site
class CommonMarkdown:
@staticmethod
def _convert_markdown(value):
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value)
toc = md.toc
return body, toc
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
def delete_sidebar_cache():
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p', 'span', 'div']
# 安全的class值白名单 - 只允许代码高亮相关的class
ALLOWED_CLASSES = [
'codehilite', 'highlight', 'hll', 'c', 'err', 'k', 'l', 'n', 'o', 'p', 'cm', 'cp', 'c1', 'cs',
'gd', 'ge', 'gr', 'gh', 'gi', 'go', 'gp', 'gs', 'gu', 'gt', 'kc', 'kd', 'kn', 'kp', 'kr', 'kt',
'ld', 'm', 'mf', 'mh', 'mi', 'mo', 'na', 'nb', 'nc', 'no', 'nd', 'ni', 'ne', 'nf', 'nl', 'nn',
'nt', 'nv', 'ow', 'w', 'mb', 'mh', 'mi', 'mo', 'sb', 'sc', 'sd', 'se', 'sh', 'si', 'sx', 's2',
's1', 'ss', 'bp', 'vc', 'vg', 'vi', 'il'
]
def class_filter(tag, name, value):
"""自定义class属性过滤器"""
if name == 'class':
# 只允许预定义的安全class值
allowed_classes = [cls for cls in value.split() if cls in ALLOWED_CLASSES]
return ' '.join(allowed_classes) if allowed_classes else False
return value
# 安全的属性白名单
ALLOWED_ATTRIBUTES = {
'a': ['href', 'title'],
'abbr': ['title'],
'acronym': ['title'],
'span': class_filter,
'div': class_filter,
'pre': class_filter,
'code': class_filter
}
# 安全的协议白名单 - 防止javascript:等危险协议
ALLOWED_PROTOCOLS = ['http', 'https', 'mailto']
def sanitize_html(html):
"""
安全的HTML清理函数
使用bleach库进行白名单过滤防止XSS攻击
"""
return bleach.clean(
html,
tags=ALLOWED_TAGS,
attributes=ALLOWED_ATTRIBUTES,
protocols=ALLOWED_PROTOCOLS, # 限制允许的协议
strip=True, # 移除不允许的标签而不是转义
strip_comments=True # 移除HTML注释
)

File diff suppressed because it is too large Load Diff

@ -1,16 +0,0 @@
"""
WSGI config for djangoblog project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
application = get_wsgi_application()

@ -1,25 +0,0 @@
from django.urls import path
from . import views
app_name = "oauth"
urlpatterns = [
path(
r'oauth/authorize',
views.authorize),
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')]

@ -9,15 +9,16 @@ from .models import BlogUser
class BlogUserCreationForm(forms.ModelForm):
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput)
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput)
#sjt 后台创建用户表单:处理密码设置和验证
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput) # sjt 密码字段
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput) # sjt 确认密码字段
class Meta:
model = BlogUser
fields = ('email',)
model = BlogUser # sjt 关联BlogUser模型
fields = ('email',) # sjt 表单包含的字段
def clean_password2(self):
# Check that the two password entries match
#sjt 验证两次输入密码一致性
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
if password1 and password2 and password1 != password2:
@ -25,29 +26,31 @@ class BlogUserCreationForm(forms.ModelForm):
return password2
def save(self, commit=True):
# Save the provided password in hashed format
#sjt 保存用户:加密密码并标记来源为后台
user = super().save(commit=False)
user.set_password(self.cleaned_data["password1"])
user.set_password(self.cleaned_data["password1"]) # sjt 加密密码
if commit:
user.source = 'adminsite'
user.source = 'adminsite' # sjt 标记来源为后台添加
user.save()
return user
class BlogUserChangeForm(UserChangeForm):
#sjt 后台修改用户表单:配置修改用户的字段
class Meta:
model = BlogUser
fields = '__all__'
field_classes = {'username': UsernameField}
model = BlogUser # sjt 关联BlogUser模型
fields = '__all__' # sjt 包含所有字段
field_classes = {'username': UsernameField} # sjt 用户名字段类
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class BlogUserAdmin(UserAdmin):
form = BlogUserChangeForm
add_form = BlogUserCreationForm
list_display = (
#sjt Admin后台用户管理配置定义显示和操作方式
form = BlogUserChangeForm # sjt 修改用户表单
add_form = BlogUserCreationForm # sjt 创建用户表单
list_display = ( # sjt 列表页显示的字段
'id',
'nickname',
'username',
@ -55,6 +58,6 @@ class BlogUserAdmin(UserAdmin):
'last_login',
'date_joined',
'source')
list_display_links = ('id', 'username')
ordering = ('-id',)
search_fields = ('username', 'nickname', 'email')
list_display_links = ('id', 'username') # sjt 列表页可点击的链接字段
ordering = ('-id',) # sjt 按ID倒序排列
search_fields = ('username', 'nickname', 'email') # sjt 搜索字段

@ -0,0 +1,6 @@
from django.apps import AppConfig
class AccountsConfig(AppConfig):
#sjt accounts应用的配置类定义应用名称
name = 'accounts'

@ -9,39 +9,48 @@ from .models import BlogUser
class LoginForm(AuthenticationForm):
#sjt 登录表单:定义登录表单字段样式
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
# sjt 设置用户名输入框样式
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# sjt 设置密码输入框样式
self.fields['password'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
class RegisterForm(UserCreationForm):
#sjt 注册表单:验证用户名、邮箱、密码合法性
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
# sjt 设置用户名输入框样式
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# sjt 设置邮箱输入框样式
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
# sjt 设置密码输入框样式
self.fields['password1'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
# sjt 设置确认密码输入框样式
self.fields['password2'].widget = widgets.PasswordInput(
attrs={'placeholder': "repeat password", "class": "form-control"})
def clean_email(self):
#sjt 验证邮箱唯一性:已存在则抛出异常
email = self.cleaned_data['email']
if get_user_model().objects.filter(email=email).exists():
raise ValidationError(_("email already exists"))
return email
class Meta:
model = get_user_model()
fields = ("username", "email")
model = get_user_model() # sjt 关联用户模型
fields = ("username", "email") # sjt 表单包含的字段
class ForgetPasswordForm(forms.Form):
#sjt 密码找回表单:验证邮箱、验证码、新密码合法性
new_password1 = forms.CharField(
label=_("New password"),
widget=forms.PasswordInput(
@ -50,7 +59,7 @@ class ForgetPasswordForm(forms.Form):
'placeholder': _("New password")
}
),
)
) # sjt 新密码字段
new_password2 = forms.CharField(
label="确认密码",
@ -60,7 +69,7 @@ class ForgetPasswordForm(forms.Form):
'placeholder': _("Confirm password")
}
),
)
) # sjt 确认密码字段
email = forms.EmailField(
label='邮箱',
@ -70,7 +79,7 @@ class ForgetPasswordForm(forms.Form):
'placeholder': _("Email")
}
),
)
) # sjt 邮箱字段
code = forms.CharField(
label=_('Code'),
@ -80,27 +89,28 @@ class ForgetPasswordForm(forms.Form):
'placeholder': _("Code")
}
),
)
) # sjt 验证码字段
def clean_new_password2(self):
#sjt 验证两次输入密码一致性,并检查密码强度
password1 = self.data.get("new_password1")
password2 = self.data.get("new_password2")
if password1 and password2 and password1 != password2:
raise ValidationError(_("passwords do not match"))
password_validation.validate_password(password2)
password_validation.validate_password(password2) # sjt 调用Django密码验证器
return password2
def clean_email(self):
#sjt 验证邮箱是否已注册
user_email = self.cleaned_data.get("email")
if not BlogUser.objects.filter(
email=user_email
).exists():
# todo 这里的报错提示可以判断一个邮箱是不是注册过,如果不想暴露可以修改
raise ValidationError(_("email does not exist"))
return user_email
def clean_code(self):
#sjt 验证验证码有效性
code = self.cleaned_data.get("code")
error = utils.verify(
email=self.cleaned_data.get("email"),
@ -112,6 +122,7 @@ class ForgetPasswordForm(forms.Form):
class ForgetPasswordCodeForm(forms.Form):
#sjt 发送验证码表单:验证邮箱格式
email = forms.EmailField(
label=_('Email'),
)
) # sjt 邮箱字段(用于发送验证码)

@ -9,27 +9,31 @@ from djangoblog.utils import get_current_site
# Create your models here.
class BlogUser(AbstractUser):
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
source = models.CharField(_('create source'), max_length=100, blank=True)
#sjt 扩展Django内置用户模型增加博客系统所需的用户属性
nickname = models.CharField(_('nick name'), max_length=100, blank=True) # sjt 用户昵称
creation_time = models.DateTimeField(_('creation time'), default=now) # sjt 账号创建时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now) # sjt 信息最后修改时间
source = models.CharField(_('create source'), max_length=100, blank=True) # sjt 账号创建来源(如注册、后台添加)
def get_absolute_url(self):
#sjt 获取用户详情页的相对URL
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
def __str__(self):
#sjt 对象字符串表示,返回用户邮箱
return self.email
def get_full_url(self):
#sjt 获取用户详情页的完整URL包含域名
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
ordering = ['-id']
verbose_name = _('user')
ordering = ['-id'] # sjt 按ID倒序排列
verbose_name = _('user') # sjt 模型显示名称
verbose_name_plural = verbose_name
get_latest_by = 'id'
get_latest_by = 'id' # sjt 最新记录按ID判断

@ -13,6 +13,7 @@ from . import utils
class AccountTest(TestCase):
def setUp(self):
#sjt 测试初始化:创建测试客户端和测试用户
self.client = Client()
self.factory = RequestFactory()
self.blog_user = BlogUser.objects.create_user(
@ -20,16 +21,19 @@ class AccountTest(TestCase):
email="admin@admin.com",
password="12345678"
)
self.new_test = "xxx123--="
self.new_test = "xxx123--=" # sjt 测试用新密码
def test_validate_account(self):
#sjt 测试用户账号验证:管理员登录、文章创建等功能
site = get_current_site().domain
# sjt 创建超级用户
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="qwer!@#$ggg")
testuser = BlogUser.objects.get(username='liangliangyy1')
# sjt 测试登录
loginresult = self.client.login(
username='liangliangyy1',
password='qwer!@#$ggg')
@ -37,12 +41,14 @@ class AccountTest(TestCase):
response = self.client.get('/admin/')
self.assertEqual(response.status_code, 200)
# sjt 测试创建分类
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
# sjt 测试创建文章
article = Article()
article.title = "nicetitleaaa"
article.body = "nicecontentaaa"
@ -52,24 +58,30 @@ class AccountTest(TestCase):
article.status = 'p'
article.save()
# sjt 测试访问文章管理页
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
def test_validate_register(self):
#sjt 测试注册流程:注册->邮箱验证->登录->功能验证
# sjt 验证初始状态无此用户
self.assertEquals(
0, len(
BlogUser.objects.filter(
email='user123@user.com')))
# sjt 提交注册表单
response = self.client.post(reverse('account:register'), {
'username': 'user1233',
'email': 'user123@user.com',
'password1': 'password123!q@wE#R$T',
'password2': 'password123!q@wE#R$T',
})
# sjt 验证用户创建成功
self.assertEquals(
1, len(
BlogUser.objects.filter(
email='user123@user.com')))
# sjt 验证邮箱并激活用户
user = BlogUser.objects.filter(email='user123@user.com')[0]
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
path = reverse('accounts:result')
@ -78,59 +90,32 @@ class AccountTest(TestCase):
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# sjt 测试登录及权限
self.client.login(username='user1233', password='password123!q@wE#R$T')
user = BlogUser.objects.filter(email='user123@user.com')[0]
user.is_superuser = True
user.is_staff = True
user.save()
delete_sidebar_cache()
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
article = Article()
article.category = category
article.title = "nicetitle333"
article.body = "nicecontentttt"
article.author = user
article.type = 'a'
article.status = 'p'
article.save()
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
response = self.client.get(reverse('account:logout'))
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.post(reverse('account:login'), {
'username': 'user1233',
'password': 'password123'
})
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
# ... 后续测试创建分类、文章等功能
def test_verify_email_code(self):
#sjt 测试验证码验证功能
to_email = "admin@admin.com"
code = generate_code()
utils.set_code(to_email, code)
utils.send_verify_email(to_email, code)
code = generate_code() # sjt 生成验证码
utils.set_code(to_email, code) # sjt 缓存验证码
utils.send_verify_email(to_email, code) # sjt 发送验证码邮件
# sjt 验证正确验证码
err = utils.verify("admin@admin.com", code)
self.assertEqual(err, None)
# sjt 验证错误邮箱的验证码
err = utils.verify("admin@123.com", code)
self.assertEqual(type(err), str)
def test_forget_password_email_code_success(self):
#sjt 测试成功发送密码找回验证码
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@admin.com")
@ -140,6 +125,7 @@ class AccountTest(TestCase):
self.assertEqual(resp.content.decode("utf-8"), "ok")
def test_forget_password_email_code_fail(self):
#sjt 测试发送密码找回验证码失败情况(无效邮箱)
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict()
@ -153,6 +139,7 @@ class AccountTest(TestCase):
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
def test_forget_password_email_success(self):
#sjt 测试成功找回密码流程
code = generate_code()
utils.set_code(self.blog_user.email, code)
data = dict(
@ -167,14 +154,15 @@ class AccountTest(TestCase):
)
self.assertEqual(resp.status_code, 302)
# 验证用户密码是否修改成功
# sjt 验证密码是否修改成功
blog_user = BlogUser.objects.filter(
email=self.blog_user.email,
).first() # type: BlogUser
).first()
self.assertNotEqual(blog_user, None)
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
def test_forget_password_email_not_user(self):
#sjt 测试使用未注册邮箱找回密码
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
@ -188,20 +176,19 @@ class AccountTest(TestCase):
self.assertEqual(resp.status_code, 200)
def test_forget_password_email_code_error(self):
#sjt 测试使用错误验证码找回密码
code = generate_code()
utils.set_code(self.blog_user.email, code)
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code="111111",
code="111111", # sjt 错误验证码
)
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)

@ -4,25 +4,25 @@ from django.urls import re_path
from . import views
from .forms import LoginForm
app_name = "accounts"
app_name = "accounts" # sjt 应用命名空间
urlpatterns = [re_path(r'^login/$',
urlpatterns = [re_path(r'^login/$', # sjt 登录页面路由
views.LoginView.as_view(success_url='/'),
name='login',
kwargs={'authentication_form': LoginForm}),
re_path(r'^register/$',
re_path(r'^register/$', # sjt 注册页面路由
views.RegisterView.as_view(success_url="/"),
name='register'),
re_path(r'^logout/$',
re_path(r'^logout/$', # sjt 注销功能路由
views.LogoutView.as_view(),
name='logout'),
path(r'account/result.html',
path(r'account/result.html', # sjt 注册/验证结果页路由
views.account_result,
name='result'),
re_path(r'^forget_password/$',
re_path(r'^forget_password/$', # sjt 密码找回页面路由
views.ForgetPasswordView.as_view(),
name='forget_password'),
re_path(r'^forget_password_code/$',
re_path(r'^forget_password_code/$', # sjt 发送密码找回验证码路由
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'),
]
]

@ -0,0 +1,26 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.backends import ModelBackend
class EmailOrUsernameModelBackend(ModelBackend):
#sjt 自定义认证后端:支持使用用户名或邮箱登录
def authenticate(self, request, username=None, password=None, **kwargs):
#sjt 认证逻辑:根据输入判断是邮箱还是用户名,验证密码
if '@' in username: # sjt 包含@符号则视为邮箱登录
kwargs = {'email': username}
else: # sjt 否则视为用户名登录
kwargs = {'username': username}
try:
user = get_user_model().objects.get(** kwargs) # sjt 查询用户
if user.check_password(password): # sjt 验证密码
return user
except get_user_model().DoesNotExist: # sjt 用户不存在
return None
def get_user(self, username):
#sjt 根据用户ID获取用户对象
try:
return get_user_model().objects.get(pk=username)
except get_user_model().DoesNotExist:
return None

@ -7,16 +7,16 @@ from django.utils.translation import gettext_lazy as _
from djangoblog.utils import send_email
_code_ttl = timedelta(minutes=5)
_code_ttl = timedelta(minutes=5) # sjt 验证码有效期5分钟
def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")):
"""发送重设密码验证码
'''sjt 发送密码重置验证码邮件
Args:
to_mail: 邮箱
to_mail: 邮箱
subject: 邮件主题
code: 验证码
"""
'''
html_content = _(
"You are resetting the password, the verification code is%(code)s, valid within 5 minutes, please keep it "
"properly") % {'code': code}
@ -24,26 +24,23 @@ def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email"))
def verify(email: str, code: str) -> typing.Optional[str]:
"""验证code是否有效
'''sjt 验证验证码有效性
Args:
email: 请求邮箱
code: 验证码
Return:
如果有错误就返回错误str
Node:
这里的错误处理不太合理应该采用raise抛出
否测调用方也需要对error进行处理
"""
cache_code = get_code(email)
if cache_code != code:
如果有错误返回错误信息字符串否则返回None
'''
cache_code = get_code(email) # sjt 从缓存获取验证码
if cache_code != code: # sjt 验证码不匹配
return gettext("Verification code error")
def set_code(email: str, code: str):
"""设置code"""
#sjt 将验证码存入缓存有效期5分钟
cache.set(email, code, _code_ttl.seconds)
def get_code(email: str) -> typing.Optional[str]:
"""获取code"""
return cache.get(email)
#sjt 从缓存获取验证码
return cache.get(email)

@ -32,28 +32,34 @@ logger = logging.getLogger(__name__)
# Create your views here.
class RegisterView(FormView):
form_class = RegisterForm
template_name = 'account/registration_form.html'
#sjt 用户注册视图:处理注册表单提交,发送邮箱验证链接
form_class = RegisterForm # sjt 注册表单类
template_name = 'account/registration_form.html' # sjt 注册页面模板
@method_decorator(csrf_protect)
def dispatch(self, *args, **kwargs):
#sjt 处理请求分发添加CSRF保护
return super(RegisterView, self).dispatch(*args, **kwargs)
def form_valid(self, form):
#sjt 表单验证通过后:创建未激活用户,生成验证链接并发送邮件
if form.is_valid():
user = form.save(False)
user.is_active = False
user.source = 'Register'
user.save(True)
site = get_current_site().domain
user = form.save(False) # sjt 不立即保存用户
user.is_active = False # sjt 初始为未激活状态(需邮箱验证)
user.source = 'Register' # sjt 标记来源为注册
user.save(True) # sjt 保存用户
site = get_current_site().domain # sjt 获取当前站点域名
# sjt 生成双重加密签名(防止链接篡改)
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
if settings.DEBUG:
if settings.DEBUG: # sjt 开发环境下使用本地域名
site = '127.0.0.1:8000'
path = reverse('account:result')
path = reverse('account:result') # sjt 获取结果页路由
# sjt 构建验证链接
url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format(
site=site, path=path, id=user.id, sign=sign)
# sjt 构建验证邮件内容
content = """
<p>请点击下面链接验证您的邮箱</p>
@ -64,6 +70,7 @@ class RegisterView(FormView):
如果上面链接无法打开请将此链接复制至浏览器
{url}
""".format(url=url)
# sjt 发送验证邮件
send_email(
emailto=[
user.email,
@ -71,6 +78,7 @@ class RegisterView(FormView):
title='验证您的电子邮箱',
content=content)
# sjt 重定向到注册结果页
url = reverse('accounts:result') + \
'?type=register&id=' + str(user.id)
return HttpResponseRedirect(url)
@ -81,60 +89,66 @@ class RegisterView(FormView):
class LogoutView(RedirectView):
url = '/login/'
#sjt 用户注销视图:处理用户注销并跳转登录页
url = '/login/' # sjt 注销后跳转的URL
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
#sjt 处理请求分发,禁止缓存
return super(LogoutView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
logout(request)
delete_sidebar_cache()
#sjt 处理GET请求执行注销操作并清除缓存
logout(request) # sjt 注销用户
delete_sidebar_cache() # sjt 删除侧边栏缓存
return super(LogoutView, self).get(request, *args, **kwargs)
class LoginView(FormView):
form_class = LoginForm
template_name = 'account/login.html'
success_url = '/'
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
#sjt 用户登录视图:支持用户名/邮箱登录,处理登录状态
form_class = LoginForm # sjt 登录表单类
template_name = 'account/login.html' # sjt 登录页面模板
success_url = '/' # sjt 登录成功默认跳转页
redirect_field_name = REDIRECT_FIELD_NAME # sjt 重定向字段名
login_ttl = 2626560 # sjt 记住登录状态的有效期(一个月,单位秒)
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
#sjt 处理请求分发保护敏感参数、CSRF保护、禁止缓存
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
#sjt 构建上下文数据获取重定向URL
redirect_to = self.request.GET.get(self.redirect_field_name)
if redirect_to is None:
redirect_to = '/'
kwargs['redirect_to'] = redirect_to
return super(LoginView, self).get_context_data(**kwargs)
return super(LoginView, self).get_context_data(** kwargs)
def form_valid(self, form):
#sjt 表单验证通过后:验证用户凭据,处理登录状态(含"记住我"功能)
form = AuthenticationForm(data=self.request.POST, request=self.request)
if form.is_valid():
delete_sidebar_cache()
if form.is_valid(): # sjt 验证用户凭据
delete_sidebar_cache() # sjt 删除侧边栏缓存
logger.info(self.redirect_field_name)
auth.login(self.request, form.get_user())
if self.request.POST.get("remember"):
self.request.session.set_expiry(self.login_ttl)
auth.login(self.request, form.get_user()) # sjt 登录用户
if self.request.POST.get("remember"): # sjt 如果勾选"记住我"
self.request.session.set_expiry(self.login_ttl) # sjt 设置会话有效期
return super(LoginView, self).form_valid(form)
# return HttpResponseRedirect('/')
else:
return self.render_to_response({
'form': form
})
def get_success_url(self):
#sjt 获取登录成功后的跳转URL验证安全性
redirect_to = self.request.POST.get(self.redirect_field_name)
# sjt 验证跳转URL是否安全
if not url_has_allowed_host_and_scheme(
url=redirect_to, allowed_hosts=[
self.request.get_host()]):
@ -143,31 +157,33 @@ class LoginView(FormView):
def account_result(request):
type = request.GET.get('type')
id = request.GET.get('id')
#sjt 注册/验证结果页:处理邮箱验证逻辑,激活用户账号
type = request.GET.get('type') # sjt 获取操作类型(注册/验证)
id = request.GET.get('id') # sjt 获取用户ID
user = get_object_or_404(get_user_model(), id=id)
user = get_object_or_404(get_user_model(), id=id) # sjt 获取用户对象
logger.info(type)
if user.is_active:
if user.is_active: # sjt 如果用户已激活,直接跳转首页
return HttpResponseRedirect('/')
if type and type in ['register', 'validation']:
if type == 'register':
if type == 'register': # sjt 注册成功结果页
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
title = '注册成功'
else:
else: # sjt 邮箱验证结果页
# sjt 验证签名是否正确
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
sign = request.GET.get('sign')
if sign != c_sign:
if sign != c_sign: # sjt 签名错误返回403
return HttpResponseForbidden()
user.is_active = True
user.is_active = True # sjt 激活用户
user.save()
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
title = '验证成功'
return render(request, 'account/result.html', {
return render(request, 'account/result.html', { # sjt 渲染结果页
'title': title,
'content': content
})
@ -176,29 +192,34 @@ def account_result(request):
class ForgetPasswordView(FormView):
form_class = ForgetPasswordForm
template_name = 'account/forget_password.html'
#sjt 密码找回视图:验证验证码后重置密码
form_class = ForgetPasswordForm # sjt 密码找回表单
template_name = 'account/forget_password.html' # sjt 密码找回页面模板
def form_valid(self, form):
#sjt 表单验证通过后:更新用户密码并跳转登录页
if form.is_valid():
blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get()
# sjt 加密并更新密码
blog_user.password = make_password(form.cleaned_data["new_password2"])
blog_user.save()
return HttpResponseRedirect('/login/')
return HttpResponseRedirect('/login/') # sjt 跳转登录页
else:
return self.render_to_response({'form': form})
class ForgetPasswordEmailCode(View):
#sjt 发送密码找回验证码视图:处理发送验证码请求
def post(self, request: HttpRequest):
form = ForgetPasswordCodeForm(request.POST)
if not form.is_valid():
#sjt 处理POST请求验证邮箱并发送验证码
form = ForgetPasswordCodeForm(request.POST) # sjt 验证邮箱表单
if not form.is_valid(): # sjt 邮箱验证失败
return HttpResponse("错误的邮箱")
to_email = form.cleaned_data["email"]
to_email = form.cleaned_data["email"] # sjt 获取目标邮箱
code = generate_code()
utils.send_verify_email(to_email, code)
utils.set_code(to_email, code)
code = generate_code() # sjt 生成验证码
utils.send_verify_email(to_email, code) # sjt 发送验证码邮件
utils.set_code(to_email, code) # sjt 缓存验证码有效期5分钟
return HttpResponse("ok")
return HttpResponse("ok") # sjt 发送成功返回"ok"

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save