Compare commits

...

25 Commits

@ -47,32 +47,48 @@ DjangoBlog is a high-performance blog platform built with Python 3.10 and Django
Ensure you have Python 3.10+ and MySQL/MariaDB installed on your system.
### 2. Clone & Installation
```bash
# Clone the project to your local machine
```
# 将项目克隆到本地机器
git clone https://github.com/liangliangyy/DjangoBlog.git
# 进入项目目录切换到DjangoBlog文件夹
cd DjangoBlog
# Install dependencies
# 安装项目所需的依赖包
pip install -r requirements.txt
```
### 3. Project Configuration
- **Database**:
Open `djangoblog/settings.py`, locate the `DATABASES` section, and update it with your MySQL connection details.
```python
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'djangoblog',
'USER': 'root',
'PASSWORD': 'your_password',
'HOST': '127.0.0.1',
'PORT': 3306,
}
}
# 数据库配置字典Django通过此配置连接数据库
DATABASES = {
# 默认数据库配置
'default': {
# 数据库引擎这里使用MySQL数据库后端
# 'django.db.backends.mysql'表示使用MySQL数据库
# 其他可选值包括sqlite3、postgresql、oracle等
'ENGINE': 'django.db.backends.mysql',
# 数据库名称需要提前在MySQL中创建该数据库
'NAME': 'djangoblog',
# 数据库登录用户名
'USER': 'root',
# 数据库登录密码,需要替换为你的实际密码
'PASSWORD': 'your_password',
# 数据库主机地址127.0.0.1表示本地数据库
# 若数据库在远程服务器可改为对应IP地址或域名
'HOST': '127.0.0.1',
# 数据库端口MySQL默认端口为3306
'PORT': 3306,
}
}
```
Create the database in MySQL:
```sql
@ -82,27 +98,38 @@ pip install -r requirements.txt
- **More Configurations**:
For advanced settings such as email, OAuth, caching, and more, please refer to our [Detailed Configuration Guide](/docs/config-en.md).
```
### 4. Database Initialization
```bash
# 生成数据库迁移文件
# 根据模型的变更创建迁移脚本,记录数据模型的修改
python manage.py makemigrations
# 执行数据库迁移
# 将迁移文件中定义的变更应用到实际数据库中,创建或修改表结构
python manage.py migrate
# Create a superuser account
# Create a superuser account 创建超级用户账号
# 生成可以登录Django管理后台的超级用户管理员用于管理网站内容
python manage.py createsuperuser
```
### 5. Running the Project
```bash
# (Optional) Generate some test data
```
# (可选)生成一些测试数据
# 为项目创建示例数据(如测试文章、用户等),方便开发和测试时查看效果
python manage.py create_testdata
# (Optional) Collect and compress static files
# (可选)收集并压缩静态文件
# 收集项目中所有静态文件CSS、JS、图片等到指定目录--noinput参数表示无需手动确认
python manage.py collectstatic --noinput
# 压缩静态文件如CSS、JS以减小文件体积提高加载速度--force参数强制覆盖已有文件
python manage.py compress --force
# Start the development server
# 启动开发服务器
# 启动Django内置的开发用Web服务器默认地址为http://127.0.0.1:8000/
# 开发过程中修改代码后服务器会自动重启,方便调试
python manage.py runserver
```

@ -2,22 +2,32 @@
## Cache:
Cache using `memcache` for default. If you don't have `memcache` environment, you can remove the `default` setting in `CACHES` and change `locmemcache` to `default`.
```python
```
# Django 缓存配置字典,用于定义不同的缓存后端及相关参数
CACHES = {
# 默认缓存配置,键名为'default'
'default': {
# 缓存后端类型使用Memcached作为缓存服务器
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
# Memcached服务器地址和端口这里使用本地的11211端口Memcached默认端口
'LOCATION': '127.0.0.1:11211',
# 缓存键的前缀如果处于测试环境TESTING为True则使用'django_test',否则使用'djangoblog'
# 用于区分不同环境或项目的缓存键,避免冲突
'KEY_PREFIX': 'django_test' if TESTING else 'djangoblog',
# 缓存超时时间单位这里设置为10小时60秒*60分*10小时
'TIMEOUT': 60 * 60 * 10
},
# 本地内存缓存配置,键名为'locmemcache'
'locmemcache': {
# 缓存后端类型:使用本地内存作为缓存(仅当前进程有效,多进程不共享)
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
# 缓存超时时间单位这里设置为3小时10800秒 = 60*60*3
'TIMEOUT': 10800,
# 本地内存缓存的唯一标识,用于在同一进程中区分不同的本地缓存实例
'LOCATION': 'unique-snowflake',
}
}
```
## OAuth Login:
QQ, Weibo, Google, GitHub and Facebook are now supported for OAuth login. Fetch OAuth login permissions from the corresponding open platform, and save them with `appkey`, `appsecret` and callback address in **Backend->OAuth** configuration.
@ -32,11 +42,25 @@ owntracks is a location tracking application. It will send your locaiton to the
## Email feature:
Same as before, Configure your own error msg recvie email information with`ADMINS = [('liangliang', 'liangliangyy@gmail.com')]` in `settings.py`. And modify:
```python
# 邮件发送服务器的SMTP地址这里使用Zoho的SMTP服务器
EMAIL_HOST = 'smtp.zoho.com'
# 邮件发送服务器的端口号587是TLS加密的常用端口
EMAIL_PORT = 587
# 发送邮件的账号用户名,从环境变量中获取(避免硬编码敏感信息)
# 环境变量键为'DJANGO_EMAIL_USER'
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
# 发送邮件的账号密码(或授权码),从环境变量中获取(安全存储敏感信息)
# 环境变量键为'DJANGO_EMAIL_PASSWORD'
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
# 默认的发件人邮箱地址,这里直接使用上面配置的邮件账号
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 服务器错误通知的发件人邮箱例如500错误时发送给管理员的邮件
# 这里同样使用环境变量中配置的邮件账号
SERVER_EMAIL = os.environ.get('DJANGO_EMAIL_USER')
```
with your email account information.

@ -21,11 +21,25 @@ owntracks是一个位置追踪软件可以定时的将你的坐标提交到
## 邮件功能:
同样,将`settings.py`中的`ADMINS = [('liangliang', 'liangliangyy@gmail.com')]`配置为你自己的错误接收邮箱,另外修改:
```python
# 配置邮件发送所使用的SMTP服务器地址此处使用Zoho的SMTP服务器
EMAIL_HOST = 'smtp.zoho.com'
# 配置SMTP服务器的端口号587是用于TLS加密连接的标准端口
EMAIL_PORT = 587
# 发送邮件的账号用户名,通过环境变量获取(避免将敏感信息硬编码在代码中)
# 对应的环境变量为'DJANGO_EMAIL_USER'
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
# 发送邮件的账号密码(或授权码),同样通过环境变量获取(增强安全性)
# 对应的环境变量为'DJANGO_EMAIL_PASSWORD'
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
# 设置默认的发件人邮箱地址,这里直接使用上面配置的邮件账号作为发件人
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 配置服务器发送错误通知时使用的发件人邮箱(如系统发生错误时向管理员发送通知)
# 此处与发送邮件的账号保持一致
SERVER_EMAIL = os.environ.get('DJANGO_EMAIL_USER')
```
为你自己的邮箱配置。

@ -21,7 +21,10 @@ This is the simplest and most recommended way to deploy. It automatically create
From the project's root directory, run the following command:
```bash
# Build and start the containers in detached mode (includes Django app and MySQL)
# 构建并以分离模式(后台运行)启动容器
# 包含Django应用和MySQL等服务具体服务取决于docker-compose配置文件
# -d: 表示detached mode分离模式容器将在后台运行不占用当前终端
# --build: 表示在启动前重新构建所有服务的镜像(确保使用最新的代码和配置)
docker-compose up -d --build
```
@ -35,7 +38,13 @@ docker-compose up -d --build
If you want to use Elasticsearch for more powerful full-text search capabilities, you can include the `docker-compose.es.yml` configuration file:
```bash
# Build and start all services in detached mode (Django, MySQL, Elasticsearch)
# 启动所有服务Build and start all services in detached mode (Django, MySQL, Elasticsearch)构建并以分离模式(后台运行)
# 包括Django应用、MySQL数据库、Elasticsearch搜索引擎等具体服务取决于配置文件
# -f docker-compose.yml: 指定第一个配置文件基础服务配置如Django、MySQL等
# -f deploy/docker-compose/docker-compose.es.yml: 指定第二个配置文件Elasticsearch相关服务配置
# 多个-f参数可合并配置后者配置会覆盖前者中重复的部分
# -d: 表示detached mode分离模式容器在后台运行不占用当前终端
# --build: 启动前重新构建所有服务的镜像,确保使用最新代码和配置
docker-compose -f docker-compose.yml -f deploy/docker-compose/docker-compose.es.yml up -d --build
```
- **Data Persistence**: Elasticsearch data will be stored in the `data/elasticsearch` directory.
@ -45,20 +54,24 @@ docker-compose -f docker-compose.yml -f deploy/docker-compose/docker-compose.es.
After the containers start for the first time, you'll need to execute some initialization commands inside the application container.
```bash
# Get a shell inside the djangoblog application container (named 'web')
# 进入名为'web'的djangoblog应用容器内部的shell环境
docker-compose exec web bash
# Inside the container, run the following commands:
# Create a superuser account (follow the prompts to set username, email, and password)
# 在容器内部,运行以下命令:
# 创建超级用户账号(按照提示设置用户名、邮箱和密码)
# 超级用户拥有管理后台的全部权限
python manage.py createsuperuser
# (Optional) Create some test data
# (可选)创建一些测试数据
# 用于开发或测试时快速填充数据库,方便功能验证
python manage.py create_testdata
# (Optional, if ES is enabled) Create the search index
# 可选如果启用了Elasticsearch创建搜索索引
# 用于初始化或更新Elasticsearch的搜索索引确保搜索功能正常工作
python manage.py rebuild_index
# Exit the container
# 退出容器的shell环境回到宿主机终端
exit
```
@ -67,10 +80,20 @@ exit
If you already have an external MySQL database running, you can run the DjangoBlog application image by itself.
```bash
# Pull the latest image from Docker Hub
# 从Docker Hub拉取最新版本的djangoblog镜像
docker pull liangliangyy/djangoblog:latest
# Run the container and connect it to your external database
# 运行容器并将其连接到外部数据库
# -d: 以分离模式(后台)运行容器
# -p 8000:8000: 将容器的8000端口映射到宿主机的8000端口宿主机内部端口服务端口:宿主机访问端口)
# -e: 设置环境变量(向容器传递配置参数,避免硬编码)
# DJANGO_SECRET_KEY: Django应用的密钥需使用高强度密钥用于加密等安全操作
# DJANGO_MYSQL_HOST: 外部MySQL数据库的主机地址
# DJANGO_MYSQL_USER: 访问MySQL数据库的用户名
# DJANGO_MYSQL_PASSWORD: 访问MySQL数据库的密码
# DJANGO_MYSQL_DATABASE: 要连接的MySQL数据库名称这里指定为'djangoblog'
# --name djangoblog: 为容器指定名称为'djangoblog',方便后续管理(如启动、停止、进入等)
# liangliangyy/djangoblog:latest: 基于该镜像运行容器
docker run -d \
-p 8000:8000 \
-e DJANGO_SECRET_KEY='your-strong-secret-key' \

