parent
0f484f1e8b
commit
7a23a1891d
@ -0,0 +1,10 @@
|
||||
[run]
|
||||
source = .
|
||||
include = *.py
|
||||
omit =
|
||||
*migrations*
|
||||
*tests*
|
||||
*.html
|
||||
*whoosh_cn_backend*
|
||||
*settings.py*
|
||||
*venv*
|
||||
@ -0,0 +1,59 @@
|
||||
from django import forms
|
||||
from django.contrib.auth.admin import UserAdmin
|
||||
from django.contrib.auth.forms import UserChangeForm
|
||||
from django.contrib.auth.forms import UsernameField
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
# Register your models here.
|
||||
from .models import BlogUser
|
||||
|
||||
|
||||
class BlogUserCreationForm(forms.ModelForm):
|
||||
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput)
|
||||
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput)
|
||||
|
||||
class Meta:
|
||||
model = BlogUser
|
||||
fields = ('email',)
|
||||
|
||||
def clean_password2(self):
|
||||
# Check that the two password entries match
|
||||
password1 = self.cleaned_data.get("password1")
|
||||
password2 = self.cleaned_data.get("password2")
|
||||
if password1 and password2 and password1 != password2:
|
||||
raise forms.ValidationError(_("passwords do not match"))
|
||||
return password2
|
||||
|
||||
def save(self, commit=True):
|
||||
# Save the provided password in hashed format
|
||||
user = super().save(commit=False)
|
||||
user.set_password(self.cleaned_data["password1"])
|
||||
if commit:
|
||||
user.source = 'adminsite'
|
||||
user.save()
|
||||
return user
|
||||
|
||||
|
||||
class BlogUserChangeForm(UserChangeForm):
|
||||
class Meta:
|
||||
model = BlogUser
|
||||
fields = '__all__'
|
||||
field_classes = {'username': UsernameField}
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
|
||||
class BlogUserAdmin(UserAdmin):
|
||||
form = BlogUserChangeForm
|
||||
add_form = BlogUserCreationForm
|
||||
list_display = (
|
||||
'id',
|
||||
'nickname',
|
||||
'username',
|
||||
'email',
|
||||
'last_login',
|
||||
'date_joined',
|
||||
'source')
|
||||
list_display_links = ('id', 'username')
|
||||
ordering = ('-id',)
|
||||
@ -0,0 +1,5 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class AccountsConfig(AppConfig):
|
||||
name = 'accounts'
|
||||
@ -0,0 +1,117 @@
|
||||
from django import forms
|
||||
from django.contrib.auth import get_user_model, password_validation
|
||||
from django.contrib.auth.forms import AuthenticationForm, UserCreationForm
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.forms import widgets
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from . import utils
|
||||
from .models import BlogUser
|
||||
|
||||
|
||||
class LoginForm(AuthenticationForm):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(LoginForm, self).__init__(*args, **kwargs)
|
||||
self.fields['username'].widget = widgets.TextInput(
|
||||
attrs={'placeholder': "username", "class": "form-control"})
|
||||
self.fields['password'].widget = widgets.PasswordInput(
|
||||
attrs={'placeholder': "password", "class": "form-control"})
|
||||
|
||||
|
||||
class RegisterForm(UserCreationForm):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(RegisterForm, self).__init__(*args, **kwargs)
|
||||
|
||||
self.fields['username'].widget = widgets.TextInput(
|
||||
attrs={'placeholder': "username", "class": "form-control"})
|
||||
self.fields['email'].widget = widgets.EmailInput(
|
||||
attrs={'placeholder': "email", "class": "form-control"})
|
||||
self.fields['password1'].widget = widgets.PasswordInput(
|
||||
attrs={'placeholder': "password", "class": "form-control"})
|
||||
self.fields['password2'].widget = widgets.PasswordInput(
|
||||
attrs={'placeholder': "repeat password", "class": "form-control"})
|
||||
|
||||
def clean_email(self):
|
||||
email = self.cleaned_data['email']
|
||||
if get_user_model().objects.filter(email=email).exists():
|
||||
raise ValidationError(_("email already exists"))
|
||||
return email
|
||||
|
||||
class Meta:
|
||||
model = get_user_model()
|
||||
fields = ("username", "email")
|
||||
|
||||
|
||||
class ForgetPasswordForm(forms.Form):
|
||||
new_password1 = forms.CharField(
|
||||
label=_("New password"),
|
||||
widget=forms.PasswordInput(
|
||||
attrs={
|
||||
"class": "form-control",
|
||||
'placeholder': _("New password")
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
new_password2 = forms.CharField(
|
||||
label="确认密码",
|
||||
widget=forms.PasswordInput(
|
||||
attrs={
|
||||
"class": "form-control",
|
||||
'placeholder': _("Confirm password")
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
email = forms.EmailField(
|
||||
label='邮箱',
|
||||
widget=forms.TextInput(
|
||||
attrs={
|
||||
'class': 'form-control',
|
||||
'placeholder': _("Email")
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
code = forms.CharField(
|
||||
label=_('Code'),
|
||||
widget=forms.TextInput(
|
||||
attrs={
|
||||
'class': 'form-control',
|
||||
'placeholder': _("Code")
|
||||
}
|
||||
),
|
||||
)
|
||||
|
||||
def clean_new_password2(self):
|
||||
password1 = self.data.get("new_password1")
|
||||
password2 = self.data.get("new_password2")
|
||||
if password1 and password2 and password1 != password2:
|
||||
raise ValidationError(_("passwords do not match"))
|
||||
password_validation.validate_password(password2)
|
||||
|
||||
return password2
|
||||
|
||||
def clean_email(self):
|
||||
user_email = self.cleaned_data.get("email")
|
||||
if not BlogUser.objects.filter(
|
||||
email=user_email
|
||||
).exists():
|
||||
# todo 这里的报错提示可以判断一个邮箱是不是注册过,如果不想暴露可以修改
|
||||
raise ValidationError(_("email does not exist"))
|
||||
return user_email
|
||||
|
||||
def clean_code(self):
|
||||
code = self.cleaned_data.get("code")
|
||||
error = utils.verify(
|
||||
email=self.cleaned_data.get("email"),
|
||||
code=code,
|
||||
)
|
||||
if error:
|
||||
raise ValidationError(error)
|
||||
return code
|
||||
|
||||
|
||||
class ForgetPasswordCodeForm(forms.Form):
|
||||
email = forms.EmailField(
|
||||
label=_('Email'),
|
||||
)
|
||||
@ -0,0 +1,49 @@
|
||||
# Generated by Django 4.1.7 on 2023-03-02 07:14
|
||||
|
||||
import django.contrib.auth.models
|
||||
import django.contrib.auth.validators
|
||||
from django.db import migrations, models
|
||||
import django.utils.timezone
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
initial = True
|
||||
|
||||
dependencies = [
|
||||
('auth', '0012_alter_user_first_name_max_length'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.CreateModel(
|
||||
name='BlogUser',
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('password', models.CharField(max_length=128, verbose_name='password')),
|
||||
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
|
||||
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
|
||||
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
|
||||
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
|
||||
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
|
||||
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
|
||||
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
|
||||
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
|
||||
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
|
||||
('nickname', models.CharField(blank=True, max_length=100, verbose_name='昵称')),
|
||||
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
|
||||
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
|
||||
('source', models.CharField(blank=True, max_length=100, verbose_name='创建来源')),
|
||||
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
|
||||
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
|
||||
],
|
||||
options={
|
||||
'verbose_name': '用户',
|
||||
'verbose_name_plural': '用户',
|
||||
'ordering': ['-id'],
|
||||
'get_latest_by': 'id',
|
||||
},
|
||||
managers=[
|
||||
('objects', django.contrib.auth.models.UserManager()),
|
||||
],
|
||||
),
|
||||
]
|
||||
@ -0,0 +1,46 @@
|
||||
# Generated by Django 4.2.5 on 2023-09-06 13:13
|
||||
|
||||
from django.db import migrations, models
|
||||
import django.utils.timezone
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
('accounts', '0001_initial'),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterModelOptions(
|
||||
name='bloguser',
|
||||
options={'get_latest_by': 'id', 'ordering': ['-id'], 'verbose_name': 'user', 'verbose_name_plural': 'user'},
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='bloguser',
|
||||
name='created_time',
|
||||
),
|
||||
migrations.RemoveField(
|
||||
model_name='bloguser',
|
||||
name='last_mod_time',
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='bloguser',
|
||||
name='creation_time',
|
||||
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
|
||||
),
|
||||
migrations.AddField(
|
||||
model_name='bloguser',
|
||||
name='last_modify_time',
|
||||
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='bloguser',
|
||||
name='nickname',
|
||||
field=models.CharField(blank=True, max_length=100, verbose_name='nick name'),
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name='bloguser',
|
||||
name='source',
|
||||
field=models.CharField(blank=True, max_length=100, verbose_name='create source'),
|
||||
),
|
||||
]
|
||||
@ -0,0 +1,35 @@
|
||||
from django.contrib.auth.models import AbstractUser
|
||||
from django.db import models
|
||||
from django.urls import reverse
|
||||
from django.utils.timezone import now
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from djangoblog.utils import get_current_site
|
||||
|
||||
|
||||
# Create your models here.
|
||||
|
||||
class BlogUser(AbstractUser):
|
||||
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
|
||||
creation_time = models.DateTimeField(_('creation time'), default=now)
|
||||
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
|
||||
source = models.CharField(_('create source'), max_length=100, blank=True)
|
||||
|
||||
def get_absolute_url(self):
|
||||
return reverse(
|
||||
'blog:author_detail', kwargs={
|
||||
'author_name': self.username})
|
||||
|
||||
def __str__(self):
|
||||
return self.email
|
||||
|
||||
def get_full_url(self):
|
||||
site = get_current_site().domain
|
||||
url = "https://{site}{path}".format(site=site,
|
||||
path=self.get_absolute_url())
|
||||
return url
|
||||
|
||||
class Meta:
|
||||
ordering = ['-id']
|
||||
verbose_name = _('user')
|
||||
verbose_name_plural = verbose_name
|
||||
get_latest_by = 'id'
|
||||
@ -0,0 +1,207 @@
|
||||
from django.test import Client, RequestFactory, TestCase
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from accounts.models import BlogUser
|
||||
from blog.models import Article, Category
|
||||
from djangoblog.utils import *
|
||||
from . import utils
|
||||
|
||||
|
||||
# Create your tests here.
|
||||
|
||||
class AccountTest(TestCase):
|
||||
def setUp(self):
|
||||
self.client = Client()
|
||||
self.factory = RequestFactory()
|
||||
self.blog_user = BlogUser.objects.create_user(
|
||||
username="test",
|
||||
email="admin@admin.com",
|
||||
password="12345678"
|
||||
)
|
||||
self.new_test = "xxx123--="
|
||||
|
||||
def test_validate_account(self):
|
||||
site = get_current_site().domain
|
||||
user = BlogUser.objects.create_superuser(
|
||||
email="liangliangyy1@gmail.com",
|
||||
username="liangliangyy1",
|
||||
password="qwer!@#$ggg")
|
||||
testuser = BlogUser.objects.get(username='liangliangyy1')
|
||||
|
||||
loginresult = self.client.login(
|
||||
username='liangliangyy1',
|
||||
password='qwer!@#$ggg')
|
||||
self.assertEqual(loginresult, True)
|
||||
response = self.client.get('/admin/')
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
category = Category()
|
||||
category.name = "categoryaaa"
|
||||
category.creation_time = timezone.now()
|
||||
category.last_modify_time = timezone.now()
|
||||
category.save()
|
||||
|
||||
article = Article()
|
||||
article.title = "nicetitleaaa"
|
||||
article.body = "nicecontentaaa"
|
||||
article.author = user
|
||||
article.category = category
|
||||
article.type = 'a'
|
||||
article.status = 'p'
|
||||
article.save()
|
||||
|
||||
response = self.client.get(article.get_admin_url())
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
def test_validate_register(self):
|
||||
self.assertEquals(
|
||||
0, len(
|
||||
BlogUser.objects.filter(
|
||||
email='user123@user.com')))
|
||||
response = self.client.post(reverse('account:register'), {
|
||||
'username': 'user1233',
|
||||
'email': 'user123@user.com',
|
||||
'password1': 'password123!q@wE#R$T',
|
||||
'password2': 'password123!q@wE#R$T',
|
||||
})
|
||||
self.assertEquals(
|
||||
1, len(
|
||||
BlogUser.objects.filter(
|
||||
email='user123@user.com')))
|
||||
user = BlogUser.objects.filter(email='user123@user.com')[0]
|
||||
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
|
||||
path = reverse('accounts:result')
|
||||
url = '{path}?type=validation&id={id}&sign={sign}'.format(
|
||||
path=path, id=user.id, sign=sign)
|
||||
response = self.client.get(url)
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
self.client.login(username='user1233', password='password123!q@wE#R$T')
|
||||
user = BlogUser.objects.filter(email='user123@user.com')[0]
|
||||
user.is_superuser = True
|
||||
user.is_staff = True
|
||||
user.save()
|
||||
delete_sidebar_cache()
|
||||
category = Category()
|
||||
category.name = "categoryaaa"
|
||||
category.creation_time = timezone.now()
|
||||
category.last_modify_time = timezone.now()
|
||||
category.save()
|
||||
|
||||
article = Article()
|
||||
article.category = category
|
||||
article.title = "nicetitle333"
|
||||
article.body = "nicecontentttt"
|
||||
article.author = user
|
||||
|
||||
article.type = 'a'
|
||||
article.status = 'p'
|
||||
article.save()
|
||||
|
||||
response = self.client.get(article.get_admin_url())
|
||||
self.assertEqual(response.status_code, 200)
|
||||
|
||||
response = self.client.get(reverse('account:logout'))
|
||||
self.assertIn(response.status_code, [301, 302, 200])
|
||||
|
||||
response = self.client.get(article.get_admin_url())
|
||||
self.assertIn(response.status_code, [301, 302, 200])
|
||||
|
||||
response = self.client.post(reverse('account:login'), {
|
||||
'username': 'user1233',
|
||||
'password': 'password123'
|
||||
})
|
||||
self.assertIn(response.status_code, [301, 302, 200])
|
||||
|
||||
response = self.client.get(article.get_admin_url())
|
||||
self.assertIn(response.status_code, [301, 302, 200])
|
||||
|
||||
def test_verify_email_code(self):
|
||||
to_email = "admin@admin.com"
|
||||
code = generate_code()
|
||||
utils.set_code(to_email, code)
|
||||
utils.send_verify_email(to_email, code)
|
||||
|
||||
err = utils.verify("admin@admin.com", code)
|
||||
self.assertEqual(err, None)
|
||||
|
||||
err = utils.verify("admin@123.com", code)
|
||||
self.assertEqual(type(err), str)
|
||||
|
||||
def test_forget_password_email_code_success(self):
|
||||
resp = self.client.post(
|
||||
path=reverse("account:forget_password_code"),
|
||||
data=dict(email="admin@admin.com")
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
self.assertEqual(resp.content.decode("utf-8"), "ok")
|
||||
|
||||
def test_forget_password_email_code_fail(self):
|
||||
resp = self.client.post(
|
||||
path=reverse("account:forget_password_code"),
|
||||
data=dict()
|
||||
)
|
||||
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
|
||||
|
||||
resp = self.client.post(
|
||||
path=reverse("account:forget_password_code"),
|
||||
data=dict(email="admin@com")
|
||||
)
|
||||
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
|
||||
|
||||
def test_forget_password_email_success(self):
|
||||
code = generate_code()
|
||||
utils.set_code(self.blog_user.email, code)
|
||||
data = dict(
|
||||
new_password1=self.new_test,
|
||||
new_password2=self.new_test,
|
||||
email=self.blog_user.email,
|
||||
code=code,
|
||||
)
|
||||
resp = self.client.post(
|
||||
path=reverse("account:forget_password"),
|
||||
data=data
|
||||
)
|
||||
self.assertEqual(resp.status_code, 302)
|
||||
|
||||
# 验证用户密码是否修改成功
|
||||
blog_user = BlogUser.objects.filter(
|
||||
email=self.blog_user.email,
|
||||
).first() # type: BlogUser
|
||||
self.assertNotEqual(blog_user, None)
|
||||
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
|
||||
|
||||
def test_forget_password_email_not_user(self):
|
||||
data = dict(
|
||||
new_password1=self.new_test,
|
||||
new_password2=self.new_test,
|
||||
email="123@123.com",
|
||||
code="123456",
|
||||
)
|
||||
resp = self.client.post(
|
||||
path=reverse("account:forget_password"),
|
||||
data=data
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
|
||||
def test_forget_password_email_code_error(self):
|
||||
code = generate_code()
|
||||
utils.set_code(self.blog_user.email, code)
|
||||
data = dict(
|
||||
new_password1=self.new_test,
|
||||
new_password2=self.new_test,
|
||||
email=self.blog_user.email,
|
||||
code="111111",
|
||||
)
|
||||
resp = self.client.post(
|
||||
path=reverse("account:forget_password"),
|
||||
data=data
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
@ -0,0 +1,28 @@
|
||||
from django.urls import path
|
||||
from django.urls import re_path
|
||||
|
||||
from . import views
|
||||
from .forms import LoginForm
|
||||
|
||||
app_name = "accounts"
|
||||
|
||||
urlpatterns = [re_path(r'^login/$',
|
||||
views.LoginView.as_view(success_url='/'),
|
||||
name='login',
|
||||
kwargs={'authentication_form': LoginForm}),
|
||||
re_path(r'^register/$',
|
||||
views.RegisterView.as_view(success_url="/"),
|
||||
name='register'),
|
||||
re_path(r'^logout/$',
|
||||
views.LogoutView.as_view(),
|
||||
name='logout'),
|
||||
path(r'account/result.html',
|
||||
views.account_result,
|
||||
name='result'),
|
||||
re_path(r'^forget_password/$',
|
||||
views.ForgetPasswordView.as_view(),
|
||||
name='forget_password'),
|
||||
re_path(r'^forget_password_code/$',
|
||||
views.ForgetPasswordEmailCode.as_view(),
|
||||
name='forget_password_code'),
|
||||
]
|
||||
@ -0,0 +1,26 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.backends import ModelBackend
|
||||
|
||||
|
||||
class EmailOrUsernameModelBackend(ModelBackend):
|
||||
"""
|
||||
允许使用用户名或邮箱登录
|
||||
"""
|
||||
|
||||
def authenticate(self, request, username=None, password=None, **kwargs):
|
||||
if '@' in username:
|
||||
kwargs = {'email': username}
|
||||
else:
|
||||
kwargs = {'username': username}
|
||||
try:
|
||||
user = get_user_model().objects.get(**kwargs)
|
||||
if user.check_password(password):
|
||||
return user
|
||||
except get_user_model().DoesNotExist:
|
||||
return None
|
||||
|
||||
def get_user(self, username):
|
||||
try:
|
||||
return get_user_model().objects.get(pk=username)
|
||||
except get_user_model().DoesNotExist:
|
||||
return None
|
||||
@ -0,0 +1,18 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from blog.documents import ElapsedTimeDocument, ArticleDocumentManager, ElaspedTimeDocumentManager, \
|
||||
ELASTICSEARCH_ENABLED
|
||||
|
||||
|
||||
# TODO 参数化
|
||||
class Command(BaseCommand):
|
||||
help = 'build search index'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
if ELASTICSEARCH_ENABLED:
|
||||
ElaspedTimeDocumentManager.build_index()
|
||||
manager = ElapsedTimeDocument()
|
||||
manager.init()
|
||||
manager = ArticleDocumentManager()
|
||||
manager.delete_index()
|
||||
manager.rebuild()
|
||||
@ -0,0 +1,13 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from blog.models import Tag, Category
|
||||
|
||||
|
||||
# TODO 参数化
|
||||
class Command(BaseCommand):
|
||||
help = 'build search words'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
datas = set([t.name for t in Tag.objects.all()] +
|
||||
[t.name for t in Category.objects.all()])
|
||||
print('\n'.join(datas))
|
||||
@ -0,0 +1,11 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from djangoblog.utils import cache
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'clear the whole cache'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
cache.clear()
|
||||
self.stdout.write(self.style.SUCCESS('Cleared cache\n'))
|
||||
@ -0,0 +1,40 @@
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.hashers import make_password
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from blog.models import Article, Tag, Category
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'create test datas'
|
||||
|
||||
def handle(self, *args, **options):
|
||||
user = get_user_model().objects.get_or_create(
|
||||
email='test@test.com', username='测试用户', password=make_password('test!q@w#eTYU'))[0]
|
||||
|
||||
pcategory = Category.objects.get_or_create(
|
||||
name='我是父类目', parent_category=None)[0]
|
||||
|
||||
category = Category.objects.get_or_create(
|
||||
name='子类目', parent_category=pcategory)[0]
|
||||
|
||||
category.save()
|
||||
basetag = Tag()
|
||||
basetag.name = "标签"
|
||||
basetag.save()
|
||||
for i in range(1, 20):
|
||||
article = Article.objects.get_or_create(
|
||||
category=category,
|
||||
title='nice title ' + str(i),
|
||||
body='nice content ' + str(i),
|
||||
author=user)[0]
|
||||
tag = Tag()
|
||||
tag.name = "标签" + str(i)
|
||||
tag.save()
|
||||
article.tags.add(tag)
|
||||
article.tags.add(basetag)
|
||||
article.save()
|
||||
|
||||
from djangoblog.utils import cache
|
||||
cache.clear()
|
||||
self.stdout.write(self.style.SUCCESS('created test datas \n'))
|
||||
@ -0,0 +1,50 @@
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from djangoblog.spider_notify import SpiderNotify
|
||||
from djangoblog.utils import get_current_site
|
||||
from blog.models import Article, Tag, Category
|
||||
|
||||
site = get_current_site().domain
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'notify baidu url'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'data_type',
|
||||
type=str,
|
||||
choices=[
|
||||
'all',
|
||||
'article',
|
||||
'tag',
|
||||
'category'],
|
||||
help='article : all article,tag : all tag,category: all category,all: All of these')
|
||||
|
||||
def get_full_url(self, path):
|
||||
url = "https://{site}{path}".format(site=site, path=path)
|
||||
return url
|
||||
|
||||
def handle(self, *args, **options):
|
||||
type = options['data_type']
|
||||
self.stdout.write('start get %s' % type)
|
||||
|
||||
urls = []
|
||||
if type == 'article' or type == 'all':
|
||||
for article in Article.objects.filter(status='p'):
|
||||
urls.append(article.get_full_url())
|
||||
if type == 'tag' or type == 'all':
|
||||
for tag in Tag.objects.all():
|
||||
url = tag.get_absolute_url()
|
||||
urls.append(self.get_full_url(url))
|
||||
if type == 'category' or type == 'all':
|
||||
for category in Category.objects.all():
|
||||
url = category.get_absolute_url()
|
||||
urls.append(self.get_full_url(url))
|
||||
|
||||
self.stdout.write(
|
||||
self.style.SUCCESS(
|
||||
'start notify %d urls' %
|
||||
len(urls)))
|
||||
SpiderNotify.baidu_notify(urls)
|
||||
self.stdout.write(self.style.SUCCESS('finish notify'))
|
||||
@ -0,0 +1,47 @@
|
||||
import requests
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.templatetags.static import static
|
||||
|
||||
from djangoblog.utils import save_user_avatar
|
||||
from oauth.models import OAuthUser
|
||||
from oauth.oauthmanager import get_manager_by_type
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'sync user avatar'
|
||||
|
||||
def test_picture(self, url):
|
||||
try:
|
||||
if requests.get(url, timeout=2).status_code == 200:
|
||||
return True
|
||||
except:
|
||||
pass
|
||||
|
||||
def handle(self, *args, **options):
|
||||
static_url = static("../")
|
||||
users = OAuthUser.objects.all()
|
||||
self.stdout.write(f'开始同步{len(users)}个用户头像')
|
||||
for u in users:
|
||||
self.stdout.write(f'开始同步:{u.nickname}')
|
||||
url = u.picture
|
||||
if url:
|
||||
if url.startswith(static_url):
|
||||
if self.test_picture(url):
|
||||
continue
|
||||
else:
|
||||
if u.metadata:
|
||||
manage = get_manager_by_type(u.type)
|
||||
url = manage.get_picture(u.metadata)
|
||||
url = save_user_avatar(url)
|
||||
else:
|
||||
url = static('blog/img/avatar.png')
|
||||
else:
|
||||
url = save_user_avatar(url)
|
||||
else:
|
||||
url = static('blog/img/avatar.png')
|
||||
if url:
|
||||
self.stdout.write(
|
||||
f'结束同步:{u.nickname}.url:{url}')
|
||||
u.picture = url
|
||||
u.save()
|
||||
self.stdout.write('结束同步')
|
||||
Binary file not shown.
Loading…
Reference in new issue