Compare commits

..

18 Commits

@ -1 +0,0 @@
undefined

@ -1 +0,0 @@
This is for doc folder

@ -1 +0,0 @@
Subproject commit 18b7cafbdcde59b0292b13179a4d706a9cb6458f

@ -0,0 +1 @@
# 文档目录 - 项目文档将存放于此

@ -0,0 +1,40 @@
#!/bin/bash
# 显示脚本开始执行
echo "脚本开始执行..."
# 从键盘读取文件名
read -p "请输入文件名: " filename
# 检查文件是否存在
if [ ! -e "$filename" ]; then
echo "错误:文件 '$filename' 不存在!"
exit 1
fi
echo "文件 '$filename' 存在"
# 检查文件是否是符号链接
if [ -L "$filename" ]; then
echo "检测到 '$filename' 是一个符号链接文件"
# 获取文件名部分(不包括路径)
base_name=$(basename "$filename")
# 移动文件到/tmp目录
echo "正在将 '$filename' 移动到 /tmp/$base_name"
mv "$filename" "/tmp/$base_name"
# 检查移动是否成功
if [ $? -eq 0 ]; then
echo "移动成功!文件现在位于 /tmp/$base_name"
else
echo "移动失败!"
exit 1
fi
else
echo "'$filename' 不是符号链接文件,不进行任何处理"
fi
echo "脚本执行完毕"
exit 0

Binary file not shown.

@ -1 +0,0 @@
undefined

@ -1,176 +0,0 @@
name: 自动部署到生产环境
on:
workflow_run:
workflows: ["Django CI"]
types:
- completed
branches:
- master
workflow_dispatch:
inputs:
environment:
description: '部署环境'
required: true
default: 'production'
type: choice
options:
- production
- staging
image_tag:
description: '镜像标签 (默认: latest)'
required: false
default: 'latest'
type: string
skip_tests:
description: '跳过测试直接部署'
required: false
default: false
type: boolean
env:
REGISTRY: registry.cn-shenzhen.aliyuncs.com
IMAGE_NAME: liangliangyy/djangoblog
NAMESPACE: djangoblog
jobs:
deploy:
name: 构建镜像并部署到生产环境
runs-on: ubuntu-latest
if: ${{ github.event.workflow_run.conclusion == 'success' || github.event_name == 'workflow_dispatch' }}
steps:
- name: 检出代码
uses: actions/checkout@v4
- name: 设置部署参数
id: deploy-params
run: |
if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then
echo "trigger_type=手动触发" >> $GITHUB_OUTPUT
echo "environment=${{ github.event.inputs.environment }}" >> $GITHUB_OUTPUT
echo "image_tag=${{ github.event.inputs.image_tag }}" >> $GITHUB_OUTPUT
echo "skip_tests=${{ github.event.inputs.skip_tests }}" >> $GITHUB_OUTPUT
else
echo "trigger_type=CI自动触发" >> $GITHUB_OUTPUT
echo "environment=production" >> $GITHUB_OUTPUT
echo "image_tag=latest" >> $GITHUB_OUTPUT
echo "skip_tests=false" >> $GITHUB_OUTPUT
fi
- name: 显示部署信息
run: |
echo "🚀 部署信息:"
echo " 触发方式: ${{ steps.deploy-params.outputs.trigger_type }}"
echo " 部署环境: ${{ steps.deploy-params.outputs.environment }}"
echo " 镜像标签: ${{ steps.deploy-params.outputs.image_tag }}"
echo " 跳过测试: ${{ steps.deploy-params.outputs.skip_tests }}"
- name: 设置Docker Buildx
uses: docker/setup-buildx-action@v3
- name: 登录私有镜像仓库
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ secrets.REGISTRY_USERNAME }}
password: ${{ secrets.REGISTRY_PASSWORD }}
- name: 提取镜像元数据
id: meta
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
tags: |
type=ref,event=branch
type=sha,prefix={{branch}}-
type=raw,value=${{ steps.deploy-params.outputs.image_tag }}
- name: 构建并推送Docker镜像
uses: docker/build-push-action@v5
with:
context: .
file: ./Dockerfile
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
cache-from: type=gha
cache-to: type=gha,mode=max
platforms: linux/amd64
- name: 部署到生产服务器
uses: appleboy/ssh-action@v1.0.3
with:
host: ${{ secrets.PRODUCTION_HOST }}
username: ${{ secrets.PRODUCTION_USER }}
key: ${{ secrets.PRODUCTION_SSH_KEY }}
port: ${{ secrets.PRODUCTION_PORT || 22 }}
script: |
echo "🚀 开始部署 DjangoBlog..."
# 检查kubectl是否可用
if ! command -v kubectl &> /dev/null; then
echo "❌ 错误: kubectl 未安装或不在PATH中"
exit 1
fi
# 检查命名空间是否存在
if ! kubectl get namespace ${{ env.NAMESPACE }} &> /dev/null; then
echo "❌ 错误: 命名空间 ${{ env.NAMESPACE }} 不存在"
exit 1
fi
# 更新deployment镜像
echo "📦 更新deployment镜像为: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.deploy-params.outputs.image_tag }}"
kubectl set image deployment/djangoblog \
djangoblog=${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.deploy-params.outputs.image_tag }} \
-n ${{ env.NAMESPACE }}
# 重启deployment
echo "🔄 重启deployment..."
kubectl -n ${{ env.NAMESPACE }} rollout restart deployment djangoblog
# 等待deployment完成
echo "⏳ 等待deployment完成..."
kubectl rollout status deployment/djangoblog -n ${{ env.NAMESPACE }} --timeout=300s
# 检查deployment状态
echo "✅ 检查deployment状态..."
kubectl get deployment djangoblog -n ${{ env.NAMESPACE }}
kubectl get pods -l app=djangoblog -n ${{ env.NAMESPACE }}
echo "🎉 部署完成!"
- name: 发送部署通知
if: always()
run: |
# 设置通知内容
if [ "${{ job.status }}" = "success" ]; then
TITLE="✅ DjangoBlog部署成功"
STATUS="成功"
else
TITLE="❌ DjangoBlog部署失败"
STATUS="失败"
fi
MESSAGE="部署状态: ${STATUS}
触发方式: ${{ steps.deploy-params.outputs.trigger_type }}
部署环境: ${{ steps.deploy-params.outputs.environment }}
镜像标签: ${{ steps.deploy-params.outputs.image_tag }}
提交者: ${{ github.actor }}
时间: $(date '+%Y-%m-%d %H:%M:%S')
查看详情: https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"
# 发送Server酱通知
if [ -n "${{ secrets.SERVERCHAN_KEY }}" ]; then
echo "{\"title\": \"${TITLE}\", \"desp\": \"${MESSAGE}\"}" > /tmp/serverchan.json
curl --location "https://sctapi.ftqq.com/${{ secrets.SERVERCHAN_KEY }}.send" \
--header "Content-Type: application/json" \
--data @/tmp/serverchan.json \
--silent > /dev/null
rm -f /tmp/serverchan.json
echo "📱 部署通知已发送"
fi