@ -7,16 +7,27 @@
接下来在`settings.py`做如下改动即可:
- 增加es链接如下所示
```python
# Elasticsearch DSL领域特定语言的配置字典用于Django与Elasticsearch交互
ELASTICSEARCH_DSL = {
# 默认的Elasticsearch连接配置键名为'default'
'default': {
# Elasticsearch服务的地址和端口这里指向本地的9200端口Elasticsearch默认端口
# 应用将通过此地址与Elasticsearch服务建立连接用于执行搜索、索引等操作
'hosts': '127.0.0.1:9200'
},
}
```
- 修改`HAYSTACK`配置:
```python
# Django Haystack搜索引擎框架的连接配置字典
# Haystack用于统一管理不同的搜索引擎后端此处配置默认搜索引擎连接
HAYSTACK_CONNECTIONS = {
# 默认的搜索引擎连接配置,键名为'default'
'default': {
# 指定搜索引擎引擎类使用自定义的ElasticSearchEngine后端
# 路径'djangoblog.elasticsearch_backend.ElasticSearchEngine'表示该类位于
# djangoblog应用的elasticsearch_backend模块中
# 此配置用于将Haystack与Elasticsearch搜索引擎关联实现搜索功能
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}

@ -28,7 +28,10 @@ Before you begin, please ensure you have the following:
We recommend deploying all DjangoBlog-related resources in a dedicated namespace for better management.
```bash
# Create a namespace named 'djangoblog'
# 创建一个名为'djangoblog'的命名空间namespace
# 命名空间用于在 Kubernetes 集群中隔离不同的应用或环境(如开发、测试、生产)
# 此处创建'djangoblog'命名空间,通常用于部署与 djangoblog 应用相关的资源
# 后续部署的 Pod、Service 等资源可指定在此命名空间下,避免与其他应用资源冲突
kubectl create namespace djangoblog
```
@ -37,16 +40,26 @@ kubectl create namespace djangoblog
This setup uses Local Persistent Volumes. You need to create the data storage directories on a node within your cluster (the default is the `master` node in `pv.yaml`).
```bash
# Log in to your master node
# 通过SSH登录到主节点master node
# user为登录用户名master-node为主节点的地址可是IP或域名
ssh user@master-node
# Create the required storage directories
# 创建所需的存储目录使用sudo获取管理员权限
# -p 选项确保在父目录不存在时自动创建,避免报错
# 用于数据库如MySQL的本地存储目录
sudo mkdir -p /mnt/local-storage-db
# 用于djangoblog应用的本地存储目录
sudo mkdir -p /mnt/local-storage-djangoblog
# 通用资源存储目录
sudo mkdir -p /mnt/resource/
# 用于Elasticsearch的本地存储目录
sudo mkdir -p /mnt/local-storage-elasticsearch
# Log out from the node
# 从节点退出,返回本地终端
exit
```
**Note**: If you wish to store data on a different node or use different paths, you must modify the `nodeAffinity` and `local.path` settings in the `deploy/k8s/pv.yaml` file.
@ -54,13 +67,16 @@ exit
After creating the directories, apply the storage-related configurations:
```bash
# Apply the StorageClass
# 应用StorageClass配置存储类用于定义持久化存储的类型和属性
# StorageClass用于动态供应持久卷PV简化存储管理
kubectl apply -f deploy/k8s/storageclass.yaml
# Apply the PersistentVolumes (PVs)
# 应用持久卷PersistentVolumes, PVs配置
# PV是集群中的一块存储资源由管理员预先创建或通过StorageClass动态生成
kubectl apply -f deploy/k8s/pv.yaml
# Apply the PersistentVolumeClaims (PVCs)
# 应用持久卷声明PersistentVolumeClaims, PVCs配置
# PVC是用户对存储资源的请求用于绑定PV并为Pod提供持久化存储
kubectl apply -f deploy/k8s/pvc.yaml
```
@ -73,10 +89,12 @@ Before deploying the application, you need to edit the `deploy/k8s/configmap.yam
- `DJANGO_MYSQL_PASSWORD` and `MYSQL_ROOT_PASSWORD`: Change to your own secure database password.
```bash
# Edit the ConfigMap file
# 使用vim编辑器编辑ConfigMap配置文件
# ConfigMap用于存储非敏感的配置数据供Pod中的容器使用
vim deploy/k8s/configmap.yaml
# Apply the configuration
# 应用ConfigMap配置到Kubernetes集群
# 执行后配置数据将被创建或更新供相关资源如Pod引用
kubectl apply -f deploy/k8s/configmap.yaml
```
@ -85,10 +103,12 @@ kubectl apply -f deploy/k8s/configmap.yaml
Now, we can deploy all the core services.
```bash
# Deploy the Deployments (DjangoBlog, MySQL, Redis, Nginx, ES)
# 部署 Deployment 资源包含DjangoBlog应用、MySQL数据库、Redis缓存、Nginx服务器、Elasticsearch搜索引擎
# Deployment 用于定义Pod的期望状态负责创建和管理Pod支持滚动更新等功能
kubectl apply -f deploy/k8s/deployment.yaml
# Deploy the Services (to create internal endpoints for the Deployments)
# 部署 Service 资源为上述Deployment创建内部访问端点
# Service 提供固定的访问地址实现Pod的负载均衡和服务发现即使Pod重建IP变化也能通过Service稳定访问
kubectl apply -f deploy/k8s/service.yaml
```
@ -103,7 +123,10 @@ kubectl get pods -n djangoblog -w
Finally, expose the Nginx service to external traffic by applying the `Ingress` rule.
```bash
# Apply the Ingress rule
# 应用Ingress规则配置
# Ingress用于管理Kubernetes集群外部访问集群内部服务的规则如HTTP/HTTPS路由
# 该配置文件deploy/k8s/gateway.yaml定义了外部请求如何映射到集群内的服务如通过域名路由到对应的Service
# 执行后Ingress控制器将根据规则处理外部流量实现对集群内服务的访问
kubectl apply -f deploy/k8s/gateway.yaml
```
@ -118,23 +141,32 @@ kubectl get ingress -n djangoblog
Similar to the Docker deployment, you need to get a shell into the DjangoBlog application Pod to perform database initialization and create a superuser on the first run.
```bash
# First, get the name of a djangoblog pod
# 首先获取djangoblog相关Pod的名称
# -n djangoblog指定在'djangoblog'命名空间中查询
# grep djangoblog筛选出包含'djangoblog'关键词的Pod即目标应用的Pod
kubectl get pods -n djangoblog | grep djangoblog
# Exec into one of the Pods (replace [pod-name] with the name from the previous step)
# 进入其中一个Pod的交互式终端将[pod-name]替换为上一步获取的Pod名称
# -it以交互式终端模式进入Pod
# -n djangoblog指定目标Pod所在的命名空间
# -- bash在Pod内启动bash shell
kubectl exec -it [pod-name] -n djangoblog -- bash
# Inside the Pod, run the following commands:
# Create a superuser account (follow the prompts)
# 在Pod内部运行以下命令
# 创建超级用户账号(按照提示输入用户名、邮箱和密码)
# 超级用户用于登录Django管理后台拥有最高权限
python manage.py createsuperuser
# (Optional) Create some test data
# (可选)创建一些测试数据
# 用于快速填充数据库,方便测试应用功能
python manage.py create_testdata
# (Optional, if ES is enabled) Create the search index
# 可选如果启用了Elasticsearch创建搜索索引
# 初始化或更新Elasticsearch的搜索索引确保搜索功能可用
python manage.py rebuild_index
# Exit the Pod
# 退出Pod的终端返回本地命令行
exit
```

