Compare commits

...

11 Commits

@ -0,0 +1,132 @@
"""
DjangoBlog 站点地图配置模块
功能为搜索引擎提供网站结构地图支持文章分类标签等内容的自动索引
"""
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
"""
静态页面站点地图
用于生成固定页面的站点地图如首页等
"""
# 优先级0.5中等优先级首页等重要页面可以设为1.0
priority = 0.5
# 更新频率:每天检查
changefreq = 'daily'
def items(self):
"""
返回包含在站点地图中的静态页面名称
这些名称需要与 urls.py 中的 URL 名称对应
"""
return ['blog:index', ] # 博客首页
def location(self, item):
"""
根据页面名称生成完整的 URL 地址
"""
return reverse(item)
class ArticleSiteMap(Sitemap):
"""
文章站点地图
自动生成所有已发布文章的站点地图
"""
# 更新频率:每月检查(文章内容相对稳定)
changefreq = "monthly"
# 优先级0.6(文章是核心内容,优先级较高)
priority = "0.6"
def items(self):
"""
返回所有已发布的文章对象
status='p' 表示已发布状态
"""
return Article.objects.filter(status='p')
def lastmod(self, obj):
"""
返回文章的最后修改时间
帮助搜索引擎了解内容更新情况
"""
return obj.last_modify_time
class CategorySiteMap(Sitemap):
"""
分类站点地图
生成文章分类页面的站点地图
"""
# 更新频率:每周检查(分类结构相对稳定)
changefreq = "Weekly"
# 优先级0.6(分类页面重要程度较高)
priority = "0.6"
def items(self):
"""
返回所有分类对象
"""
return Category.objects.all()
def lastmod(self, obj):
"""
返回分类的最后修改时间
当分类下的文章更新时分类页面也需要更新
"""
return obj.last_modify_time
class TagSiteMap(Sitemap):
"""
标签站点地图
生成标签页面的站点地图
"""
# 更新频率:每周检查
changefreq = "Weekly"
# 优先级0.3(标签页面重要性相对较低)
priority = "0.3"
def items(self):
"""
返回所有标签对象
"""
return Tag.objects.all()
def lastmod(self, obj):
"""
返回标签的最后修改时间
当标签关联的文章更新时标签页面也需要更新
"""
return obj.last_modify_time
class UserSiteMap(Sitemap):
"""
用户站点地图
生成用户主页的站点地图
"""
# 更新频率:每周检查
changefreq = "Weekly"
# 优先级0.3(用户页面重要性相对较低)
priority = "0.3"
def items(self):
"""
返回所有发表过文章的用户作者
使用 set 去重确保每个用户只出现一次
"""
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
"""
返回用户的注册时间
这里使用用户注册时间作为最后修改时间
实际可以根据用户最后活动时间优化
"""
return obj.date_joined

