Compare commits

...

28 Commits

Author SHA1 Message Date
pig6z2klp 41bc2df376 ADD file via upload
4 months ago
pig6z2klp 5570c8d063 ADD file via upload
4 months ago
pig6z2klp d52903716e ADD file via upload
4 months ago
pig6z2klp 81f868eda9 ADD file via upload
4 months ago
pig6z2klp 56ddc13bba Update views.py
5 months ago
pig6z2klp 1f81dc6ca5 Update urls.py
5 months ago
pig6z2klp 3fdb2da6ca Update tests.py
5 months ago
pig6z2klp 34edc2603e Update oauthmanager.py
5 months ago
pig6z2klp 8424d77c1a Update models.py
5 months ago
pig6z2klp e9fba9f801 Update forms.py
5 months ago
pig6z2klp 2e26c7eae0 Update apps.py
5 months ago
pig6z2klp fc62a702ee Update admin.py
5 months ago
pig6z2klp 60041ede63 Update oauth_tags.py
5 months ago
pig6z2klp 700ed4dd52 Update 0002_alter_oauthconfig_options_alter_oauthuser_options_and_more.py
5 months ago
pig6z2klp aae731429d Update 0001_initial.py
5 months ago
pig6z2klp 502d9389a6 Update k8s-en.md
5 months ago
pig6z2klp 9b4149c43e Update es.md
5 months ago
pig6z2klp d61d928eef Update docker-en.md
5 months ago
pig6z2klp 9dcc881fd4 Update config.md
5 months ago
pig6z2klp 0f6ec0ee81 Update config-en.md
5 months ago
pig6z2klp 14241705d0 ADD file via upload
5 months ago
pig6z2klp eb777cd625 Update README-en.md
5 months ago
pig6z2klp 9c2ccf8f8a ADD file via upload
5 months ago
pig6z2klp fbb0a01079 ADD file via upload
5 months ago
plhw57tbe 20eea088eb Merge pull request '合并源代码' (#1) from develop into txb_branch
5 months ago
txb 3c06c750d4 在src目录添加main.py文件
5 months ago
smy 192f27dd24 在master分支添加doc目录及README.md文档
5 months ago
smy 3fac0dcd23 删除master分支中的README.md文件
5 months ago

Binary file not shown.

@ -47,32 +47,48 @@ DjangoBlog is a high-performance blog platform built with Python 3.10 and Django
Ensure you have Python 3.10+ and MySQL/MariaDB installed on your system. Ensure you have Python 3.10+ and MySQL/MariaDB installed on your system.
### 2. Clone & Installation ### 2. Clone & Installation
```
```bash # 将项目克隆到本地机器
# Clone the project to your local machine
git clone https://github.com/liangliangyy/DjangoBlog.git git clone https://github.com/liangliangyy/DjangoBlog.git
# 进入项目目录切换到DjangoBlog文件夹
cd DjangoBlog cd DjangoBlog
# Install dependencies # 安装项目所需的依赖包
pip install -r requirements.txt pip install -r requirements.txt
``` ```
### 3. Project Configuration ### 3. Project Configuration
- **Database**: - **Database**:
Open `djangoblog/settings.py`, locate the `DATABASES` section, and update it with your MySQL connection details. Open `djangoblog/settings.py`, locate the `DATABASES` section, and update it with your MySQL connection details.
```python ```python
DATABASES = { # 数据库配置字典Django通过此配置连接数据库
'default': { DATABASES = {
'ENGINE': 'django.db.backends.mysql', # 默认数据库配置
'NAME': 'djangoblog', 'default': {
'USER': 'root', # 数据库引擎这里使用MySQL数据库后端
'PASSWORD': 'your_password', # 'django.db.backends.mysql'表示使用MySQL数据库
'HOST': '127.0.0.1', # 其他可选值包括sqlite3、postgresql、oracle等
'PORT': 3306, 'ENGINE': 'django.db.backends.mysql',
}
} # 数据库名称需要提前在MySQL中创建该数据库
'NAME': 'djangoblog',
# 数据库登录用户名
'USER': 'root',
# 数据库登录密码,需要替换为你的实际密码
'PASSWORD': 'your_password',
# 数据库主机地址127.0.0.1表示本地数据库
# 若数据库在远程服务器可改为对应IP地址或域名
'HOST': '127.0.0.1',
# 数据库端口MySQL默认端口为3306
'PORT': 3306,
}
}
``` ```
Create the database in MySQL: Create the database in MySQL:
```sql ```sql
@ -82,27 +98,38 @@ pip install -r requirements.txt
- **More Configurations**: - **More Configurations**:
For advanced settings such as email, OAuth, caching, and more, please refer to our [Detailed Configuration Guide](/docs/config-en.md). For advanced settings such as email, OAuth, caching, and more, please refer to our [Detailed Configuration Guide](/docs/config-en.md).
```
### 4. Database Initialization ### 4. Database Initialization
```bash # 生成数据库迁移文件
# 根据模型的变更创建迁移脚本,记录数据模型的修改
python manage.py makemigrations python manage.py makemigrations
# 执行数据库迁移
# 将迁移文件中定义的变更应用到实际数据库中,创建或修改表结构
python manage.py migrate python manage.py migrate
# Create a superuser account # Create a superuser account 创建超级用户账号
# 生成可以登录Django管理后台的超级用户管理员用于管理网站内容
python manage.py createsuperuser python manage.py createsuperuser
``` ```
### 5. Running the Project ### 5. Running the Project
```bash ```
# (Optional) Generate some test data # (可选)生成一些测试数据
# 为项目创建示例数据(如测试文章、用户等),方便开发和测试时查看效果
python manage.py create_testdata python manage.py create_testdata
# (Optional) Collect and compress static files # (可选)收集并压缩静态文件
# 收集项目中所有静态文件CSS、JS、图片等到指定目录--noinput参数表示无需手动确认
python manage.py collectstatic --noinput python manage.py collectstatic --noinput
# 压缩静态文件如CSS、JS以减小文件体积提高加载速度--force参数强制覆盖已有文件
python manage.py compress --force python manage.py compress --force
# Start the development server # 启动开发服务器
# 启动Django内置的开发用Web服务器默认地址为http://127.0.0.1:8000/
# 开发过程中修改代码后服务器会自动重启,方便调试
python manage.py runserver python manage.py runserver
``` ```

@ -2,22 +2,32 @@
## Cache: ## Cache:
Cache using `memcache` for default. If you don't have `memcache` environment, you can remove the `default` setting in `CACHES` and change `locmemcache` to `default`. Cache using `memcache` for default. If you don't have `memcache` environment, you can remove the `default` setting in `CACHES` and change `locmemcache` to `default`.
```python ```
# Django 缓存配置字典,用于定义不同的缓存后端及相关参数
CACHES = { CACHES = {
# 默认缓存配置,键名为'default'
'default': { 'default': {
# 缓存后端类型使用Memcached作为缓存服务器
'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache', 'BACKEND': 'django.core.cache.backends.memcached.MemcachedCache',
# Memcached服务器地址和端口这里使用本地的11211端口Memcached默认端口
'LOCATION': '127.0.0.1:11211', 'LOCATION': '127.0.0.1:11211',
# 缓存键的前缀如果处于测试环境TESTING为True则使用'django_test',否则使用'djangoblog'
# 用于区分不同环境或项目的缓存键,避免冲突
'KEY_PREFIX': 'django_test' if TESTING else 'djangoblog', 'KEY_PREFIX': 'django_test' if TESTING else 'djangoblog',
# 缓存超时时间单位这里设置为10小时60秒*60分*10小时
'TIMEOUT': 60 * 60 * 10 'TIMEOUT': 60 * 60 * 10
}, },
# 本地内存缓存配置,键名为'locmemcache'
'locmemcache': { 'locmemcache': {
# 缓存后端类型:使用本地内存作为缓存(仅当前进程有效,多进程不共享)
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
# 缓存超时时间单位这里设置为3小时10800秒 = 60*60*3
'TIMEOUT': 10800, 'TIMEOUT': 10800,
# 本地内存缓存的唯一标识,用于在同一进程中区分不同的本地缓存实例
'LOCATION': 'unique-snowflake', 'LOCATION': 'unique-snowflake',
} }
} }
``` ```
## OAuth Login: ## OAuth Login:
QQ, Weibo, Google, GitHub and Facebook are now supported for OAuth login. Fetch OAuth login permissions from the corresponding open platform, and save them with `appkey`, `appsecret` and callback address in **Backend->OAuth** configuration. QQ, Weibo, Google, GitHub and Facebook are now supported for OAuth login. Fetch OAuth login permissions from the corresponding open platform, and save them with `appkey`, `appsecret` and callback address in **Backend->OAuth** configuration.
@ -32,11 +42,25 @@ owntracks is a location tracking application. It will send your locaiton to the
## Email feature: ## Email feature:
Same as before, Configure your own error msg recvie email information with`ADMINS = [('liangliang', 'liangliangyy@gmail.com')]` in `settings.py`. And modify: Same as before, Configure your own error msg recvie email information with`ADMINS = [('liangliang', 'liangliangyy@gmail.com')]` in `settings.py`. And modify:
```python ```python
# 邮件发送服务器的SMTP地址这里使用Zoho的SMTP服务器
EMAIL_HOST = 'smtp.zoho.com' EMAIL_HOST = 'smtp.zoho.com'
# 邮件发送服务器的端口号587是TLS加密的常用端口
EMAIL_PORT = 587 EMAIL_PORT = 587
# 发送邮件的账号用户名,从环境变量中获取(避免硬编码敏感信息)
# 环境变量键为'DJANGO_EMAIL_USER'
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER') EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
# 发送邮件的账号密码(或授权码),从环境变量中获取(安全存储敏感信息)
# 环境变量键为'DJANGO_EMAIL_PASSWORD'
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD') EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
# 默认的发件人邮箱地址,这里直接使用上面配置的邮件账号
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 服务器错误通知的发件人邮箱例如500错误时发送给管理员的邮件
# 这里同样使用环境变量中配置的邮件账号
SERVER_EMAIL = os.environ.get('DJANGO_EMAIL_USER') SERVER_EMAIL = os.environ.get('DJANGO_EMAIL_USER')
``` ```
with your email account information. with your email account information.

@ -21,11 +21,25 @@ owntracks是一个位置追踪软件可以定时的将你的坐标提交到
## 邮件功能: ## 邮件功能:
同样,将`settings.py`中的`ADMINS = [('liangliang', 'liangliangyy@gmail.com')]`配置为你自己的错误接收邮箱,另外修改: 同样,将`settings.py`中的`ADMINS = [('liangliang', 'liangliangyy@gmail.com')]`配置为你自己的错误接收邮箱,另外修改:
```python ```python
# 配置邮件发送所使用的SMTP服务器地址此处使用Zoho的SMTP服务器
EMAIL_HOST = 'smtp.zoho.com' EMAIL_HOST = 'smtp.zoho.com'
# 配置SMTP服务器的端口号587是用于TLS加密连接的标准端口
EMAIL_PORT = 587 EMAIL_PORT = 587
# 发送邮件的账号用户名,通过环境变量获取(避免将敏感信息硬编码在代码中)
# 对应的环境变量为'DJANGO_EMAIL_USER'
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER') EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
# 发送邮件的账号密码(或授权码),同样通过环境变量获取(增强安全性)
# 对应的环境变量为'DJANGO_EMAIL_PASSWORD'
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD') EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
# 设置默认的发件人邮箱地址,这里直接使用上面配置的邮件账号作为发件人
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
# 配置服务器发送错误通知时使用的发件人邮箱(如系统发生错误时向管理员发送通知)
# 此处与发送邮件的账号保持一致
SERVER_EMAIL = os.environ.get('DJANGO_EMAIL_USER') SERVER_EMAIL = os.environ.get('DJANGO_EMAIL_USER')
``` ```
为你自己的邮箱配置。 为你自己的邮箱配置。

@ -21,7 +21,10 @@ This is the simplest and most recommended way to deploy. It automatically create
From the project's root directory, run the following command: From the project's root directory, run the following command:
```bash ```bash
# Build and start the containers in detached mode (includes Django app and MySQL) # 构建并以分离模式(后台运行)启动容器
# 包含Django应用和MySQL等服务具体服务取决于docker-compose配置文件
# -d: 表示detached mode分离模式容器将在后台运行不占用当前终端
# --build: 表示在启动前重新构建所有服务的镜像(确保使用最新的代码和配置)
docker-compose up -d --build docker-compose up -d --build
``` ```
@ -35,7 +38,13 @@ docker-compose up -d --build
If you want to use Elasticsearch for more powerful full-text search capabilities, you can include the `docker-compose.es.yml` configuration file: If you want to use Elasticsearch for more powerful full-text search capabilities, you can include the `docker-compose.es.yml` configuration file:
```bash ```bash
# Build and start all services in detached mode (Django, MySQL, Elasticsearch) # 启动所有服务Build and start all services in detached mode (Django, MySQL, Elasticsearch)构建并以分离模式(后台运行)
# 包括Django应用、MySQL数据库、Elasticsearch搜索引擎等具体服务取决于配置文件
# -f docker-compose.yml: 指定第一个配置文件基础服务配置如Django、MySQL等
# -f deploy/docker-compose/docker-compose.es.yml: 指定第二个配置文件Elasticsearch相关服务配置
# 多个-f参数可合并配置后者配置会覆盖前者中重复的部分
# -d: 表示detached mode分离模式容器在后台运行不占用当前终端
# --build: 启动前重新构建所有服务的镜像,确保使用最新代码和配置
docker-compose -f docker-compose.yml -f deploy/docker-compose/docker-compose.es.yml up -d --build docker-compose -f docker-compose.yml -f deploy/docker-compose/docker-compose.es.yml up -d --build
``` ```
- **Data Persistence**: Elasticsearch data will be stored in the `data/elasticsearch` directory. - **Data Persistence**: Elasticsearch data will be stored in the `data/elasticsearch` directory.
@ -45,20 +54,24 @@ docker-compose -f docker-compose.yml -f deploy/docker-compose/docker-compose.es.
After the containers start for the first time, you'll need to execute some initialization commands inside the application container. After the containers start for the first time, you'll need to execute some initialization commands inside the application container.
```bash ```bash
# Get a shell inside the djangoblog application container (named 'web') # 进入名为'web'的djangoblog应用容器内部的shell环境
docker-compose exec web bash docker-compose exec web bash
# Inside the container, run the following commands: # 在容器内部,运行以下命令:
# Create a superuser account (follow the prompts to set username, email, and password)
# 创建超级用户账号(按照提示设置用户名、邮箱和密码)
# 超级用户拥有管理后台的全部权限
python manage.py createsuperuser python manage.py createsuperuser
# (Optional) Create some test data # (可选)创建一些测试数据
# 用于开发或测试时快速填充数据库,方便功能验证
python manage.py create_testdata python manage.py create_testdata
# (Optional, if ES is enabled) Create the search index # 可选如果启用了Elasticsearch创建搜索索引
# 用于初始化或更新Elasticsearch的搜索索引确保搜索功能正常工作
python manage.py rebuild_index python manage.py rebuild_index
# Exit the container # 退出容器的shell环境回到宿主机终端
exit exit
``` ```
@ -67,10 +80,20 @@ exit
If you already have an external MySQL database running, you can run the DjangoBlog application image by itself. If you already have an external MySQL database running, you can run the DjangoBlog application image by itself.
```bash ```bash
# Pull the latest image from Docker Hub # 从Docker Hub拉取最新版本的djangoblog镜像
docker pull liangliangyy/djangoblog:latest docker pull liangliangyy/djangoblog:latest
# Run the container and connect it to your external database # 运行容器并将其连接到外部数据库
# -d: 以分离模式(后台)运行容器
# -p 8000:8000: 将容器的8000端口映射到宿主机的8000端口宿主机内部端口服务端口:宿主机访问端口)
# -e: 设置环境变量(向容器传递配置参数,避免硬编码)
# DJANGO_SECRET_KEY: Django应用的密钥需使用高强度密钥用于加密等安全操作
# DJANGO_MYSQL_HOST: 外部MySQL数据库的主机地址
# DJANGO_MYSQL_USER: 访问MySQL数据库的用户名
# DJANGO_MYSQL_PASSWORD: 访问MySQL数据库的密码
# DJANGO_MYSQL_DATABASE: 要连接的MySQL数据库名称这里指定为'djangoblog'
# --name djangoblog: 为容器指定名称为'djangoblog',方便后续管理(如启动、停止、进入等)
# liangliangyy/djangoblog:latest: 基于该镜像运行容器
docker run -d \ docker run -d \
-p 8000:8000 \ -p 8000:8000 \
-e DJANGO_SECRET_KEY='your-strong-secret-key' \ -e DJANGO_SECRET_KEY='your-strong-secret-key' \

@ -7,16 +7,27 @@
接下来在`settings.py`做如下改动即可: 接下来在`settings.py`做如下改动即可:
- 增加es链接如下所示 - 增加es链接如下所示
```python ```python
# Elasticsearch DSL领域特定语言的配置字典用于Django与Elasticsearch交互
ELASTICSEARCH_DSL = { ELASTICSEARCH_DSL = {
# 默认的Elasticsearch连接配置键名为'default'
'default': { 'default': {
# Elasticsearch服务的地址和端口这里指向本地的9200端口Elasticsearch默认端口
# 应用将通过此地址与Elasticsearch服务建立连接用于执行搜索、索引等操作
'hosts': '127.0.0.1:9200' 'hosts': '127.0.0.1:9200'
}, },
} }
``` ```
- 修改`HAYSTACK`配置: - 修改`HAYSTACK`配置:
```python ```python
# Django Haystack搜索引擎框架的连接配置字典
# Haystack用于统一管理不同的搜索引擎后端此处配置默认搜索引擎连接
HAYSTACK_CONNECTIONS = { HAYSTACK_CONNECTIONS = {
# 默认的搜索引擎连接配置,键名为'default'
'default': { 'default': {
# 指定搜索引擎引擎类使用自定义的ElasticSearchEngine后端
# 路径'djangoblog.elasticsearch_backend.ElasticSearchEngine'表示该类位于
# djangoblog应用的elasticsearch_backend模块中
# 此配置用于将Haystack与Elasticsearch搜索引擎关联实现搜索功能
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine', 'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
}, },
} }

@ -28,7 +28,10 @@ Before you begin, please ensure you have the following:
We recommend deploying all DjangoBlog-related resources in a dedicated namespace for better management. We recommend deploying all DjangoBlog-related resources in a dedicated namespace for better management.
```bash ```bash
# Create a namespace named 'djangoblog' # 创建一个名为'djangoblog'的命名空间namespace
# 命名空间用于在 Kubernetes 集群中隔离不同的应用或环境(如开发、测试、生产)
# 此处创建'djangoblog'命名空间,通常用于部署与 djangoblog 应用相关的资源
# 后续部署的 Pod、Service 等资源可指定在此命名空间下,避免与其他应用资源冲突
kubectl create namespace djangoblog kubectl create namespace djangoblog
``` ```
@ -37,16 +40,26 @@ kubectl create namespace djangoblog
This setup uses Local Persistent Volumes. You need to create the data storage directories on a node within your cluster (the default is the `master` node in `pv.yaml`). This setup uses Local Persistent Volumes. You need to create the data storage directories on a node within your cluster (the default is the `master` node in `pv.yaml`).
```bash ```bash
# Log in to your master node # 通过SSH登录到主节点master node
# user为登录用户名master-node为主节点的地址可是IP或域名
ssh user@master-node ssh user@master-node
# Create the required storage directories # 创建所需的存储目录使用sudo获取管理员权限
# -p 选项确保在父目录不存在时自动创建,避免报错
# 用于数据库如MySQL的本地存储目录
sudo mkdir -p /mnt/local-storage-db sudo mkdir -p /mnt/local-storage-db
# 用于djangoblog应用的本地存储目录
sudo mkdir -p /mnt/local-storage-djangoblog sudo mkdir -p /mnt/local-storage-djangoblog
# 通用资源存储目录
sudo mkdir -p /mnt/resource/ sudo mkdir -p /mnt/resource/
# 用于Elasticsearch的本地存储目录
sudo mkdir -p /mnt/local-storage-elasticsearch sudo mkdir -p /mnt/local-storage-elasticsearch
# Log out from the node # 从节点退出,返回本地终端
exit exit
``` ```
**Note**: If you wish to store data on a different node or use different paths, you must modify the `nodeAffinity` and `local.path` settings in the `deploy/k8s/pv.yaml` file. **Note**: If you wish to store data on a different node or use different paths, you must modify the `nodeAffinity` and `local.path` settings in the `deploy/k8s/pv.yaml` file.
@ -54,13 +67,16 @@ exit
After creating the directories, apply the storage-related configurations: After creating the directories, apply the storage-related configurations:
```bash ```bash
# Apply the StorageClass # 应用StorageClass配置存储类用于定义持久化存储的类型和属性
# StorageClass用于动态供应持久卷PV简化存储管理
kubectl apply -f deploy/k8s/storageclass.yaml kubectl apply -f deploy/k8s/storageclass.yaml
# Apply the PersistentVolumes (PVs) # 应用持久卷PersistentVolumes, PVs配置
# PV是集群中的一块存储资源由管理员预先创建或通过StorageClass动态生成
kubectl apply -f deploy/k8s/pv.yaml kubectl apply -f deploy/k8s/pv.yaml
# Apply the PersistentVolumeClaims (PVCs) # 应用持久卷声明PersistentVolumeClaims, PVCs配置
# PVC是用户对存储资源的请求用于绑定PV并为Pod提供持久化存储
kubectl apply -f deploy/k8s/pvc.yaml kubectl apply -f deploy/k8s/pvc.yaml
``` ```
@ -73,10 +89,12 @@ Before deploying the application, you need to edit the `deploy/k8s/configmap.yam
- `DJANGO_MYSQL_PASSWORD` and `MYSQL_ROOT_PASSWORD`: Change to your own secure database password. - `DJANGO_MYSQL_PASSWORD` and `MYSQL_ROOT_PASSWORD`: Change to your own secure database password.
```bash ```bash
# Edit the ConfigMap file # 使用vim编辑器编辑ConfigMap配置文件
# ConfigMap用于存储非敏感的配置数据供Pod中的容器使用
vim deploy/k8s/configmap.yaml vim deploy/k8s/configmap.yaml
# Apply the configuration # 应用ConfigMap配置到Kubernetes集群
# 执行后配置数据将被创建或更新供相关资源如Pod引用
kubectl apply -f deploy/k8s/configmap.yaml kubectl apply -f deploy/k8s/configmap.yaml
``` ```
@ -85,10 +103,12 @@ kubectl apply -f deploy/k8s/configmap.yaml
Now, we can deploy all the core services. Now, we can deploy all the core services.
```bash ```bash
# Deploy the Deployments (DjangoBlog, MySQL, Redis, Nginx, ES) # 部署 Deployment 资源包含DjangoBlog应用、MySQL数据库、Redis缓存、Nginx服务器、Elasticsearch搜索引擎
# Deployment 用于定义Pod的期望状态负责创建和管理Pod支持滚动更新等功能
kubectl apply -f deploy/k8s/deployment.yaml kubectl apply -f deploy/k8s/deployment.yaml
# Deploy the Services (to create internal endpoints for the Deployments) # 部署 Service 资源为上述Deployment创建内部访问端点
# Service 提供固定的访问地址实现Pod的负载均衡和服务发现即使Pod重建IP变化也能通过Service稳定访问
kubectl apply -f deploy/k8s/service.yaml kubectl apply -f deploy/k8s/service.yaml
``` ```
@ -103,7 +123,10 @@ kubectl get pods -n djangoblog -w
Finally, expose the Nginx service to external traffic by applying the `Ingress` rule. Finally, expose the Nginx service to external traffic by applying the `Ingress` rule.
```bash ```bash
# Apply the Ingress rule # 应用Ingress规则配置
# Ingress用于管理Kubernetes集群外部访问集群内部服务的规则如HTTP/HTTPS路由
# 该配置文件deploy/k8s/gateway.yaml定义了外部请求如何映射到集群内的服务如通过域名路由到对应的Service
# 执行后Ingress控制器将根据规则处理外部流量实现对集群内服务的访问
kubectl apply -f deploy/k8s/gateway.yaml kubectl apply -f deploy/k8s/gateway.yaml
``` ```
@ -118,23 +141,32 @@ kubectl get ingress -n djangoblog
Similar to the Docker deployment, you need to get a shell into the DjangoBlog application Pod to perform database initialization and create a superuser on the first run. Similar to the Docker deployment, you need to get a shell into the DjangoBlog application Pod to perform database initialization and create a superuser on the first run.
```bash ```bash
# First, get the name of a djangoblog pod # 首先获取djangoblog相关Pod的名称
# -n djangoblog指定在'djangoblog'命名空间中查询
# grep djangoblog筛选出包含'djangoblog'关键词的Pod即目标应用的Pod
kubectl get pods -n djangoblog | grep djangoblog kubectl get pods -n djangoblog | grep djangoblog
# Exec into one of the Pods (replace [pod-name] with the name from the previous step) # 进入其中一个Pod的交互式终端将[pod-name]替换为上一步获取的Pod名称
# -it以交互式终端模式进入Pod
# -n djangoblog指定目标Pod所在的命名空间
# -- bash在Pod内启动bash shell
kubectl exec -it [pod-name] -n djangoblog -- bash kubectl exec -it [pod-name] -n djangoblog -- bash
# Inside the Pod, run the following commands: # 在Pod内部运行以下命令
# Create a superuser account (follow the prompts)
# 创建超级用户账号(按照提示输入用户名、邮箱和密码)
# 超级用户用于登录Django管理后台拥有最高权限
python manage.py createsuperuser python manage.py createsuperuser
# (Optional) Create some test data # (可选)创建一些测试数据
# 用于快速填充数据库,方便测试应用功能
python manage.py create_testdata python manage.py create_testdata
# (Optional, if ES is enabled) Create the search index # 可选如果启用了Elasticsearch创建搜索索引
# 初始化或更新Elasticsearch的搜索索引确保搜索功能可用
python manage.py rebuild_index python manage.py rebuild_index
# Exit the Pod # 退出Pod的终端返回本地命令行
exit exit
``` ```

@ -1,54 +1,77 @@
import logging import logging
from django.contrib import admin from django.contrib import admin
# Register your models here. # 注册模型到admin后台此处注释用于提示后续需注册模型
from django.urls import reverse from django.urls import reverse
from django.utils.html import format_html from django.utils.html import format_html
# 创建当前模块的日志记录器
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 定义OAuthUser模型的Admin管理类
class OAuthUserAdmin(admin.ModelAdmin): class OAuthUserAdmin(admin.ModelAdmin):
# 配置后台搜索字段:支持按昵称和邮箱搜索
search_fields = ('nickname', 'email') search_fields = ('nickname', 'email')
# 每页显示20条记录
list_per_page = 20 list_per_page = 20
# 列表页显示的字段
list_display = ( list_display = (
'id', 'id', # ID
'nickname', 'nickname', # 昵称
'link_to_usermodel', 'link_to_usermodel', # 关联的本地用户(自定义字段)
'show_user_image', 'show_user_image', # 用户头像(自定义字段)
'type', 'type', # 第三方登录类型
'email', 'email', # 邮箱
) )
# 列表页中可点击跳转编辑页的字段
list_display_links = ('id', 'nickname') list_display_links = ('id', 'nickname')
# 列表页的筛选器:按关联用户和登录类型筛选
list_filter = ('author', 'type',) list_filter = ('author', 'type',)
# 初始定义的只读字段(空列表,后续动态添加)
readonly_fields = [] readonly_fields = []
# 动态获取只读字段:编辑时所有字段均为只读
def get_readonly_fields(self, request, obj=None): def get_readonly_fields(self, request, obj=None):
return list(self.readonly_fields) + \ if obj: # 编辑已有对象时
[field.name for field in obj._meta.fields] + \ # 原只读字段 + 所有模型字段 + 所有多对多字段
[field.name for field in obj._meta.many_to_many] return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
return self.readonly_fields # 新增时使用初始只读字段
# 禁用新增权限不允许在后台手动添加OAuthUser记录
def has_add_permission(self, request): def has_add_permission(self, request):
return False return False
# 自定义字段:显示关联本地用户的链接
def link_to_usermodel(self, obj): def link_to_usermodel(self, obj):
if obj.author: if obj.author: # 若存在关联用户
# 获取关联用户模型的app标签和模型名
info = (obj.author._meta.app_label, obj.author._meta.model_name) info = (obj.author._meta.app_label, obj.author._meta.model_name)
# 生成关联用户在admin后台的编辑页URL
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,)) link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
# 返回HTML格式的链接显示用户昵称或邮箱
return format_html( return format_html(
u'<a href="%s">%s</a>' % u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email)) (link, obj.author.nickname if obj.author.nickname else obj.author.email))
# 自定义字段:显示用户头像
def show_user_image(self, obj): def show_user_image(self, obj):
img = obj.picture img = obj.picture # 获取头像URL
# 返回HTML格式的图片标签限制宽高为50px
return format_html( return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' % u'<img src="%s" style="width:50px;height:50px"></img>' %
(img)) (img))
# 自定义字段的显示名称
link_to_usermodel.short_description = '用户' link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像' show_user_image.short_description = '用户头像'
# 定义OAuthConfig模型的Admin管理类
class OAuthConfigAdmin(admin.ModelAdmin): class OAuthConfigAdmin(admin.ModelAdmin):
# 列表页显示的字段
list_display = ('type', 'appkey', 'appsecret', 'is_enable') list_display = ('type', 'appkey', 'appsecret', 'is_enable')
# 列表页的筛选器:按登录类型筛选
list_filter = ('type',) list_filter = ('type',)

@ -1,5 +1,9 @@
# 导入Django的应用配置基类
from django.apps import AppConfig from django.apps import AppConfig
# 定义OAuth应用的配置类继承自AppConfig
class OauthConfig(AppConfig): class OauthConfig(AppConfig):
# 应用的名称,用于标识该应用(与应用目录名一致)
# 在Django项目中通过此名称引用该应用如在INSTALLED_APPS中注册
name = 'oauth' name = 'oauth'

@ -1,12 +1,20 @@
# 导入Django的表单基类和 widgets表单控件
from django.contrib.auth.forms import forms from django.contrib.auth.forms import forms
from django.forms import widgets from django.forms import widgets
# 定义一个要求用户输入邮箱的表单类继承自基础表单类forms.Form
class RequireEmailForm(forms.Form): class RequireEmailForm(forms.Form):
# 定义邮箱字段:标签为“电子邮箱”,且为必填项
email = forms.EmailField(label='电子邮箱', required=True) email = forms.EmailField(label='电子邮箱', required=True)
# 定义oauthid字段使用隐藏输入控件不在页面显式展示非必填
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False) oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
# 重写初始化方法,用于自定义表单字段的控件属性
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# 调用父类的初始化方法,确保基础功能正常
super(RequireEmailForm, self).__init__(*args, **kwargs) super(RequireEmailForm, self).__init__(*args, **kwargs)
# 为email字段设置自定义控件
# 使用EmailInput控件添加placeholder提示文本和CSS类
self.fields['email'].widget = widgets.EmailInput( self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"}) attrs={'placeholder': "email", "class": "form-control"})

@ -1,5 +1,4 @@
# Generated by Django 4.1.7 on 2023-03-07 09:53 # 由Django 4.1.7生成于2023年3月7日09:53
from django.conf import settings from django.conf import settings
from django.db import migrations, models from django.db import migrations, models
import django.db.models.deletion import django.db.models.deletion
@ -7,51 +6,74 @@ import django.utils.timezone
class Migration(migrations.Migration): class Migration(migrations.Migration):
# 标记为初始迁移(首次创建模型时使用)
initial = True initial = True
# 依赖关系依赖于Django内置用户模型的可交换依赖支持自定义用户模型
dependencies = [ dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL), migrations.swappable_dependency(settings.AUTH_USER_MODEL),
] ]
# 迁移操作:定义要执行的数据库变更(创建模型)
operations = [ operations = [
# 创建OAuthConfig模型第三方登录配置表
migrations.CreateModel( migrations.CreateModel(
name='OAuthConfig', name='OAuthConfig',
fields=[ fields=[
# 自增主键ID
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 第三方登录类型可选值为微博、谷歌、GitHub、FaceBook、QQ默认值为'a'可能是占位符长度限制10
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')), ('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
# 第三方应用的AppKeyAPI密钥长度限制200
('appkey', models.CharField(max_length=200, verbose_name='AppKey')), ('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
# 第三方应用的AppSecret密钥长度限制200
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')), ('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
# 回调地址授权后跳转的URL默认值为百度首页长度限制200
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')), ('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
# 是否启用该第三方登录方式,默认启用
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')), ('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
# 创建时间:默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), ('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间:默认值为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), ('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
], ],
options={ options={
'verbose_name': 'oauth配置', 'verbose_name': 'oauth配置', # 模型的单数显示名称
'verbose_name_plural': 'oauth配置', 'verbose_name_plural': 'oauth配置', # 模型的复数显示名称
'ordering': ['-created_time'], 'ordering': ['-created_time'], # 默认排序:按创建时间倒序
}, },
), ),
# 创建OAuthUser模型第三方登录用户关联表
migrations.CreateModel( migrations.CreateModel(
name='OAuthUser', name='OAuthUser',
fields=[ fields=[
# 自增主键ID
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 第三方平台的唯一标识OpenID长度限制50
('openid', models.CharField(max_length=50)), ('openid', models.CharField(max_length=50)),
# 第三方平台的用户昵称长度限制50
('nickname', models.CharField(max_length=50, verbose_name='昵称')), ('nickname', models.CharField(max_length=50, verbose_name='昵称')),
# 访问令牌可选可为空或空白长度限制150
('token', models.CharField(blank=True, max_length=150, null=True)), ('token', models.CharField(blank=True, max_length=150, null=True)),
# 用户头像URL可选可为空或空白长度限制350
('picture', models.CharField(blank=True, max_length=350, null=True)), ('picture', models.CharField(blank=True, max_length=350, null=True)),
# 第三方登录类型(如'weibo'、'github'等长度限制50
('type', models.CharField(max_length=50)), ('type', models.CharField(max_length=50)),
# 第三方平台的用户邮箱可选可为空或空白长度限制50
('email', models.CharField(blank=True, max_length=50, null=True)), ('email', models.CharField(blank=True, max_length=50, null=True)),
# 额外元数据可选可为空或空白存储JSON等格式的附加信息
('metadata', models.TextField(blank=True, null=True)), ('metadata', models.TextField(blank=True, null=True)),
# 创建时间:默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), ('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间:默认值为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), ('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 关联的本地用户模型:可为空(未绑定本地用户时),级联删除(本地用户删除时,关联记录也删除)
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')), ('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
], ],
options={ options={
'verbose_name': 'oauth用户', 'verbose_name': 'oauth用户', # 模型的单数显示名称
'verbose_name_plural': 'oauth用户', 'verbose_name_plural': 'oauth用户', # 模型的复数显示名称
'ordering': ['-created_time'], 'ordering': ['-created_time'], # 默认排序:按创建时间倒序
}, },
), ),
] ]

@ -1,5 +1,4 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13 # 由Django 4.2.5生成于2023年9月6日13:13
from django.conf import settings from django.conf import settings
from django.db import migrations, models from django.db import migrations, models
import django.db.models.deletion import django.db.models.deletion
@ -7,80 +6,103 @@ import django.utils.timezone
class Migration(migrations.Migration): class Migration(migrations.Migration):
# 依赖关系依赖于Django内置用户模型和oauth应用的初始迁移(0001_initial)
dependencies = [ dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL), migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'), ('oauth', '0001_initial'),
] ]
# 迁移操作:定义对数据库模型的修改
operations = [ operations = [
# 修改OAuthConfig模型的元选项
migrations.AlterModelOptions( migrations.AlterModelOptions(
name='oauthconfig', name='oauthconfig',
# 排序方式改为按creation_time倒序显示名称保持不变
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'}, options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
), ),
# 修改OAuthUser模型的元选项
migrations.AlterModelOptions( migrations.AlterModelOptions(
name='oauthuser', name='oauthuser',
# 排序方式改为按creation_time倒序显示名称改为英文'oauth user'
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'}, options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
), ),
# 移除OAuthConfig模型中的created_time字段
migrations.RemoveField( migrations.RemoveField(
model_name='oauthconfig', model_name='oauthconfig',
name='created_time', name='created_time',
), ),
# 移除OAuthConfig模型中的last_mod_time字段
migrations.RemoveField( migrations.RemoveField(
model_name='oauthconfig', model_name='oauthconfig',
name='last_mod_time', name='last_mod_time',
), ),
# 移除OAuthUser模型中的created_time字段
migrations.RemoveField( migrations.RemoveField(
model_name='oauthuser', model_name='oauthuser',
name='created_time', name='created_time',
), ),
# 移除OAuthUser模型中的last_mod_time字段
migrations.RemoveField( migrations.RemoveField(
model_name='oauthuser', model_name='oauthuser',
name='last_mod_time', name='last_mod_time',
), ),
# 为OAuthConfig模型添加creation_time字段创建时间
migrations.AddField( migrations.AddField(
model_name='oauthconfig', model_name='oauthconfig',
name='creation_time', name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
), ),
# 为OAuthConfig模型添加last_modify_time字段最后修改时间
migrations.AddField( migrations.AddField(
model_name='oauthconfig', model_name='oauthconfig',
name='last_modify_time', name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
), ),
# 为OAuthUser模型添加creation_time字段创建时间
migrations.AddField( migrations.AddField(
model_name='oauthuser', model_name='oauthuser',
name='creation_time', name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
), ),
# 为OAuthUser模型添加last_modify_time字段最后修改时间
migrations.AddField( migrations.AddField(
model_name='oauthuser', model_name='oauthuser',
name='last_modify_time', name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
), ),
# 修改OAuthConfig模型的callback_url字段
migrations.AlterField( migrations.AlterField(
model_name='oauthconfig', model_name='oauthconfig',
name='callback_url', name='callback_url',
# 默认值改为空字符串,显示名称改为英文'callback url'
field=models.CharField(default='', max_length=200, verbose_name='callback url'), field=models.CharField(default='', max_length=200, verbose_name='callback url'),
), ),
# 修改OAuthConfig模型的is_enable字段
migrations.AlterField( migrations.AlterField(
model_name='oauthconfig', model_name='oauthconfig',
name='is_enable', name='is_enable',
# 显示名称改为英文'is enable'
field=models.BooleanField(default=True, verbose_name='is enable'), field=models.BooleanField(default=True, verbose_name='is enable'),
), ),
# 修改OAuthConfig模型的type字段
migrations.AlterField( migrations.AlterField(
model_name='oauthconfig', model_name='oauthconfig',
name='type', name='type',
# 选项中'微博'改为'weibo'、'谷歌'改为'google',显示名称改为英文'type'
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'), field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
), ),
# 修改OAuthUser模型的author字段
migrations.AlterField( migrations.AlterField(
model_name='oauthuser', model_name='oauthuser',
name='author', name='author',
# 显示名称改为英文'author'
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'), field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
), ),
# 修改OAuthUser模型的nickname字段
migrations.AlterField( migrations.AlterField(
model_name='oauthuser', model_name='oauthuser',
name='nickname', name='nickname',
# 显示名称改为英文'nickname'
field=models.CharField(max_length=50, verbose_name='nickname'), field=models.CharField(max_length=50, verbose_name='nickname'),
), ),
] ]

@ -1,67 +1,101 @@
# Create your models here. # 定义该应用的数据库模型
from django.conf import settings from django.conf import settings
from django.core.exceptions import ValidationError from django.core.exceptions import ValidationError # 用于数据验证,抛出验证异常
from django.db import models from django.db import models
from django.utils.timezone import now from django.utils.timezone import now # 获取当前时间,用于时间字段默认值
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _ # 用于国际化翻译,支持多语言
# 定义OAuthUser模型存储第三方登录用户与本地用户的关联信息
class OAuthUser(models.Model): class OAuthUser(models.Model):
# 关联本地Django用户模型允许为空未绑定本地用户时为null
# on_delete=models.CASCADE本地用户删除时关联的OAuthUser记录也随之删除
author = models.ForeignKey( author = models.ForeignKey(
settings.AUTH_USER_MODEL, settings.AUTH_USER_MODEL,
verbose_name=_('author'), verbose_name=_('author'), # 字段显示名(支持国际化)
blank=True, blank=True, # 表单中允许为空
null=True, null=True, # 数据库中允许为null
on_delete=models.CASCADE) on_delete=models.CASCADE
)
# 第三方平台的唯一标识如GitHub的OpenID不可重复
openid = models.CharField(max_length=50) openid = models.CharField(max_length=50)
# 第三方平台的用户昵称(支持国际化显示)
nickname = models.CharField(max_length=50, verbose_name=_('nick name')) nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
# 第三方平台的访问令牌(可选,可为空或空白)
token = models.CharField(max_length=150, null=True, blank=True) token = models.CharField(max_length=150, null=True, blank=True)
# 第三方平台的用户头像URL可选可为空或空白
picture = models.CharField(max_length=350, blank=True, null=True) picture = models.CharField(max_length=350, blank=True, null=True)
# 第三方登录类型(如'weibo'、'github',不可为空)
type = models.CharField(blank=False, null=False, max_length=50) type = models.CharField(blank=False, null=False, max_length=50)
# 第三方平台的用户邮箱(可选,可为空或空白)
email = models.CharField(max_length=50, null=True, blank=True) email = models.CharField(max_length=50, null=True, blank=True)
# 存储第三方平台返回的额外元数据(如用户其他信息,可选)
metadata = models.TextField(null=True, blank=True) metadata = models.TextField(null=True, blank=True)
# 记录创建时间(默认值为当前时间,支持国际化显示)
creation_time = models.DateTimeField(_('creation time'), default=now) creation_time = models.DateTimeField(_('creation time'), default=now)
# 记录最后修改时间(默认值为当前时间,支持国际化显示)
last_modify_time = models.DateTimeField(_('last modify time'), default=now) last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 定义模型实例的字符串表示在Admin后台或打印时显示昵称
def __str__(self): def __str__(self):
return self.nickname return self.nickname
# 模型元数据配置
class Meta: class Meta:
verbose_name = _('oauth user') verbose_name = _('oauth user') # 模型单数显示名(支持国际化)
verbose_name_plural = verbose_name verbose_name_plural = verbose_name # 模型复数显示名(与单数一致)
ordering = ['-creation_time'] ordering = ['-creation_time'] # 默认排序:按创建时间倒序(新记录在前)
# 定义OAuthConfig模型存储第三方登录平台的配置信息如AppKey、AppSecret等
class OAuthConfig(models.Model): class OAuthConfig(models.Model):
# 定义第三方登录平台的可选类型(元组格式:(数据库存储值, 显示值)
TYPE = ( TYPE = (
('weibo', _('weibo')), ('weibo', _('weibo')), # 微博(支持国际化)
('google', _('google')), ('google', _('google')), # 谷歌(支持国际化)
('github', 'GitHub'), ('github', 'GitHub'), # GitHub固定显示名
('facebook', 'FaceBook'), ('facebook', 'FaceBook'), # FaceBook固定显示名
('qq', 'QQ'), ('qq', 'QQ'), # QQ固定显示名
) )
# 第三方平台类型关联TYPE选项默认值为'a',支持国际化显示)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a') type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
# 第三方平台的AppKeyAPI密钥不可为空
appkey = models.CharField(max_length=200, verbose_name='AppKey') appkey = models.CharField(max_length=200, verbose_name='AppKey')
# 第三方平台的AppSecret密钥不可为空
appsecret = models.CharField(max_length=200, verbose_name='AppSecret') appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
# 第三方登录的回调地址授权后跳转URL不可为空默认值为空字符串
callback_url = models.CharField( callback_url = models.CharField(
max_length=200, max_length=200,
verbose_name=_('callback url'), verbose_name=_('callback url'), # 支持国际化显示
blank=False, blank=False, # 表单中不允许为空
default='') default=''
)
# 是否启用该第三方登录平台(默认启用,不可为空)
is_enable = models.BooleanField( is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False) _('is enable'), # 支持国际化显示
default=True,
blank=False,
null=False
)
# 配置创建时间(默认值为当前时间,支持国际化显示)
creation_time = models.DateTimeField(_('creation time'), default=now) creation_time = models.DateTimeField(_('creation time'), default=now)
# 配置最后修改时间(默认值为当前时间,支持国际化显示)
last_modify_time = models.DateTimeField(_('last modify time'), default=now) last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 自定义数据验证方法(保存前触发,确保同一平台类型只存在一条配置)
def clean(self): def clean(self):
# 筛选条件:平台类型与当前一致,且排除当前记录(编辑时不与自身冲突)
if OAuthConfig.objects.filter( if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count(): type=self.type).exclude(id=self.id).count():
# 抛出验证异常:提示该平台类型已存在配置
raise ValidationError(_(self.type + _('already exists'))) raise ValidationError(_(self.type + _('already exists')))
# 定义模型实例的字符串表示在Admin后台或打印时显示平台类型
def __str__(self): def __str__(self):
return self.type return self.type
# 模型元数据配置
class Meta: class Meta:
verbose_name = 'oauth配置' verbose_name = 'oauth配置' # 模型单数显示名(中文固定)
verbose_name_plural = verbose_name verbose_name_plural = verbose_name # 模型复数显示名(与单数一致)
ordering = ['-creation_time'] ordering = ['-creation_time'] # 默认排序:按创建时间倒序(新配置在前)

@ -1,220 +1,247 @@
import json import json
import logging import logging
import os import os
import urllib.parse import urllib.parse # 用于URL参数编码/解码
from abc import ABCMeta, abstractmethod from abc import ABCMeta, abstractmethod # 用于定义抽象基类
import requests import requests # 用于发送HTTP请求获取授权、token、用户信息
from djangoblog.utils import cache_decorator from djangoblog.utils import cache_decorator # 导入缓存装饰器,优化重复查询
from oauth.models import OAuthUser, OAuthConfig from oauth.models import OAuthUser, OAuthConfig # 导入OAuth相关模型
# 创建当前模块的日志记录器用于记录OAuth流程中的关键信息和错误
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 自定义异常类用于表示OAuth授权过程中获取token失败的情况
class OAuthAccessTokenException(Exception): class OAuthAccessTokenException(Exception):
''' '''
oauth授权失败异常 oauth授权失败异常
''' '''
# 抽象基类定义所有第三方OAuth管理器的统一接口模板方法模式
class BaseOauthManager(metaclass=ABCMeta): class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权""" """获取用户授权的抽象基类"""
AUTH_URL = None # 子类需重写的常量第三方平台的授权URL、token获取URL、用户信息API URL、图标名称
"""获取token""" AUTH_URL = None # 授权页面URL用户跳转授权的地址
TOKEN_URL = None TOKEN_URL = None # 获取access_token的API URL
"""获取用户信息""" API_URL = None # 获取用户信息的API URL
API_URL = None ICON_NAME = None # 平台图标标识需与OAuthConfig的type字段对应
'''icon图标名'''
ICON_NAME = None
def __init__(self, access_token=None, openid=None): def __init__(self, access_token=None, openid=None):
# 初始化access_token访问令牌和openid第三方平台用户唯一标识
self.access_token = access_token self.access_token = access_token
self.openid = openid self.openid = openid
# 属性判断access_token是否已设置
@property @property
def is_access_token_set(self): def is_access_token_set(self):
return self.access_token is not None return self.access_token is not None
# 属性判断是否已完成授权需同时拥有有效access_token和openid
@property @property
def is_authorized(self): def is_authorized(self):
return self.is_access_token_set and self.access_token is not None and self.openid is not None return self.is_access_token_set and self.access_token is not None and self.openid is not None
# 抽象方法生成授权URL子类需实现返回用户跳转的授权链接
@abstractmethod @abstractmethod
def get_authorization_url(self, nexturl='/'): def get_authorization_url(self, nexturl='/'):
pass pass
# 抽象方法通过授权码code获取access_token子类需实现完成token交换
@abstractmethod @abstractmethod
def get_access_token_by_code(self, code): def get_access_token_by_code(self, code):
pass pass
# 抽象方法通过access_token获取第三方用户信息子类需实现返回OAuthUser对象
@abstractmethod @abstractmethod
def get_oauth_userinfo(self): def get_oauth_userinfo(self):
pass pass
# 抽象方法从用户元数据中提取头像URL子类需实现适配不同平台的字段差异
@abstractmethod @abstractmethod
def get_picture(self, metadata): def get_picture(self, metadata):
pass pass
# 通用HTTP GET请求方法封装请求逻辑打印响应日志
def do_get(self, url, params, headers=None): def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers) rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text) logger.info(rsp.text) # 记录响应内容,便于调试
return rsp.text return rsp.text
# 通用HTTP POST请求方法封装请求逻辑打印响应日志
def do_post(self, url, params, headers=None): def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers) rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text) logger.info(rsp.text) # 记录响应内容,便于调试
return rsp.text return rsp.text
# 获取当前平台的OAuth配置从OAuthConfig模型中查询按ICON_NAME匹配
def get_config(self): def get_config(self):
value = OAuthConfig.objects.filter(type=self.ICON_NAME) value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None return value[0] if value else None # 存在则返回第一条配置否则返回None
# 微博OAuth管理器继承BaseOauthManager实现微博平台的授权逻辑
class WBOauthManager(BaseOauthManager): class WBOauthManager(BaseOauthManager):
AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # 微博平台的固定URL和图标标识
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # 微博授权页URL
API_URL = 'https://api.weibo.com/2/users/show.json' TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' # 微博token获取URL
ICON_NAME = 'weibo' API_URL = 'https://api.weibo.com/2/users/show.json' # 微博用户信息API
ICON_NAME = 'weibo' # 与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None): def __init__(self, access_token=None, openid=None):
# 从配置中获取微博的AppKey、AppSecret、回调地址
config = self.get_config() config = self.get_config()
self.client_id = config.appkey if config else '' self.client_id = config.appkey if config else '' # 微博开放平台AppKey
self.client_secret = config.appsecret if config else '' self.client_secret = config.appsecret if config else '' # 微博开放平台AppSecret
self.callback_url = config.callback_url if config else '' self.callback_url = config.callback_url if config else ''# 微博授权回调地址
super( # 调用父类初始化方法设置access_token和openid
WBOauthManager, super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
self).__init__(
access_token=access_token,
openid=openid)
# 生成微博授权URL拼接client_id、响应类型、回调地址等参数
def get_authorization_url(self, nexturl='/'): def get_authorization_url(self, nexturl='/'):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'response_type': 'code', 'response_type': 'code', # 授权类型为code授权码模式
'redirect_uri': self.callback_url + '&next_url=' + nexturl 'redirect_uri': self.callback_url + '&next_url=' + nexturl # 回调地址+授权后跳转地址
} }
# 拼接URL和参数urllib.parse.urlencode将字典转为URL查询字符串
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url return url
# 通过授权码code获取微博access_token并自动获取用户信息
def get_access_token_by_code(self, code): def get_access_token_by_code(self, code):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_secret': self.client_secret, 'client_secret': self.client_secret,
'grant_type': 'authorization_code', 'grant_type': 'authorization_code', # 授权模式为授权码模式
'code': code, 'code': code, # 前端获取的授权码
'redirect_uri': self.callback_url 'redirect_uri': self.callback_url # 回调地址(需与平台配置一致)
} }
# 发送POST请求获取token响应
rsp = self.do_post(self.TOKEN_URL, params) rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp) # 解析JSON响应
obj = json.loads(rsp) # 若响应中包含access_token说明获取成功
if 'access_token' in obj: if 'access_token' in obj:
self.access_token = str(obj['access_token']) self.access_token = str(obj['access_token']) # 保存access_token
self.openid = str(obj['uid']) self.openid = str(obj['uid']) # 微博用户唯一标识为uid
return self.get_oauth_userinfo() return self.get_oauth_userinfo() # 自动获取用户信息并返回
else: else:
# 获取失败,抛出异常(携带响应内容便于排查)
raise OAuthAccessTokenException(rsp) raise OAuthAccessTokenException(rsp)
# 通过access_token获取微博用户信息返回OAuthUser对象
def get_oauth_userinfo(self): def get_oauth_userinfo(self):
if not self.is_authorized: if not self.is_authorized: # 未授权则返回None
return None return None
# 构造用户信息查询参数需uid和access_token
params = { params = {
'uid': self.openid, 'uid': self.openid,
'access_token': self.access_token 'access_token': self.access_token
} }
# 发送GET请求获取用户信息
rsp = self.do_get(self.API_URL, params) rsp = self.do_get(self.API_URL, params)
try: try:
datas = json.loads(rsp) datas = json.loads(rsp) # 解析用户信息JSON
user = OAuthUser() user = OAuthUser() # 创建OAuthUser对象
user.metadata = rsp user.metadata = rsp # 保存原始元数据(便于后续扩展)
user.picture = datas['avatar_large'] user.picture = datas['avatar_large'] # 微博头像URL大尺寸
user.nickname = datas['screen_name'] user.nickname = datas['screen_name'] # 微博昵称
user.openid = datas['id'] user.openid = datas['id'] # 微博用户唯一ID
user.type = 'weibo' user.type = 'weibo' # 平台类型
user.token = self.access_token user.token = self.access_token # 保存access_token
# 若响应中包含邮箱,则赋值(微博需额外申请权限才能获取邮箱)
if 'email' in datas and datas['email']: if 'email' in datas and datas['email']:
user.email = datas['email'] user.email = datas['email']
return user return user
except Exception as e: except Exception as e:
# 捕获异常并记录日志(避免流程崩溃)
logger.error(e) logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp) logger.error('weibo oauth error.rsp:' + rsp)
return None return None
# 从微博用户元数据中提取头像URL
def get_picture(self, metadata): def get_picture(self, metadata):
datas = json.loads(metadata) datas = json.loads(metadata)
return datas['avatar_large'] return datas['avatar_large'] # 微博头像字段为avatar_large
# 代理混合类Mixin为需要代理的OAuth管理器提供HTTP代理支持
class ProxyManagerMixin: class ProxyManagerMixin:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
# 从环境变量中读取HTTP代理配置若存在则设置代理
if os.environ.get("HTTP_PROXY"): if os.environ.get("HTTP_PROXY"):
self.proxies = { self.proxies = {
"http": os.environ.get("HTTP_PROXY"), "http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY") "https": os.environ.get("HTTP_PROXY") # HTTPS也使用相同代理
} }
else: else:
self.proxies = None self.proxies = None # 无代理则为None
# 重写GET方法添加代理参数
def do_get(self, url, params, headers=None): def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies) rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text) logger.info(rsp.text)
return rsp.text return rsp.text
# 重写POST方法添加代理参数
def do_post(self, url, params, headers=None): def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers, proxies=self.proxies) rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text) logger.info(rsp.text)
return rsp.text return rsp.text
# 谷歌OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+谷歌授权)
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # 谷歌平台的固定URL和图标标识
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # 谷歌授权页URL
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' # 谷歌token获取URL
ICON_NAME = 'google' API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' # 谷歌用户信息API
ICON_NAME = 'google' # 与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None): def __init__(self, access_token=None, openid=None):
# 从配置中获取谷歌的AppKey、AppSecret、回调地址
config = self.get_config() config = self.get_config()
self.client_id = config.appkey if config else '' self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else '' self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else '' self.callback_url = config.callback_url if config else ''
super( # 调用父类初始化先初始化ProxyManagerMixin再初始化BaseOauthManager
GoogleOauthManager, super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
self).__init__(
access_token=access_token,
openid=openid)
# 生成谷歌授权URL需指定openid和email权限
def get_authorization_url(self, nexturl='/'): def get_authorization_url(self, nexturl='/'):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'response_type': 'code', 'response_type': 'code', # 授权码模式
'redirect_uri': self.callback_url, 'redirect_uri': self.callback_url, # 回调地址
'scope': 'openid email', 'scope': 'openid email' # 申请的权限获取用户ID和邮箱
} }
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url return url
# 通过授权码code获取谷歌access_token
def get_access_token_by_code(self, code): def get_access_token_by_code(self, code):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_secret': self.client_secret, 'client_secret': self.client_secret,
'grant_type': 'authorization_code', 'grant_type': 'authorization_code',
'code': code, 'code': code,
'redirect_uri': self.callback_url 'redirect_uri': self.callback_url
} }
rsp = self.do_post(self.TOKEN_URL, params) rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp) obj = json.loads(rsp)
if 'access_token' in obj: if 'access_token' in obj:
self.access_token = str(obj['access_token']) self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token']) self.openid = str(obj['id_token']) # 谷歌用id_token作为用户唯一标识
logger.info(self.ICON_NAME + ' oauth ' + rsp) logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token return self.access_token # 返回access_token后续需手动调用get_oauth_userinfo
else: else:
raise OAuthAccessTokenException(rsp) raise OAuthAccessTokenException(rsp)
# 通过access_token获取谷歌用户信息
def get_oauth_userinfo(self): def get_oauth_userinfo(self):
if not self.is_authorized: if not self.is_authorized:
return None return None
@ -223,16 +250,15 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
} }
rsp = self.do_get(self.API_URL, params) rsp = self.do_get(self.API_URL, params)
try: try:
datas = json.loads(rsp) datas = json.loads(rsp)
user = OAuthUser() user = OAuthUser()
user.metadata = rsp user.metadata = rsp
user.picture = datas['picture'] user.picture = datas['picture'] # 谷歌头像URL
user.nickname = datas['name'] user.nickname = datas['name'] # 谷歌用户名
user.openid = datas['sub'] user.openid = datas['sub'] # 谷歌用户唯一标识sub字段
user.token = self.access_token user.token = self.access_token
user.type = 'google' user.type = 'google'
if datas['email']: if datas['email']: # 谷歌默认返回邮箱(需申请权限)
user.email = datas['email'] user.email = datas['email']
return user return user
except Exception as e: except Exception as e:
@ -240,15 +266,18 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
logger.error('google oauth error.rsp:' + rsp) logger.error('google oauth error.rsp:' + rsp)
return None return None
# 从谷歌元数据中提取头像URL
def get_picture(self, metadata): def get_picture(self, metadata):
datas = json.loads(metadata) datas = json.loads(metadata)
return datas['picture'] return datas['picture']
# GitHub OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+GitHub授权
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize' # GitHub平台的固定URL和图标标识
TOKEN_URL = 'https://github.com/login/oauth/access_token' AUTH_URL = 'https://github.com/login/oauth/authorize' # GitHub授权页URL
API_URL = 'https://api.github.com/user' TOKEN_URL = 'https://github.com/login/oauth/access_token' # GitHub token URL
API_URL = 'https://api.github.com/user' # GitHub用户信息API
ICON_NAME = 'github' ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None): def __init__(self, access_token=None, openid=None):
@ -256,55 +285,55 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else '' self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else '' self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else '' self.callback_url = config.callback_url if config else ''
super( super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
# 生成GitHub授权URL申请user权限用于获取用户信息
def get_authorization_url(self, next_url='/'): def get_authorization_url(self, next_url='/'):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'response_type': 'code', 'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}', 'redirect_uri': f'{self.callback_url}&next_url={next_url}', # 回调+跳转地址
'scope': 'user' 'scope': 'user' # GitHub权限获取用户基本信息
} }
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url return url
# 通过授权码code获取GitHub access_tokenGitHub返回格式为表单需特殊解析
def get_access_token_by_code(self, code): def get_access_token_by_code(self, code):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_secret': self.client_secret, 'client_secret': self.client_secret,
'grant_type': 'authorization_code', 'grant_type': 'authorization_code',
'code': code, 'code': code,
'redirect_uri': self.callback_url 'redirect_uri': self.callback_url
} }
rsp = self.do_post(self.TOKEN_URL, params) rsp = self.do_post(self.TOKEN_URL, params)
# GitHub返回的是表单格式如access_token=xxx&token_type=bearer需用parse_qs解析
from urllib import parse from urllib import parse
r = parse.parse_qs(rsp) r = parse.parse_qs(rsp)
if 'access_token' in r: if 'access_token' in r:
self.access_token = (r['access_token'][0]) self.access_token = (r['access_token'][0]) # parse_qs返回列表取第一个元素
return self.access_token return self.access_token
else: else:
raise OAuthAccessTokenException(rsp) raise OAuthAccessTokenException(rsp)
# 通过access_token获取GitHub用户信息GitHub需在Header中携带token
def get_oauth_userinfo(self): def get_oauth_userinfo(self):
# GitHub API要求在Header中用Authorization: token xxx传递token
rsp = self.do_get(self.API_URL, params={}, headers={ rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token "Authorization": "token " + self.access_token
}) })
try: try:
datas = json.loads(rsp) datas = json.loads(rsp)
user = OAuthUser() user = OAuthUser()
user.picture = datas['avatar_url'] user.picture = datas['avatar_url'] # GitHub头像URL
user.nickname = datas['name'] user.nickname = datas['name'] or datas['login'] # 优先用name无则用login用户名
user.openid = datas['id'] user.openid = datas['id'] # GitHub用户唯一ID数字
user.type = 'github' user.type = 'github'
user.token = self.access_token user.token = self.access_token
user.metadata = rsp user.metadata = rsp
# GitHub邮箱可能为空用户未公开需判断
if 'email' in datas and datas['email']: if 'email' in datas and datas['email']:
user.email = datas['email'] user.email = datas['email']
return user return user
@ -313,15 +342,18 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
logger.error('github oauth error.rsp:' + rsp) logger.error('github oauth error.rsp:' + rsp)
return None return None
# 从GitHub元数据中提取头像URL
def get_picture(self, metadata): def get_picture(self, metadata):
datas = json.loads(metadata) datas = json.loads(metadata)
return datas['avatar_url'] return datas['avatar_url']
# Facebook OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+Facebook授权
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # Facebook平台的固定URL和图标标识注意API版本号v16.0
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # Facebook授权页URL
API_URL = 'https://graph.facebook.com/me' TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' # Facebook token URL
API_URL = 'https://graph.facebook.com/me' # Facebook用户信息API
ICON_NAME = 'facebook' ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None): def __init__(self, access_token=None, openid=None):
@ -329,34 +361,31 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else '' self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else '' self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else '' self.callback_url = config.callback_url if config else ''
super( super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
# 生成Facebook授权URL申请email和public_profile权限
def get_authorization_url(self, next_url='/'): def get_authorization_url(self, next_url='/'):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'response_type': 'code', 'response_type': 'code',
'redirect_uri': self.callback_url, 'redirect_uri': self.callback_url,
'scope': 'email,public_profile' 'scope': 'email,public_profile' # 申请邮箱和公开资料权限
} }
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url return url
# 通过授权码code获取Facebook access_tokenFacebook无需显式grant_type
def get_access_token_by_code(self, code): def get_access_token_by_code(self, code):
params = { params = {
'client_id': self.client_id, 'client_id': self.client_id,
'client_secret': self.client_secret, 'client_secret': self.client_secret,
# 'grant_type': 'authorization_code', # 'grant_type': 'authorization_code', # Facebook可省略该参数
'code': code, 'code': code,
'redirect_uri': self.callback_url 'redirect_uri': self.callback_url
} }
rsp = self.do_post(self.TOKEN_URL, params) rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp) obj = json.loads(rsp)
if 'access_token' in obj: if 'access_token' in obj:
token = str(obj['access_token']) token = str(obj['access_token'])
self.access_token = token self.access_token = token
@ -364,7 +393,9 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
else: else:
raise OAuthAccessTokenException(rsp) raise OAuthAccessTokenException(rsp)
# 通过access_token获取Facebook用户信息需指定fields参数否则返回默认字段
def get_oauth_userinfo(self): def get_oauth_userinfo(self):
# Facebook需显式指定要获取的字段id、name、picture、email
params = { params = {
'access_token': self.access_token, 'access_token': self.access_token,
'fields': 'id,name,picture,email' 'fields': 'id,name,picture,email'
@ -373,13 +404,15 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
rsp = self.do_get(self.API_URL, params) rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp) datas = json.loads(rsp)
user = OAuthUser() user = OAuthUser()
user.nickname = datas['name'] user.nickname = datas['name'] # Facebook用户名
user.openid = datas['id'] user.openid = datas['id'] # Facebook用户唯一ID
user.type = 'facebook' user.type = 'facebook'
user.token = self.access_token user.token = self.access_token
user.metadata = rsp user.metadata = rsp
# 处理邮箱(可能为空,用户未公开)
if 'email' in datas and datas['email']: if 'email' in datas and datas['email']:
user.email = datas['email'] user.email = datas['email']
# 处理头像Facebook头像嵌套在picture.data.url中
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']: if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url']) user.picture = str(datas['picture']['data']['url'])
return user return user
@ -387,29 +420,29 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
logger.error(e) logger.error(e)
return None return None
# 从Facebook元数据中提取头像URL处理嵌套结构
def get_picture(self, metadata): def get_picture(self, metadata):
datas = json.loads(metadata) datas = json.loads(metadata)
return str(datas['picture']['data']['url']) return str(datas['picture']['data']['url'])
# QQ OAuth管理器继承BaseOauthManager实现QQ平台的授权逻辑
class QQOauthManager(BaseOauthManager): class QQOauthManager(BaseOauthManager):
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # QQ平台的固定URL和图标标识QQ需额外请求openid接口
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # QQ授权页URL
API_URL = 'https://graph.qq.com/user/get_user_info' TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' # QQ token获取URL
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' API_URL = 'https://graph.qq.com/user/get_user_info' # QQ用户信息API
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ openid获取URLQQ特有
ICON_NAME = 'qq' ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None): def __init__(self, access_token=None, openid=None):
config = self.get_config() config = self.get_config()
self.client_id = config.appkey if config else '' self.client_id = config.appkey if config else '' # QQ的AppID
self.client_secret = config.appsecret if config else '' self.client_secret = config.appsecret if config else '' # QQ的AppKey
self.callback_url = config.callback_url if config else '' self.callback_url = config.callback_url if config else ''
super( super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
QQOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
# 生成QQ授权URL
def get_authorization_url(self, next_url='/'): def get_authorization_url(self, next_url='/'):
params = { params = {
'response_type': 'code', 'response_type': 'code',
@ -419,6 +452,7 @@ class QQOauthManager(BaseOauthManager):
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url return url
# 通过授权码code获取QQ access_tokenQQ返回格式为表单
def get_access_token_by_code(self, code): def get_access_token_by_code(self, code):
params = { params = {
'grant_type': 'authorization_code', 'grant_type': 'authorization_code',
@ -427,16 +461,18 @@ class QQOauthManager(BaseOauthManager):
'code': code, 'code': code,
'redirect_uri': self.callback_url 'redirect_uri': self.callback_url
} }
rsp = self.do_get(self.TOKEN_URL, params) rsp = self.do_get(self.TOKEN_URL, params) # QQ的token接口用GET请求
if rsp: if rsp:
# QQ返回表单格式如access_token=xxx&expires_in=7776000&refresh_token=xxx
d = urllib.parse.parse_qs(rsp) d = urllib.parse.parse_qs(rsp)
if 'access_token' in d: if 'access_token' in d:
token = d['access_token'] token = d['access_token']
self.access_token = token[0] self.access_token = token[0] # 取列表第一个元素
return token return token
else: else:
raise OAuthAccessTokenException(rsp) raise OAuthAccessTokenException(rsp)
# QQ特有通过access_token获取openidQQ的openid需单独请求接口
def get_open_id(self): def get_open_id(self):
if self.is_access_token_set: if self.is_access_token_set:
params = { params = {
@ -444,18 +480,18 @@ class QQOauthManager(BaseOauthManager):
} }
rsp = self.do_get(self.OPEN_ID_URL, params) rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp: if rsp:
rsp = rsp.replace( # QQ返回格式为callback({"client_id":"xxx","openid":"xxx"}); 需处理格式
'callback(', '').replace( rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
')', '').replace(
';', '')
obj = json.loads(rsp) obj = json.loads(rsp)
openid = str(obj['openid']) openid = str(obj['openid'])
self.openid = openid self.openid = openid
return openid return openid
# 通过access_token和openid获取QQ用户信息
def get_oauth_userinfo(self): def get_oauth_userinfo(self):
openid = self.get_open_id() openid = self.get_open_id() # 先获取openid
if openid: if openid:
# QQ用户信息接口需传递access_token、oauth_consumer_key即AppID、openid
params = { params = {
'access_token': self.access_token, 'access_token': self.access_token,
'oauth_consumer_key': self.client_id, 'oauth_consumer_key': self.client_id,
@ -465,40 +501,47 @@ class QQOauthManager(BaseOauthManager):
logger.info(rsp) logger.info(rsp)
obj = json.loads(rsp) obj = json.loads(rsp)
user = OAuthUser() user = OAuthUser()
user.nickname = obj['nickname'] user.nickname = obj['nickname'] # QQ昵称
user.openid = openid user.openid = openid # QQ openid
user.type = 'qq' user.type = 'qq'
user.token = self.access_token user.token = self.access_token
user.metadata = rsp user.metadata = rsp
# 处理邮箱QQ需额外申请权限可能为空
if 'email' in obj: if 'email' in obj:
user.email = obj['email'] user.email = obj['email']
# 处理头像QQ头像字段为figureurl
if 'figureurl' in obj: if 'figureurl' in obj:
user.picture = str(obj['figureurl']) user.picture = str(obj['figureurl'])
return user return user
# 从QQ元数据中提取头像URL
def get_picture(self, metadata): def get_picture(self, metadata):
datas = json.loads(metadata) datas = json.loads(metadata)
return str(datas['figureurl']) return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60) # 获取所有已启用的OAuth应用带缓存100分钟过期减少数据库查询
@cache_decorator(expiration=100 * 60) # 缓存100分钟100*60秒
def get_oauth_apps(): def get_oauth_apps():
# 1. 查询所有已启用的OAuth配置is_enable=True
configs = OAuthConfig.objects.filter(is_enable=True).all() configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs: if not configs:
return [] return [] # 无配置则返回空列表
# 2. 提取已启用的平台类型(如['weibo', 'github']
configtypes = [x.type for x in configs] configtypes = [x.type for x in configs]
# 3. 获取所有BaseOauthManager的子类即所有平台的管理器
applications = BaseOauthManager.__subclasses__() applications = BaseOauthManager.__subclasses__()
# 4. 筛选出已启用的管理器ICON_NAME在configtypes中
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes] apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps return apps
# 根据平台类型获取对应的OAuth管理器
def get_manager_by_type(type): def get_manager_by_type(type):
applications = get_oauth_apps() applications = get_oauth_apps() # 获取已启用的应用
if applications: if applications:
finds = list( # 筛选出类型匹配的管理器(不区分大小写)
filter( finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
if finds: if finds:
return finds[0] return finds[0] # 返回第一个匹配的管理器
return None return None # 无匹配则返回None

@ -1,22 +1,36 @@
# 导入Django模板相关模块和URL反转工具
from django import template from django import template
from django.urls import reverse from django.urls import reverse
# 导入获取OAuth应用列表的工具函数
from oauth.oauthmanager import get_oauth_apps from oauth.oauthmanager import get_oauth_apps
# 注册一个模板库实例,用于注册自定义模板标签
register = template.Library() register = template.Library()
# 注册一个包含型模板标签inclusion tag指定模板文件为'oauth/oauth_applications.html'
# 该标签会将处理结果传递给指定模板,再将模板渲染后的内容嵌入到调用位置
@register.inclusion_tag('oauth/oauth_applications.html') @register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request): def load_oauth_applications(request):
# 获取所有已配置的OAuth应用如微博、GitHub等
applications = get_oauth_apps() applications = get_oauth_apps()
if applications: if applications:
# 生成OAuth登录的基础URL通过reverse反转'oauth:oauthlogin'命名URL
baseurl = reverse('oauth:oauthlogin') baseurl = reverse('oauth:oauthlogin')
# 获取当前请求的完整路径(用于登录后跳转回原页面)
path = request.get_full_path() path = request.get_full_path()
# 处理应用列表:将每个应用转换为(图标名称, 登录URL)的元组
# 登录URL格式为"基础URL?type=应用图标名&next_url=当前路径"
apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format( apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications)) baseurl=baseurl, type=x.ICON_NAME, next=path)), applications))
else: else:
# 若没有配置OAuth应用返回空列表
apps = [] apps = []
# 将处理后的应用列表传递给模板
return { return {
'apps': apps 'apps': apps
} }

@ -1,129 +1,171 @@
import json import json
from unittest.mock import patch from unittest.mock import patch # 用于用于模拟函数/方法的返回值,隔离外部依赖
from django.conf import settings from django.conf import settings
from django.contrib import auth from django.contrib import auth # 用于用户认证相关操作
from django.test import Client, RequestFactory, TestCase from django.test import Client, RequestFactory, TestCase # Django测试工具
from django.urls import reverse from django.urls import reverse # 用于反向解析URL
from djangoblog.utils import get_sha256 from djangoblog.utils import get_sha256 # 导入SHA256加密工具函数
from oauth.models import OAuthConfig from oauth.models import OAuthConfig # 导入OAuth配置模型
from oauth.oauthmanager import BaseOauthManager from oauth.oauthmanager import BaseOauthManager # 导入OAuth管理器基类
# Create your tests here. # 测试用例测试OAuth配置及基础登录流程
class OAuthConfigTest(TestCase): class OAuthConfigTest(TestCase):
def setUp(self): def setUp(self):
# 初始化测试客户端(模拟用户请求)和请求工厂(构建请求对象)
self.client = Client() self.client = Client()
self.factory = RequestFactory() self.factory = RequestFactory()
def test_oauth_login_test(self): def test_oauth_login_test(self):
# 创建一条微博OAuth配置记录
c = OAuthConfig() c = OAuthConfig()
c.type = 'weibo' c.type = 'weibo' # 平台类型为微博
c.appkey = 'appkey' c.appkey = 'appkey' # 模拟AppKey
c.appsecret = 'appsecret' c.appsecret = 'appsecret' # 模拟AppSecret
c.save() c.save() # 保存到数据库
# 测试访问微博OAuth登录URL是否正常跳转
response = self.client.get('/oauth/oauthlogin?type=weibo') response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302) # 期望302重定向
self.assertTrue("api.weibo.com" in response.url) self.assertTrue("api.weibo.com" in response.url) # 重定向地址应包含微博API域名
# 测试授权回调处理是否正常
response = self.client.get('/oauth/authorize?type=weibo&code=code') response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302) # 期望302重定向
self.assertEqual(response.url, '/') self.assertEqual(response.url, '/') # 回调后默认跳转首页
# 测试用例测试各平台OAuth登录流程微博、谷歌、GitHub、Facebook、QQ
class OauthLoginTest(TestCase): class OauthLoginTest(TestCase):
def setUp(self) -> None: def setUp(self) -> None:
# 初始化测试客户端和请求工厂
self.client = Client() self.client = Client()
self.factory = RequestFactory() self.factory = RequestFactory()
# 初始化所有平台的OAuth配置
self.apps = self.init_apps() self.apps = self.init_apps()
def init_apps(self): def init_apps(self):
# 获取所有OAuth管理器子类各平台实现并创建对应配置
applications = [p() for p in BaseOauthManager.__subclasses__()] applications = [p() for p in BaseOauthManager.__subclasses__()]
for application in applications: for application in applications:
c = OAuthConfig() c = OAuthConfig()
c.type = application.ICON_NAME.lower() c.type = application.ICON_NAME.lower() # 平台类型与管理器图标名一致
c.appkey = 'appkey' c.appkey = 'appkey' # 统一模拟AppKey
c.appsecret = 'appsecret' c.appsecret = 'appsecret' # 统一模拟AppSecret
c.save() c.save() # 保存配置
return applications return applications
def get_app_by_type(self, type): def get_app_by_type(self, type):
# 根据平台类型获取对应的OAuth管理器实例
for app in self.apps: for app in self.apps:
if app.ICON_NAME.lower() == type: if app.ICON_NAME.lower() == type:
return app return app
@patch("oauth.oauthmanager.WBOauthManager.do_post") # 测试微博登录流程使用mock模拟HTTP请求返回值
@patch("oauth.oauthmanager.WBOauthManager.do_get") @patch("oauth.oauthmanager.WBOauthManager.do_post") # 模拟POST请求
@patch("oauth.oauthmanager.WBOauthManager.do_get") # 模拟GET请求
def test_weibo_login(self, mock_do_get, mock_do_post): def test_weibo_login(self, mock_do_get, mock_do_post):
# 获取微博管理器实例
weibo_app = self.get_app_by_type('weibo') weibo_app = self.get_app_by_type('weibo')
assert weibo_app assert weibo_app # 确保实例存在
# 生成授权URL测试URL生成逻辑
url = weibo_app.get_authorization_url() url = weibo_app.get_authorization_url()
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid" # 模拟获取token的响应返回access_token和uid
}) mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟获取用户信息的响应
mock_do_get.return_value = json.dumps({ mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large", "avatar_large": "avatar_large", # 头像URL
"screen_name": "screen_name", "screen_name": "screen_name", # 昵称
"id": "id", "id": "id", # 用户ID
"email": "email", "email": "email", # 邮箱
}) })
# 测试通过code获取用户信息的流程
userinfo = weibo_app.get_access_token_by_code('code') userinfo = weibo_app.get_access_token_by_code('code')
self.assertEqual(userinfo.token, 'access_token') # 验证返回的用户信息是否正确
self.assertEqual(userinfo.openid, 'id') self.assertEqual(userinfo.token, 'access_token') # token匹配
self.assertEqual(userinfo.openid, 'id') # openid匹配
# 测试谷歌登录流程
@patch("oauth.oauthmanager.GoogleOauthManager.do_post") @patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get") @patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post): def test_google_login(self, mock_do_get, mock_do_post):
google_app = self.get_app_by_type('google') google_app = self.get_app_by_type('google')
assert google_app assert google_app
# 生成授权URL
url = google_app.get_authorization_url() url = google_app.get_authorization_url()
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({ mock_do_post.return_value = json.dumps({
"access_token": "access_token", "access_token": "access_token",
"id_token": "id_token", "id_token": "id_token" # 谷歌用id_token作为openid
}) })
# 模拟获取用户信息的响应
mock_do_get.return_value = json.dumps({ mock_do_get.return_value = json.dumps({
"picture": "picture", "picture": "picture", # 头像
"name": "name", "name": "name", # 姓名
"sub": "sub", "sub": "sub", # 唯一标识
"email": "email", "email": "email" # 邮箱
}) })
# 测试获取token和用户信息
token = google_app.get_access_token_by_code('code') token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo() userinfo = google_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token') self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub') self.assertEqual(userinfo.openid, 'sub')
# 测试GitHub登录流程
@patch("oauth.oauthmanager.GitHubOauthManager.do_post") @patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get") @patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post): def test_github_login(self, mock_do_get, mock_do_post):
github_app = self.get_app_by_type('github') github_app = self.get_app_by_type('github')
assert github_app assert github_app
# 生成授权URL并验证包含GitHub域名和client_id
url = github_app.get_authorization_url() url = github_app.get_authorization_url()
self.assertTrue("github.com" in url) self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url) self.assertTrue("client_id" in url)
# 模拟GitHub返回的token表单格式
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer" mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
# 模拟用户信息响应
mock_do_get.return_value = json.dumps({ mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url", "avatar_url": "avatar_url", # 头像
"name": "name", "name": "name", # 姓名
"id": "id", "id": "id", # 唯一ID
"email": "email", "email": "email" # 邮箱
}) })
# 测试获取token和用户信息
token = github_app.get_access_token_by_code('code') token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo() userinfo = github_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a') self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id') self.assertEqual(userinfo.openid, 'id')
# 测试Facebook登录流程
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post") @patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get") @patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post): def test_facebook_login(self, mock_do_get, mock_do_post):
facebook_app = self.get_app_by_type('facebook') facebook_app = self.get_app_by_type('facebook')
assert facebook_app assert facebook_app
# 生成授权URL并验证包含Facebook域名
url = facebook_app.get_authorization_url() url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url) self.assertTrue("facebook.com" in url)
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({ mock_do_post.return_value = json.dumps({
"access_token": "access_token", "access_token": "access_token"
}) })
# 模拟用户信息响应(头像为嵌套结构)
mock_do_get.return_value = json.dumps({ mock_do_get.return_value = json.dumps({
"name": "name", "name": "name",
"id": "id", "id": "id",
@ -134,14 +176,18 @@ class OauthLoginTest(TestCase):
} }
} }
}) })
# 测试获取token和用户信息
token = facebook_app.get_access_token_by_code('code') token = facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo() userinfo = facebook_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token') self.assertEqual(userinfo.token, 'access_token')
# 测试QQ登录流程QQ需额外获取openid
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[ @patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
'access_token=access_token&expires_in=3600', # 模拟三次GET请求的返回值按调用顺序
'callback({"client_id":"appid","openid":"openid"} );', 'access_token=access_token&expires_in=3600', # 获取token的响应表单格式
json.dumps({ 'callback({"client_id":"appid","openid":"openid"} );', # 获取openid的响应
json.dumps({ # 获取用户信息的响应
"nickname": "nickname", "nickname": "nickname",
"email": "email", "email": "email",
"figureurl": "figureurl", "figureurl": "figureurl",
@ -151,19 +197,26 @@ class OauthLoginTest(TestCase):
def test_qq_login(self, mock_do_get): def test_qq_login(self, mock_do_get):
qq_app = self.get_app_by_type('qq') qq_app = self.get_app_by_type('qq')
assert qq_app assert qq_app
# 生成授权URL并验证包含QQ域名
url = qq_app.get_authorization_url() url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url) self.assertTrue("qq.com" in url)
# 测试获取token和用户信息
token = qq_app.get_access_token_by_code('code') token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo() userinfo = qq_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token') self.assertEqual(userinfo.token, 'access_token')
# 测试微博登录(包含邮箱,可直接绑定用户)
@patch("oauth.oauthmanager.WBOauthManager.do_post") @patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get") @patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post): def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({"access_token": "access_token", mock_do_post.return_value = json.dumps({
"uid": "uid" "access_token": "access_token",
}) "uid": "uid"
})
# 模拟包含邮箱的用户信息
mock_user_info = { mock_user_info = {
"avatar_large": "avatar_large", "avatar_large": "avatar_large",
"screen_name": "screen_name1", "screen_name": "screen_name1",
@ -172,38 +225,46 @@ class OauthLoginTest(TestCase):
} }
mock_do_get.return_value = json.dumps(mock_user_info) mock_do_get.return_value = json.dumps(mock_user_info)
# 测试访问登录URL是否重定向到微博
response = self.client.get('/oauth/oauthlogin?type=weibo') response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url) self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调后是否跳转首页
response = self.client.get('/oauth/authorize?type=weibo&code=code') response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/') self.assertEqual(response.url, '/')
# 验证用户已登录且信息正确
user = auth.get_user(self.client) user = auth.get_user(self.client)
assert user.is_authenticated assert user.is_authenticated
self.assertTrue(user.is_authenticated) self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name']) self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email']) self.assertEqual(user.email, mock_user_info['email'])
self.client.logout()
# 测试登出后再次登录是否正常
self.client.logout()
response = self.client.get('/oauth/authorize?type=weibo&code=code') response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/') self.assertEqual(response.url, '/')
# 再次验证用户登录状态
user = auth.get_user(self.client) user = auth.get_user(self.client)
assert user.is_authenticated assert user.is_authenticated
self.assertTrue(user.is_authenticated) self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name']) self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email']) self.assertEqual(user.email, mock_user_info['email'])
# 测试微博登录(无邮箱,需补充邮箱并绑定)
@patch("oauth.oauthmanager.WBOauthManager.do_post") @patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get") @patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post): def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
# 模拟获取token的响应
mock_do_post.return_value = json.dumps({"access_token": "access_token", mock_do_post.return_value = json.dumps({
"uid": "uid" "access_token": "access_token",
}) "uid": "uid"
})
# 模拟不含邮箱的用户信息
mock_user_info = { mock_user_info = {
"avatar_large": "avatar_large", "avatar_large": "avatar_large",
"screen_name": "screen_name1", "screen_name": "screen_name1",
@ -211,39 +272,41 @@ class OauthLoginTest(TestCase):
} }
mock_do_get.return_value = json.dumps(mock_user_info) mock_do_get.return_value = json.dumps(mock_user_info)
# 测试访问登录URL
response = self.client.get('/oauth/oauthlogin?type=weibo') response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url) self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调后是否跳转邮箱补充页面
response = self.client.get('/oauth/authorize?type=weibo&code=code') response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
# 提取OAuth用户ID从跳转URL中解析
oauth_user_id = int(response.url.split('/')[-1].split('.')[0]) oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html') self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
# 测试提交邮箱表单
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id}) response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
sign = get_sha256(settings.SECRET_KEY +
str(oauth_user_id) + settings.SECRET_KEY)
url = reverse('oauth:bindsuccess', kwargs={ # 生成验证签名(用于邮箱验证)
'oauthid': oauth_user_id, sign = get_sha256(settings.SECRET_KEY + str(oauth_user_id) + settings.SECRET_KEY)
})
# 验证跳转URL是否正确绑定成功页
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauth_user_id})
self.assertEqual(response.url, f'{url}?type=email') self.assertEqual(response.url, f'{url}?type=email')
path = reverse('oauth:email_confirm', kwargs={ # 测试邮箱验证链接
'id': oauth_user_id, path = reverse('oauth:email_confirm', kwargs={'id': oauth_user_id, 'sign': sign})
'sign': sign
})
response = self.client.get(path) response = self.client.get(path)
self.assertEqual(response.status_code, 302) self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success') self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
# 验证用户已绑定且信息正确
user = auth.get_user(self.client) user = auth.get_user(self.client)
from oauth.models import OAuthUser from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user) oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated) self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name']) self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com') self.assertEqual(user.email, 'test@gmail.com') # 验证补充的邮箱
self.assertEqual(oauth_user.pk, oauth_user_id) self.assertEqual(oauth_user.pk, oauth_user_id) # 验证关联的OAuth用户ID

@ -1,25 +1,49 @@
# 导入Django的URL路径配置工具和当前应用的视图函数/类
from django.urls import path from django.urls import path
from . import views from . import views
# 定义应用的命名空间为"oauth",用于在模板或视图中通过"命名空间:URL名称"的方式反向解析URL
app_name = "oauth" app_name = "oauth"
# 配置当前应用的URL路由列表每个path对应一个视图处理逻辑
urlpatterns = [ urlpatterns = [
# 1. OAuth授权回调处理URL
# 路径:/oauth/authorize第三方平台授权后会跳转到该地址
# 由views.authorize视图函数处理负责解析授权码、获取用户信息、绑定本地用户等
path( path(
r'oauth/authorize', r'oauth/authorize',
views.authorize), views.authorize),
# 2. 补充邮箱页面URL命名为"require_email"
# 路径:/oauth/requireemail/[oauthid].html[oauthid]为OAuthUser的ID整数类型
# 由views.RequireEmailView类视图的as_view()方法处理(用于第三方登录无邮箱时,让用户补充邮箱)
path( path(
r'oauth/requireemail/<int:oauthid>.html', r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(), views.RequireEmailView.as_view(),
name='require_email'), name='require_email'),
# 3. 邮箱验证URL命名为"email_confirm"
# 路径:/oauth/emailconfirm/[id]/[sign].html[id]为OAuthUser的ID[sign]为验证签名)
# 由views.emailconfirm视图函数处理验证邮箱归属完成本地用户绑定
path( path(
r'oauth/emailconfirm/<int:id>/<sign>.html', r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm, views.emailconfirm,
name='email_confirm'), name='email_confirm'),
# 4. 绑定成功页面URL命名为"bindsuccess"
# 路径:/oauth/bindsuccess/[oauthid].html[oauthid]为OAuthUser的ID
# 由views.bindsuccess视图函数处理展示第三方登录与本地用户绑定成功的结果
path( path(
r'oauth/bindsuccess/<int:oauthid>.html', r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess, views.bindsuccess,
name='bindsuccess'), name='bindsuccess'),
# 5. OAuth登录入口URL命名为"oauthlogin"
# 路径:/oauth/oauthlogin前端点击第三方登录按钮时跳转到该地址
# 由views.oauthlogin视图函数处理生成第三方平台的授权链接引导用户跳转授权
path( path(
r'oauth/oauthlogin', r'oauth/oauthlogin',
views.oauthlogin, views.oauthlogin,
name='oauthlogin')] name='oauthlogin')
]

@ -1,155 +1,216 @@
import logging import logging
# Create your views here. # 定义视图函数(处理用户请求的逻辑)
from urllib.parse import urlparse from urllib.parse import urlparse # 用于解析URL验证跳转地址合法性
from django.conf import settings from django.conf import settings
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model # 获取自定义用户模型
from django.contrib.auth import login from django.contrib.auth import login # 用于用户登录
from django.core.exceptions import ObjectDoesNotExist from django.core.exceptions import ObjectDoesNotExist # 用于捕获对象不存在异常
from django.db import transaction from django.db import transaction # 用于数据库事务,确保操作原子性
from django.http import HttpResponseForbidden from django.http import HttpResponseForbidden # 用于返回403禁止访问响应
from django.http import HttpResponseRedirect from django.http import HttpResponseRedirect # 用于重定向响应
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404 # 获取对象不存在则返回404
from django.shortcuts import render from django.shortcuts import render # 用于渲染模板
from django.urls import reverse from django.urls import reverse # 用于反向解析URL
from django.utils import timezone from django.utils import timezone # 用于获取当前时间
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _ # 用于国际化翻译
from django.views.generic import FormView from django.views.generic import FormView # 基于类的表单视图
from djangoblog.blog_signals import oauth_user_login_signal
from djangoblog.utils import get_current_site
from djangoblog.utils import send_email, get_sha256
from oauth.forms import RequireEmailForm
from .models import OAuthUser
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
from djangoblog.blog_signals import oauth_user_login_signal # 导入OAuth登录信号
from djangoblog.utils import get_current_site # 获取当前网站域名
from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和SHA256加密工具
from oauth.forms import RequireEmailForm # 导入补充邮箱表单
from .models import OAuthUser # 导入OAuth用户模型
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常
# 创建当前模块的日志记录器
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def get_redirecturl(request): def get_redirecturl(request):
"""
处理授权后的跳转URL确保安全性只允许本站内跳转
"""
# 从请求参数中获取跳转地址默认值为None
nexturl = request.GET.get('next_url', None) nexturl = request.GET.get('next_url', None)
# 过滤非法跳转地址(如登录页)
if not nexturl or nexturl == '/login/' or nexturl == '/login': if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/' nexturl = '/'
return nexturl return nexturl
# 解析URL检查是否为本站域名
p = urlparse(nexturl) p = urlparse(nexturl)
if p.netloc: if p.netloc: # 若URL包含域名
site = get_current_site().domain site = get_current_site().domain # 获取当前网站域名
# 去除www.前缀后比较,确保跳转地址为本站
if not p.netloc.replace('www.', '') == site.replace('www.', ''): if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl) logger.info('非法url:' + nexturl) # 记录非法URL日志
return "/" return "/" # 非法则跳转到首页
return nexturl return nexturl # 返回合法的跳转地址
def oauthlogin(request): def oauthlogin(request):
"""
OAuth登录入口生成第三方平台的授权链接引导用户跳转
"""
# 获取请求中的平台类型(如'weibo'、'github'
type = request.GET.get('type', None) type = request.GET.get('type', None)
if not type: if not type: # 若未指定平台类型,跳转到首页
return HttpResponseRedirect('/') return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type) manager = get_manager_by_type(type)
if not manager: if not manager: # 若管理器不存在,跳转到首页
return HttpResponseRedirect('/') return HttpResponseRedirect('/')
# 获取安全的跳转地址(授权后返回的页面)
nexturl = get_redirecturl(request) nexturl = get_redirecturl(request)
# 生成第三方平台的授权URL
authorizeurl = manager.get_authorization_url(nexturl) authorizeurl = manager.get_authorization_url(nexturl)
# 重定向到第三方平台的授权页面
return HttpResponseRedirect(authorizeurl) return HttpResponseRedirect(authorizeurl)
def authorize(request): def authorize(request):
"""
OAuth授权回调处理解析授权码获取用户信息绑定本地用户
"""
# 获取平台类型
type = request.GET.get('type', None) type = request.GET.get('type', None)
if not type: if not type:
return HttpResponseRedirect('/') return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type) manager = get_manager_by_type(type)
if not manager: if not manager:
return HttpResponseRedirect('/') return HttpResponseRedirect('/')
# 获取授权码code第三方平台返回
code = request.GET.get('code', None) code = request.GET.get('code', None)
try: try:
# 使用授权码获取access_token
rsp = manager.get_access_token_by_code(code) rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e: except OAuthAccessTokenException as e:
# 捕获token获取失败异常记录日志并跳转首页
logger.warning("OAuthAccessTokenException:" + str(e)) logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/') return HttpResponseRedirect('/')
except Exception as e: except Exception as e:
# 捕获其他异常,记录日志
logger.error(e) logger.error(e)
rsp = None rsp = None
# 获取授权后的跳转地址
nexturl = get_redirecturl(request) nexturl = get_redirecturl(request)
if not rsp: if not rsp: # 若获取token失败重新生成授权链接并重定向
return HttpResponseRedirect(manager.get_authorization_url(nexturl)) return HttpResponseRedirect(manager.get_authorization_url(nexturl))
# 通过token获取第三方用户信息
user = manager.get_oauth_userinfo() user = manager.get_oauth_userinfo()
if user: if user: # 若成功获取用户信息
# 处理空昵称生成默认昵称djangoblog+时间戳)
if not user.nickname or not user.nickname.strip(): if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
# 检查该第三方用户是否已存在(避免重复创建)
try: try:
temp = OAuthUser.objects.get(type=type, openid=user.openid) temp = OAuthUser.objects.get(type=type, openid=user.openid)
# 更新已有用户的头像、元数据、昵称
temp.picture = user.picture temp.picture = user.picture
temp.metadata = user.metadata temp.metadata = user.metadata
temp.nickname = user.nickname temp.nickname = user.nickname
user = temp user = temp # 使用已有用户对象
except ObjectDoesNotExist: except ObjectDoesNotExist:
# 若不存在,则使用新创建的用户对象
pass pass
# facebook的token过长
# Facebook的token过长这里清空根据实际需求处理
if type == 'facebook': if type == 'facebook':
user.token = '' user.token = ''
# 若用户信息包含邮箱,直接绑定本地用户
if user.email: if user.email:
with transaction.atomic(): with transaction.atomic(): # 数据库事务:确保操作原子性
author = None author = None
# 尝试获取已关联的本地用户
try: try:
author = get_user_model().objects.get(id=user.author_id) author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist: except ObjectDoesNotExist:
pass pass
# 若未关联本地用户,则创建或获取本地用户
if not author: if not author:
# 按邮箱查询或创建本地用户
result = get_user_model().objects.get_or_create(email=user.email) result = get_user_model().objects.get_or_create(email=user.email)
author = result[0] author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]: if result[1]:
try: try:
# 检查昵称是否已存在
get_user_model().objects.get(username=user.nickname) get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist: except ObjectDoesNotExist:
# 昵称不存在,直接使用
author.username = user.nickname author.username = user.nickname
else: else:
# 昵称已存在,生成默认用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize' author.source = 'authorize' # 标记来源为OAuth授权
author.save() author.save() # 保存用户信息
# 关联本地用户到OAuthUser
user.author = author user.author = author
user.save() user.save()
# 发送OAuth用户登录信号供其他模块监听处理
oauth_user_login_signal.send( oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id) sender=authorize.__class__, id=user.id)
# 登录用户(创建会话)
login(request, author) login(request, author)
# 重定向到授权前的页面
return HttpResponseRedirect(nexturl) return HttpResponseRedirect(nexturl)
else: else:
# 若用户信息不含邮箱保存OAuthUser并跳转到补充邮箱页面
user.save() user.save()
url = reverse('oauth:require_email', kwargs={ url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
'oauthid': user.id
})
return HttpResponseRedirect(url) return HttpResponseRedirect(url)
else: else:
# 若未获取到用户信息,跳转到授权前的页面
return HttpResponseRedirect(nexturl) return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign): def emailconfirm(request, id, sign):
if not sign: """
return HttpResponseForbidden() 邮箱验证验证签名合法性完成本地用户绑定
if not get_sha256(settings.SECRET_KEY + """
str(id) + if not sign: # 签名为空返回403禁止访问
settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden() return HttpResponseForbidden()
# 验证签名使用SECRET_KEY+ID+SECRET_KEY生成SHA256签名
if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden() # 签名不匹配返回403
# 获取对应的OAuthUser对象不存在则返回404
oauthuser = get_object_or_404(OAuthUser, pk=id) oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
with transaction.atomic(): # 数据库事务
# 检查是否已关联本地用户
if oauthuser.author: if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id) author = get_user_model().objects.get(pk=oauthuser.author_id)
else: else:
# 按邮箱查询或创建本地用户
result = get_user_model().objects.get_or_create(email=oauthuser.email) result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0] author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]: if result[1]:
author.source = 'emailconfirm' author.source = 'emailconfirm' # 标记来源为邮箱验证
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip( # 设置用户名为OAuth用户的昵称为空则生成默认值
) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save() author.save()
# 关联本地用户到OAuthUser
oauthuser.author = author oauthuser.author = author
oauthuser.save() oauthuser.save()
# 发送OAuth用户登录信号
oauth_user_login_signal.send( oauth_user_login_signal.send(
sender=emailconfirm.__class__, sender=emailconfirm.__class__, id=oauthuser.id)
id=oauthuser.id) # 登录用户
login(request, author) login(request, author)
# 发送绑定成功邮件
site = 'http://' + get_current_site().domain site = 'http://' + get_current_site().domain
content = _(''' content = _('''
<p>Congratulations, you have successfully bound your email address. You can use <p>Congratulations, you have successfully bound your email address. You can use
@ -161,59 +222,63 @@ def emailconfirm(request, id, sign):
If the link above cannot be opened, please copy this link to your browser. If the link above cannot be opened, please copy this link to your browser.
%(site)s %(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site} ''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content) send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': id # 重定向到绑定成功页面
}) url = reverse('oauth:bindsuccess', kwargs={'oauthid': id})
url = url + '?type=success' url = url + '?type=success'
return HttpResponseRedirect(url) return HttpResponseRedirect(url)
class RequireEmailView(FormView): class RequireEmailView(FormView):
form_class = RequireEmailForm """
template_name = 'oauth/require_email.html' 补充邮箱视图处理用户补充邮箱的表单提交
"""
form_class = RequireEmailForm # 使用的表单类
template_name = 'oauth/require_email.html' # 渲染的模板
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
oauthid = self.kwargs['oauthid'] """处理GET请求展示补充邮箱表单"""
oauthid = self.kwargs['oauthid'] # 从URL中获取OAuthUser的ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email: # 若已存在邮箱,可在此处添加逻辑(如跳转到首页)
pass # if oauthuser.email:
# return HttpResponseRedirect('/') # return HttpResponseRedirect('/')
return super(RequireEmailView, self).get(request, *args, **kwargs) return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self): def get_initial(self):
"""设置表单初始值填充oauthid"""
oauthid = self.kwargs['oauthid'] oauthid = self.kwargs['oauthid']
return { return {'email': '', 'oauthid': oauthid}
'email': '',
'oauthid': oauthid
}
def get_context_data(self, **kwargs): def get_context_data(self,** kwargs):
"""向模板传递额外上下文:用户头像"""
oauthid = self.kwargs['oauthid'] oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture: if oauthuser.picture:
kwargs['picture'] = oauthuser.picture kwargs['picture'] = oauthuser.picture # 传递头像URL
return super(RequireEmailView, self).get_context_data(**kwargs) return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form): def form_valid(self, form):
email = form.cleaned_data['email'] """处理表单验证通过的逻辑:保存邮箱并发送验证邮件"""
oauthid = form.cleaned_data['oauthid'] email = form.cleaned_data['email'] # 获取清洗后的邮箱
oauthid = form.cleaned_data['oauthid'] # 获取OAuthUser的ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# 保存用户邮箱
oauthuser.email = email oauthuser.email = email
oauthuser.save() oauthuser.save()
sign = get_sha256(settings.SECRET_KEY +
str(oauthuser.id) + settings.SECRET_KEY) # 生成邮箱验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
# 获取当前网站域名开发环境使用127.0.0.1:8000
site = get_current_site().domain site = get_current_site().domain
if settings.DEBUG: if settings.DEBUG:
site = '127.0.0.1:8000' site = '127.0.0.1:8000'
path = reverse('oauth:email_confirm', kwargs={ # 生成邮箱验证链接
'id': oauthid, path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
'sign': sign
})
url = "http://{site}{path}".format(site=site, path=path) url = "http://{site}{path}".format(site=site, path=path)
# 发送邮箱验证邮件
content = _(""" content = _("""
<p>Please click the link below to bind your email</p> <p>Please click the link below to bind your email</p>
@ -226,16 +291,21 @@ class RequireEmailView(FormView):
%(url)s %(url)s
""") % {'url': url} """) % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content) send_email(emailto=[email, ], title=_('Bind your email'), content=content)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauthid # 重定向到绑定成功页面(提示用户查收邮件)
}) url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid})
url = url + '?type=email' url = url + '?type=email'
return HttpResponseRedirect(url) return HttpResponseRedirect(url)
def bindsuccess(request, oauthid): def bindsuccess(request, oauthid):
type = request.GET.get('type', None) """
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) 绑定成功页面展示绑定结果信息
"""
type = request.GET.get('type', None) # 获取类型email待验证success已验证
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) # 获取对应的OAuthUser
# 根据类型设置页面标题和内容
if type == 'email': if type == 'email':
title = _('Bind your email') title = _('Bind your email')
content = _( content = _(
@ -247,6 +317,8 @@ def bindsuccess(request, oauthid):
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s" "Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % { " to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type}) 'oauthuser_type': oauthuser.type})
# 渲染绑定成功页面
return render(request, 'oauth/bindsuccess.html', { return render(request, 'oauth/bindsuccess.html', {
'title': title, 'title': title,
'content': content 'content': content

@ -1 +0,0 @@
print('hello world')
Loading…
Cancel
Save