@ -1,54 +1,77 @@
import logging
from django.contrib import admin
# Register your models here.
# 注册模型到admin后台此处注释用于提示后续需注册模型
from django.urls import reverse
from django.utils.html import format_html
# 创建当前模块的日志记录器
logger = logging.getLogger(__name__)
# 定义OAuthUser模型的Admin管理类
class OAuthUserAdmin(admin.ModelAdmin):
# 配置后台搜索字段:支持按昵称和邮箱搜索
search_fields = ('nickname', 'email')
# 每页显示20条记录
list_per_page = 20
# 列表页显示的字段
list_display = (
'id',
'nickname',
'link_to_usermodel',
'show_user_image',
'type',
'email',
'id', # ID
'nickname', # 昵称
'link_to_usermodel', # 关联的本地用户(自定义字段)
'show_user_image', # 用户头像(自定义字段)
'type', # 第三方登录类型
'email', # 邮箱
)
# 列表页中可点击跳转编辑页的字段
list_display_links = ('id', 'nickname')
# 列表页的筛选器:按关联用户和登录类型筛选
list_filter = ('author', 'type',)
# 初始定义的只读字段(空列表,后续动态添加)
readonly_fields = []
# 动态获取只读字段:编辑时所有字段均为只读
def get_readonly_fields(self, request, obj=None):
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
if obj: # 编辑已有对象时
# 原只读字段 + 所有模型字段 + 所有多对多字段
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
return self.readonly_fields # 新增时使用初始只读字段
# 禁用新增权限不允许在后台手动添加OAuthUser记录
def has_add_permission(self, request):
return False
# 自定义字段:显示关联本地用户的链接
def link_to_usermodel(self, obj):
if obj.author:
if obj.author: # 若存在关联用户
# 获取关联用户模型的app标签和模型名
info = (obj.author._meta.app_label, obj.author._meta.model_name)
# 生成关联用户在admin后台的编辑页URL
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
# 返回HTML格式的链接显示用户昵称或邮箱
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
# 自定义字段:显示用户头像
def show_user_image(self, obj):
img = obj.picture
img = obj.picture # 获取头像URL
# 返回HTML格式的图片标签限制宽高为50px
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
(img))
# 自定义字段的显示名称
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
# 定义OAuthConfig模型的Admin管理类
class OAuthConfigAdmin(admin.ModelAdmin):
# 列表页显示的字段
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
list_filter = ('type',)
# 列表页的筛选器:按登录类型筛选
list_filter = ('type',)

@ -1,5 +1,9 @@
# 导入Django的应用配置基类
from django.apps import AppConfig
# 定义OAuth应用的配置类继承自AppConfig
class OauthConfig(AppConfig):
name = 'oauth'
# 应用的名称,用于标识该应用(与应用目录名一致)
# 在Django项目中通过此名称引用该应用如在INSTALLED_APPS中注册
name = 'oauth'

@ -1,12 +1,20 @@
# 导入Django的表单基类和 widgets表单控件
from django.contrib.auth.forms import forms
from django.forms import widgets
# 定义一个要求用户输入邮箱的表单类继承自基础表单类forms.Form
class RequireEmailForm(forms.Form):
# 定义邮箱字段:标签为“电子邮箱”,且为必填项
email = forms.EmailField(label='电子邮箱', required=True)
# 定义oauthid字段使用隐藏输入控件不在页面显式展示非必填
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
# 重写初始化方法,用于自定义表单字段的控件属性
def __init__(self, *args, **kwargs):
# 调用父类的初始化方法,确保基础功能正常
super(RequireEmailForm, self).__init__(*args, **kwargs)
# 为email字段设置自定义控件
# 使用EmailInput控件添加placeholder提示文本和CSS类
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
attrs={'placeholder': "email", "class": "form-control"})

@ -1,5 +1,4 @@
# Generated by Django 4.1.7 on 2023-03-07 09:53
# 由Django 4.1.7生成于2023年3月7日09:53
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
@ -7,51 +6,74 @@ import django.utils.timezone
class Migration(migrations.Migration):
# 标记为初始迁移(首次创建模型时使用)
initial = True
# 依赖关系依赖于Django内置用户模型的可交换依赖支持自定义用户模型
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
# 迁移操作:定义要执行的数据库变更(创建模型)
operations = [
# 创建OAuthConfig模型第三方登录配置表
migrations.CreateModel(
name='OAuthConfig',
fields=[
# 自增主键ID
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 第三方登录类型可选值为微博、谷歌、GitHub、FaceBook、QQ默认值为'a'可能是占位符长度限制10
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
# 第三方应用的AppKeyAPI密钥长度限制200
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
# 第三方应用的AppSecret密钥长度限制200
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
# 回调地址授权后跳转的URL默认值为百度首页长度限制200
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
# 是否启用该第三方登录方式,默认启用
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
# 创建时间:默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间:默认值为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
],
options={
'verbose_name': 'oauth配置',
'verbose_name_plural': 'oauth配置',
'ordering': ['-created_time'],
'verbose_name': 'oauth配置', # 模型的单数显示名称
'verbose_name_plural': 'oauth配置', # 模型的复数显示名称
'ordering': ['-created_time'], # 默认排序:按创建时间倒序
},
),
# 创建OAuthUser模型第三方登录用户关联表
migrations.CreateModel(
name='OAuthUser',
fields=[
# 自增主键ID
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 第三方平台的唯一标识OpenID长度限制50
('openid', models.CharField(max_length=50)),
# 第三方平台的用户昵称长度限制50
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
# 访问令牌可选可为空或空白长度限制150
('token', models.CharField(blank=True, max_length=150, null=True)),
# 用户头像URL可选可为空或空白长度限制350
('picture', models.CharField(blank=True, max_length=350, null=True)),
# 第三方登录类型(如'weibo'、'github'等长度限制50
('type', models.CharField(max_length=50)),
# 第三方平台的用户邮箱可选可为空或空白长度限制50
('email', models.CharField(blank=True, max_length=50, null=True)),
# 额外元数据可选可为空或空白存储JSON等格式的附加信息
('metadata', models.TextField(blank=True, null=True)),
# 创建时间:默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间:默认值为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 关联的本地用户模型:可为空(未绑定本地用户时),级联删除(本地用户删除时,关联记录也删除)
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
],
options={
'verbose_name': 'oauth用户',
'verbose_name_plural': 'oauth用户',
'ordering': ['-created_time'],
'verbose_name': 'oauth用户', # 模型的单数显示名称
'verbose_name_plural': 'oauth用户', # 模型的复数显示名称
'ordering': ['-created_time'], # 默认排序:按创建时间倒序
},
),
]
]