@ -1,59 +1,107 @@
from django import forms from django import forms
from django.contrib.auth.admin import UserAdmin from django.contrib.auth.admin import UserAdmin # Django 自带的用户管理后台基类
from django.contrib.auth.forms import UserChangeForm from django.contrib.auth.forms import UserChangeForm # Django 默认的用户信息修改表单
from django.contrib.auth.forms import UsernameField from django.contrib.auth.forms import UsernameField # Django 用于用户名字段的专用表单字段
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _ # 用于支持多语言翻译的辅助函数
# Register your models here. # 从当前 app 的 models 导入自定义的用户模型 BlogUser
from .models import BlogUser from .models import BlogUser
# ======================
# 自定义用户创建表单(用于后台添加用户时使用)
# ======================
class BlogUserCreationForm(forms.ModelForm): class BlogUserCreationForm(forms.ModelForm):
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput) # 添加两个密码字段,用于用户注册时输入和确认密码
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput) password1 = forms.CharField(
label=_('password'), # 字段显示名称可翻译这里是“password”
widget=forms.PasswordInput # 使用密码输入框,输入内容会被隐藏
)
password2 = forms.CharField(
label=_('Enter password again'), # 确认密码的标签
widget=forms.PasswordInput # 同样是密码输入框
)
class Meta: class Meta:
model = BlogUser model = BlogUser # 指定该表单关联的模型是 BlogUser
fields = ('email',) fields = ('email',) # 在创建用户时,只显示 email 字段(可以从后台选择的字段)
def clean_password2(self): def clean_password2(self):
# Check that the two password entries match """
校验两次输入的密码是否一致
"""
# 从 cleaned_data 中获取用户输入的两个密码
password1 = self.cleaned_data.get("password1") password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2") password2 = self.cleaned_data.get("password2")
# 如果两个密码都有值,但它们不相等,则抛出验证错误
if password1 and password2 and password1 != password2: if password1 and password2 and password1 != password2:
raise forms.ValidationError(_("passwords do not match")) raise forms.ValidationError(_("passwords do not match")) # 提示“密码不匹配”
# 验证通过,返回 password2通常返回确认密码字段的值
return password2 return password2
def save(self, commit=True): def save(self, commit=True):
# Save the provided password in hashed format """
保存用户对象并对密码进行哈希处理
"""
# 调用父类的 save 方法但不立即提交到数据库commit=False
user = super().save(commit=False) user = super().save(commit=False)
# 对用户输入的密码password1进行哈希处理并设置到 user 对象上
user.set_password(self.cleaned_data["password1"]) user.set_password(self.cleaned_data["password1"])
if commit: if commit:
# 如果 commit=True默认则保存到数据库
# 同时,给用户添加一个来源标识 source = 'adminsite',表示是通过后台添加的
user.source = 'adminsite' user.source = 'adminsite'
user.save() user.save()
# 返回保存后的用户对象
return user return user
# ======================
# 自定义用户修改表单(用于后台编辑用户信息时使用)
# ======================
class BlogUserChangeForm(UserChangeForm): class BlogUserChangeForm(UserChangeForm):
class Meta: class Meta:
model = BlogUser model = BlogUser # 指定关联的模型是 BlogUser
fields = '__all__' fields = '__all__' # 表单中包含模型的所有字段
# 指定 username 字段使用 Django 提供的 UsernameField它对用户名有特殊处理如唯一性等
field_classes = {'username': UsernameField} field_classes = {'username': UsernameField}
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs) """
初始化方法这里暂时没有额外逻辑只是调用了父类的初始化
"""
super().__init__(*args, **kwargs)
# ======================
# 自定义用户管理后台类(用于在 Django Admin 中管理 BlogUser 模型)
# ======================
class BlogUserAdmin(UserAdmin): class BlogUserAdmin(UserAdmin):
# 指定用户修改时使用的表单类(编辑用户信息时)
form = BlogUserChangeForm form = BlogUserChangeForm
# 指定用户创建时使用的表单类(添加新用户时)
add_form = BlogUserCreationForm add_form = BlogUserCreationForm
# 定义在用户列表页显示哪些字段
list_display = ( list_display = (
'id', 'id', # 用户 ID
'nickname', 'nickname', # 昵称(假设你的 BlogUser 模型中有这个字段)
'username', 'username', # 用户名
'email', 'email', # 邮箱
'last_login', 'last_login', # 上次登录时间
'date_joined', 'date_joined', # 注册时间
'source') 'source' # 用户来源(比如 adminsite 表示后台添加)
)
# 定义哪些字段可以作为链接,点击后可以进入编辑页面
# 这里 id 和 username 都可以作为链接
list_display_links = ('id', 'username') list_display_links = ('id', 'username')
ordering = ('-id',)
# 定义默认排序方式,这里是按照 id 降序(最新的用户在前面)
ordering = ('-id',)

@ -1,31 +1,50 @@
#!/usr/bin/env bash #!/usr/bin/env bash
# 指定脚本解释器为bash
# 定义应用名称为djangoblog
NAME="djangoblog" NAME="djangoblog"
# 定义Django项目根目录路径
DJANGODIR=/code/djangoblog DJANGODIR=/code/djangoblog
# 定义运行应用的用户
USER=root USER=root
# 定义运行应用的用户组
GROUP=root GROUP=root
# 定义Gunicorn工作进程数量
NUM_WORKERS=1 NUM_WORKERS=1
# 定义Django的WSGI模块路径
DJANGO_WSGI_MODULE=djangoblog.wsgi DJANGO_WSGI_MODULE=djangoblog.wsgi
# 输出启动信息,显示当前启动的应用名称和执行用户
echo "Starting $NAME as `whoami`" echo "Starting $NAME as `whoami`"
# 进入Django项目根目录
cd $DJANGODIR cd $DJANGODIR
# 将项目目录添加到Python路径中确保Python能正确导入项目模块
export PYTHONPATH=$DJANGODIR:$PYTHONPATH export PYTHONPATH=$DJANGODIR:$PYTHONPATH
# 执行Django项目初始化命令序列若任何一步失败则退出脚本
# 1. 生成数据库迁移文件
python manage.py makemigrations && \ python manage.py makemigrations && \
# 2. 应用数据库迁移
python manage.py migrate && \ python manage.py migrate && \
# 3. 收集静态文件(无交互模式)
python manage.py collectstatic --noinput && \ python manage.py collectstatic --noinput && \
# 4. 强制压缩静态文件通常用于CSS/JS压缩
python manage.py compress --force && \ python manage.py compress --force && \
# 5. 构建搜索索引(如果项目使用了全文搜索功能)
python manage.py build_index && \ python manage.py build_index && \
# 6. 编译翻译文件(用于国际化支持)
python manage.py compilemessages || exit 1 python manage.py compilemessages || exit 1
# 启动Gunicorn作为WSGI服务器替换当前进程exec命令特性
exec gunicorn ${DJANGO_WSGI_MODULE}:application \ exec gunicorn ${DJANGO_WSGI_MODULE}:application \
--name $NAME \ --name $NAME \ # 指定应用名称
--workers $NUM_WORKERS \ --workers $NUM_WORKERS \ # 指定工作进程数量
--user=$USER --group=$GROUP \ --user=$USER --group=$GROUP \ # 指定运行的用户和用户组
--bind 0.0.0.0:8000 \ --bind 0.0.0.0:8000 \ # 绑定监听地址和端口0.0.0.0表示允许所有网络访问)
--log-level=debug \ --log-level=debug \ # 设置日志级别为debug
--log-file=- \ --log-file=- \ # 日志输出到标准输出(-表示stdout
--worker-class gevent \ --worker-class gevent \ # 使用gevent工作类支持异步IO提高并发性能
--threads 4 --threads 4 # 每个工作进程的线程数量

@ -1,34 +1,47 @@
<!DOCTYPE html> <!DOCTYPE html>
<html> <html>
<head> <head>
<!-- 加载static静态文件标签用于后面引用静态资源 -->
{% load static %} {% load static %}
<meta charset="utf-8"> <meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge"> <meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1"> <meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> <!-- 以上3个meta标签必须放在head的最前面 -->
<meta name="description" content=""> <meta name="description" content="">
<meta name="author" content=""> <meta name="author" content="">
<link rel="icon" href="../../favicon.ico"> <link rel="icon" href="../../favicon.ico">
<!-- 禁止搜索引擎索引此页面 -->
<meta name="robots" content="noindex"> <meta name="robots" content="noindex">
<!-- 动态设置页面标题使用Django模板变量 -->
<title>{{ SITE_NAME }} | {{ SITE_DESCRIPTION }}</title> <title>{{ SITE_NAME }} | {{ SITE_DESCRIPTION }}</title>
<!-- 加载账户相关的CSS文件 -->
<link href="{% static 'account/css/account.css' %}" rel="stylesheet"> <link href="{% static 'account/css/account.css' %}" rel="stylesheet">
<!-- 使用Django压缩工具压缩CSS文件 -->
{% load compress %} {% load compress %}
{% compress css %} {% compress css %}
<!-- Bootstrap core CSS --> <!-- Bootstrap核心CSS文件 -->
<link href="{% static 'assets/css/bootstrap.min.css' %}" rel="stylesheet"> <link href="{% static 'assets/css/bootstrap.min.css' %}" rel="stylesheet">
<!-- OAuth认证样式 -->
<link href="{% static 'blog/css/oauth_style.css' %}" rel="stylesheet"> <link href="{% static 'blog/css/oauth_style.css' %}" rel="stylesheet">
<!-- IE10 viewport hack for Surface/desktop Windows 8 bug --> <!-- IE10视口bug修复 -->
<link href="{% static 'assets/css/ie10-viewport-bug-workaround.css' %}" rel="stylesheet"> <link href="{% static 'assets/css/ie10-viewport-bug-workaround.css' %}" rel="stylesheet">
<!-- TODC Bootstrap core CSS --> <!-- TODC Bootstrap样式 -->
<link href="{% static 'assets/css/todc-bootstrap.min.css' %}" rel="stylesheet"> <link href="{% static 'assets/css/todc-bootstrap.min.css' %}" rel="stylesheet">
<!-- Custom styles for this template --> <!-- 登录页面自定义样式 -->
<link href="{% static 'assets/css/signin.css' %}" rel="stylesheet"> <link href="{% static 'assets/css/signin.css' %}" rel="stylesheet">
{% endcompress %} {% endcompress %}
<!-- 压缩JavaScript文件 -->
{% compress js %} {% compress js %}
<!-- IE10视口bug修复脚本 -->
<script src="{% static 'assets/js/ie10-viewport-bug-workaround.js' %}"></script> <script src="{% static 'assets/js/ie10-viewport-bug-workaround.js' %}"></script>
<!-- IE浏览器仿真模式警告 -->
<script src="{% static 'assets/js/ie-emulation-modes-warning.js' %}"></script> <script src="{% static 'assets/js/ie-emulation-modes-warning.js' %}"></script>
{% endcompress %} {% endcompress %}
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- HTML5 shim和Respond.js用于IE8支持HTML5元素和媒体查询 -->
<!--[if lt IE 9]> <!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script> <script src="https://oss.maxcdn.com/html5shiv/3.7.3/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
@ -36,12 +49,15 @@
</head> </head>
<body> <body>
{% block content %} <!-- 定义内容块,子模板可以在此处插入具体内容 -->
{% endblock %} {% block content %}
{% endblock %}
<!-- IE10 viewport hack for Surface/desktop Windows 8 bug -->
<!-- IE10视口hack用于Surface/桌面Windows 8 bug -->
</body> </body>
<!-- 引入jQuery库 -->
<script type="text/javascript" src="{% static 'blog/js/jquery-3.6.0.min.js' %}"></script> <script type="text/javascript" src="{% static 'blog/js/jquery-3.6.0.min.js' %}"></script>
<!-- 引入账户相关的JavaScript文件 -->
<script src="{% static 'account/js/account.js' %}" type="text/javascript"></script> <script src="{% static 'account/js/account.js' %}" type="text/javascript"></script>
</html> </html>

@ -0,0 +1,442 @@
"""
OAuth认证模块
提供第三方登录功能支持微信微博GitHubGoogleQQFacebook等平台
"""
import logging
import json
import urllib.parse
from abc import ABC, abstractmethod
from django.urls import reverse
from django.shortcuts import get_object_or_404, render
from django.http import HttpResponseRedirect, HttpResponseForbidden
from django.contrib import auth
from django.utils.translation import gettext_lazy as _
from .models import OAuthUser, OAuthConfig
# 获取日志器
logger = logging.getLogger(__name__)
class BaseOauthManager(ABC):
"""OAuth认证管理器基类"""
# 授权URL和API端点
AUTH_URL = ""
TOKEN_URL = ""
OPEN_ID_URL = ""
USER_INFO_URL = ""
ICON_NAME = "" # 平台图标名称
def __init__(self):
"""初始化OAuth管理器"""
self.access_token = None
self.openid = None
self.client_id = None
self.client_secret = None
self.callback_url = None
@abstractmethod
def get_authorization_url(self, next_url='/'):
"""获取授权URL"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""获取用户信息"""
pass
def get_picture(self, metadata):
"""获取用户头像(可选实现)"""
return ""
def do_get(self, url, params, headers=None):
"""
执行GET请求
Args:
url: 请求URL
params: 请求参数
headers: 请求头
Returns:
str: 响应内容
"""
try:
response = requests.get(url=url, params=params, headers=headers)
logger.info(f"GET Response: {response.text}")
return response.text
except Exception as e:
logger.error(f"GET request failed: {e}")
raise
def do_post(self, url, params, headers=None):
"""
执行POST请求
Args:
url: 请求URL
params: 请求参数
headers: 请求头
Returns:
str: 响应内容
"""
try:
response = requests.post(url, data=params, headers=headers)
logger.info(f"POST Response: {response.text}")
return response.text
except Exception as e:
logger.error(f"POST request failed: {e}")
raise
@property
def is_access_token_set(self):
"""检查访问令牌是否已设置"""
return self.access_token is not None
def get_config(self):
"""获取OAuth配置"""
try:
config = OAuthConfig.objects.filter(
type=self.ICON_NAME.lower(),
is_enable=True
).first()
if config:
self.client_id = config.appkey
self.client_secret = config.appsecret
self.callback_url = config.callback_url
return config
except Exception as e:
logger.error(f"Get OAuth config failed: {e}")
return None
class WeiboOauthManager(BaseOauthManager):
"""微博OAuth认证管理器"""
AUTH_URL = "https://api.weibo.com/oauth2/authorize"
TOKEN_URL = "https://api.weibo.com/oauth2/access_token"
USER_INFO_URL = "https://api.weibo.com/2/users/show.json"
ICON_NAME = "weibo"
def get_authorization_url(self, next_url='/'):
"""
获取微博授权URL
Args:
next_url: 授权成功后跳转的URL
Returns:
str: 完整的授权URL
"""
if not self.get_config():
return ""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取访问令牌
Args:
code: 授权码
Returns:
tuple: (access_token, 响应数据)
"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
try:
response_text = self.do_post(self.TOKEN_URL, params)
token_data = json.loads(response_text)
if 'access_token' in token_data:
self.access_token = token_data['access_token']
return self.access_token, token_data
else:
logger.error(f"Failed to get access token: {token_data}")
raise OAuthAccessTokenException(token_data)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
raise OAuthAccessTokenException("Invalid response format")
except Exception as e:
logger.error(f"Get access token failed: {e}")
raise OAuthAccessTokenException(str(e))
def get_oauth_userinfo(self):
"""
获取微博用户信息
Returns:
OAuthUser: 用户信息对象
"""
if not self.is_access_token_set:
raise ValueError("Access token not set")
params = {'access_token': self.access_token}
response_text = self.do_get(self.USER_INFO_URL, params)
user_data = json.loads(response_text)
# 创建OAuth用户对象
oauth_user = OAuthUser()
oauth_user.nickname = user_data.get('screen_name', '')
oauth_user.picture = user_data.get('profile_image_url', '')
oauth_user.token = self.access_token
oauth_user.type = 'weibo'
oauth_user.email = user_data.get('email', '')
oauth_user.metadata = json.dumps(user_data)
return oauth_user
class GitHubOauthManager(BaseOauthManager):
"""GitHub OAuth认证管理器"""
AUTH_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
USER_INFO_URL = "https://api.github.com/user"
ICON_NAME = "github"
def get_authorization_url(self, next_url='/'):
"""获取GitHub授权URL"""
if not self.get_config():
return ""
params = {
'client_id': self.client_id,
'scope': 'user:email',
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取GitHub访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
}
headers = {'Accept': 'application/json'}
response_text = self.do_post(self.TOKEN_URL, params, headers)
token_data = json.loads(response_text)
if 'access_token' in token_data:
self.access_token = token_data['access_token']
return self.access_token, token_data
else:
raise OAuthAccessTokenException(token_data)
def get_oauth_userinfo(self):
"""获取GitHub用户信息"""
if not self.is_access_token_set:
raise ValueError("Access token not set")
headers = {'Authorization': f'token {self.access_token}'}
response_text = self.do_get(self.USER_INFO_URL, headers=headers)
user_data = json.loads(response_text)
oauth_user = OAuthUser()
oauth_user.nickname = user_data.get('login', '')
oauth_user.picture = user_data.get('avatar_url', '')
oauth_user.token = self.access_token
oauth_user.type = 'github'
oauth_user.email = user_data.get('email', '')
oauth_user.metadata = json.dumps(user_data)
return oauth_user
# OAuth异常类
class OAuthException(Exception):
"""OAuth异常基类"""
pass
class OAuthAccessTokenException(OAuthException):
"""访问令牌获取异常"""
pass
class OAuthUserInfoException(OAuthException):
"""用户信息获取异常"""
pass
# 工具函数
def get_oauth_apps():
"""
获取所有可用的OAuth应用
Returns:
list: OAuth管理器实例列表
"""
config_types = OAuthConfig.objects.filter(
is_enable=True
).values_list('type', flat=True)
applications = BaseOauthManager.__subclasses__()
apps = [
app() for app in applications
if app().ICON_NAME.lower() in config_types
]
return apps
def get_manager_by_type(oauth_type):
"""
根据类型获取OAuth管理器
Args:
oauth_type: OAuth类型weibogithub等
Returns:
BaseOauthManager: OAuth管理器实例
"""
applications = get_oauth_apps()
if applications:
finds = list(filter(
lambda x: x.ICON_NAME.lower() == oauth_type.lower(),
applications
))
return finds[0] if finds else None
return None
# 视图函数
def oauth_login(request):
"""
OAuth登录入口
Args:
request: HTTP请求对象
Returns:
HttpResponseRedirect: 重定向到授权页面或首页
"""
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
if not oauth_type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(oauth_type)
if not manager:
return HttpResponseRedirect('/')
next_url = request.GET.get('next_url', '/')
authorize_url = manager.get_authorization_url(next_url)
return HttpResponseRedirect(authorize_url)
def oauth_authorize(request):
"""
OAuth授权回调处理
Args:
request: HTTP请求对象
Returns:
HttpResponse: 授权结果页面或重定向
"""
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
code = request.GET.get('code')
next_url = request.GET.get('next_url', '/')
if not oauth_type or not code:
return HttpResponseRedirect('/')
try:
manager = get_manager_by_type(oauth_type)
if not manager:
return HttpResponseRedirect('/')
# 获取访问令牌
manager.get_access_token_by_code(code)
# 获取用户信息
oauth_user = manager.get_oauth_userinfo()
# 处理用户登录或绑定
user = process_oauth_user(oauth_user, request)
if user:
auth.login(request, user)
return HttpResponseRedirect(next_url)
else:
# 转到绑定页面
return redirect_to_bind_page(oauth_user, request)
except OAuthException as e:
logger.error(f"OAuth authorization failed: {e}")
return render(request, 'oauth/error.html', {
'error_message': _('OAuth authentication failed')
})
def process_oauth_user(oauth_user, request):
"""
处理OAuth用户信息
Args:
oauth_user: OAuth用户对象
request: HTTP请求对象
Returns:
User: 认证用户对象或None
"""
try:
# 查找已存在的OAuth用户
existing_oauth_user = OAuthUser.objects.filter(
type=oauth_user.type,
openid=oauth_user.openid
).first()
if existing_oauth_user:
# 已绑定用户,直接登录
if existing_oauth_user.author:
return existing_oauth_user.author
else:
# 未绑定用户,转到绑定页面
return None
else:
# 新用户保存OAuth信息并转到绑定页面
oauth_user.save()
return None
except Exception as e:
logger.error(f"Process OAuth user failed: {e}")
return None
def redirect_to_bind_page(oauth_user, request):
"""
重定向到用户绑定页面
Args:
oauth_user: OAuth用户对象
request: HTTP请求对象
Returns:
HttpResponseRedirect: 重定向到绑定页面
"""
# 生成绑定URL
bind_url = reverse('oauth:bind') + f'?oauth_id={oauth_user.id}'
return HttpResponseRedirect(bind_url)