@ -1,371 +0,0 @@
name: Django CI
on:
push:
branches:
- master
- dev
paths-ignore:
- '**/*.md'
- '**/*.css'
- '**/*.js'
pull_request:
branches:
- master
- dev
paths-ignore:
- '**/*.md'
- '**/*.css'
- '**/*.js'
jobs:
test:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
include:
# 标准测试 - Python 3.10
- python-version: "3.10"
test-type: "standard"
database: "mysql"
elasticsearch: false
coverage: false
# 标准测试 - Python 3.11
- python-version: "3.11"
test-type: "standard"
database: "mysql"
elasticsearch: false
coverage: false
# 完整测试 - 包含ES和覆盖率
- python-version: "3.11"
test-type: "full"
database: "mysql"
elasticsearch: true
coverage: true
# Docker构建测试
- python-version: "3.11"
test-type: "docker"
database: "none"
elasticsearch: false
coverage: false
name: Test (${{ matrix.test-type }}, Python ${{ matrix.python-version }})
steps:
- name: Checkout代码
uses: actions/checkout@v4
- name: 设置测试信息
id: test-info
run: |
echo "test_name=${{ matrix.test-type }}-py${{ matrix.python-version }}" >> $GITHUB_OUTPUT
if [ "${{ matrix.test-type }}" = "docker" ]; then
echo "skip_python_setup=true" >> $GITHUB_OUTPUT
else
echo "skip_python_setup=false" >> $GITHUB_OUTPUT
fi
# MySQL数据库设置 (只有需要数据库的测试才执行)
- name: 启动MySQL数据库
if: matrix.database == 'mysql'
uses: samin/mysql-action@v1.3
with:
host port: 3306
container port: 3306
character set server: utf8mb4
collation server: utf8mb4_general_ci
mysql version: latest
mysql root password: root
mysql database: djangoblog
mysql user: root
mysql password: root
# Elasticsearch设置 (只有完整测试才执行)
- name: 配置系统参数 (ES)
if: matrix.elasticsearch == true
run: |
sudo swapoff -a
sudo sysctl -w vm.swappiness=1
sudo sysctl -w fs.file-max=262144
sudo sysctl -w vm.max_map_count=262144
- name: 启动Elasticsearch
if: matrix.elasticsearch == true
uses: miyataka/elasticsearch-github-actions@1
with:
stack-version: '7.12.1'
plugins: 'https://release.infinilabs.com/analysis-ik/stable/elasticsearch-analysis-ik-7.12.1.zip'
# Python环境设置 (Docker测试跳过)
- name: 设置Python ${{ matrix.python-version }}
if: steps.test-info.outputs.skip_python_setup == 'false'
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
cache-dependency-path: 'requirements.txt'
# 多层缓存策略优化
- name: 缓存Python依赖
if: steps.test-info.outputs.skip_python_setup == 'false'
uses: actions/cache@v4
with:
path: |
~/.cache/pip
.pytest_cache
key: ${{ runner.os }}-python-${{ matrix.python-version }}-${{ hashFiles('requirements.txt') }}-${{ hashFiles('**/pyproject.toml') }}
restore-keys: |
${{ runner.os }}-python-${{ matrix.python-version }}-${{ hashFiles('requirements.txt') }}-
${{ runner.os }}-python-${{ matrix.python-version }}-
${{ runner.os }}-python-
# Django缓存优化 (测试数据库等)
- name: 缓存Django资源
if: matrix.test-type != 'docker'
uses: actions/cache@v4
with:
path: |
.coverage*
htmlcov/
.django_cache/
key: ${{ runner.os }}-django-${{ matrix.test-type }}-${{ github.sha }}
restore-keys: |
${{ runner.os }}-django-${{ matrix.test-type }}-
${{ runner.os }}-django-
- name: 安装Python依赖
if: steps.test-info.outputs.skip_python_setup == 'false'
run: |
echo "📦 安装Python依赖 (Python ${{ matrix.python-version }})"
python -m pip install --upgrade pip setuptools wheel
# 安装基础依赖
pip install -r requirements.txt
# 根据测试类型安装额外依赖
if [ "${{ matrix.coverage }}" = "true" ]; then
echo "📊 安装覆盖率工具"
pip install coverage[toml]
fi
# 验证关键依赖
echo "🔍 验证关键依赖安装"
python -c "import django; print(f'Django version: {django.get_version()}')"
python -c "import MySQLdb; print('MySQL client: OK')" || python -c "import pymysql; print('PyMySQL client: OK')"
if [ "${{ matrix.elasticsearch }}" = "true" ]; then
python -c "import elasticsearch; print('Elasticsearch client: OK')"
fi
# Django环境准备
- name: 准备Django环境
if: matrix.test-type != 'docker'
env:
DJANGO_MYSQL_PASSWORD: root
DJANGO_MYSQL_HOST: 127.0.0.1
DJANGO_ELASTICSEARCH_HOST: ${{ matrix.elasticsearch && '127.0.0.1:9200' || '' }}
run: |
echo "🔧 准备Django测试环境"
# 等待数据库就绪
echo "⏳ 等待MySQL数据库启动..."
for i in {1..30}; do
if python -c "import MySQLdb; MySQLdb.connect(host='127.0.0.1', user='root', passwd='root', db='djangoblog')" 2>/dev/null; then
echo "✅ MySQL数据库连接成功"
break
fi
echo "🔄 等待数据库启动... ($i/30)"
sleep 2
done
# 等待Elasticsearch就绪 (如果启用)
if [ "${{ matrix.elasticsearch }}" = "true" ]; then
echo "⏳ 等待Elasticsearch启动..."
for i in {1..30}; do
if curl -s http://127.0.0.1:9200/_cluster/health | grep -q '"status":"green"\|"status":"yellow"'; then
echo "✅ Elasticsearch连接成功"
break
fi
echo "🔄 等待Elasticsearch启动... ($i/30)"
sleep 2
done
fi
# Django测试执行
- name: 执行数据库迁移
if: matrix.test-type != 'docker'
env:
DJANGO_MYSQL_PASSWORD: root
DJANGO_MYSQL_HOST: 127.0.0.1
DJANGO_ELASTICSEARCH_HOST: ${{ matrix.elasticsearch && '127.0.0.1:9200' || '' }}
run: |
echo "🗄️ 执行数据库迁移"
# 检查迁移文件
echo "📋 检查待应用的迁移..."
python manage.py showmigrations
# 检查是否有未创建的迁移
python manage.py makemigrations --check --verbosity 2
# 执行迁移
python manage.py migrate --verbosity 2
echo "✅ 数据库迁移完成"
- name: 运行Django测试
if: matrix.test-type != 'docker'
env:
DJANGO_MYSQL_PASSWORD: root
DJANGO_MYSQL_HOST: 127.0.0.1
DJANGO_ELASTICSEARCH_HOST: ${{ matrix.elasticsearch && '127.0.0.1:9200' || '' }}
run: |
echo "🧪 开始执行 ${{ matrix.test-type }} 测试 (Python ${{ matrix.python-version }})"
# 显示Django配置信息
python manage.py diffsettings | head -20
# 运行测试
if [ "${{ matrix.coverage }}" = "true" ]; then
echo "📊 运行测试并生成覆盖率报告"
coverage run --source='.' --omit='*/venv/*,*/migrations/*,*/tests/*,manage.py' manage.py test --verbosity=2
echo "📈 生成覆盖率报告"
coverage xml
coverage report --show-missing
coverage html
echo "📋 覆盖率统计:"
coverage report | tail -1
else
echo "🧪 运行标准测试"
python manage.py test --verbosity=2 --failfast
fi
echo "✅ 测试执行完成"
# 覆盖率报告上传 (只有完整测试才执行)
- name: 上传覆盖率到Codecov
if: matrix.coverage == true && success()
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
flags: unittests
name: codecov-${{ steps.test-info.outputs.test_name }}
fail_ci_if_error: false
verbose: true
- name: 上传覆盖率到Codecov (备用)
if: matrix.coverage == true && failure()
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
flags: unittests
name: codecov-${{ steps.test-info.outputs.test_name }}-fallback
fail_ci_if_error: false
verbose: true
# Docker构建测试
- name: 设置QEMU
if: matrix.test-type == 'docker'
uses: docker/setup-qemu-action@v3
- name: 设置Docker Buildx
if: matrix.test-type == 'docker'
uses: docker/setup-buildx-action@v3
- name: Docker构建测试
if: matrix.test-type == 'docker'
uses: docker/build-push-action@v5
with:
context: .
push: false
tags: djangoblog/djangoblog:test-${{ github.sha }}
cache-from: type=gha
cache-to: type=gha,mode=max
# 收集测试工件 (失败时收集调试信息)
- name: 收集测试工件
if: failure() && matrix.test-type != 'docker'
run: |
echo "🔍 收集测试失败的调试信息"
# 收集Django日志
if [ -d "logs" ]; then
echo "📄 Django日志文件:"
ls -la logs/
if [ -f "logs/djangoblog.log" ]; then
echo "🔍 最新日志内容:"
tail -100 logs/djangoblog.log
fi
fi
# 显示数据库状态
echo "🗄️ 数据库连接状态:"
python -c "
try:
from django.db import connection
cursor = connection.cursor()
cursor.execute('SELECT VERSION()')
print(f'MySQL版本: {cursor.fetchone()[0]}')
cursor.execute('SHOW TABLES')
tables = cursor.fetchall()
print(f'数据库表数量: {len(tables)}')
except Exception as e:
print(f'数据库连接错误: {e}')
" || true
# Elasticsearch状态 (如果启用)
if [ "${{ matrix.elasticsearch }}" = "true" ]; then
echo "🔍 Elasticsearch状态:"
curl -s http://127.0.0.1:9200/_cluster/health?pretty || true
fi
# 上传测试工件
- name: 上传覆盖率HTML报告
if: matrix.coverage == true && always()
uses: actions/upload-artifact@v4
with:
name: coverage-report-${{ steps.test-info.outputs.test_name }}
path: htmlcov/
retention-days: 30
# 性能统计
- name: 测试性能统计
if: always() && matrix.test-type != 'docker'
run: |
echo "⚡ 测试性能统计:"
echo " 开始时间: $(date -d '@${{ job.started_at }}' '+%Y-%m-%d %H:%M:%S' 2>/dev/null || echo '未知')"
echo " 当前时间: $(date '+%Y-%m-%d %H:%M:%S')"
# 系统资源使用情况
echo "💻 系统资源:"
echo " CPU使用: $(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)%"
echo " 内存使用: $(free -h | awk '/^Mem:/ {printf "%.1f%%", $3/$2 * 100}')"
echo " 磁盘使用: $(df -h / | awk 'NR==2{printf "%s", $5}')"
# 测试结果汇总
- name: 测试完成总结
if: always()
run: |
echo "📋 ============ 测试执行总结 ============"
echo " 🏷️ 测试类型: ${{ matrix.test-type }}"
echo " 🐍 Python版本: ${{ matrix.python-version }}"
echo " 🗄️ 数据库: ${{ matrix.database }}"
echo " 🔍 Elasticsearch: ${{ matrix.elasticsearch }}"
echo " 📊 覆盖率: ${{ matrix.coverage }}"
echo " ⚡ 状态: ${{ job.status }}"
echo " 📅 完成时间: $(date '+%Y-%m-%d %H:%M:%S')"
echo "============================================"
# 根据测试结果显示不同消息
if [ "${{ job.status }}" = "success" ]; then
echo "🎉 测试执行成功!"
else
echo "❌ 测试执行失败,请检查上面的日志"
fi

@ -0,0 +1,10 @@
[run]
source = .
include = *.py
omit =
*migrations*
*tests*
*.html
*whoosh_cn_backend*
*settings.py*
*venv*

@ -8,5 +8,4 @@ settings_production.py
*.md
docs/
logs/
static/
.github/
static/

@ -38,12 +38,10 @@ jobs:
uses: actions/checkout@v3
- name: Initialize CodeQL
uses: github/codeql-action/init@v3
with:
languages: python
uses: github/codeql-action/init@v2
- name: Autobuild
uses: github/codeql-action/autobuild@v3
uses: github/codeql-action/autobuild@v2
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v3
uses: github/codeql-action/analyze@v2

@ -0,0 +1,136 @@
name: Django CI
on:
push:
branches:
- master
- dev
paths-ignore:
- '**/*.md'
- '**/*.css'
- '**/*.js'
pull_request:
branches:
- master
- dev
paths-ignore:
- '**/*.md'
- '**/*.css'
- '**/*.js'
jobs:
build-normal:
runs-on: ubuntu-latest
strategy:
max-parallel: 4
matrix:
python-version: ["3.10","3.11" ]
steps:
- name: Start MySQL
uses: samin/mysql-action@v1.3
with:
host port: 3306
container port: 3306
character set server: utf8mb4
collation server: utf8mb4_general_ci
mysql version: latest
mysql root password: root
mysql database: djangoblog
mysql user: root
mysql password: root
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run Tests
env:
DJANGO_MYSQL_PASSWORD: root
DJANGO_MYSQL_HOST: 127.0.0.1
run: |
python manage.py makemigrations
python manage.py migrate
python manage.py test
build-with-es:
runs-on: ubuntu-latest
strategy:
max-parallel: 4
matrix:
python-version: ["3.10","3.11" ]
steps:
- name: Start MySQL
uses: samin/mysql-action@v1.3
with:
host port: 3306
container port: 3306
character set server: utf8mb4
collation server: utf8mb4_general_ci
mysql version: latest
mysql root password: root
mysql database: djangoblog
mysql user: root
mysql password: root
- name: Configure sysctl limits
run: |
sudo swapoff -a
sudo sysctl -w vm.swappiness=1
sudo sysctl -w fs.file-max=262144
sudo sysctl -w vm.max_map_count=262144
- uses: miyataka/elasticsearch-github-actions@1
with:
stack-version: '7.12.1'
plugins: 'https://release.infinilabs.com/analysis-ik/stable/elasticsearch-analysis-ik-7.12.1.zip'
- uses: actions/checkout@v3
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'
- name: Install Dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run Tests
env:
DJANGO_MYSQL_PASSWORD: root
DJANGO_MYSQL_HOST: 127.0.0.1
DJANGO_ELASTICSEARCH_HOST: 127.0.0.1:9200
run: |
python manage.py makemigrations
python manage.py migrate
coverage run manage.py test
coverage xml
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v1
docker:
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v2
- name: Set up QEMU
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Build and push
uses: docker/build-push-action@v3
with:
context: .
push: false
tags: djangoblog/djangoblog:dev

@ -22,19 +22,19 @@ jobs:
run: |
echo "DOCKER_TAG=latest" >> $GITHUB_ENV
- name: Checkout
uses: actions/checkout@v4
uses: actions/checkout@v3
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
uses: docker/setup-qemu-action@v2
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
uses: docker/setup-buildx-action@v2
- name: Login to DockerHub
uses: docker/login-action@v3
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push
uses: docker/build-push-action@v5
uses: docker/build-push-action@v3
with:
context: .
push: true

@ -62,6 +62,7 @@ target/
# http://www.jetbrains.com/pycharm/webhelp/project.html
.idea
.iml
static/
# virtualenv
venv/