@ -1,5 +1,4 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
# 由Django 4.2.5生成于2023年9月6日13:13
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
@ -7,80 +6,103 @@ import django.utils.timezone
class Migration(migrations.Migration):
# 依赖关系依赖于Django内置用户模型和oauth应用的初始迁移(0001_initial)
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
]
# 迁移操作:定义对数据库模型的修改
operations = [
# 修改OAuthConfig模型的元选项
migrations.AlterModelOptions(
name='oauthconfig',
# 排序方式改为按creation_time倒序显示名称保持不变
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
),
# 修改OAuthUser模型的元选项
migrations.AlterModelOptions(
name='oauthuser',
# 排序方式改为按creation_time倒序显示名称改为英文'oauth user'
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
),
# 移除OAuthConfig模型中的created_time字段
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
),
# 移除OAuthConfig模型中的last_mod_time字段
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
),
# 移除OAuthUser模型中的created_time字段
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
),
# 移除OAuthUser模型中的last_mod_time字段
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
),
# 为OAuthConfig模型添加creation_time字段创建时间
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为OAuthConfig模型添加last_modify_time字段最后修改时间
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 为OAuthUser模型添加creation_time字段创建时间
migrations.AddField(
model_name='oauthuser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为OAuthUser模型添加last_modify_time字段最后修改时间
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 修改OAuthConfig模型的callback_url字段
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
# 默认值改为空字符串,显示名称改为英文'callback url'
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
),
# 修改OAuthConfig模型的is_enable字段
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
# 显示名称改为英文'is enable'
field=models.BooleanField(default=True, verbose_name='is enable'),
),
# 修改OAuthConfig模型的type字段
migrations.AlterField(
model_name='oauthconfig',
name='type',
# 选项中'微博'改为'weibo'、'谷歌'改为'google',显示名称改为英文'type'
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
),
# 修改OAuthUser模型的author字段
migrations.AlterField(
model_name='oauthuser',
name='author',
# 显示名称改为英文'author'
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
# 修改OAuthUser模型的nickname字段
migrations.AlterField(
model_name='oauthuser',
name='nickname',
# 显示名称改为英文'nickname'
field=models.CharField(max_length=50, verbose_name='nickname'),
),
]
]

@ -1,67 +1,101 @@
# Create your models here.
# 定义该应用的数据库模型
from django.conf import settings
from django.core.exceptions import ValidationError
from django.core.exceptions import ValidationError # 用于数据验证,抛出验证异常
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from django.utils.timezone import now # 获取当前时间,用于时间字段默认值
from django.utils.translation import gettext_lazy as _ # 用于国际化翻译,支持多语言
# 定义OAuthUser模型存储第三方登录用户与本地用户的关联信息
class OAuthUser(models.Model):
# 关联本地Django用户模型允许为空未绑定本地用户时为null
# on_delete=models.CASCADE本地用户删除时关联的OAuthUser记录也随之删除
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=True,
null=True,
on_delete=models.CASCADE)
verbose_name=_('author'), # 字段显示名(支持国际化)
blank=True, # 表单中允许为空
null=True, # 数据库中允许为null
on_delete=models.CASCADE
)
# 第三方平台的唯一标识如GitHub的OpenID不可重复
openid = models.CharField(max_length=50)
# 第三方平台的用户昵称(支持国际化显示)
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
# 第三方平台的访问令牌(可选,可为空或空白)
token = models.CharField(max_length=150, null=True, blank=True)
# 第三方平台的用户头像URL可选可为空或空白
picture = models.CharField(max_length=350, blank=True, null=True)
# 第三方登录类型(如'weibo'、'github',不可为空)
type = models.CharField(blank=False, null=False, max_length=50)
# 第三方平台的用户邮箱(可选,可为空或空白)
email = models.CharField(max_length=50, null=True, blank=True)
# 存储第三方平台返回的额外元数据(如用户其他信息,可选)
metadata = models.TextField(null=True, blank=True)
# 记录创建时间(默认值为当前时间,支持国际化显示)
creation_time = models.DateTimeField(_('creation time'), default=now)
# 记录最后修改时间(默认值为当前时间,支持国际化显示)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 定义模型实例的字符串表示在Admin后台或打印时显示昵称
def __str__(self):
return self.nickname
# 模型元数据配置
class Meta:
verbose_name = _('oauth user')
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = _('oauth user') # 模型单数显示名(支持国际化)
verbose_name_plural = verbose_name # 模型复数显示名(与单数一致)
ordering = ['-creation_time'] # 默认排序:按创建时间倒序(新记录在前)
# 定义OAuthConfig模型存储第三方登录平台的配置信息如AppKey、AppSecret等
class OAuthConfig(models.Model):
# 定义第三方登录平台的可选类型(元组格式:(数据库存储值, 显示值)
TYPE = (
('weibo', _('weibo')),
('google', _('google')),
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ'),
('weibo', _('weibo')), # 微博(支持国际化)
('google', _('google')), # 谷歌(支持国际化)
('github', 'GitHub'), # GitHub固定显示名
('facebook', 'FaceBook'), # FaceBook固定显示名
('qq', 'QQ'), # QQ固定显示名
)
# 第三方平台类型关联TYPE选项默认值为'a',支持国际化显示)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
# 第三方平台的AppKeyAPI密钥不可为空
appkey = models.CharField(max_length=200, verbose_name='AppKey')
# 第三方平台的AppSecret密钥不可为空
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
# 第三方登录的回调地址授权后跳转URL不可为空默认值为空字符串
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
verbose_name=_('callback url'), # 支持国际化显示
blank=False, # 表单中不允许为空
default=''
)
# 是否启用该第三方登录平台(默认启用,不可为空)
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
_('is enable'), # 支持国际化显示
default=True,
blank=False,
null=False
)
# 配置创建时间(默认值为当前时间,支持国际化显示)
creation_time = models.DateTimeField(_('creation time'), default=now)
# 配置最后修改时间(默认值为当前时间,支持国际化显示)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 自定义数据验证方法(保存前触发,确保同一平台类型只存在一条配置)
def clean(self):
# 筛选条件:平台类型与当前一致,且排除当前记录(编辑时不与自身冲突)
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
# 抛出验证异常:提示该平台类型已存在配置
raise ValidationError(_(self.type + _('already exists')))
# 定义模型实例的字符串表示在Admin后台或打印时显示平台类型
def __str__(self):
return self.type
# 模型元数据配置
class Meta:
verbose_name = 'oauth配置'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = 'oauth配置' # 模型单数显示名(中文固定)
verbose_name_plural = verbose_name # 模型复数显示名(与单数一致)
ordering = ['-creation_time'] # 默认排序:按创建时间倒序(新配置在前)