@ -0,0 +1,132 @@
"""
DjangoBlog 站点地图配置模块
功能为搜索引擎提供网站结构地图支持文章分类标签等内容的自动索引
"""
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
"""
静态页面站点地图
用于生成固定页面的站点地图如首页等
"""
# 优先级0.5中等优先级首页等重要页面可以设为1.0
priority = 0.5
# 更新频率:每天检查
changefreq = 'daily'
def items(self):
"""
返回包含在站点地图中的静态页面名称
这些名称需要与 urls.py 中的 URL 名称对应
"""
return ['blog:index', ] # 博客首页
def location(self, item):
"""
根据页面名称生成完整的 URL 地址
"""
return reverse(item)
class ArticleSiteMap(Sitemap):
"""
文章站点地图
自动生成所有已发布文章的站点地图
"""
# 更新频率:每月检查(文章内容相对稳定)
changefreq = "monthly"
# 优先级0.6(文章是核心内容,优先级较高)
priority = "0.6"
def items(self):
"""
返回所有已发布的文章对象
status='p' 表示已发布状态
"""
return Article.objects.filter(status='p')
def lastmod(self, obj):
"""
返回文章的最后修改时间
帮助搜索引擎了解内容更新情况
"""
return obj.last_modify_time
class CategorySiteMap(Sitemap):
"""
分类站点地图
生成文章分类页面的站点地图
"""
# 更新频率:每周检查(分类结构相对稳定)
changefreq = "Weekly"
# 优先级0.6(分类页面重要程度较高)
priority = "0.6"
def items(self):
"""
返回所有分类对象
"""
return Category.objects.all()
def lastmod(self, obj):
"""
返回分类的最后修改时间
当分类下的文章更新时分类页面也需要更新
"""
return obj.last_modify_time
class TagSiteMap(Sitemap):
"""
标签站点地图
生成标签页面的站点地图
"""
# 更新频率:每周检查
changefreq = "Weekly"
# 优先级0.3(标签页面重要性相对较低)
priority = "0.3"
def items(self):
"""
返回所有标签对象
"""
return Tag.objects.all()
def lastmod(self, obj):
"""
返回标签的最后修改时间
当标签关联的文章更新时标签页面也需要更新
"""
return obj.last_modify_time
class UserSiteMap(Sitemap):
"""
用户站点地图
生成用户主页的站点地图
"""
# 更新频率:每周检查
changefreq = "Weekly"
# 优先级0.3(用户页面重要性相对较低)
priority = "0.3"
def items(self):
"""
返回所有发表过文章的用户作者
使用 set 去重确保每个用户只出现一次
"""
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
"""
返回用户的注册时间
这里使用用户注册时间作为最后修改时间
实际可以根据用户最后活动时间优化
"""
return obj.date_joined