@ -0,0 +1,158 @@
# DjangoBlog
<p align="center">
<a href="https://github.com/liangliangyy/DjangoBlog/actions/workflows/django.yml"><img src="https://github.com/liangliangyy/DjangoBlog/actions/workflows/django.yml/badge.svg" alt="Django CI"></a>
<a href="https://github.com/liangliangyy/DjangoBlog/actions/workflows/codeql-analysis.yml"><img src="https://github.com/liangliangyy/DjangoBlog/actions/workflows/codeql-analysis.yml/badge.svg" alt="CodeQL"></a>
<a href="https://codecov.io/gh/liangliangyy/DjangoBlog"><img src="https://codecov.io/gh/liangliangyy/DjangoBlog/branch/master/graph/badge.svg" alt="codecov"></a>
<a href="https://github.com/liangliangyy/DjangoBlog/blob/master/LICENSE"><img src="https://img.shields.io/github/license/liangliangyy/djangoblog.svg" alt="license"></a>
</p>
<p align="center">
<b>一款功能强大、设计优雅的现代化博客系统</b>
<br>
<a href="/docs/README-en.md">English</a><b>简体中文</b>
</p>
---
DjangoBlog 是一款基于 Python 3.10 和 Django 4.0 构建的高性能博客平台。它不仅提供了传统博客的所有核心功能还通过一个灵活的插件系统让您可以轻松扩展和定制您的网站。无论您是个人博主、技术爱好者还是内容创作者DjangoBlog 都旨在为您提供一个稳定、高效且易于维护的写作和发布环境。
## ✨ 特性亮点
- **强大的内容管理**: 支持文章、独立页面、分类和标签的完整管理。内置强大的 Markdown 编辑器,支持代码语法高亮。
- **全文搜索**: 集成搜索引擎,提供快速、精准的文章内容搜索。
- **互动评论系统**: 支持回复、邮件提醒等功能,评论内容同样支持 Markdown。
- **灵活的侧边栏**: 可自定义展示最新文章、最多阅读、标签云等模块。
- **社交化登录**: 内置 OAuth 支持,已集成 Google, GitHub, Facebook, 微博, QQ 等主流平台。
- **高性能缓存**: 原生支持 Redis 缓存,并提供自动刷新机制,确保网站高速响应。
- **SEO 友好**: 具备基础 SEO 功能,新内容发布后可自动通知 Google 和百度。
- **便捷的插件系统**: 通过创建独立的插件来扩展博客功能代码解耦易于维护。我们已经通过插件实现了文章浏览计数、SEO 优化等功能!
- **集成图床**: 内置简单的图床功能,方便图片上传和管理。
- **自动化前端**: 集成 `django-compressor`,自动压缩和优化 CSS 及 JavaScript 文件。
- **健壮的运维**: 内置网站异常邮件提醒和微信公众号管理功能。
## 🛠️ 技术栈
- **后端**: Python 3.10, Django 4.0
- **数据库**: MySQL, SQLite (可配置)
- **缓存**: Redis
- **前端**: HTML5, CSS3, JavaScript
- **搜索**: Whoosh, Elasticsearch (可配置)
- **编辑器**: Markdown (mdeditor)
## 🚀 快速开始
### 1. 环境准备
确保您的系统中已安装 Python 3.10+ 和 MySQL/MariaDB。
### 2. 克隆与安装
```bash
# 克隆项目到本地
git clone https://github.com/liangliangyy/DjangoBlog.git
cd DjangoBlog
# 安装依赖
pip install -r requirements.txt
```
### 3. 项目配置
- **数据库**:
打开 `djangoblog/settings.py` 文件,找到 `DATABASES` 配置项,修改为您的 MySQL 连接信息。
```python
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'djangoblog',
'USER': 'root',
'PASSWORD': 'your_password',
'HOST': '127.0.0.1',
'PORT': 3306,
}
}
```
在 MySQL 中创建数据库:
```sql
CREATE DATABASE `djangoblog` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```
- **更多配置**:
关于邮件发送、OAuth 登录、缓存等更多高级配置,请参阅我们的 [详细配置文档](/docs/config.md)。
### 4. 初始化数据库
```bash
python manage.py makemigrations
python manage.py migrate
# 创建一个超级管理员账户
python manage.py createsuperuser
```
### 5. 运行项目
```bash
# (可选) 生成一些测试数据
python manage.py create_testdata
# (可选) 收集和压缩静态文件
python manage.py collectstatic --noinput
python manage.py compress --force
# 启动开发服务器
python manage.py runserver
```
现在,在您的浏览器中访问 `http://127.0.0.1:8000/`,您应该能看到 DjangoBlog 的首页了!
## 部署
- **传统部署**: 我们为您准备了非常详细的 [服务器部署教程](https://www.lylinux.net/article/2019/8/5/58.html)。
- **Docker 部署**: 项目已全面支持 Docker。如果您熟悉容器化技术请参考 [Docker 部署文档](/docs/docker.md) 来快速启动。
- **Kubernetes 部署**: 我们也提供了完整的 [Kubernetes 部署指南](/docs/k8s.md),助您轻松上云。
## 🧩 插件系统
插件系统是 DjangoBlog 的核心特色之一。它允许您在不修改核心代码的情况下,通过编写独立的插件来为您的博客添加新功能。
- **工作原理**: 插件通过在预定义的“钩子”上注册回调函数来工作。例如,当一篇文章被渲染时,`after_article_body_get` 钩子会被触发,所有注册到此钩子的函数都会被执行。
- **现有插件**: `view_count`(浏览计数), `seo_optimizer`SEO优化等都是通过插件系统实现的。
- **开发您自己的插件**: 只需在 `plugins` 目录下创建一个新的文件夹,并编写您的 `plugin.py`。欢迎探索并为 DjangoBlog 社区贡献您的创意!
## 🤝 贡献指南
我们热烈欢迎任何形式的贡献!如果您有好的想法或发现了 Bug请随时提交 Issue 或 Pull Request。
## 📄 许可证
本项目基于 [MIT License](LICENSE) 开源。
---
## ❤️ 支持与赞助
如果您觉得这个项目对您有帮助,并且希望支持我继续维护和开发新功能,欢迎请我喝杯咖啡!您的每一份支持都是我前进的最大动力。
<p align="center">
<img src="/docs/imgs/alipay.jpg" width="150" alt="支付宝赞助">
<img src="/docs/imgs/wechat.jpg" width="150" alt="微信赞助">
</p>
<p align="center">
<i>(左) 支付宝 / (右) 微信</i>
</p>
## 🙏 鸣谢
特别感谢 **JetBrains** 为本项目提供的免费开源许可证。
<p align="center">
<a href="https://www.jetbrains.com/?from=DjangoBlog">
<img src="/docs/imgs/pycharm_logo.png" width="150" alt="JetBrains Logo">
</a>
</p>
---
> 如果本项目帮助到了你,请在[这里](https://github.com/liangliangyy/DjangoBlog/issues/214)留下你的网址,让更多的人看到。您的回复将会是我继续更新维护下去的动力。

@ -1,52 +1,80 @@
# admin.py - Django后台管理配置文件
# 导入Django表单模块
from django import forms
# 导入Django用户管理类
from django.contrib.auth.admin import UserAdmin
# 导入用户修改表单
from django.contrib.auth.forms import UserChangeForm
# 导入用户名字段
from django.contrib.auth.forms import UsernameField
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# Register your models here.
# 在这里注册模型
# 导入自定义的用户模型
from .models import BlogUser
# 自定义用户创建表单
class BlogUserCreationForm(forms.ModelForm):
# 密码字段1 - 用于输入密码
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput)
# 密码字段2 - 用于确认密码
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput)
class Meta:
# 指定关联的模型
model = BlogUser
# 表单包含的字段
fields = ('email',)
# 清理密码确认字段的方法
def clean_password2(self):
# Check that the two password entries match
# 从已清理的数据中获取两个密码字段的值
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
# 检查两个密码是否匹配
if password1 and password2 and password1 != password2:
raise forms.ValidationError(_("passwords do not match"))
return password2
# 保存用户的方法
def save(self, commit=True):
# Save the provided password in hashed format
# 调用父类的save方法但不立即提交到数据库
user = super().save(commit=False)
# 设置哈希后的密码
user.set_password(self.cleaned_data["password1"])
if commit:
# 设置用户来源为管理员站点
user.source = 'adminsite'
# 保存用户到数据库
user.save()
return user
# 自定义用户修改表单
class BlogUserChangeForm(UserChangeForm):
class Meta:
# 指定关联的模型
model = BlogUser
# 包含所有字段
fields = '__all__'
# 字段类型映射
field_classes = {'username': UsernameField}
# 初始化方法
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 自定义用户管理类
class BlogUserAdmin(UserAdmin):
# 指定修改表单
form = BlogUserChangeForm
# 指定创建表单
add_form = BlogUserCreationForm
# 列表页面显示的字段
list_display = (
'id',
'nickname',
@ -55,6 +83,7 @@ class BlogUserAdmin(UserAdmin):
'last_login',
'date_joined',
'source')
# 列表中可点击链接的字段
list_display_links = ('id', 'username')
ordering = ('-id',)
search_fields = ('username', 'nickname', 'email')
# 默认排序字段按ID降序
ordering = ('-id',)

@ -0,0 +1,10 @@
# apps.py - Django应用程序配置文件
# 导入Django应用配置基类
from django.apps import AppConfig
# 定义账户应用的配置类
class AccountsConfig(AppConfig):
# 指定应用程序的完整Python路径
name = 'accounts'