@ -1,220 +1,247 @@
import json
import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod
import urllib.parse # 用于URL参数编码/解码
from abc import ABCMeta, abstractmethod # 用于定义抽象基类
import requests
import requests # 用于发送HTTP请求获取授权、token、用户信息
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
from djangoblog.utils import cache_decorator # 导入缓存装饰器,优化重复查询
from oauth.models import OAuthUser, OAuthConfig # 导入OAuth相关模型
# 创建当前模块的日志记录器用于记录OAuth流程中的关键信息和错误
logger = logging.getLogger(__name__)
# 自定义异常类用于表示OAuth授权过程中获取token失败的情况
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
'''
# 抽象基类定义所有第三方OAuth管理器的统一接口模板方法模式
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
AUTH_URL = None
"""获取token"""
TOKEN_URL = None
"""获取用户信息"""
API_URL = None
'''icon图标名'''
ICON_NAME = None
"""获取用户授权的抽象基类"""
# 子类需重写的常量第三方平台的授权URL、token获取URL、用户信息API URL、图标名称
AUTH_URL = None # 授权页面URL用户跳转授权的地址
TOKEN_URL = None # 获取access_token的API URL
API_URL = None # 获取用户信息的API URL
ICON_NAME = None # 平台图标标识需与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None):
# 初始化access_token访问令牌和openid第三方平台用户唯一标识
self.access_token = access_token
self.openid = openid
# 属性判断access_token是否已设置
@property
def is_access_token_set(self):
return self.access_token is not None
# 属性判断是否已完成授权需同时拥有有效access_token和openid
@property
def is_authorized(self):
return self.is_access_token_set and self.access_token is not None and self.openid is not None
# 抽象方法生成授权URL子类需实现返回用户跳转的授权链接
@abstractmethod
def get_authorization_url(self, nexturl='/'):
pass
# 抽象方法通过授权码code获取access_token子类需实现完成token交换
@abstractmethod
def get_access_token_by_code(self, code):
pass
# 抽象方法通过access_token获取第三方用户信息子类需实现返回OAuthUser对象
@abstractmethod
def get_oauth_userinfo(self):
pass
# 抽象方法从用户元数据中提取头像URL子类需实现适配不同平台的字段差异
@abstractmethod
def get_picture(self, metadata):
pass
# 通用HTTP GET请求方法封装请求逻辑打印响应日志
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
logger.info(rsp.text) # 记录响应内容,便于调试
return rsp.text
# 通用HTTP POST请求方法封装请求逻辑打印响应日志
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
logger.info(rsp.text) # 记录响应内容,便于调试
return rsp.text
# 获取当前平台的OAuth配置从OAuthConfig模型中查询按ICON_NAME匹配
def get_config(self):
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
return value[0] if value else None # 存在则返回第一条配置否则返回None
# 微博OAuth管理器继承BaseOauthManager实现微博平台的授权逻辑
class WBOauthManager(BaseOauthManager):
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
# 微博平台的固定URL和图标标识
AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # 微博授权页URL
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' # 微博token获取URL
API_URL = 'https://api.weibo.com/2/users/show.json' # 微博用户信息API
ICON_NAME = 'weibo' # 与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None):
# 从配置中获取微博的AppKey、AppSecret、回调地址
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
WBOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
self.client_id = config.appkey if config else '' # 微博开放平台AppKey
self.client_secret = config.appsecret if config else '' # 微博开放平台AppSecret
self.callback_url = config.callback_url if config else ''# 微博授权回调地址
# 调用父类初始化方法设置access_token和openid
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成微博授权URL拼接client_id、响应类型、回调地址等参数
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + nexturl
'response_type': 'code', # 授权类型为code授权码模式
'redirect_uri': self.callback_url + '&next_url=' + nexturl # 回调地址+授权后跳转地址
}
# 拼接URL和参数urllib.parse.urlencode将字典转为URL查询字符串
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取微博access_token并自动获取用户信息
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
'grant_type': 'authorization_code', # 授权模式为授权码模式
'code': code, # 前端获取的授权码
'redirect_uri': self.callback_url # 回调地址(需与平台配置一致)
}
# 发送POST请求获取token响应
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp) # 解析JSON响应
obj = json.loads(rsp)
# 若响应中包含access_token说明获取成功
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid'])
return self.get_oauth_userinfo()
self.access_token = str(obj['access_token']) # 保存access_token
self.openid = str(obj['uid']) # 微博用户唯一标识为uid
return self.get_oauth_userinfo() # 自动获取用户信息并返回
else:
# 获取失败,抛出异常(携带响应内容便于排查)
raise OAuthAccessTokenException(rsp)
# 通过access_token获取微博用户信息返回OAuthUser对象
def get_oauth_userinfo(self):
if not self.is_authorized:
if not self.is_authorized: # 未授权则返回None
return None
# 构造用户信息查询参数需uid和access_token
params = {
'uid': self.openid,
'access_token': self.access_token
}
# 发送GET请求获取用户信息
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['avatar_large']
user.nickname = datas['screen_name']
user.openid = datas['id']
user.type = 'weibo'
user.token = self.access_token
datas = json.loads(rsp) # 解析用户信息JSON
user = OAuthUser() # 创建OAuthUser对象
user.metadata = rsp # 保存原始元数据(便于后续扩展)
user.picture = datas['avatar_large'] # 微博头像URL大尺寸
user.nickname = datas['screen_name'] # 微博昵称
user.openid = datas['id'] # 微博用户唯一ID
user.type = 'weibo' # 平台类型
user.token = self.access_token # 保存access_token
# 若响应中包含邮箱,则赋值(微博需额外申请权限才能获取邮箱)
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
# 捕获异常并记录日志(避免流程崩溃)
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
# 从微博用户元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_large']
return datas['avatar_large'] # 微博头像字段为avatar_large
# 代理混合类Mixin为需要代理的OAuth管理器提供HTTP代理支持
class ProxyManagerMixin:
def __init__(self, *args, **kwargs):
# 从环境变量中读取HTTP代理配置若存在则设置代理
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
"https": os.environ.get("HTTP_PROXY") # HTTPS也使用相同代理
}
else:
self.proxies = None
self.proxies = None # 无代理则为None
# 重写GET方法添加代理参数
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
# 重写POST方法添加代理参数
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
# 谷歌OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+谷歌授权)
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
# 谷歌平台的固定URL和图标标识
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # 谷歌授权页URL
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' # 谷歌token获取URL
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' # 谷歌用户信息API
ICON_NAME = 'google' # 与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None):
# 从配置中获取谷歌的AppKey、AppSecret、回调地址
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GoogleOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
# 调用父类初始化先初始化ProxyManagerMixin再初始化BaseOauthManager
super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成谷歌授权URL需指定openid和email权限
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email',
'response_type': 'code', # 授权码模式
'redirect_uri': self.callback_url, # 回调地址
'scope': 'openid email' # 申请的权限获取用户ID和邮箱
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取谷歌access_token
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
self.openid = str(obj['id_token']) # 谷歌用id_token作为用户唯一标识
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
return self.access_token # 返回access_token后续需手动调用get_oauth_userinfo
else:
raise OAuthAccessTokenException(rsp)
# 通过access_token获取谷歌用户信息
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
@ -223,16 +250,15 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.nickname = datas['name']
user.openid = datas['sub']
user.picture = datas['picture'] # 谷歌头像URL
user.nickname = datas['name'] # 谷歌用户名
user.openid = datas['sub'] # 谷歌用户唯一标识sub字段
user.token = self.access_token
user.type = 'google'
if datas['email']:
if datas['email']: # 谷歌默认返回邮箱(需申请权限)
user.email = datas['email']
return user
except Exception as e:
@ -240,15 +266,18 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
logger.error('google oauth error.rsp:' + rsp)
return None
# 从谷歌元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['picture']
# GitHub OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+GitHub授权
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
# GitHub平台的固定URL和图标标识
AUTH_URL = 'https://github.com/login/oauth/authorize' # GitHub授权页URL
TOKEN_URL = 'https://github.com/login/oauth/access_token' # GitHub token URL
API_URL = 'https://api.github.com/user' # GitHub用户信息API
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
@ -256,55 +285,55 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成GitHub授权URL申请user权限用于获取用户信息
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
'redirect_uri': f'{self.callback_url}&next_url={next_url}', # 回调+跳转地址
'scope': 'user' # GitHub权限获取用户基本信息
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取GitHub access_tokenGitHub返回格式为表单需特殊解析
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# GitHub返回的是表单格式如access_token=xxx&token_type=bearer需用parse_qs解析
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0])
self.access_token = (r['access_token'][0]) # parse_qs返回列表取第一个元素
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
# 通过access_token获取GitHub用户信息GitHub需在Header中携带token
def get_oauth_userinfo(self):
# GitHub API要求在Header中用Authorization: token xxx传递token
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.nickname = datas['name']
user.openid = datas['id']
user.picture = datas['avatar_url'] # GitHub头像URL
user.nickname = datas['name'] or datas['login'] # 优先用name无则用login用户名
user.openid = datas['id'] # GitHub用户唯一ID数字
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
# GitHub邮箱可能为空用户未公开需判断
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
@ -313,15 +342,18 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
logger.error('github oauth error.rsp:' + rsp)
return None
# 从GitHub元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_url']
# Facebook OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+Facebook授权
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
# Facebook平台的固定URL和图标标识注意API版本号v16.0
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # Facebook授权页URL
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' # Facebook token URL
API_URL = 'https://graph.facebook.com/me' # Facebook用户信息API
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
@ -329,34 +361,31 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成Facebook授权URL申请email和public_profile权限
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
'scope': 'email,public_profile' # 申请邮箱和公开资料权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取Facebook access_tokenFacebook无需显式grant_type
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
# 'grant_type': 'authorization_code', # Facebook可省略该参数
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
@ -364,7 +393,9 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
else:
raise OAuthAccessTokenException(rsp)
# 通过access_token获取Facebook用户信息需指定fields参数否则返回默认字段
def get_oauth_userinfo(self):
# Facebook需显式指定要获取的字段id、name、picture、email
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
@ -373,13 +404,15 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name']
user.openid = datas['id']
user.nickname = datas['name'] # Facebook用户名
user.openid = datas['id'] # Facebook用户唯一ID
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
# 处理邮箱(可能为空,用户未公开)
if 'email' in datas and datas['email']:
user.email = datas['email']
# 处理头像Facebook头像嵌套在picture.data.url中
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
@ -387,29 +420,29 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
logger.error(e)
return None
# 从Facebook元数据中提取头像URL处理嵌套结构
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
# QQ OAuth管理器继承BaseOauthManager实现QQ平台的授权逻辑
class QQOauthManager(BaseOauthManager):
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
# QQ平台的固定URL和图标标识QQ需额外请求openid接口
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # QQ授权页URL
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' # QQ token获取URL
API_URL = 'https://graph.qq.com/user/get_user_info' # QQ用户信息API
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ openid获取URLQQ特有
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.client_id = config.appkey if config else '' # QQ的AppID
self.client_secret = config.appsecret if config else '' # QQ的AppKey
self.callback_url = config.callback_url if config else ''
super(
QQOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成QQ授权URL
def get_authorization_url(self, next_url='/'):
params = {
'response_type': 'code',
@ -419,6 +452,7 @@ class QQOauthManager(BaseOauthManager):
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取QQ access_tokenQQ返回格式为表单
def get_access_token_by_code(self, code):
params = {
'grant_type': 'authorization_code',
@ -427,16 +461,18 @@ class QQOauthManager(BaseOauthManager):
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_get(self.TOKEN_URL, params)
rsp = self.do_get(self.TOKEN_URL, params) # QQ的token接口用GET请求
if rsp:
# QQ返回表单格式如access_token=xxx&expires_in=7776000&refresh_token=xxx
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0]
self.access_token = token[0] # 取列表第一个元素
return token
else:
raise OAuthAccessTokenException(rsp)
# QQ特有通过access_token获取openidQQ的openid需单独请求接口
def get_open_id(self):
if self.is_access_token_set:
params = {
@ -444,18 +480,18 @@ class QQOauthManager(BaseOauthManager):
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
rsp = rsp.replace(
'callback(', '').replace(
')', '').replace(
';', '')
# QQ返回格式为callback({"client_id":"xxx","openid":"xxx"}); 需处理格式
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
# 通过access_token和openid获取QQ用户信息
def get_oauth_userinfo(self):
openid = self.get_open_id()
openid = self.get_open_id() # 先获取openid
if openid:
# QQ用户信息接口需传递access_token、oauth_consumer_key即AppID、openid
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
@ -465,40 +501,47 @@ class QQOauthManager(BaseOauthManager):
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname']
user.openid = openid
user.nickname = obj['nickname'] # QQ昵称
user.openid = openid # QQ openid
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
# 处理邮箱QQ需额外申请权限可能为空
if 'email' in obj:
user.email = obj['email']
# 处理头像QQ头像字段为figureurl
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
return user
# 从QQ元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60)
# 获取所有已启用的OAuth应用带缓存100分钟过期减少数据库查询
@cache_decorator(expiration=100 * 60) # 缓存100分钟100*60秒
def get_oauth_apps():
# 1. 查询所有已启用的OAuth配置is_enable=True
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
return [] # 无配置则返回空列表
# 2. 提取已启用的平台类型(如['weibo', 'github']
configtypes = [x.type for x in configs]
# 3. 获取所有BaseOauthManager的子类即所有平台的管理器
applications = BaseOauthManager.__subclasses__()
# 4. 筛选出已启用的管理器ICON_NAME在configtypes中
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
# 根据平台类型获取对应的OAuth管理器
def get_manager_by_type(type):
applications = get_oauth_apps()
applications = get_oauth_apps() # 获取已启用的应用
if applications:
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
# 筛选出类型匹配的管理器(不区分大小写)
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0]
return None
return finds[0] # 返回第一个匹配的管理器
return None # 无匹配则返回None

@ -1,22 +1,36 @@
# 导入Django模板相关模块和URL反转工具
from django import template
from django.urls import reverse
# 导入获取OAuth应用列表的工具函数
from oauth.oauthmanager import get_oauth_apps
# 注册一个模板库实例,用于注册自定义模板标签
register = template.Library()
# 注册一个包含型模板标签inclusion tag指定模板文件为'oauth/oauth_applications.html'
# 该标签会将处理结果传递给指定模板,再将模板渲染后的内容嵌入到调用位置
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
# 获取所有已配置的OAuth应用如微博、GitHub等
applications = get_oauth_apps()
if applications:
# 生成OAuth登录的基础URL通过reverse反转'oauth:oauthlogin'命名URL
baseurl = reverse('oauth:oauthlogin')
# 获取当前请求的完整路径(用于登录后跳转回原页面)
path = request.get_full_path()
# 处理应用列表:将每个应用转换为(图标名称, 登录URL)的元组
# 登录URL格式为"基础URL?type=应用图标名&next_url=当前路径"
apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications))
else:
# 若没有配置OAuth应用返回空列表
apps = []
# 将处理后的应用列表传递给模板
return {
'apps': apps
}
}

@ -1,129 +1,171 @@
import json
from unittest.mock import patch
from unittest.mock import patch # 用于用于模拟函数/方法的返回值,隔离外部依赖
from django.conf import settings
from django.contrib import auth
from django.test import Client, RequestFactory, TestCase
from django.urls import reverse
from django.contrib import auth # 用于用户认证相关操作
from django.test import Client, RequestFactory, TestCase # Django测试工具
from django.urls import reverse # 用于反向解析URL
from djangoblog.utils import get_sha256
from oauth.models import OAuthConfig
from oauth.oauthmanager import BaseOauthManager
from djangoblog.utils import get_sha256 # 导入SHA256加密工具函数
from oauth.models import OAuthConfig # 导入OAuth配置模型
from oauth.oauthmanager import BaseOauthManager # 导入OAuth管理器基类
# Create your tests here.
# 测试用例测试OAuth配置及基础登录流程
class OAuthConfigTest(TestCase):
def setUp(self):
# 初始化测试客户端(模拟用户请求)和请求工厂(构建请求对象)
self.client = Client()
self.factory = RequestFactory()
def test_oauth_login_test(self):
# 创建一条微博OAuth配置记录
c = OAuthConfig()
c.type = 'weibo'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
c.type = 'weibo' # 平台类型为微博
c.appkey = 'appkey' # 模拟AppKey
c.appsecret = 'appsecret' # 模拟AppSecret
c.save() # 保存到数据库
# 测试访问微博OAuth登录URL是否正常跳转
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
self.assertEqual(response.status_code, 302) # 期望302重定向
self.assertTrue("api.weibo.com" in response.url) # 重定向地址应包含微博API域名
# 测试授权回调处理是否正常
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
self.assertEqual(response.status_code, 302) # 期望302重定向
self.assertEqual(response.url, '/') # 回调后默认跳转首页
# 测试用例测试各平台OAuth登录流程微博、谷歌、GitHub、Facebook、QQ
class OauthLoginTest(TestCase):
def setUp(self) -> None:
# 初始化测试客户端和请求工厂
self.client = Client()
self.factory = RequestFactory()
# 初始化所有平台的OAuth配置
self.apps = self.init_apps()
def init_apps(self):
# 获取所有OAuth管理器子类各平台实现并创建对应配置
applications = [p() for p in BaseOauthManager.__subclasses__()]
for application in applications:
c = OAuthConfig()
c.type = application.ICON_NAME.lower()
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
c.type = application.ICON_NAME.lower() # 平台类型与管理器图标名一致
c.appkey = 'appkey' # 统一模拟AppKey
c.appsecret = 'appsecret' # 统一模拟AppSecret
c.save() # 保存配置
return applications
def get_app_by_type(self, type):
# 根据平台类型获取对应的OAuth管理器实例
for app in self.apps:
if app.ICON_NAME.lower() == type:
return app
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
# 测试微博登录流程使用mock模拟HTTP请求返回值
@patch("oauth.oauthmanager.WBOauthManager.do_post") # 模拟POST请求
@patch("oauth.oauthmanager.WBOauthManager.do_get") # 模拟GET请求
def test_weibo_login(self, mock_do_get, mock_do_post):
# 获取微博管理器实例
weibo_app = self.get_app_by_type('weibo')
assert weibo_app
assert weibo_app # 确保实例存在
# 生成授权URL测试URL生成逻辑
url = weibo_app.get_authorization_url()
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 模拟获取token的响应返回access_token和uid
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟获取用户信息的响应
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
"id": "id",
"email": "email",
"avatar_large": "avatar_large", # 头像URL
"screen_name": "screen_name", # 昵称
"id": "id", # 用户ID
"email": "email", # 邮箱
})
# 测试通过code获取用户信息的流程
userinfo = weibo_app.get_access_token_by_code('code')
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
# 验证返回的用户信息是否正确
self.assertEqual(userinfo.token, 'access_token') # token匹配
self.assertEqual(userinfo.openid, 'id') # openid匹配
# 测试谷歌登录流程
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
google_app = self.get_app_by_type('google')
assert google_app
# 生成授权URL
url = google_app.get_authorization_url()
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
"id_token": "id_token" # 谷歌用id_token作为openid
})
# 模拟获取用户信息的响应
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
"sub": "sub",
"email": "email",
"picture": "picture", # 头像
"name": "name", # 姓名
"sub": "sub", # 唯一标识
"email": "email" # 邮箱
})
# 测试获取token和用户信息
token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
# 测试GitHub登录流程
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
github_app = self.get_app_by_type('github')
assert github_app
# 生成授权URL并验证包含GitHub域名和client_id
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
# 模拟GitHub返回的token表单格式
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
# 模拟用户信息响应
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"name": "name",
"id": "id",
"email": "email",
"avatar_url": "avatar_url", # 头像
"name": "name", # 姓名
"id": "id", # 唯一ID
"email": "email" # 邮箱
})
# 测试获取token和用户信息
token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id')
# 测试Facebook登录流程
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
# 生成授权URL并验证包含Facebook域名
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"access_token": "access_token"
})
# 模拟用户信息响应(头像为嵌套结构)
mock_do_get.return_value = json.dumps({
"name": "name",
"id": "id",
@ -134,14 +176,18 @@ class OauthLoginTest(TestCase):
}
}
})
# 测试获取token和用户信息
token = facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
# 测试QQ登录流程QQ需额外获取openid
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
'access_token=access_token&expires_in=3600',
'callback({"client_id":"appid","openid":"openid"} );',
json.dumps({
# 模拟三次GET请求的返回值按调用顺序
'access_token=access_token&expires_in=3600', # 获取token的响应表单格式
'callback({"client_id":"appid","openid":"openid"} );', # 获取openid的响应
json.dumps({ # 获取用户信息的响应
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
@ -151,19 +197,26 @@ class OauthLoginTest(TestCase):
def test_qq_login(self, mock_do_get):
qq_app = self.get_app_by_type('qq')
assert qq_app
# 生成授权URL并验证包含QQ域名
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
# 测试获取token和用户信息
token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
# 测试微博登录(包含邮箱,可直接绑定用户)
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟包含邮箱的用户信息
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
@ -172,38 +225,46 @@ class OauthLoginTest(TestCase):
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 测试访问登录URL是否重定向到微博
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调后是否跳转首页
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
# 验证用户已登录且信息正确
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
self.client.logout()
# 测试登出后再次登录是否正常
self.client.logout()
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
# 再次验证用户登录状态
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
# 测试微博登录(无邮箱,需补充邮箱并绑定)
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟不含邮箱的用户信息
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
@ -211,39 +272,41 @@ class OauthLoginTest(TestCase):
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 测试访问登录URL
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调后是否跳转邮箱补充页面
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
# 提取OAuth用户ID从跳转URL中解析
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
# 测试提交邮箱表单
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302)
sign = get_sha256(settings.SECRET_KEY +
str(oauth_user_id) + settings.SECRET_KEY)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauth_user_id,
})
# 生成验证签名(用于邮箱验证)
sign = get_sha256(settings.SECRET_KEY + str(oauth_user_id) + settings.SECRET_KEY)
# 验证跳转URL是否正确绑定成功页
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauth_user_id})
self.assertEqual(response.url, f'{url}?type=email')
path = reverse('oauth:email_confirm', kwargs={
'id': oauth_user_id,
'sign': sign
})
# 测试邮箱验证链接
path = reverse('oauth:email_confirm', kwargs={'id': oauth_user_id, 'sign': sign})
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
# 验证用户已绑定且信息正确
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com')
self.assertEqual(oauth_user.pk, oauth_user_id)
self.assertEqual(user.email, 'test@gmail.com') # 验证补充的邮箱
self.assertEqual(oauth_user.pk, oauth_user_id) # 验证关联的OAuth用户ID

@ -1,25 +1,49 @@
# 导入Django的URL路径配置工具和当前应用的视图函数/类
from django.urls import path
from . import views
# 定义应用的命名空间为"oauth",用于在模板或视图中通过"命名空间:URL名称"的方式反向解析URL
app_name = "oauth"
# 配置当前应用的URL路由列表每个path对应一个视图处理逻辑
urlpatterns = [
# 1. OAuth授权回调处理URL
# 路径:/oauth/authorize第三方平台授权后会跳转到该地址
# 由views.authorize视图函数处理负责解析授权码、获取用户信息、绑定本地用户等
path(
r'oauth/authorize',
views.authorize),
# 2. 补充邮箱页面URL命名为"require_email"
# 路径:/oauth/requireemail/[oauthid].html[oauthid]为OAuthUser的ID整数类型
# 由views.RequireEmailView类视图的as_view()方法处理(用于第三方登录无邮箱时,让用户补充邮箱)
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
# 3. 邮箱验证URL命名为"email_confirm"
# 路径:/oauth/emailconfirm/[id]/[sign].html[id]为OAuthUser的ID[sign]为验证签名)
# 由views.emailconfirm视图函数处理验证邮箱归属完成本地用户绑定
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
# 4. 绑定成功页面URL命名为"bindsuccess"
# 路径:/oauth/bindsuccess/[oauthid].html[oauthid]为OAuthUser的ID
# 由views.bindsuccess视图函数处理展示第三方登录与本地用户绑定成功的结果
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
# 5. OAuth登录入口URL命名为"oauthlogin"
# 路径:/oauth/oauthlogin前端点击第三方登录按钮时跳转到该地址
# 由views.oauthlogin视图函数处理生成第三方平台的授权链接引导用户跳转授权
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')]
name='oauthlogin')
]

@ -1,155 +1,216 @@
import logging
# Create your views here.
from urllib.parse import urlparse
# 定义视图函数(处理用户请求的逻辑)
from urllib.parse import urlparse # 用于解析URL验证跳转地址合法性
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth import login
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.http import HttpResponseForbidden
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.shortcuts import render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView
from djangoblog.blog_signals import oauth_user_login_signal
from djangoblog.utils import get_current_site
from djangoblog.utils import send_email, get_sha256
from oauth.forms import RequireEmailForm
from .models import OAuthUser
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
from django.contrib.auth import get_user_model # 获取自定义用户模型
from django.contrib.auth import login # 用于用户登录
from django.core.exceptions import ObjectDoesNotExist # 用于捕获对象不存在异常
from django.db import transaction # 用于数据库事务,确保操作原子性
from django.http import HttpResponseForbidden # 用于返回403禁止访问响应
from django.http import HttpResponseRedirect # 用于重定向响应
from django.shortcuts import get_object_or_404 # 获取对象不存在则返回404
from django.shortcuts import render # 用于渲染模板
from django.urls import reverse # 用于反向解析URL
from django.utils import timezone # 用于获取当前时间
from django.utils.translation import gettext_lazy as _ # 用于国际化翻译
from django.views.generic import FormView # 基于类的表单视图
from djangoblog.blog_signals import oauth_user_login_signal # 导入OAuth登录信号
from djangoblog.utils import get_current_site # 获取当前网站域名
from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和SHA256加密工具
from oauth.forms import RequireEmailForm # 导入补充邮箱表单
from .models import OAuthUser # 导入OAuth用户模型
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常
# 创建当前模块的日志记录器
logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""
处理授权后的跳转URL确保安全性只允许本站内跳转
"""
# 从请求参数中获取跳转地址默认值为None
nexturl = request.GET.get('next_url', None)
# 过滤非法跳转地址(如登录页)
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
# 解析URL检查是否为本站域名
p = urlparse(nexturl)
if p.netloc:
site = get_current_site().domain
if p.netloc: # 若URL包含域名
site = get_current_site().domain # 获取当前网站域名
# 去除www.前缀后比较,确保跳转地址为本站
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl)
return "/"
return nexturl
logger.info('非法url:' + nexturl) # 记录非法URL日志
return "/" # 非法则跳转到首页
return nexturl # 返回合法的跳转地址
def oauthlogin(request):
"""
OAuth登录入口生成第三方平台的授权链接引导用户跳转
"""
# 获取请求中的平台类型(如'weibo'、'github'
type = request.GET.get('type', None)
if not type:
if not type: # 若未指定平台类型,跳转到首页
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager:
if not manager: # 若管理器不存在,跳转到首页
return HttpResponseRedirect('/')
# 获取安全的跳转地址(授权后返回的页面)
nexturl = get_redirecturl(request)
# 生成第三方平台的授权URL
authorizeurl = manager.get_authorization_url(nexturl)
# 重定向到第三方平台的授权页面
return HttpResponseRedirect(authorizeurl)
def authorize(request):
"""
OAuth授权回调处理解析授权码获取用户信息绑定本地用户
"""
# 获取平台类型
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
# 获取授权码code第三方平台返回
code = request.GET.get('code', None)
try:
# 使用授权码获取access_token
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
# 捕获token获取失败异常记录日志并跳转首页
logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/')
except Exception as e:
# 捕获其他异常,记录日志
logger.error(e)
rsp = None
# 获取授权后的跳转地址
nexturl = get_redirecturl(request)
if not rsp:
if not rsp: # 若获取token失败重新生成授权链接并重定向
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
# 通过token获取第三方用户信息
user = manager.get_oauth_userinfo()
if user:
if user: # 若成功获取用户信息
# 处理空昵称生成默认昵称djangoblog+时间戳)
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
# 检查该第三方用户是否已存在(避免重复创建)
try:
temp = OAuthUser.objects.get(type=type, openid=user.openid)
# 更新已有用户的头像、元数据、昵称
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
user = temp # 使用已有用户对象
except ObjectDoesNotExist:
# 若不存在,则使用新创建的用户对象
pass
# facebook的token过长
# Facebook的token过长这里清空根据实际需求处理
if type == 'facebook':
user.token = ''
# 若用户信息包含邮箱,直接绑定本地用户
if user.email:
with transaction.atomic():
with transaction.atomic(): # 数据库事务:确保操作原子性
author = None
# 尝试获取已关联的本地用户
try:
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
# 若未关联本地用户,则创建或获取本地用户
if not author:
# 按邮箱查询或创建本地用户
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]:
try:
# 检查昵称是否已存在
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
# 昵称不存在,直接使用
author.username = user.nickname
else:
# 昵称已存在,生成默认用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize'
author.save()
author.source = 'authorize' # 标记来源为OAuth授权
author.save() # 保存用户信息
# 关联本地用户到OAuthUser
user.author = author
user.save()
# 发送OAuth用户登录信号供其他模块监听处理
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
# 登录用户(创建会话)
login(request, author)
# 重定向到授权前的页面
return HttpResponseRedirect(nexturl)
else:
# 若用户信息不含邮箱保存OAuthUser并跳转到补充邮箱页面
user.save()
url = reverse('oauth:require_email', kwargs={
'oauthid': user.id
})
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
else:
# 若未获取到用户信息,跳转到授权前的页面
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
if not sign:
return HttpResponseForbidden()
if not get_sha256(settings.SECRET_KEY +
str(id) +
settings.SECRET_KEY).upper() == sign.upper():
"""
邮箱验证验证签名合法性完成本地用户绑定
"""
if not sign: # 签名为空返回403禁止访问
return HttpResponseForbidden()
# 验证签名使用SECRET_KEY+ID+SECRET_KEY生成SHA256签名
if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden() # 签名不匹配返回403
# 获取对应的OAuthUser对象不存在则返回404
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
with transaction.atomic(): # 数据库事务
# 检查是否已关联本地用户
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
# 按邮箱查询或创建本地用户
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]:
author.source = 'emailconfirm'
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip(
) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'emailconfirm' # 标记来源为邮箱验证
# 设置用户名为OAuth用户的昵称为空则生成默认值
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save()
# 关联本地用户到OAuthUser
oauthuser.author = author
oauthuser.save()
# 发送OAuth用户登录信号
oauth_user_login_signal.send(
sender=emailconfirm.__class__,
id=oauthuser.id)
sender=emailconfirm.__class__, id=oauthuser.id)
# 登录用户
login(request, author)
# 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
@ -161,59 +222,63 @@ def emailconfirm(request, id, sign):
If the link above cannot be opened, please copy this link to your browser.
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': id
})
# 重定向到绑定成功页面
url = reverse('oauth:bindsuccess', kwargs={'oauthid': id})
url = url + '?type=success'
return HttpResponseRedirect(url)
class RequireEmailView(FormView):
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
"""
补充邮箱视图处理用户补充邮箱的表单提交
"""
form_class = RequireEmailForm # 使用的表单类
template_name = 'oauth/require_email.html' # 渲染的模板
def get(self, request, *args, **kwargs):
oauthid = self.kwargs['oauthid']
"""处理GET请求展示补充邮箱表单"""
oauthid = self.kwargs['oauthid'] # 从URL中获取OAuthUser的ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email:
pass
# return HttpResponseRedirect('/')
# 若已存在邮箱,可在此处添加逻辑(如跳转到首页)
# if oauthuser.email:
# return HttpResponseRedirect('/')
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""设置表单初始值填充oauthid"""
oauthid = self.kwargs['oauthid']
return {
'email': '',
'oauthid': oauthid
}
return {'email': '', 'oauthid': oauthid}
def get_context_data(self, **kwargs):
def get_context_data(self,** kwargs):
"""向模板传递额外上下文:用户头像"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
kwargs['picture'] = oauthuser.picture # 传递头像URL
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
"""处理表单验证通过的逻辑:保存邮箱并发送验证邮件"""
email = form.cleaned_data['email'] # 获取清洗后的邮箱
oauthid = form.cleaned_data['oauthid'] # 获取OAuthUser的ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# 保存用户邮箱
oauthuser.email = email
oauthuser.save()
sign = get_sha256(settings.SECRET_KEY +
str(oauthuser.id) + settings.SECRET_KEY)
# 生成邮箱验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
# 获取当前网站域名开发环境使用127.0.0.1:8000
site = get_current_site().domain
if settings.DEBUG:
site = '127.0.0.1:8000'
path = reverse('oauth:email_confirm', kwargs={
'id': oauthid,
'sign': sign
})
# 生成邮箱验证链接
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = "http://{site}{path}".format(site=site, path=path)
# 发送邮箱验证邮件
content = _("""
<p>Please click the link below to bind your email</p>
@ -226,16 +291,21 @@ class RequireEmailView(FormView):
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauthid
})
# 重定向到绑定成功页面(提示用户查收邮件)
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid})
url = url + '?type=email'
return HttpResponseRedirect(url)
def bindsuccess(request, oauthid):
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
"""
绑定成功页面展示绑定结果信息
"""
type = request.GET.get('type', None) # 获取类型email待验证success已验证
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) # 获取对应的OAuthUser
# 根据类型设置页面标题和内容
if type == 'email':
title = _('Bind your email')
content = _(
@ -247,7 +317,9 @@ def bindsuccess(request, oauthid):
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
# 渲染绑定成功页面
return render(request, 'oauth/bindsuccess.html', {
'title': title,
'content': content
})
})

@ -1 +0,0 @@
print('hello world')
Loading…
Cancel
Save