@ -0,0 +1,370 @@
#!/usr/bin/env python
# encoding: utf-8
"""
DjangoBlog 通用工具函数模块
提供缓存Markdown处理邮件发送文件操作等通用功能
"""
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
# 获取当前模块的日志器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
获取最大的文章ID和评论ID
Returns:
tuple: (最大文章ID, 最大评论ID)
"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
计算字符串的SHA256哈希值
Args:
str (str): 输入字符串
Returns:
str: SHA256哈希值的十六进制表示
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""
缓存装饰器自动缓存函数结果
Args:
expiration (int): 缓存过期时间默认3分钟
Returns:
function: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从视图类获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
# 如果没有特定缓存键,根据函数参数生成
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# 尝试从缓存获取结果
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
# 缓存未命中,执行函数并缓存结果
logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key))
value = func(*args, **kwargs)
if value is None:
# 对None值进行特殊处理
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
Args:
path (str): URL路径
servername (str): 主机名
serverport (str): 端口号
key_prefix (str): 缓存键前缀
Returns:
bool: 是否成功删除缓存
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 创建模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除缓存
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""
获取当前站点信息带缓存
Returns:
Site: 当前站点对象
"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""
Markdown处理工具类
提供Markdown到HTML的转换功能
"""
@staticmethod
def _convert_markdown(value):
"""
内部方法执行Markdown转换
Args:
value (str): Markdown文本
Returns:
tuple: (HTML内容, 目录HTML)
"""
# 配置Markdown扩展
md = markdown.Markdown(
extensions=[
'extra', # 额外语法支持
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value) # 转换Markdown
toc = md.toc # 提取目录
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""
获取带目录的Markdown转换结果
Args:
value (str): Markdown文本
Returns:
tuple: (HTML内容, 目录HTML)
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""
获取不带目录的Markdown转换结果
Args:
value (str): Markdown文本
Returns:
str: HTML内容
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
发送邮件通过信号机制
Args:
emailto (str): 收件人邮箱
title (str): 邮件标题
content (str): 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成随机数验证码
Returns:
str: 6位数字验证码
"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
将字典转换为URL参数字符串
Args:
dict (dict): 参数字典
Returns:
str: URL参数字符串
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
@cache_decorator()
def get_blog_setting():
"""
获取博客设置带缓存
如果不存在默认设置则创建默认设置
Returns:
BlogSettings: 博客设置对象
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
# 如果没有设置记录,创建默认设置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value) # 缓存设置
return value
def save_user_avatar(url):
'''
保存用户头像到本地
Args:
url (str): 头像URL地址
Returns:
str: 本地静态文件路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
# 下载头像文件
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
# 检查文件是否为图片
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
# 生成唯一文件名并保存
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename) # 返回静态文件URL
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""
删除侧边栏缓存
"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""
删除视图缓存
Args:
prefix (str): 缓存前缀
keys (list): 缓存键列表
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""
获取资源URL
Returns:
str: 静态资源URL
"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
# HTML净化配置
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
"""
净化HTML防止XSS攻击
Args:
html (str): 原始HTML
Returns:
str: 净化后的HTML
"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
Loading…
Cancel
Save