@ -1,47 +1,71 @@
# forms.py - Django表单定义文件
# 导入Django表单模块
from django import forms
# 导入用户模型相关函数和表单类
from django.contrib.auth import get_user_model, password_validation
from django.contrib.auth.forms import AuthenticationForm, UserCreationForm
# 导入验证错误异常
from django.core.exceptions import ValidationError
# 导入表单小部件
from django.forms import widgets
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 导入工具模块
from . import utils
# 导入用户模型
from .models import BlogUser
# 登录表单类
class LoginForm(AuthenticationForm):
# 初始化方法,设置表单字段的样式和属性
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
# 设置用户名字段的输入框样式
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# 设置密码字段的输入框样式
self.fields['password'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
# 注册表单类
class RegisterForm(UserCreationForm):
# 初始化方法,设置表单字段的样式和属性
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
# 设置用户名字段的输入框样式
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# 设置邮箱字段的输入框样式
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
# 设置密码字段的输入框样式
self.fields['password1'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
# 设置确认密码字段的输入框样式
self.fields['password2'].widget = widgets.PasswordInput(
attrs={'placeholder': "repeat password", "class": "form-control"})
# 清理邮箱字段的方法,检查邮箱是否已存在
def clean_email(self):
email = self.cleaned_data['email']
# 检查邮箱是否已被注册
if get_user_model().objects.filter(email=email).exists():
raise ValidationError(_("email already exists"))
return email
class Meta:
# 指定关联的用户模型
model = get_user_model()
# 表单包含的字段
fields = ("username", "email")
# 忘记密码表单类
class ForgetPasswordForm(forms.Form):
# 新密码字段
new_password1 = forms.CharField(
label=_("New password"),
widget=forms.PasswordInput(
@ -52,6 +76,7 @@ class ForgetPasswordForm(forms.Form):
),
)
# 确认新密码字段
new_password2 = forms.CharField(
label="确认密码",
widget=forms.PasswordInput(
@ -62,6 +87,7 @@ class ForgetPasswordForm(forms.Form):
),
)
# 邮箱字段
email = forms.EmailField(
label='邮箱',
widget=forms.TextInput(
@ -72,6 +98,7 @@ class ForgetPasswordForm(forms.Form):
),
)
# 验证码字段
code = forms.CharField(
label=_('Code'),
widget=forms.TextInput(
@ -82,17 +109,22 @@ class ForgetPasswordForm(forms.Form):
),
)
# 清理确认密码字段的方法
def clean_new_password2(self):
password1 = self.data.get("new_password1")
password2 = self.data.get("new_password2")
# 检查两个密码是否匹配
if password1 and password2 and password1 != password2:
raise ValidationError(_("passwords do not match"))
# 验证密码强度
password_validation.validate_password(password2)
return password2
# 清理邮箱字段的方法,检查邮箱是否存在
def clean_email(self):
user_email = self.cleaned_data.get("email")
# 检查邮箱是否已注册
if not BlogUser.objects.filter(
email=user_email
).exists():
@ -100,8 +132,10 @@ class ForgetPasswordForm(forms.Form):
raise ValidationError(_("email does not exist"))
return user_email
# 清理验证码字段的方法
def clean_code(self):
code = self.cleaned_data.get("code")
# 验证邮箱和验证码是否匹配
error = utils.verify(
email=self.cleaned_data.get("email"),
code=code,
@ -111,7 +145,9 @@ class ForgetPasswordForm(forms.Form):
return code
# 获取忘记密码验证码表单类
class ForgetPasswordCodeForm(forms.Form):
# 邮箱字段
email = forms.EmailField(
label=_('Email'),
)
)

@ -1,35 +1,58 @@
# models.py - Django数据模型定义文件
# 导入Django内置的用户抽象基类
from django.contrib.auth.models import AbstractUser
# 导入Django数据库模型
from django.db import models
# 导入URL反向解析函数
from django.urls import reverse
# 导入时间相关函数
from django.utils.timezone import now
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 导入工具函数获取当前站点
from djangoblog.utils import get_current_site
# Create your models here.
# 在这里创建模型
# 博客用户模型继承自Django的AbstractUser
class BlogUser(AbstractUser):
# 昵称字段最大长度100字符允许为空
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
# 创建时间字段,默认值为当前时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# 最后修改时间字段,默认值为当前时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 用户来源字段记录创建来源最大长度100字符允许为空
source = models.CharField(_('create source'), max_length=100, blank=True)
# 获取用户绝对URL的方法
def get_absolute_url(self):
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
# 对象的字符串表示方法,返回邮箱地址
def __str__(self):
return self.email
# 获取完整URL的方法包含域名
def get_full_url(self):
# 获取当前站点域名
site = get_current_site().domain
# 构建完整的URL
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
# 模型的元数据配置
class Meta:
# 默认按ID降序排列
ordering = ['-id']
# 单数形式的显示名称
verbose_name = _('user')
# 复数形式的显示名称(与单数相同)
verbose_name_plural = verbose_name
get_latest_by = 'id'
# 指定获取最新对象的字段
get_latest_by = 'id'

@ -1,135 +1,186 @@
# tests.py - Django测试用例文件
# 导入Django测试相关模块
from django.test import Client, RequestFactory, TestCase
# 导入URL反向解析
from django.urls import reverse
# 导入时区相关功能
from django.utils import timezone
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 导入账户相关模型
from accounts.models import BlogUser
# 导入博客相关模型
from blog.models import Article, Category
# 导入工具函数
from djangoblog.utils import *
# 导入当前应用的工具模块
from . import utils
# Create your tests here.
# 在这里创建测试用例
# 账户测试类
class AccountTest(TestCase):
# 测试前置设置方法
def setUp(self):
# 创建测试客户端
self.client = Client()
# 创建请求工厂
self.factory = RequestFactory()
# 创建测试用户
self.blog_user = BlogUser.objects.create_user(
username="test",
email="admin@admin.com",
password="12345678"
)
# 设置测试用的新密码
self.new_test = "xxx123--="
# 测试账户验证功能
def test_validate_account(self):
# 获取当前站点域名
site = get_current_site().domain
# 创建超级用户
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="qwer!@#$ggg")
# 获取刚创建的用户
testuser = BlogUser.objects.get(username='liangliangyy1')
# 测试登录功能
loginresult = self.client.login(
username='liangliangyy1',
password='qwer!@#$ggg')
# 断言登录成功
self.assertEqual(loginresult, True)
# 测试访问管理员页面
response = self.client.get('/admin/')
self.assertEqual(response.status_code, 200)
# 创建测试分类
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
# 创建测试文章
article = Article()
article.title = "nicetitleaaa"
article.body = "nicecontentaaa"
article.author = user
article.category = category
article.type = 'a'
article.status = 'p'
article.type = 'a' # 文章类型
article.status = 'p' # 发布状态
article.save()
# 测试访问文章管理页面
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
# 测试用户注册功能
def test_validate_register(self):
# 断言邮箱不存在
self.assertEquals(
0, len(
BlogUser.objects.filter(
email='user123@user.com')))
# 发送注册请求
response = self.client.post(reverse('account:register'), {
'username': 'user1233',
'email': 'user123@user.com',
'password1': 'password123!q@wE#R$T',
'password2': 'password123!q@wE#R$T',
})
# 断言用户创建成功
self.assertEquals(
1, len(
BlogUser.objects.filter(
email='user123@user.com')))
# 获取新创建的用户
user = BlogUser.objects.filter(email='user123@user.com')[0]
# 生成验证签名
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
path = reverse('accounts:result')
# 构建验证URL
url = '{path}?type=validation&id={id}&sign={sign}'.format(
path=path, id=user.id, sign=sign)
# 测试验证页面
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# 测试用户登录
self.client.login(username='user1233', password='password123!q@wE#R$T')
user = BlogUser.objects.filter(email='user123@user.com')[0]
# 设置用户为超级用户和管理员
user.is_superuser = True
user.is_staff = True
user.save()
# 清理侧边栏缓存
delete_sidebar_cache()
# 创建分类
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
# 创建文章
article = Article()
article.category = category
article.title = "nicetitle333"
article.body = "nicecontentttt"
article.author = user
article.type = 'a'
article.status = 'p'
article.save()
# 测试访问文章管理页面
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
# 测试退出登录
response = self.client.get(reverse('account:logout'))
self.assertIn(response.status_code, [301, 302, 200])
# 测试退出后访问管理页面(应该重定向)
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
# 测试错误密码登录
response = self.client.post(reverse('account:login'), {
'username': 'user1233',
'password': 'password123'
})
self.assertIn(response.status_code, [301, 302, 200])
# 测试登录后访问管理页面
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
# 测试邮箱验证码功能
def test_verify_email_code(self):
to_email = "admin@admin.com"
# 生成验证码
code = generate_code()
# 设置验证码
utils.set_code(to_email, code)
# 发送验证邮件
utils.send_verify_email(to_email, code)
# 测试正确验证码
err = utils.verify("admin@admin.com", code)
self.assertEqual(err, None)
# 测试错误邮箱
err = utils.verify("admin@123.com", code)
self.assertEqual(type(err), str)
# 测试成功发送忘记密码验证码
def test_forget_password_email_code_success(self):
resp = self.client.post(
path=reverse("account:forget_password_code"),
@ -139,33 +190,40 @@ class AccountTest(TestCase):
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.content.decode("utf-8"), "ok")
# 测试发送忘记密码验证码失败情况
def test_forget_password_email_code_fail(self):
# 测试空数据
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict()
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
# 测试错误邮箱格式
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@com")
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
# 测试成功重置密码
def test_forget_password_email_success(self):
# 生成并设置验证码
code = generate_code()
utils.set_code(self.blog_user.email, code)
# 准备重置密码数据
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code=code,
)
# 发送重置密码请求
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.status_code, 302) # 重定向响应
# 验证用户密码是否修改成功
blog_user = BlogUser.objects.filter(
@ -174,6 +232,7 @@ class AccountTest(TestCase):
self.assertNotEqual(blog_user, None)
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
# 测试重置密码时邮箱不存在的情况
def test_forget_password_email_not_user(self):
data = dict(
new_password1=self.new_test,
@ -188,7 +247,7 @@ class AccountTest(TestCase):
self.assertEqual(resp.status_code, 200)
# 测试重置密码时验证码错误的情况
def test_forget_password_email_code_error(self):
code = generate_code()
utils.set_code(self.blog_user.email, code)
@ -196,12 +255,11 @@ class AccountTest(TestCase):
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code="111111",
code="111111", # 错误的验证码
)
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)

@ -0,0 +1,54 @@
# urls.py - Django URL路由配置文件
# 导入Django URL路由相关函数
from django.urls import path
from django.urls import re_path
# 导入当前应用的视图模块
from . import views
# 导入登录表单类
from .forms import LoginForm
# 定义应用命名空间用于URL反向解析
app_name = "accounts"
# URL模式列表 - 定义URL路径与视图的映射关系
urlpatterns = [
# 登录URL - 使用正则表达式匹配路径
re_path(r'^login/$',
# 使用类视图,设置登录成功后跳转到首页
views.LoginView.as_view(success_url='/'),
name='login', # URL名称用于反向解析
# 传递额外参数,指定认证表单类
kwargs={'authentication_form': LoginForm}),
# 注册URL - 使用正则表达式匹配路径
re_path(r'^register/$',
# 使用类视图,设置注册成功后跳转到首页
views.RegisterView.as_view(success_url="/"),
name='register'), # URL名称用于反向解析
# 退出登录URL - 使用正则表达式匹配路径
re_path(r'^logout/$',
# 使用类视图
views.LogoutView.as_view(),
name='logout'), # URL名称用于反向解析
# 账户操作结果页面URL - 使用path匹配精确路径
path(r'account/result.html',
# 使用函数视图
views.account_result,
name='result'), # URL名称用于反向解析
# 忘记密码URL - 使用正则表达式匹配路径
re_path(r'^forget_password/$',
# 使用类视图
views.ForgetPasswordView.as_view(),
name='forget_password'), # URL名称用于反向解析
# 获取忘记密码验证码URL - 使用正则表达式匹配路径
re_path(r'^forget_password_code/$',
# 使用类视图
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'), # URL名称用于反向解析
]

@ -0,0 +1,42 @@
# user_login_backend.py - 自定义用户认证后端
# 导入获取用户模型的函数
from django.contrib.auth import get_user_model
# 导入Django模型认证后端基类
from django.contrib.auth.backends import ModelBackend
# 自定义认证后端类,支持邮箱或用户名登录
class EmailOrUsernameModelBackend(ModelBackend):
"""
允许使用用户名或邮箱登录的自定义认证后端
"""
# 用户认证方法
def authenticate(self, request, username=None, password=None, **kwargs):
# 判断输入的是邮箱还是用户名
if '@' in username:
# 如果包含@符号,按邮箱处理
kwargs = {'email': username}
else:
# 否则按用户名处理
kwargs = {'username': username}
try:
# 根据用户名或邮箱获取用户
user = get_user_model().objects.get(**kwargs)
# 验证密码是否正确
if user.check_password(password):
return user # 认证成功,返回用户对象
except get_user_model().DoesNotExist:
# 用户不存在返回None
return None
# 根据用户ID获取用户的方法
def get_user(self, username):
try:
# 根据主键用户ID获取用户
return get_user_model().objects.get(pk=username)
except get_user_model().DoesNotExist:
# 用户不存在返回None
return None

@ -0,0 +1,71 @@
# utils.py - 工具函数模块,处理验证码相关功能
# 导入类型提示模块
import typing
# 导入时间间隔类
from datetime import timedelta
# 导入Django缓存模块
from django.core.cache import cache
# 导入国际化翻译函数
from django.utils.translation import gettext
from django.utils.translation import gettext_lazy as _
# 导入发送邮件工具函数
from djangoblog.utils import send_email
# 验证码有效期设置为5分钟
_code_ttl = timedelta(minutes=5)
def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")):
"""发送验证邮件
Args:
to_mail: 接收邮箱地址
subject: 邮件主题默认为"验证邮件"
code: 验证码
"""
# 构建邮件HTML内容包含验证码信息
html_content = _(
"You are resetting the password, the verification code is%(code)s, valid within 5 minutes, please keep it "
"properly") % {'code': code}
# 调用发送邮件函数
send_email([to_mail], subject, html_content)
def verify(email: str, code: str) -> typing.Optional[str]:
"""验证验证码是否有效
Args:
email: 请求验证的邮箱地址
code: 用户输入的验证码
Return:
如果验证失败返回错误信息字符串成功返回None
Note:
这里的错误处理不太合理应该采用raise抛出异常
否则调用方也需要对error进行处理
"""
# 从缓存中获取该邮箱对应的验证码
cache_code = get_code(email)
# 比较用户输入的验证码和缓存中的验证码
if cache_code != code:
return gettext("Verification code error")
def set_code(email: str, code: str):
"""设置验证码到缓存中
Args:
email: 邮箱地址作为缓存的key
code: 验证码作为缓存的value
"""
# 将验证码存入缓存设置过期时间为5分钟
cache.set(email, code, _code_ttl.seconds)
def get_code(email: str) -> typing.Optional[str]:
"""从缓存中获取验证码
Args:
email: 邮箱地址作为缓存的key
Return:
返回验证码字符串如果不存在则返回None
"""
return cache.get(email)

@ -1,59 +1,89 @@
# views.py - Django视图文件处理用户账户相关请求
# 导入日志模块
import logging
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 导入Django设置
from django.conf import settings
# 导入Django认证相关模块
from django.contrib import auth
from django.contrib.auth import REDIRECT_FIELD_NAME
from django.contrib.auth import get_user_model
from django.contrib.auth import logout
from django.contrib.auth.forms import AuthenticationForm
from django.contrib.auth.hashers import make_password
# 导入HTTP响应类
from django.http import HttpResponseRedirect, HttpResponseForbidden
from django.http.request import HttpRequest
from django.http.response import HttpResponse
# 导入快捷函数
from django.shortcuts import get_object_or_404
from django.shortcuts import render
# 导入URL反向解析
from django.urls import reverse
# 导入方法装饰器
from django.utils.decorators import method_decorator
from django.utils.http import url_has_allowed_host_and_scheme
# 导入基于类的视图
from django.views import View
from django.views.decorators.cache import never_cache
from django.views.decorators.csrf import csrf_protect
from django.views.decorators.debug import sensitive_post_parameters
from django.views.generic import FormView, RedirectView
# 导入工具函数
from djangoblog.utils import send_email, get_sha256, get_current_site, generate_code, delete_sidebar_cache
# 导入当前应用的工具模块
from . import utils
# 导入表单类
from .forms import RegisterForm, LoginForm, ForgetPasswordForm, ForgetPasswordCodeForm
# 导入用户模型
from .models import BlogUser
# 获取日志记录器
logger = logging.getLogger(__name__)
# Create your views here.
# 在这里创建视图
# 用户注册视图
class RegisterView(FormView):
# 指定使用的表单类
form_class = RegisterForm
# 指定模板文件
template_name = 'account/registration_form.html'
# 使用CSRF保护装饰器
@method_decorator(csrf_protect)
def dispatch(self, *args, **kwargs):
return super(RegisterView, self).dispatch(*args, **kwargs)
# 表单验证通过后的处理
def form_valid(self, form):
if form.is_valid():
# 保存用户但不提交到数据库
user = form.save(False)
# 设置用户为非激活状态(需要邮箱验证)
user.is_active = False
# 设置用户来源
user.source = 'Register'
# 保存用户到数据库
user.save(True)
# 获取当前站点域名
site = get_current_site().domain
# 生成验证签名
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
# 调试模式下使用本地地址
if settings.DEBUG:
site = '127.0.0.1:8000'
# 构建验证URL
path = reverse('account:result')
url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format(
site=site, path=path, id=user.id, sign=sign)
# 构建邮件内容
content = """
<p>请点击下面链接验证您的邮箱</p>
@ -64,6 +94,7 @@ class RegisterView(FormView):
如果上面链接无法打开请将此链接复制至浏览器
{url}
""".format(url=url)
# 发送验证邮件
send_email(
emailto=[
user.email,
@ -71,43 +102,59 @@ class RegisterView(FormView):
title='验证您的电子邮箱',
content=content)
# 重定向到结果页面
url = reverse('accounts:result') + \
'?type=register&id=' + str(user.id)
return HttpResponseRedirect(url)
else:
# 表单验证失败,重新渲染表单
return self.render_to_response({
'form': form
})
# 用户退出登录视图
class LogoutView(RedirectView):
# 退出后重定向的URL
url = '/login/'
# 禁用缓存
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
return super(LogoutView, self).dispatch(request, *args, **kwargs)
# 处理GET请求
def get(self, request, *args, **kwargs):
# 执行退出登录操作
logout(request)
# 清理侧边栏缓存
delete_sidebar_cache()
return super(LogoutView, self).get(request, *args, **kwargs)
# 用户登录视图
class LoginView(FormView):
# 指定使用的表单类
form_class = LoginForm
# 指定模板文件
template_name = 'account/login.html'
# 登录成功后的默认重定向URL
success_url = '/'
# 重定向字段名
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
# 登录会话有效期(一个月)
login_ttl = 2626560
# 使用多个装饰器保护敏感操作
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
return super(LoginView, self).dispatch(request, *args, **kwargs)
# 获取上下文数据
def get_context_data(self, **kwargs):
# 获取重定向URL
redirect_to = self.request.GET.get(self.redirect_field_name)
if redirect_to is None:
redirect_to = '/'
@ -115,26 +162,33 @@ class LoginView(FormView):
return super(LoginView, self).get_context_data(**kwargs)
# 表单验证通过后的处理
def form_valid(self, form):
form = AuthenticationForm(data=self.request.POST, request=self.request)
if form.is_valid():
# 清理侧边栏缓存
delete_sidebar_cache()
# 记录日志
logger.info(self.redirect_field_name)
# 执行登录操作
auth.login(self.request, form.get_user())
# 如果用户选择"记住我",设置会话有效期
if self.request.POST.get("remember"):
self.request.session.set_expiry(self.login_ttl)
return super(LoginView, self).form_valid(form)
# return HttpResponseRedirect('/')
else:
# 表单验证失败,重新渲染表单
return self.render_to_response({
'form': form
})
# 获取登录成功后的重定向URL
def get_success_url(self):
# 从POST数据中获取重定向URL
redirect_to = self.request.POST.get(self.redirect_field_name)
# 检查URL是否安全
if not url_has_allowed_host_and_scheme(
url=redirect_to, allowed_hosts=[
self.request.get_host()]):
@ -142,63 +196,91 @@ class LoginView(FormView):
return redirect_to
# 账户操作结果页面视图函数
def account_result(request):
# 获取操作类型和用户ID
type = request.GET.get('type')
id = request.GET.get('id')
# 获取用户对象如果不存在返回404
user = get_object_or_404(get_user_model(), id=id)
logger.info(type)
# 如果用户已激活,重定向到首页
if user.is_active:
return HttpResponseRedirect('/')
# 处理注册和验证类型
if type and type in ['register', 'validation']:
if type == 'register':
# 注册成功页面内容
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
title = '注册成功'
else:
# 验证邮箱签名
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
sign = request.GET.get('sign')
# 签名不匹配返回403禁止访问
if sign != c_sign:
return HttpResponseForbidden()
# 激活用户账户
user.is_active = True
user.save()
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
title = '验证成功'
# 渲染结果页面
return render(request, 'account/result.html', {
'title': title,
'content': content
})
else:
# 无效类型,重定向到首页
return HttpResponseRedirect('/')
# 忘记密码视图
class ForgetPasswordView(FormView):
# 指定使用的表单类
form_class = ForgetPasswordForm
# 指定模板文件
template_name = 'account/forget_password.html'
# 表单验证通过后的处理
def form_valid(self, form):
if form.is_valid():
# 根据邮箱获取用户
blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get()
# 设置新密码(自动哈希)
blog_user.password = make_password(form.cleaned_data["new_password2"])
# 保存用户
blog_user.save()
# 重定向到登录页面
return HttpResponseRedirect('/login/')
else:
# 表单验证失败,重新渲染表单
return self.render_to_response({'form': form})
# 忘记密码验证码发送视图
class ForgetPasswordEmailCode(View):
# 处理POST请求
def post(self, request: HttpRequest):
# 初始化表单
form = ForgetPasswordCodeForm(request.POST)
# 表单验证
if not form.is_valid():
return HttpResponse("错误的邮箱")
# 获取邮箱地址
to_email = form.cleaned_data["email"]
# 生成验证码
code = generate_code()
# 发送验证邮件
utils.send_verify_email(to_email, code)
# 保存验证码到缓存
utils.set_code(to_email, code)
return HttpResponse("ok")
return HttpResponse("ok")

@ -6,7 +6,7 @@ from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
# Register your models here.
from .models import Article, Category, Tag, Links, SideBar, BlogSettings
from .models import Article
class ArticleForm(forms.ModelForm):
@ -17,18 +17,22 @@ class ArticleForm(forms.ModelForm):
fields = '__all__'
#xjh管理员动作函数 - 发布选中的文章
def makr_article_publish(modeladmin, request, queryset):
queryset.update(status='p')
#xjh管理员动作函数 - 将选中的文章设为草稿
def draft_article(modeladmin, request, queryset):
queryset.update(status='d')
#xjh管理员动作函数 - 关闭文章评论
def close_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='c')
#xjh管理员动作函数 - 打开文章评论
def open_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='o')
@ -40,6 +44,7 @@ open_article_commentstatus.short_description = _('Open article comments')
class ArticlelAdmin(admin.ModelAdmin):
"""xjh文章模型的后台管理配置"""
list_per_page = 20
search_fields = ('body', 'title')
form = ArticleForm
@ -55,7 +60,6 @@ class ArticlelAdmin(admin.ModelAdmin):
'article_order')
list_display_links = ('id', 'title')
list_filter = ('status', 'type', 'category')
date_hierarchy = 'creation_time'
filter_horizontal = ('tags',)
exclude = ('creation_time', 'last_modify_time')
view_on_site = True
@ -64,9 +68,9 @@ class ArticlelAdmin(admin.ModelAdmin):
draft_article,
close_article_commentstatus,
open_article_commentstatus]
raw_id_fields = ('author', 'category',)
def link_to_category(self, obj):
"""xjh在文章列表显示分类链接"""
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
@ -74,15 +78,18 @@ class ArticlelAdmin(admin.ModelAdmin):
link_to_category.short_description = _('category')
def get_form(self, request, obj=None, **kwargs):
"""xjh限制作者字段只能选择超级用户"""
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
"""xjh保存文章模型"""
super(ArticlelAdmin, self).save_model(request, obj, form, change)
def get_view_on_site_url(self, obj=None):
"""xjh获取文章在前台的URL"""
if obj:
url = obj.get_full_url()
return url
@ -93,22 +100,27 @@ class ArticlelAdmin(admin.ModelAdmin):
class TagAdmin(admin.ModelAdmin):
"""xjh标签模型的后台管理配置"""
exclude = ('slug', 'last_mod_time', 'creation_time')
class CategoryAdmin(admin.ModelAdmin):
"""xjh分类模型的后台管理配置"""
list_display = ('name', 'parent_category', 'index')
exclude = ('slug', 'last_mod_time', 'creation_time')
class LinksAdmin(admin.ModelAdmin):
"""xjh友情链接模型的后台管理配置"""
exclude = ('last_mod_time', 'creation_time')
class SideBarAdmin(admin.ModelAdmin):
"""xjh侧边栏模型的后台管理配置"""
list_display = ('name', 'content', 'is_enable', 'sequence')
exclude = ('last_mod_time', 'creation_time')
class BlogSettingsAdmin(admin.ModelAdmin):
pass
"""xjh博客设置模型的后台管理配置"""
pass

@ -2,4 +2,5 @@ from django.apps import AppConfig
class BlogConfig(AppConfig):
name = 'blog'
"""xjh博客应用的配置类"""
name = 'blog'

@ -9,6 +9,7 @@ logger = logging.getLogger(__name__)
def seo_processor(requests):
"""xjhSEO上下文处理器为模板提供SEO相关变量"""
key = 'seo_processor'
value = cache.get(key)
if value:
@ -16,6 +17,7 @@ def seo_processor(requests):
else:
logger.info('set processor cache.')
setting = get_blog_setting()
#xjh构建包含SEO和网站配置的上下文字典
value = {
'SITE_NAME': setting.site_name,
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense,
@ -39,5 +41,6 @@ def seo_processor(requests):
"GLOBAL_FOOTER": setting.global_footer,
"COMMENT_NEED_REVIEW": setting.comment_need_review,
}
#xjh缓存上下文数据10小时
cache.set(key, value, 60 * 60 * 10)
return value
return value

@ -7,9 +7,11 @@ from elasticsearch_dsl.connections import connections
from blog.models import Article
#xjh检查是否启用了Elasticsearch
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
if ELASTICSEARCH_ENABLED:
#xjh创建Elasticsearch连接
connections.create_connection(
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
from elasticsearch import Elasticsearch
@ -21,6 +23,7 @@ if ELASTICSEARCH_ENABLED:
try:
c.get_pipeline('geoip')
except elasticsearch.exceptions.NotFoundError:
#xjh创建geoip处理管道用于IP地址地理位置解析
c.put_pipeline('geoip', body='''{
"description" : "Add geoip info",
"processors" : [
@ -34,6 +37,7 @@ if ELASTICSEARCH_ENABLED:
class GeoIp(InnerDoc):
"""xjhIP地理位置信息文档结构"""
continent_name = Keyword()
country_iso_code = Keyword()
country_name = Keyword()
@ -41,21 +45,25 @@ class GeoIp(InnerDoc):
class UserAgentBrowser(InnerDoc):
"""xjh用户代理浏览器信息文档结构"""
Family = Keyword()
Version = Keyword()
class UserAgentOS(UserAgentBrowser):
"""xjh用户代理操作系统信息文档结构"""
pass
class UserAgentDevice(InnerDoc):
"""xjh用户代理设备信息文档结构"""
Family = Keyword()
Brand = Keyword()
Model = Keyword()
class UserAgent(InnerDoc):
"""xjh完整的用户代理信息文档结构"""
browser = Object(UserAgentBrowser, required=False)
os = Object(UserAgentOS, required=False)
device = Object(UserAgentDevice, required=False)
@ -64,6 +72,7 @@ class UserAgent(InnerDoc):
class ElapsedTimeDocument(Document):
"""xjh页面加载耗时记录文档"""
url = Keyword()
time_taken = Long()
log_datetime = Date()
@ -83,8 +92,10 @@ class ElapsedTimeDocument(Document):
class ElaspedTimeDocumentManager:
"""xjh耗时文档管理器"""
@staticmethod
def build_index():
"""xjh构建性能索引"""
from elasticsearch import Elasticsearch
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
res = client.indices.exists(index="performance")
@ -93,12 +104,14 @@ class ElaspedTimeDocumentManager:
@staticmethod
def delete_index():
"""xjh删除性能索引"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='performance', ignore=[400, 404])
@staticmethod
def create(url, time_taken, log_datetime, useragent, ip):
"""xjh创建耗时记录"""
ElaspedTimeDocumentManager.build_index()
ua = UserAgent()
ua.browser = UserAgentBrowser()
@ -116,6 +129,7 @@ class ElaspedTimeDocumentManager:
ua.string = useragent.ua_string
ua.is_bot = useragent.is_bot
#xjh使用当前时间戳作为文档ID
doc = ElapsedTimeDocument(
meta={
'id': int(
@ -127,10 +141,11 @@ class ElaspedTimeDocumentManager:
time_taken=time_taken,
log_datetime=log_datetime,
useragent=ua, ip=ip)
doc.save(pipeline="geoip")
doc.save(pipeline="geoip") #xjh保存文档并通过geoip管道处理
class ArticleDocument(Document):
"""xjh文章搜索文档结构"""
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
author = Object(properties={
@ -165,19 +180,23 @@ class ArticleDocument(Document):
class ArticleDocumentManager():
"""xjh文章文档管理器"""
def __init__(self):
self.create_index()
def create_index(self):
"""xjh创建文章索引"""
ArticleDocument.init()
def delete_index(self):
"""xjh删除文章索引"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='blog', ignore=[400, 404])
def convert_to_doc(self, articles):
"""xjh将文章对象转换为搜索文档"""
return [
ArticleDocument(
meta={
@ -202,6 +221,7 @@ class ArticleDocumentManager():
article_order=article.article_order) for article in articles]
def rebuild(self, articles=None):
"""xjh重建文章索引"""
ArticleDocument.init()
articles = articles if articles else Article.objects.all()
docs = self.convert_to_doc(articles)
@ -209,5 +229,6 @@ class ArticleDocumentManager():
doc.save()
def update_docs(self, docs):
"""xjh更新文档"""
for doc in docs:
doc.save()
doc.save()

@ -7,13 +7,15 @@ logger = logging.getLogger(__name__)
class BlogSearchForm(SearchForm):
"""xjh博客搜索表单"""
querydata = forms.CharField(required=True)
def search(self):
"""xjh执行搜索"""
datas = super(BlogSearchForm, self).search()
if not self.is_valid():
return self.no_query_found()
if self.cleaned_data['querydata']:
logger.info(self.cleaned_data['querydata'])
return datas
return datas

@ -10,14 +10,16 @@ logger = logging.getLogger(__name__)
class OnlineMiddleware(object):
"""xjh在线中间件用于记录页面加载时间和用户访问信息"""
def __init__(self, get_response=None):
self.get_response = get_response
super().__init__()
def __call__(self, request):
''' page render time '''
''' xjh页面渲染时间统计 '''
start_time = time.time()
response = self.get_response(request)
#xjh获取用户IP和User-Agent信息
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
ip, _ = get_client_ip(request)
user_agent = parse(http_user_agent)
@ -25,6 +27,7 @@ class OnlineMiddleware(object):
try:
cast_time = time.time() - start_time
if ELASTICSEARCH_ENABLED:
#xjh记录页面加载耗时到Elasticsearch
time_taken = round((cast_time) * 1000, 2)
url = request.path
from django.utils import timezone
@ -34,9 +37,10 @@ class OnlineMiddleware(object):
log_datetime=timezone.now(),
useragent=user_agent,
ip=ip)
#xjh在页面内容中替换加载时间占位符
response.content = response.content.replace(
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5]))
except Exception as e:
logger.error("Error OnlineMiddleware: %s" % e)
return response
return response

@ -18,6 +18,7 @@ logger = logging.getLogger(__name__)
class LinkShowType(models.TextChoices):
"""xjh链接显示类型选择"""
I = ('i', _('index'))
L = ('l', _('list'))
P = ('p', _('post'))
@ -26,17 +27,20 @@ class LinkShowType(models.TextChoices):
class BaseModel(models.Model):
"""xjh基础模型类包含公共字段"""
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
def save(self, *args, **kwargs):
#xjh特殊处理文章浏览量更新
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
#xjh自动生成slug字段
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
@ -45,6 +49,7 @@ class BaseModel(models.Model):
super().save(*args, **kwargs)
def get_full_url(self):
"""xjh获取完整URL"""
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
@ -55,11 +60,12 @@ class BaseModel(models.Model):
@abstractmethod
def get_absolute_url(self):
"""xjh抽象方法获取绝对URL"""
pass
class Article(BaseModel):
"""文章"""
"""xjh文章模型"""
STATUS_CHOICES = (
('d', _('Draft')),
('p', _('Published')),
@ -118,6 +124,7 @@ class Article(BaseModel):
get_latest_by = 'id'
def get_absolute_url(self):
"""xjh获取文章绝对URL"""
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
@ -127,6 +134,7 @@ class Article(BaseModel):
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""xjh获取分类树"""
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
@ -136,10 +144,12 @@ class Article(BaseModel):
super().save(*args, **kwargs)
def viewed(self):
"""xjh增加文章浏览量"""
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
"""xjh获取文章评论列表"""
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
@ -152,23 +162,24 @@ class Article(BaseModel):
return comments
def get_admin_url(self):
"""xjh获取文章管理后台URL"""
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
def next_article(self):
# 下一篇
"""xjh获取下一篇文章"""
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
# 前一篇
"""xjh获取上一篇文章"""
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
Get the first image url from article.body.
xjh从文章内容中提取第一张图片URL
:return:
"""
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
@ -178,7 +189,7 @@ class Article(BaseModel):
class Category(BaseModel):
"""文章分类"""
"""xjh文章分类模型"""
name = models.CharField(_('category name'), max_length=30, unique=True)
parent_category = models.ForeignKey(
'self',
@ -205,7 +216,7 @@ class Category(BaseModel):
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
递归获得分类目录的父级
xjh递归获得分类目录的父级
:return:
"""
categorys = []
@ -221,7 +232,7 @@ class Category(BaseModel):
@cache_decorator(60 * 60 * 10)
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
xjh获得当前分类目录所有子集
:return:
"""
categorys = []
@ -241,7 +252,7 @@ class Category(BaseModel):
class Tag(BaseModel):
"""文章标签"""
"""xjh文章标签模型"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
@ -253,6 +264,7 @@ class Tag(BaseModel):
@cache_decorator(60 * 60 * 10)
def get_article_count(self):
"""xjh获取标签下的文章数量"""
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
@ -262,7 +274,7 @@ class Tag(BaseModel):
class Links(models.Model):
"""友情链接"""
"""xjh友情链接模型"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
@ -287,7 +299,7 @@ class Links(models.Model):
class SideBar(models.Model):
"""侧边栏,可以展示一些html内容"""
"""xjh侧边栏模型可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
@ -305,7 +317,7 @@ class SideBar(models.Model):
class BlogSettings(models.Model):
"""blog的配置"""
"""xjh博客设置模型"""
site_name = models.CharField(
_('site name'),
max_length=200,
@ -367,10 +379,12 @@ class BlogSettings(models.Model):
return self.site_name
def clean(self):
"""xjh验证只能有一个博客配置实例"""
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
"""xjh保存时清除缓存"""
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()
cache.clear()

@ -4,10 +4,13 @@ from blog.models import Article
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
"""xjh文章搜索索引配置"""
text = indexes.CharField(document=True, use_template=True)
def get_model(self):
"""xjh指定搜索模型"""
return Article
def index_queryset(self, using=None):
return self.get_model().objects.filter(status='p')
"""xjh指定索引查询集只索引已发布的文章"""
return self.get_model().objects.filter(status='p')

@ -51,75 +51,7 @@ def datetimeformat(data):
@register.filter()
@stringfilter
def custom_markdown(content):
"""
通用markdown过滤器应用文章内容插件
主要用于文章内容处理
"""
html_content = CommonMarkdown.get_markdown(content)
# 然后应用插件过滤器优化HTML
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
optimized_html = hooks.apply_filters(ARTICLE_CONTENT_HOOK_NAME, html_content)
return mark_safe(optimized_html)
@register.filter()
@stringfilter
def sidebar_markdown(content):
html_content = CommonMarkdown.get_markdown(content)
return mark_safe(html_content)
@register.simple_tag(takes_context=True)
def render_article_content(context, article, is_summary=False):
"""
渲染文章内容包含完整的上下文信息供插件使用
Args:
context: 模板上下文
article: 文章对象
is_summary: 是否为摘要模式首页使用
"""
if not article or not hasattr(article, 'body'):
return ''
# 先转换Markdown为HTML
html_content = CommonMarkdown.get_markdown(article.body)
# 如果是摘要模式,先截断内容再应用插件
if is_summary:
# 截断HTML内容到合适的长度约300字符
from django.utils.html import strip_tags
from django.template.defaultfilters import truncatechars
# 先去除HTML标签截断纯文本然后重新转换为HTML
plain_text = strip_tags(html_content)
truncated_text = truncatechars(plain_text, 300)
# 重新转换截断后的文本为HTML简化版避免复杂的插件处理
html_content = CommonMarkdown.get_markdown(truncated_text)
# 然后应用插件过滤器,传递完整的上下文
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
# 获取request对象
request = context.get('request')
# 应用所有文章内容相关的插件
# 注意:摘要模式下某些插件(如版权声明)可能不适用
optimized_html = hooks.apply_filters(
ARTICLE_CONTENT_HOOK_NAME,
html_content,
article=article,
request=request,
context=context,
is_summary=is_summary # 传递摘要标志,插件可以据此调整行为
)
return mark_safe(optimized_html)
return mark_safe(CommonMarkdown.get_markdown(content))
@register.simple_tag
@ -360,49 +292,38 @@ def load_article_detail(article, isindex, user):
}
# 返回用户头像URL
# 模板使用方法: {{ email|gravatar_url:150 }}
# return only the URL of the gravatar
# TEMPLATE USE: {{ email|gravatar_url:150 }}
@register.filter
def gravatar_url(email, size=40):
"""获得用户头像 - 优先使用OAuth头像否则使用默认头像"""
cachekey = 'avatar/' + email
"""获得gravatar头像"""
cachekey = 'gravatat/' + email
url = cache.get(cachekey)
if url:
return url
# 检查OAuth用户是否有自定义头像
usermodels = OAuthUser.objects.filter(email=email)
if usermodels:
# 过滤出有头像的用户
users_with_picture = list(filter(lambda x: x.picture is not None, usermodels))
if users_with_picture:
# 获取默认头像路径用于比较
default_avatar_path = static('blog/img/avatar.png')
# 优先选择非默认头像的用户,否则选择第一个
non_default_users = [u for u in users_with_picture if u.picture != default_avatar_path and not u.picture.endswith('/avatar.png')]
selected_user = non_default_users[0] if non_default_users else users_with_picture[0]
url = selected_user.picture
cache.set(cachekey, url, 60 * 60 * 24) # 缓存24小时
avatar_type = 'non-default' if non_default_users else 'default'
logger.info('Using {} OAuth avatar for {} from {}'.format(avatar_type, email, selected_user.type))
return url
# 使用默认头像
url = static('blog/img/avatar.png')
cache.set(cachekey, url, 60 * 60 * 24) # 缓存24小时
logger.info('Using default avatar for {}'.format(email))
return url
else:
usermodels = OAuthUser.objects.filter(email=email)
if usermodels:
o = list(filter(lambda x: x.picture is not None, usermodels))
if o:
return o[0].picture
email = email.encode('utf-8')
default = static('blog/img/avatar.png')
url = "https://www.gravatar.com/avatar/%s?%s" % (hashlib.md5(
email.lower()).hexdigest(), urllib.parse.urlencode({'d': default, 's': str(size)}))
cache.set(cachekey, url, 60 * 60 * 10)
logger.info('set gravatar cache.key:{key}'.format(key=cachekey))
return url
@register.filter
def gravatar(email, size=40):
"""获得用户头像HTML标签"""
"""获得gravatar头像"""
url = gravatar_url(email, size)
return mark_safe(
'<img src="%s" height="%d" width="%d" class="avatar" alt="用户头像">' %
'<img src="%s" height="%d" width="%d">' %
(url, size, size))
@ -421,134 +342,3 @@ def query(qs, **kwargs):
def addstr(arg1, arg2):
"""concatenate arg1 & arg2"""
return str(arg1) + str(arg2)
# === 插件系统模板标签 ===
@register.simple_tag(takes_context=True)
def render_plugin_widgets(context, position, **kwargs):
"""
渲染指定位置的所有插件组件
Args:
context: 模板上下文
position: 位置标识
**kwargs: 传递给插件的额外参数
Returns:
按优先级排序的所有插件HTML内容
"""
from djangoblog.plugin_manage.loader import get_loaded_plugins
widgets = []
for plugin in get_loaded_plugins():
try:
widget_data = plugin.render_position_widget(
position=position,
context=context,
**kwargs
)
if widget_data:
widgets.append(widget_data)
except Exception as e:
logger.error(f"Error rendering widget from plugin {plugin.PLUGIN_NAME}: {e}")
# 按优先级排序(数字越小优先级越高)
widgets.sort(key=lambda x: x['priority'])
# 合并HTML内容
html_parts = [widget['html'] for widget in widgets]
return mark_safe(''.join(html_parts))
@register.simple_tag(takes_context=True)
def plugin_head_resources(context):
"""渲染所有插件的head资源仅自定义HTMLCSS已集成到压缩系统"""
from djangoblog.plugin_manage.loader import get_loaded_plugins
resources = []
for plugin in get_loaded_plugins():
try:
# 只处理自定义head HTMLCSS文件已通过压缩系统处理
head_html = plugin.get_head_html(context)
if head_html:
resources.append(head_html)
except Exception as e:
logger.error(f"Error loading head resources from plugin {plugin.PLUGIN_NAME}: {e}")
return mark_safe('\n'.join(resources))
@register.simple_tag(takes_context=True)
def plugin_body_resources(context):
"""渲染所有插件的body资源仅自定义HTMLJS已集成到压缩系统"""
from djangoblog.plugin_manage.loader import get_loaded_plugins
resources = []
for plugin in get_loaded_plugins():
try:
# 只处理自定义body HTMLJS文件已通过压缩系统处理
body_html = plugin.get_body_html(context)
if body_html:
resources.append(body_html)
except Exception as e:
logger.error(f"Error loading body resources from plugin {plugin.PLUGIN_NAME}: {e}")
return mark_safe('\n'.join(resources))
@register.inclusion_tag('plugins/css_includes.html')
def plugin_compressed_css():
"""插件CSS压缩包含模板"""
from djangoblog.plugin_manage.loader import get_loaded_plugins
css_files = []
for plugin in get_loaded_plugins():
for css_file in plugin.get_css_files():
css_url = plugin.get_static_url(css_file)
css_files.append(css_url)
return {'css_files': css_files}
@register.inclusion_tag('plugins/js_includes.html')
def plugin_compressed_js():
"""插件JS压缩包含模板"""
from djangoblog.plugin_manage.loader import get_loaded_plugins
js_files = []
for plugin in get_loaded_plugins():
for js_file in plugin.get_js_files():
js_url = plugin.get_static_url(js_file)
js_files.append(js_url)
return {'js_files': js_files}
@register.simple_tag(takes_context=True)
def plugin_widget(context, plugin_name, widget_type='default', **kwargs):
"""
渲染指定插件的组件
使用方式
{% plugin_widget 'article_recommendation' 'bottom' article=article count=5 %}
"""
from djangoblog.plugin_manage.loader import get_plugin_by_slug
plugin = get_plugin_by_slug(plugin_name)
if plugin and hasattr(plugin, 'render_template'):
try:
widget_context = {**context.flatten(), **kwargs}
template_name = f"{widget_type}.html"
return mark_safe(plugin.render_template(template_name, widget_context))
except Exception as e:
logger.error(f"Error rendering plugin widget {plugin_name}.{widget_type}: {e}")
return ""

@ -20,11 +20,13 @@ from oauth.models import OAuthUser, OAuthConfig
# Create your tests here.
class ArticleTest(TestCase):
"""xjh文章相关测试用例"""
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
def test_validate_article(self):
"""xjh测试文章相关功能"""
site = get_current_site().domain
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
@ -149,6 +151,7 @@ class ArticleTest(TestCase):
self.client.get('/admin/admin/logentry/1/change/')
def check_pagination(self, p, type, value):
"""xjh检查分页功能"""
for page in range(1, p.num_pages + 1):
s = load_pagination_info(p.page(page), type, value)
self.assertIsNotNone(s)
@ -160,6 +163,7 @@ class ArticleTest(TestCase):
self.assertEqual(response.status_code, 200)
def test_image(self):
"""xjh测试图片上传和处理功能"""
import requests
rsp = requests.get(
'https://www.python.org/static/img/python-logo.png')
@ -183,10 +187,12 @@ class ArticleTest(TestCase):
'https://www.python.org/static/img/python-logo.png')
def test_errorpage(self):
"""xjh测试错误页面"""
rsp = self.client.get('/eee')
self.assertEqual(rsp.status_code, 404)
def test_commands(self):
"""xjh测试管理命令"""
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
@ -229,4 +235,4 @@ class ArticleTest(TestCase):
call_command("create_testdata")
call_command("clear_cache")
call_command("sync_user_avatar")
call_command("build_search_words")
call_command("build_search_words")

@ -5,58 +5,71 @@ from . import views
app_name = "blog"
urlpatterns = [
#xjh首页
path(
r'',
views.IndexView.as_view(),
name='index'),
#xjh首页分页
path(
r'page/<int:page>/',
views.IndexView.as_view(),
name='index_page'),
#xjh文章详情页
path(
r'article/<int:year>/<int:month>/<int:day>/<int:article_id>.html',
views.ArticleDetailView.as_view(),
name='detailbyid'),
#xjh分类目录页
path(
r'category/<slug:category_name>.html',
views.CategoryDetailView.as_view(),
name='category_detail'),
#xjh分类目录分页
path(
r'category/<slug:category_name>/<int:page>.html',
views.CategoryDetailView.as_view(),
name='category_detail_page'),
#xjh作者文章页
path(
r'author/<author_name>.html',
views.AuthorDetailView.as_view(),
name='author_detail'),
#xjh作者文章分页
path(
r'author/<author_name>/<int:page>.html',
views.AuthorDetailView.as_view(),
name='author_detail_page'),
#xjh标签文章页
path(
r'tag/<slug:tag_name>.html',
views.TagDetailView.as_view(),
name='tag_detail'),
#xjh标签文章分页
path(
r'tag/<slug:tag_name>/<int:page>.html',
views.TagDetailView.as_view(),
name='tag_detail_page'),
#xjh文章归档页缓存1小时
path(
'archives.html',
cache_page(
60 * 60)(
views.ArchivesView.as_view()),
name='archives'),
#xjh友情链接页
path(
'links.html',
views.LinkListView.as_view(),
name='links'),
#xjh文件上传接口
path(
r'upload',
views.fileupload,
name='upload'),
#xjh清理缓存接口
path(
r'clean',
views.clean_cache_view,
name='clean'),
]
]

@ -21,48 +21,51 @@ from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
from djangoblog.utils import cache, get_blog_setting, get_sha256
# 获取logger实例
logger = logging.getLogger(__name__)
class ArticleListView(ListView):
"""文章列表基类视图"""
# 指定使用的模板
"""xjh文章列表视图基类"""
# template_name属性用于指定使用哪个模板进行渲染
template_name = 'blog/article_index.html'
# 上下文变量名
# context_object_name属性用于给上下文变量取名在模板中使用该名字
context_object_name = 'article_list'
# 页面类型
# 页面类型,分类目录或标签列表等
page_type = ''
# 分页大小
paginate_by = settings.PAGINATE_BY
# 页码参数名
page_kwarg = 'page'
# 链接显示类型
link_type = LinkShowType.L
def get_view_cache_key(self):
"""获取视图缓存键"""
return self.request.get['pages']
@property
def page_number(self):
"""获取当前页码"""
page_kwarg = self.page_kwarg
page = self.kwargs.get(
page_kwarg) or self.request.GET.get(page_kwarg) or 1
return page
def get_queryset_cache_key(self):
"""获取查询集缓存键 - 子类需重写"""
"""
子类重写.获得queryset的缓存key
"""
raise NotImplementedError()
def get_queryset_data(self):
"""获取查询集数据 - 子类需重写"""
"""
子类重写.获取queryset的数据
"""
raise NotImplementedError()
def get_queryset_from_cache(self, cache_key):
"""从缓存获取查询集数据"""
'''
xjh缓存页面数据
:param cache_key: 缓存key
:return:
'''
value = cache.get(cache_key)
if value:
logger.info('get view cache.key:{key}'.format(key=cache_key))
@ -74,53 +77,50 @@ class ArticleListView(ListView):
return article_list
def get_queryset(self):
"""重写获取查询集方法,使用缓存"""
'''
xjh重写默认从缓存获取数据
:return:
'''
key = self.get_queryset_cache_key()
value = self.get_queryset_from_cache(key)
return value
def get_context_data(self, **kwargs):
"""添加上下文数据"""
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
class IndexView(ArticleListView):
"""首页视图"""
'''
xjh首页视图
'''
# 友情链接类型
link_type = LinkShowType.I
def get_queryset_data(self):
"""获取首页文章列表数据"""
article_list = Article.objects.filter(type='a', status='p')
return article_list
def get_queryset_cache_key(self):
"""生成首页缓存键"""
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
class ArticleDetailView(DetailView):
"""文章详情页视图"""
'''
xjh文章详情页面视图
'''
template_name = 'blog/article_detail.html'
model = Article
pk_url_kwarg = 'article_id'
context_object_name = "article"
def get_context_data(self, **kwargs):
"""获取文章详情页上下文数据"""
# 评论表单
comment_form = CommentForm()
# 获取文章评论
article_comments = self.object.comment_list()
parent_comments = article_comments.filter(parent_comment=None)
blog_setting = get_blog_setting()
# 评论分页
paginator = Paginator(parent_comments, blog_setting.article_comment_count)
page = self.request.GET.get('comment_page', '1')
if not page.isnumeric():
@ -136,15 +136,12 @@ class ArticleDetailView(DetailView):
next_page = p_comments.next_page_number() if p_comments.has_next() else None
prev_page = p_comments.previous_page_number() if p_comments.has_previous() else None
# 评论翻页URL
if next_page:
kwargs[
'comment_next_page_url'] = self.object.get_absolute_url() + f'?comment_page={next_page}#commentlist-container'
if prev_page:
kwargs[
'comment_prev_page_url'] = self.object.get_absolute_url() + f'?comment_page={prev_page}#commentlist-container'
# 添加上下文数据
kwargs['form'] = comment_form
kwargs['article_comments'] = article_comments
kwargs['p_comments'] = p_comments
@ -156,38 +153,34 @@ class ArticleDetailView(DetailView):
context = super(ArticleDetailView, self).get_context_data(**kwargs)
article = self.object
# 触发文章详情加载钩子,允许插件添加额外上下文数据
from djangoblog.plugin_manage.hook_constants import ARTICLE_DETAIL_LOAD
hooks.run_action(ARTICLE_DETAIL_LOAD, article=article, context=context, request=self.request)
# Action Hook, 通知插件"文章详情已获取"
hooks.run_action('after_article_body_get', article=article, request=self.request)
# # Filter Hook, 允许插件修改文章正文
article.body = hooks.apply_filters(ARTICLE_CONTENT_HOOK_NAME, article.body, article=article,
request=self.request)
return context
class CategoryDetailView(ArticleListView):
"""分类目录列表视图"""
'''
xjh分类目录列表视图
'''
page_type = "分类目录归档"
def get_queryset_data(self):
"""获取分类文章数据"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
# 获取所有子分类
categorynames = list(
map(lambda c: c.name, category.get_sub_categorys()))
# 过滤分类文章
article_list = Article.objects.filter(
category__name__in=categorynames, status='p')
return article_list
def get_queryset_cache_key(self):
"""生成分类页缓存键"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
@ -197,7 +190,7 @@ class CategoryDetailView(ArticleListView):
return cache_key
def get_context_data(self, **kwargs):
"""添加上下文数据"""
categoryname = self.categoryname
try:
categoryname = categoryname.split('/')[-1]
@ -209,12 +202,12 @@ class CategoryDetailView(ArticleListView):
class AuthorDetailView(ArticleListView):
"""作者详情页视图"""
'''
xjh作者详情页视图
'''
page_type = '作者文章归档'
def get_queryset_cache_key(self):
"""生成作者页缓存键"""
from uuslug import slugify
author_name = slugify(self.kwargs['author_name'])
cache_key = 'author_{author_name}_{page}'.format(
@ -222,14 +215,12 @@ class AuthorDetailView(ArticleListView):
return cache_key
def get_queryset_data(self):
"""获取作者文章数据"""
author_name = self.kwargs['author_name']
article_list = Article.objects.filter(
author__username=author_name, type='a', status='p')
return article_list
def get_context_data(self, **kwargs):
"""添加上下文数据"""
author_name = self.kwargs['author_name']
kwargs['page_type'] = AuthorDetailView.page_type
kwargs['tag_name'] = author_name
@ -237,12 +228,12 @@ class AuthorDetailView(ArticleListView):
class TagDetailView(ArticleListView):
"""标签列表页面视图"""
'''
xjh标签列表页面视图
'''
page_type = '分类标签归档'
def get_queryset_data(self):
"""获取标签文章数据"""
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
@ -252,7 +243,6 @@ class TagDetailView(ArticleListView):
return article_list
def get_queryset_cache_key(self):
"""生成标签页缓存键"""
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
@ -262,7 +252,7 @@ class TagDetailView(ArticleListView):
return cache_key
def get_context_data(self, **kwargs):
"""添加上下文数据"""
# tag_name = self.kwargs['tag_name']
tag_name = self.name
kwargs['page_type'] = TagDetailView.page_type
kwargs['tag_name'] = tag_name
@ -270,39 +260,34 @@ class TagDetailView(ArticleListView):
class ArchivesView(ArticleListView):
"""文章归档页面视图"""
'''
xjh文章归档页面视图
'''
page_type = '文章归档'
paginate_by = None # 归档页不分页
paginate_by = None
page_kwarg = None
template_name = 'blog/article_archives.html'
def get_queryset_data(self):
"""获取所有已发布文章"""
return Article.objects.filter(status='p').all()
def get_queryset_cache_key(self):
"""生成归档页缓存键"""
cache_key = 'archives'
return cache_key
class LinkListView(ListView):
"""友情链接列表视图"""
"""xjh友情链接列表视图"""
model = Links
template_name = 'blog/links_list.html'
def get_queryset(self):
"""获取已启用的链接"""
return Links.objects.filter(is_enable=True)
class EsSearchView(SearchView):
"""Elasticsearch搜索视图"""
"""xjhElasticsearch搜索视图"""
def get_context(self):
"""获取搜索上下文"""
paginator, page = self.build_page()
context = {
"query": self.query,
@ -311,7 +296,6 @@ class EsSearchView(SearchView):
"paginator": paginator,
"suggestion": None,
}
# 添加拼写建议
if hasattr(self.results, "query") and self.results.query.backend.include_spelling:
context["suggestion"] = self.results.query.get_spelling_suggestion()
context.update(self.extra_context())
@ -319,50 +303,38 @@ class EsSearchView(SearchView):
return context
@csrf_exempt # 免除CSRF验证
@csrf_exempt
def fileupload(request):
"""
文件上传视图 - 提供图床功能
需要调用端自己实现上传逻辑
xjh文件上传接口该方法需自己写调用端来上传图片该方法仅提供图床功能
:param request:
:return:
"""
if request.method == 'POST':
# 验证签名
sign = request.GET.get('sign', None)
if not sign:
return HttpResponseForbidden()
if not sign == get_sha256(get_sha256(settings.SECRET_KEY)):
return HttpResponseForbidden()
response = []
# 处理所有上传的文件
for filename in request.FILES:
timestr = timezone.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
fname = u''.join(str(filename))
# 判断是否为图片
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
# 构建保存路径
base_dir = os.path.join(settings.STATICFILES, "files" if not isimage else "image", timestr)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
savepath = os.path.normpath(os.path.join(base_dir, f"{uuid.uuid4().hex}{os.path.splitext(filename)[-1]}"))
# 安全检查
if not savepath.startswith(base_dir):
return HttpResponse("only for post")
# 保存文件
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES[filename].chunks():
wfile.write(chunk)
# 如果是图片,进行压缩优化
if isimage:
from PIL import Image
image = Image.open(savepath)
image.save(savepath, quality=20, optimize=True)
# 生成静态URL
url = static(savepath)
response.append(url)
return HttpResponse(response)
@ -375,7 +347,7 @@ def page_not_found_view(
request,
exception,
template_name='blog/error_page.html'):
"""404页面未找到错误处理视图"""
"""xjh404页面处理视图"""
if exception:
logger.error(exception)
url = request.get_full_path()
@ -387,7 +359,7 @@ def page_not_found_view(
def server_error_view(request, template_name='blog/error_page.html'):
"""500服务器错误处理视图"""
"""xjh500页面处理视图"""
return render(request,
template_name,
{'message': _('Sorry, the server is busy, please click the home page to see other?'),
@ -399,7 +371,7 @@ def permission_denied_view(
request,
exception,
template_name='blog/error_page.html'):
"""403权限拒绝错误处理视图"""
"""xjh403页面处理视图"""
if exception:
logger.error(exception)
return render(
@ -409,6 +381,6 @@ def permission_denied_view(
def clean_cache_view(request):
"""清理缓存视图"""
"""xjh清理缓存视图"""
cache.clear()
return HttpResponse('ok')

@ -0,0 +1,47 @@
from django.contrib import admin
from django.urls import reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
def disable_commentstatus(modeladmin, request, queryset):
queryset.update(is_enable=False) # 杨智鑫:批量设置评论为禁用状态
def enable_commentstatus(modeladmin, request, queryset):
queryset.update(is_enable=True) # 杨智鑫:批量设置评论为启用状态
disable_commentstatus.short_description = _('Disable comments') # 杨智鑫:批量禁用评论
enable_commentstatus.short_description = _('Enable comments') # 杨智鑫:批量启用评论
class CommentAdmin(admin.ModelAdmin):
list_per_page = 20
list_display = (
'id',
'body',
'link_to_userinfo',
'link_to_article',
'is_enable',
'creation_time') # 杨智鑫:显示
list_display_links = ('id', 'body', 'is_enable') # 杨智鑫:可点击
list_filter = ('is_enable',) # 杨智鑫:过滤
exclude = ('creation_time', 'last_modify_time') # 杨智鑫:不显示创建时间
actions = [disable_commentstatus, enable_commentstatus] # 杨智鑫:批量操作
def link_to_userinfo(self, obj):
info = (obj.author._meta.app_label, obj.author._meta.model_name) # 杨智鑫:获取用户信息
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,)) # 杨智鑫:获取用户信息
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email)) # 杨智鑫:获取用户信息
def link_to_article(self, obj):
info = (obj.article._meta.app_label, obj.article._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.article.id,)) # 杨智鑫:获取文章信息
return format_html(
u'<a href="%s">%s</a>' % (link, obj.article.title)) # 杨智鑫:获取文章信息
link_to_userinfo.short_description = _('User') # 杨智鑫:用户
link_to_article.short_description = _('Article') # 杨智鑫:文章

@ -2,4 +2,4 @@ from django.apps import AppConfig
class CommentsConfig(AppConfig):
name = 'comments'
name = 'comments' # 杨智鑫:应用名称

@ -0,0 +1,13 @@
from django import forms
from django.forms import ModelForm
from .models import Comment
class CommentForm(ModelForm):
parent_comment_id = forms.IntegerField(
widget=forms.HiddenInput, required=False) # 杨智鑫隐藏字段用于处理回复评论的父评论ID
class Meta:
model = Comment # 杨智鑫:指定表单关联的模型
fields = ['body'] # 杨智鑫:表单只包含评论内容字段

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save