需求分析文档

pull/42/head
echo 2 months ago
parent 2a1fefc8d4
commit cee4cefc28

@ -0,0 +1,75 @@
## 当前状态
- 本地数据库与所有表已创建完毕(含示例用户与权限)。
- 项目已具备基础后端骨架FastAPI + SQLAlchemy + asyncpg路由与模型已就绪。
## 目标
- 在后端实现登录接口(用户名/邮箱 + 密码),校验 `users.password_hash`bcrypt返回 JWT。
- 在前端原型 `index.html` 的登录区对接 `/api/v1/user/login`,存储 Token 并启用 Axios 拦截器为后续请求附加 `Authorization`
- 保护受限页面与接口(集群管理、故障中心等),未认证/无权限时处理 401/403 与页面跳转。
## 技术选型与依赖
- 密码校验:`passlib[bcrypt]`(验证 `$2b$12$...` 格式的 bcrypt 哈希)
- Token`PyJWT`HS256配置 `SECRET_KEY``ACCESS_TOKEN_EXPIRE_MINUTES`
- 依赖追加到 `src/backend/requirements.txt``passlib[bcrypt]`、`PyJWT`。
## 后端实现
### 配置
- 在 `src/backend/app/config.py` 增加:`SECRET_KEY`、`ALGORITHM='HS256'`、`ACCESS_TOKEN_EXPIRE_MINUTES=60`(从环境变量读取,支持 `.env`)。
### 模型与工具
- 新增 `src/backend/app/models/users.py` 映射 `users` 表(`id, username, email, password_hash, is_active, ...`)。
- 新增 `src/backend/app/security.py`
- 函数:`verify_password(plain, hashed)` 使用 `passlib` 验证;
- 函数:`create_access_token(data, expires_minutes)` 使用 `PyJWT` 生成 JWT
- 依赖:`get_current_user(token)` 验证 Bearer Token 并加载用户。
### 路由
- 新增 `src/backend/app/schemas/auth.py``LoginRequest {username_or_email, password}`、`TokenResponse {access_token, token_type}`。
- 新增 `src/backend/app/routers/auth.py`
- `POST /api/v1/user/login`
1) 接受 `LoginRequest`
2) 支持用户名或邮箱匹配用户
3) 验证 `password_hash`
4) 生成 `JWT` 并返回 `TokenResponse`
- 失败返回:`401`(统一错误包络:`{code,message,detail,traceId}`)。
- 将 `auth` 路由挂载到 `app/main.py``/api/v1`)。
### 受限接口保护
- 在现有路由clusters/faults/logs添加可选依赖 `get_current_user`(至少在需要用户态的写操作时强制验证)。
- 统一异常处理:
- 未认证:返回 `401`,前端跳转登录;
- 无权限:返回 `403`,前端显示无权限提示。
## 前端集成
- 在 `src/fronted/index.html` 的登录页(`#login`)绑定提交事件:
- 调用 `POST /api/v1/user/login`,成功后将 `access_token` 存入 `localStorage`
- 切换到主界面并在 Axios 拦截器中附加 `Authorization: Bearer <token>`
- 在 utils 层(`utils/auth.js`
- `setupAuth()`注册登录表单事件、退出登录、Token 读写;
- `setupAxios()`:请求拦截器注入 `Authorization`,响应拦截器处理 `401/403`(跳转登录/无权限提示)。
## 安全与配置
- `.env`(不提交仓库):
- `DATABASE_URL=postgresql+asyncpg://app_user:<pwd>@localhost:5432/hadoop_fault_db`
- `SECRET_KEY=<随机32字节>`
- `ACCESS_TOKEN_EXPIRE_MINUTES=60`
- 最小权限:后端连接使用 `app_user`,仅业务读写权限(已在教程中设置)。
## 测试与验证
- 用脚本插入一个测试用户(若已有则跳过),并设置 bcrypt 哈希(脚本已包含管理员示例)。
- 后端启动:`uvicorn src.backend.app.main:app --reload`
- 测试:
- `POST /api/v1/user/login` 成功返回 Token
- 携带 Authorization 访问 `GET /api/v1/clusters` 正常;
- 不带 Token 时受限接口返回 401/403。
- 前端登录表单:输入已存在用户与密码,跳转后页面 API 正常附带 Token。
## 交付物
- 后端:`auth` 路由与 `security` 工具、`users` 模型与 `schemas`、主入口注册与依赖更新。
- 前端:`utils/auth.js` 登录逻辑与 Axios 拦截器;在 `index.html` 调用初始化。
## 后续扩展
- 注册与审批流(`POST /api/user/register` → 管理员审批列表);
- 角色与权限在 Token 中下发(`role_key`),前端按角色控制菜单显示;
- 刷新令牌与登出;
- 审计:登录成功/失败写入 `audit_logs`

@ -0,0 +1,199 @@
## 目标
* 基于现有需求文档与数据库建表脚本,生成一份从零开始的 PostgreSQL 部署步骤教程文件Markdown
* 生成与项目技术栈匹配的后端初始代码FastAPI + SQLAlchemy + asyncpg覆盖基础连通与核心模块最小可用接口。
## 参考依据
* 需求规格说明书:`doc/project/需求规格说明书.md`
* 建表脚本:`doc/project/数据库建表脚本_postgres.sql`
* 数据库设计与ER说明`doc/project/数据库设计文档.md`、`doc/project/ER图设计说明.md`
## 交付物
* 新增文档:`doc/project/数据库部署步骤教程.md`
* 新增代码目录:`backend/`
* `backend/requirements.txt`
* `backend/app/main.py`
* `backend/app/config.py`
* `backend/app/db.py`
* `backend/app/models/*.py`核心表模型clusters、nodes、system\_logs、fault\_records、exec\_logs 等)
* `backend/app/schemas/*.py`Pydantic 模型)
* `backend/app/routers/*.py`clusters、faults、logs、health
## 数据库部署教程大纲
* 环境要求与准备
* PostgreSQL 14+Windows 环境PowerShell或 Docker Desktop
* 建表脚本路径与说明(不包含 CREATE DATABASE
* 安装 PostgreSQL两种方式
* 安装器方式:下载安装、设置超级用户、添加 PATH
* Docker 方式:`docker run -d --name pg -e POSTGRES_PASSWORD=... -p 5432:5432 postgres:14`
* 初始化数据库与用户
* 创建数据库:`CREATE DATABASE hadoop_fault_db WITH ENCODING 'UTF8';`
* 可选:创建业务用户与授权
* 本地认证配置(`pg_hba.conf` 的 `host all all 127.0.0.1/32 md5` 提示)
* 执行建表脚本
* Windows`psql -U postgres -h localhost -d hadoop_fault_db -f doc/project/数据库建表脚本_postgres.sql`
* Docker`docker cp` + 容器内 `psql -f` 执行
* 验证与自检
* 表/索引/约束检查示例查询(如 `\dt`、`\di`、`\d+ fault_records`
* 运行脚本内置的示例数据并验证返回提示语句
* 备份与恢复
* 备份:`pg_dump -U postgres -d hadoop_fault_db > backup.sql`
* 恢复:`psql -U postgres -d hadoop_fault_db -f backup.sql`
* 连接字符串与安全建议
* DSN 示例:`postgresql://postgres:<pwd>@localhost:5432/hadoop_fault_db`
* 禁止明文密钥入库、最小权限、定期备份演练
## 后端初始代码计划
* 依赖管理:`requirements.txt`
* `fastapi`, `uvicorn[standard]`, `SQLAlchemy>=2`, `asyncpg`, `pydantic`, `python-dotenv`
* 配置模块:`app/config.py`
* 加载环境变量(数据库 DSN、服务端口、日志级别
* 数据库模块:`app/db.py`
* 创建 `AsyncEngine``async_sessionmaker`,封装获取/关闭会话函数(含函数级注释/Docstring
* SQLAlchemy 模型:`app/models/*.py`
* 映射核心表clusters、nodes、system\_logs、fault\_records、exec\_logs、roles、permissions、user\_role\_mapping、user\_cluster\_mapping、app\_configurations、audit\_logs
* 类型对齐:`JSONB`→`postgresql.JSONB`、`INET`→`postgresql.INET`、时间→`TIMESTAMP(timezone=True)`
* Pydantic 模型:`app/schemas/*.py`
* 输入/输出模型ClusterCreate/ClusterRead、FaultCreate/FaultRead、LogQuery
* 路由模块:`app/routers/*.py`
* `health.py`健康检查DB连通探测
* `clusters.py``GET /api/v1/clusters`、`POST /api/v1/clusters`
* `faults.py``GET /api/v1/faults`、`POST /api/v1/faults`
* `logs.py``GET /api/v1/logs`(分页/过滤)
* 应用入口:`app/main.py`
* 注册路由、CORS允许前端域名、异常处理器、版本前缀 `/api/v1`
* 运行说明
* 创建虚拟环境并安装:`python -m venv venv & venv\Scripts\activate & pip install -r requirements.txt`
* 启动:`uvicorn app.main:app --reload`
* 环境配置:`.env` 中设置 `DATABASE_URL=postgresql+asyncpg://postgres:<pwd>@localhost:5432/hadoop_fault_db`
## 代码片段示例(将用于实际生成)
* `app/db.py`
```python
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
DATABASE_URL = "postgresql+asyncpg://postgres:password@localhost:5432/hadoop_fault_db"
engine = create_async_engine(DATABASE_URL, echo=False, pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_db() -> AsyncSession:
"""获取一个异步数据库会话,用于依赖注入。"""
async with SessionLocal() as session:
yield session
```
* `app/main.py`
```python
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import health, clusters, faults, logs
app = FastAPI(title="Hadoop Fault Detecting API", version="v1")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"])
app.include_router(health.router, prefix="/api/v1")
app.include_router(clusters.router, prefix="/api/v1")
app.include_router(faults.router, prefix="/api/v1")
app.include_router(logs.router, prefix="/api/v1")
```
* `app/routers/health.py`
```python
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_db
router = APIRouter()
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""健康检查:测试数据库连通性。"""
await db.execute("SELECT 1")
return {"status": "ok"}
```
* `app/routers/clusters.py`(示例)
```python
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db import get_db
from app.models.clusters import Cluster
router = APIRouter()
@router.get("/clusters")
async def list_clusters(db: AsyncSession = Depends(get_db)):
"""查询集群列表。"""
result = await db.execute(select(Cluster).limit(100))
rows = result.scalars().all()
return {"total": len(rows), "list": [c.to_dict() for c in rows]}
```
## 验证与后续工作
* 我将按上述结构生成教程文件与后端初始代码,并在本地运行基本连通(`/health`、`/clusters`)进行自检。
* 后续可按 SRS 模块逐步扩展接口与实体映射,增加 WebSocket、审计与权限模块。

@ -0,0 +1,9 @@
import os
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:password@localhost:5432/hadoop_fault_db",
)

@ -0,0 +1,10 @@
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from app.config import DATABASE_URL
engine = create_async_engine(DATABASE_URL, echo=False, pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_db() -> AsyncSession:
"""获取一个异步数据库会话,用于依赖注入。"""
async with SessionLocal() as session:
yield session

@ -0,0 +1,18 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app.routers import health, clusters, faults, logs
app = FastAPI(title="Hadoop Fault Detecting API", version="v1")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(health.router, prefix="/api/v1")
app.include_router(clusters.router, prefix="/api/v1")
app.include_router(faults.router, prefix="/api/v1")
app.include_router(logs.router, prefix="/api/v1")

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass

@ -0,0 +1,34 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Integer
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy import TIMESTAMP
from app.models import Base
class Cluster(Base):
__tablename__ = "clusters"
id: Mapped[int] = mapped_column(primary_key=True)
uuid: Mapped[str] = mapped_column(UUID(as_uuid=False), unique=True)
name: Mapped[str] = mapped_column(String(100), unique=True)
type: Mapped[str] = mapped_column(String(50))
node_count: Mapped[int] = mapped_column(Integer, default=0)
health_status: Mapped[str] = mapped_column(String(20), default="unknown")
description: Mapped[str | None] = mapped_column(String, nullable=True)
config_info: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
updated_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
def to_dict(self) -> dict:
"""将集群对象转换为可序列化字典。"""
return {
"id": self.id,
"uuid": self.uuid,
"name": self.name,
"type": self.type,
"node_count": self.node_count,
"health_status": self.health_status,
"description": self.description,
"config_info": self.config_info,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}

@ -0,0 +1,39 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Integer
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import TIMESTAMP, Text
from app.models import Base
class ExecLog(Base):
__tablename__ = "exec_logs"
id: Mapped[int] = mapped_column(primary_key=True)
exec_id: Mapped[str] = mapped_column(String(32), unique=True)
fault_id: Mapped[str] = mapped_column(String(32))
command_type: Mapped[str] = mapped_column(String(50))
script_path: Mapped[str | None] = mapped_column(String(255), nullable=True)
command_content: Mapped[str] = mapped_column(Text)
target_nodes: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
risk_level: Mapped[str] = mapped_column(String(20), default="medium")
execution_status: Mapped[str] = mapped_column(String(20), default="pending")
start_time: Mapped[str | None] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
end_time: Mapped[str | None] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
duration: Mapped[int | None] = mapped_column(Integer, nullable=True)
stdout_log: Mapped[str | None] = mapped_column(Text, nullable=True)
stderr_log: Mapped[str | None] = mapped_column(Text, nullable=True)
exit_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
operator: Mapped[str] = mapped_column(String(50), default="system")
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
updated_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
def to_dict(self) -> dict:
"""将执行日志转换为可序列化字典。"""
return {
"exec_id": self.exec_id,
"fault_id": self.fault_id,
"command_type": self.command_type,
"execution_status": self.execution_status,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"exit_code": self.exit_code,
}

@ -0,0 +1,38 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import TIMESTAMP
from app.models import Base
class FaultRecord(Base):
__tablename__ = "fault_records"
id: Mapped[int] = mapped_column(primary_key=True)
fault_id: Mapped[str] = mapped_column(String(32), unique=True)
cluster_id: Mapped[int | None] = mapped_column(nullable=True)
fault_type: Mapped[str] = mapped_column(String(50))
fault_level: Mapped[str] = mapped_column(String(20), default="medium")
title: Mapped[str] = mapped_column(String(200))
description: Mapped[str | None] = mapped_column(String, nullable=True)
affected_nodes: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
affected_clusters: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
root_cause: Mapped[str | None] = mapped_column(String, nullable=True)
repair_suggestion: Mapped[str | None] = mapped_column(String, nullable=True)
status: Mapped[str] = mapped_column(String(20), default="detected")
assignee: Mapped[str | None] = mapped_column(String(50), nullable=True)
reporter: Mapped[str] = mapped_column(String(50), default="system")
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
updated_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
resolved_at: Mapped[str | None] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
def to_dict(self) -> dict:
"""将故障记录转换为可序列化字典。"""
return {
"fault_id": self.fault_id,
"cluster_id": self.cluster_id,
"fault_type": self.fault_type,
"fault_level": self.fault_level,
"title": self.title,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
}

@ -0,0 +1,33 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Boolean
from sqlalchemy import TIMESTAMP, Text
from app.models import Base
class SystemLog(Base):
__tablename__ = "system_logs"
id: Mapped[int] = mapped_column(primary_key=True)
log_id: Mapped[str] = mapped_column(String(32), unique=True)
fault_id: Mapped[str | None] = mapped_column(String(32), nullable=True)
cluster_id: Mapped[int | None] = mapped_column(nullable=True)
timestamp: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
host: Mapped[str] = mapped_column(String(100))
service: Mapped[str] = mapped_column(String(50))
log_level: Mapped[str] = mapped_column(String(10))
message: Mapped[str] = mapped_column(Text)
exception: Mapped[str | None] = mapped_column(Text, nullable=True)
raw_log: Mapped[str | None] = mapped_column(Text, nullable=True)
processed: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
def to_dict(self) -> dict:
"""将系统日志转换为可序列化字典。"""
return {
"log_id": self.log_id,
"cluster_id": self.cluster_id,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"service": self.service,
"log_level": self.log_level,
"message": self.message,
"processed": self.processed,
}

@ -0,0 +1,14 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db import get_db
from app.models.clusters import Cluster
router = APIRouter()
@router.get("/clusters")
async def list_clusters(db: AsyncSession = Depends(get_db)):
"""查询集群列表。"""
result = await db.execute(select(Cluster).limit(100))
rows = result.scalars().all()
return {"total": len(rows), "list": [c.to_dict() for c in rows]}

@ -0,0 +1,14 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db import get_db
from app.models.fault_records import FaultRecord
router = APIRouter()
@router.get("/faults")
async def list_faults(db: AsyncSession = Depends(get_db)):
"""查询故障记录。"""
result = await db.execute(select(FaultRecord).limit(100))
rows = result.scalars().all()
return {"total": len(rows), "list": [f.to_dict() for f in rows]}

@ -0,0 +1,11 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import get_db
router = APIRouter()
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""健康检查:测试数据库连通性。"""
await db.execute("SELECT 1")
return {"status": "ok"}

@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.db import get_db
from app.models.system_logs import SystemLog
router = APIRouter()
@router.get("/logs")
async def list_logs(
db: AsyncSession = Depends(get_db),
level: str | None = Query(None),
page: int = Query(1, ge=1),
pageSize: int = Query(10, ge=1, le=100),
):
"""查询系统日志,支持按级别筛选与分页。"""
stmt = select(SystemLog)
if level:
stmt = stmt.where(SystemLog.log_level == level)
stmt = stmt.offset((page - 1) * pageSize).limit(pageSize)
result = await db.execute(stmt)
rows = result.scalars().all()
return {"total": len(rows), "list": [l.to_dict() for l in rows]}

@ -0,0 +1,15 @@
## 第八周周总结2025-11-17
本周主要完成了界面原型设计和数据库设计两项核心工作一方面基于Trae风格完成了故障诊断相关页面的双/三栏布局原型、日志联动预览与基础交互框架;另一方面,整理并输出了数据库表结构与主键、外键、索引设计,明确了用户、角色、集群、节点、故障与执行日志等实体的关系。整体推进较为顺利,但与既定计划存在偏差:原定本周聚焦于“用户注册与登录后端接口及前后端联调”,实际已调整到下周执行。
偏差原因主要有三点:
- 为保证界面原型的统一性与可扩展性,额外投入时间在组件化与交互细节上;
- 数据模型跨模块复用较多,设计与评审比预期更充分;
- 与前端联动时,为后续接口集成预留了更多占位与校验逻辑。
改进措施与下周计划:
- 将认证模块任务拆分为可交付的最小增量注册、登录、权限校验三步走优先落地后端API与基础集成测试
- 在不影响原型演示的前提下,减少视觉微调类工作,把时间向接口联调与数据校验迁移;
- 与前端保持每日15分钟站会及时同步字段与交互变更降低返工。
总体评价:本周产出可为后续开发提供稳定的界面与数据底座,虽与计划有偏差,但风险在可控范围内,已完成任务重排并明确了下周执行路径。

@ -1,3 +1,4 @@
\encoding UTF8
-- =====================================================
-- 故障检测系统数据库建表脚本 (PostgreSQL 优化版)
-- 数据库: PostgreSQL 14+

@ -0,0 +1,174 @@
# PostgreSQL 数据库部署步骤教程(从零开始)
## 环境与准备
- 操作系统WindowsPowerShell或其他平台
- 数据库PostgreSQL 14+(编码 `UTF8`
- 项目脚本:`doc/project/数据库建表脚本_postgres.sql`(不包含 `CREATE DATABASE`
- 注意:请不要以明文形式保存任何数据库账户口令
## 安装 PostgreSQL
- 安装器方式Windows
- 访问 https://www.postgresql.org/download/windows/ 下载 14+ 安装包
- 按向导安装并记住超级用户 `postgres` 的口令
- 将 `psql.exe` 所在目录加入 `PATH`
- Docker 方式:
- `docker run -d --name pg -e POSTGRES_PASSWORD=<安全口令> -p 5432:5432 postgres:14`
## 初始化数据库与用户
- 使用 `psql` 连接到默认数据库并创建业务库:
```sql
CREATE DATABASE hadoop_fault_db WITH ENCODING 'UTF8';
```
- 可选:创建业务用户并授权(按需执行):
```sql
CREATE USER app_user WITH PASSWORD '<安全口令>';
GRANT CONNECT ON DATABASE hadoop_fault_db TO app_user;
GRANT USAGE ON SCHEMA public TO app_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO app_user;
```
- 本地认证配置(如需):在 `pg_hba.conf` 中确保存在如下规则:
```
host all all 127.0.0.1/32 md5
host all all ::1/128 md5
```
## 执行建表脚本
- Windows 本机执行:
```powershell
psql -U postgres -h localhost -d hadoop_fault_db -f "doc/project/数据库建表脚本_postgres.sql"
```
- Docker 容器执行:
```powershell
docker cp "doc/project/数据库建表脚本_postgres.sql" pg:/tmp/schema.sql
docker exec -it pg psql -U postgres -d hadoop_fault_db -f /tmp/schema.sql
```
## 验证与自检
- 查看表与索引:
```sql
\dt
\di
```
- 查看关键表结构:
```sql
\d+ fault_records
\d+ exec_logs
\d+ system_logs
```
- 验证示例数据:
```sql
SELECT name, uuid FROM clusters;
SELECT role_name, role_key FROM roles;
```
## 备份与恢复
- 备份:
```powershell
pg_dump -U postgres -d hadoop_fault_db > backup_$(Get-Date -Format yyyyMMdd).sql
```
- 恢复:
```powershell
psql -U postgres -d hadoop_fault_db -f backup_20250101.sql
```
## 连接字符串示例
- 本地:`postgresql://postgres:<安全口令>@localhost:5432/hadoop_fault_db`
- 后端asyncpg`postgresql+asyncpg://postgres:<安全口令>@localhost:5432/hadoop_fault_db`
## 运维与安全建议
- 最小权限原则:为应用创建专用账号,仅授予必要权限
- 定期备份与恢复演练:使用 `pg_dump``psql` 验证恢复可用
- 指标与监控:暴露数据库关键指标并纳入监控平台
- 参数与密钥管理:统一置于安全的环境配置管理,避免明文写入代码或库
## 运维与安全操作(详细教程,结合当前项目)
### 1. 为后端应用创建专用账号并授予必要权限
> 目标后端仅能进行业务读写SELECT/INSERT/UPDATE/DELETE不能 `CREATE/DROP/ALTER` 架构对象;新建表默认也继承上述权限。
1) 创建专用账号(禁止超级权限):
```sql
CREATE USER app_user WITH PASSWORD '<安全口令>' NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT LOGIN;
```
2) 授权到业务库与 schema
```sql
GRANT CONNECT ON DATABASE hadoop_fault_db TO app_user;
GRANT USAGE ON SCHEMA public TO app_user;
```
3) 授予现有对象最小权限:
```sql
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_user;
GRANT USAGE, SELECT ON ALL SEQUENCES IN SCHEMA public TO app_user;
```
4) 为未来对象设置默认权限(由对象拥有者执行,通常为 postgres 或迁移用户):
```sql
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO app_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE, SELECT ON SEQUENCES TO app_user;
```
5) 测试最小权限(应成功读写但不能删除架构):
```sql
-- 作为 app_user 连接后测试:
SELECT COUNT(*) FROM clusters;
-- 业务写入示例(按需测试):
-- INSERT INTO fault_records (fault_id, fault_type, fault_level, title, status) VALUES ('TEST-0001','demo','low','测试故障','detected');
-- 以下操作应失败:
-- DROP TABLE system_logs; -- 预期报错:权限不足
```
### 2. 创建只读账号(用于报表/监控读取)
> 目标:只读账号仅能 `SELECT`,适用于报表或外部监控。
```sql
CREATE USER report_user WITH PASSWORD '<安全口令>' NOSUPERUSER NOCREATEDB NOCREATEROLE NOINHERIT LOGIN;
GRANT CONNECT ON DATABASE hadoop_fault_db TO report_user;
GRANT USAGE ON SCHEMA public TO report_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO report_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO report_user;
```
### 3. 连接控制pg_hba.conf与网络安全
> 目标:仅允许可信来源访问,使用密码/可选 TLS。
- 本机开发:保证如下规则存在(已在教程示例中给出):
```
host all all 127.0.0.1/32 md5
host all all ::1/128 md5
```
- 生产环境:仅白名单应用服务器 IP例如
```
host hadoop_fault_db app_user <APP_SERVER_IP>/32 md5
```
- 可选启用 TLS建议在服务端启用证书后客户端 DSN 带上 `sslmode=require`
- 例如:`postgresql+asyncpg://app_user:<安全口令>@<host>:5432/hadoop_fault_db?sslmode=require`
### 4. 后端配置与密钥管理
> 目标:避免明文密钥进入代码库,统一通过环境变量或安全配置管理下发。
- 示例:在后端 `.env` 中设置(不要提交到仓库):
```
DATABASE_URL=postgresql+asyncpg://app_user:<安全口令>@localhost:5432/hadoop_fault_db
JWT_SECRET=<随机高强度密钥>
JWT_EXPIRE_MINUTES=60
```
- 代码引用:`src/backend/app/config.py` 通过 `dotenv` 加载 `DATABASE_URL`
- 口令轮换:
```sql
ALTER ROLE app_user WITH PASSWORD '<新口令>'; -- 轮换后更新部署环境中的 DATABASE_URL
```
### 5. SQL 审计与慢查询日志(可选)
> 目标:定位性能问题与风险操作。生产环境谨慎开启,避免过多日志。
- 建议参数(在 `postgresql.conf``ALTER SYSTEM`
```
log_min_duration_statement = 500ms # 仅记录慢于 500ms 的SQL
log_line_prefix = '%m [%p] %u %d %r' # 时间/进程/用户/库/来源
```
- 重载配置:`SELECT pg_reload_conf();`
### 6. 维护例行(每周/每月)
> 目标:保持统计与索引健康,日志/表空间合理。
- 更新统计信息:`VACUUM (ANALYZE);`
- 查看膨胀与重建索引(按需):`REINDEX DATABASE hadoop_fault_db;`
- 日志归档与保留策略:结合 `app_configurations` 中的保留天数统一治理。

@ -0,0 +1,231 @@
# 登录/注册模块前后端联调详细教程(分角色分阶段)
## 目标与范围
- 目标:在开发网络内让前端同学本地运行页面并稳定访问同一套后端与数据库,完成登录/注册模块的联调与测试。
- 范围Windows 开发环境;后端 FastAPI + SQLAlchemy异步`asyncpg`);数据库 PostgreSQL前端为静态站点`src/fronted`)。
## 角色与职责
- 数据库管理员DBA部署 PostgreSQL、建库建表、放行网络、提供连接串。
- 后端负责人BE配置 `.env`、安装依赖、启动后端、开放端口、提供 API。
- 前端开发FE启动静态站点、配置后端基地址、发起接口请求、验证 CORS。
- 联调与测试QA/协同):执行跨角色的验证脚本,记录问题并推动修复。
## 阶段划分与通过标准
- 阶段 0网络与信息准备 → 互相 `ping` 可达,记录 IP/端口/用户信息
- 阶段 1数据库部署与放行DBA`psql SELECT 1` 成功,其他电脑 `Test-NetConnection` 通过
- 阶段 2后端启动与连通性验证BE`/api/v1/health` 返回 `{"status":"ok"}`
- 阶段 3前端启动与联调准备FE → FE 本地页面能请求 `health/clusters/faults/logs`
- 阶段 4登录/注册联调与验收FE+BE+QA → 注册/登录/`/user/me` 流程完整可用
---
## 阶段 0网络与信息准备所有角色
**步骤**
- 查询本机 IP
```powershell
ipconfig
```
- 测试连通:
```powershell
ping <对方IP>
```
- 记录统一信息(示例):
- 后端开发机 IP`192.168.1.50`
- 数据库主机 IP`192.168.1.50:5432`
- 后端服务端口:`8000`
- 数据库:`hadoop_fault_db`,账户:`app_user`
**验证通过标准**
- 前端电脑可以 `ping` 通后端开发机与数据库主机,并记录信息到协同表格。
---
## 阶段 1数据库部署与放行DBA
**方案 A本地安装 PostgreSQL**
1) 配置远程监听:
- 编辑 `postgresql.conf``listen_addresses = '*'`
- 编辑 `pg_hba.conf`:添加网段白名单(示例)
```
host all all 192.168.1.0/24 md5
```
2) 重启服务并开放端口Windows 防火墙):
```powershell
New-NetFirewallRule -DisplayName "Postgres 5432" -Direction Inbound -LocalPort 5432 -Protocol TCP -Action Allow
```
3) 创建数据库与用户(在 `psql` 中执行):
```sql
CREATE DATABASE hadoop_fault_db;
CREATE USER app_user WITH PASSWORD 'StrongPassword123!';
GRANT ALL PRIVILEGES ON DATABASE hadoop_fault_db TO app_user;
```
4) 执行建表脚本:
```powershell
psql -h <DB> -U app_user -d hadoop_fault_db -f "doc/project/数据库建表脚本_postgres.sql"
```
**方案 BDocker 快速部署**
```powershell
docker run --name pg14 -e POSTGRES_PASSWORD=StrongPassword123! -p 5432:5432 -d postgres:14
docker exec -it pg14 psql -U postgres -c "CREATE DATABASE hadoop_fault_db;"
docker exec -it pg14 psql -U postgres -c "CREATE USER app_user WITH PASSWORD 'StrongPassword123!';"
docker exec -it pg14 psql -U postgres -d hadoop_fault_db -c "GRANT ALL PRIVILEGES ON DATABASE hadoop_fault_db TO app_user;"
docker cp "doc/project/数据库建表脚本_postgres.sql" pg14:/tmp/schema.sql
docker exec -it pg14 psql -U app_user -d hadoop_fault_db -f /tmp/schema.sql
```
**验证通过标准**
- 任一电脑执行:
```powershell
psql -h <DB> -U app_user -d hadoop_fault_db -c "SELECT 1"
Test-NetConnection <DB> -Port 5432
```
- 结果显示查询成功,`TcpTestSucceeded : True`。
---
## 阶段 2后端启动与连通性验证BE
**安装依赖与配置环境**
```powershell
cd e:\学习通\download\软件项目管理\故障检测\项目\ErrorDetecting
python -m venv venv
venv\Scripts\activate
pip install -r src\backend\requirements.txt
```
**创建 `.env`(与 `src/backend/app/config.py:6-9` 加载逻辑一致)**
```env
DATABASE_URL=postgresql+asyncpg://app_user:StrongPassword123!@<DB>:5432/hadoop_fault_db
```
**启动后端并监听局域网**
```powershell
uvicorn src.backend.app.main:app --host 0.0.0.0 --port 8000 --reload
```
**开放 8000 端口Windows 防火墙)**
```powershell
New-NetFirewallRule -DisplayName "Uvicorn 8000" -Direction Inbound -LocalPort 8000 -Protocol TCP -Action Allow
```
**可用联调端点**
- 健康检查:`GET http://<后端IP>:8000/api/v1/health``src/backend/app/routers/health.py:7-11`
- 集群列表:`GET http://<后端IP>:8000/api/v1/clusters``src/backend/app/routers/clusters.py:9-14`
- 故障列表:`GET http://<后端IP>:8000/api/v1/faults``src/backend/app/routers/faults.py:9-14`
- 系统日志:`GET http://<后端IP>:8000/api/v1/logs?page=1&pageSize=10``src/backend/app/routers/logs.py:9-23`
**验证通过标准**
- 前端电脑执行:
```powershell
curl http://<后端IP>:8000/api/v1/health
```
- 返回 `{"status":"ok"}`;后端控制台无错误(数据库连通:`health_check` 会执行 `SELECT 1`)。
---
## 阶段 3前端启动与联调准备FE
**启动静态站点服务**
- Node 方案:
```powershell
npx http-server src/fronted -p 5173
```
- Python 方案:
```powershell
cd src\fronted
python -m http.server 5173
```
- 打开浏览器:`http://localhost:5173`
**配置后端基地址(开发)**
- 将前端调用统一指向:`http://<后端IP>:8000/api/v1`
**验证通过标准**
- 浏览器直接访问:`http://<后端IP>:8000/api/v1/health` 返回 `ok`
- 从前端页面发起请求到 `health/clusters/faults/logs` 均正常返回;控制台无 CORS 报错(`src/backend/app/main.py:7-13` 开启任意来源)。
---
## 阶段 4登录/注册联调与验收FE+BE+QA
**后端接口建议(落地后联调)**
- `POST /api/v1/user/register``{ username, password, email }` → 入库 `users`
- `POST /api/v1/user/login``{ username, password }` → 返回 `{ access_token, token_type }`
- `GET /api/v1/user/me``Authorization: Bearer <token>` → 返回当前用户
**前端联调要点**
- 替换本地演示登录逻辑为真实 `fetch/axios` 调用;登录成功后保存 `access_token``localStorage`,在后续请求中注入 `Authorization: Bearer <token>`
**示例验证命令(后端接口就绪后执行)**
```powershell
# 注册
curl -X POST http://<后端IP>:8000/api/v1/user/register -H "Content-Type: application/json" -d "{\"username\":\"admin\",\"password\":\"Admin123!\",\"email\":\"admin@example.com\"}"
# 登录
curl -X POST http://<后端IP>:8000/api/v1/user/login -H "Content-Type: application/json" -d "{\"username\":\"admin\",\"password\":\"Admin123!\"}"
# 获取当前用户(替换 <access_token>
curl http://<后端IP>:8000/api/v1/user/me -H "Authorization: Bearer <access_token>"
```
**验收标准**
- 注册返回成功并在 `users` 表可见记录。
- 登录返回有效 `access_token``/user/me` 返回当前用户信息。
- 前端展示用户菜单、受控页面按角色可见;携带 Token 的后续请求成功。
---
## 故障排查与回滚建议
**端口不可达**
- 后端:确认 `--host 0.0.0.0`、防火墙入站规则(`8000`)、`Test-NetConnection <后端IP> -Port 8000`
- 数据库:检查 `pg_hba.conf` 白名单、`postgresql.conf` 监听、`5432` 防火墙
**数据库认证失败**
- 统一连接串与口令,确保 `.env` 生效;校验 `app_user` 权限与表是否存在。
**CORS 报错**
- 开发阶段允许任意来源;联调稳定后将 `allow_origins` 收敛到 FE 来源(如 `http://localhost:5173`)。
**JWT 校验失败**
- FE 是否正确传入 `Authorization: Bearer <token>`;后端 `JWT_SECRET`/过期时间一致(落地登录后)。
**回滚建议**
- 回到阶段 2/3 先以 `health`/列表接口验证链路,登录/注册接口再增量上线。
---
## 安全与合规
- 仅在受控内网放行 `5432/8000`;使用隧道工具时加 IP 白名单与强密码。
- `.env` 不入库,敏感信息只在环境中配置。
- CORS 在联调后尽快收敛到明确的 FE 开发来源列表。
---
## 关键代码位置(便于对焦)
- CORS 配置:`src/backend/app/main.py:7-13`
- 环境变量加载与默认连接串:`src/backend/app/config.py:6-9`
- 数据库异步会话:`src/backend/app/db.py:1-10`
- 健康检查:`src/backend/app/routers/health.py:7-11`
- 列表接口:
- 集群:`src/backend/app/routers/clusters.py:9-14`
- 故障:`src/backend/app/routers/faults.py:9-14`
- 日志:`src/backend/app/routers/logs.py:9-23`
---
## 常用验证命令清单PowerShell
```powershell
# 网络
ipconfig
ping <对方IP>
Test-NetConnection <主机> -Port <端口>
# 数据库
psql -h <DB> -U app_user -d hadoop_fault_db -c "SELECT 1"
# 后端
uvicorn src.backend.app.main:app --host 0.0.0.0 --port 8000 --reload
curl http://<后端IP>:8000/api/v1/health
# 前端(二选一)
npx http-server src/fronted -p 5173
cd src\fronted; python -m http.server 5173
```

@ -19,11 +19,11 @@
## 3. 功能需求(按模块)
### F-01 身份认证与注册审批
- 说明:登录、注册(审批队列)、用户状态管理。
- 说明:登录、注册(审批队列)、用户状态管理、JWT 认证
- 前端映射:`#login`、`#register`、`#user-management`(审批列表)。
- 接口示例:`POST /api/user/login`、`POST /api/user/register`、`GET /api/admin/approvals`、`POST /api/admin/approvals/{id}/approve`。
- 接口示例:`POST /api/v1/user/login`、`POST /api/v1/user/register`、`GET /api/v1/admin/approvals`、`POST /api/v1/admin/approvals/{id}/approve`、`GET /api/v1/user/me`。
- 数据映射:`users`、`audit_logs`、`roles/user_role_mapping`。
- 验收:登录成功生成会话与角色;注册进入审批队列;操作审计记录完整。
- 验收:登录成功生成 JWT 与角色;注册进入审批队列;操作审计记录完整;受保护接口需携带 `Authorization: Bearer <token>`
### F-02 集群列表与注册
- 说明:查看已注册集群、注册新集群、注销集群。
@ -146,4 +146,4 @@
## 10. 需求追踪矩阵(摘要)
- 功能模块 ↔ 前端页面 ↔ 后端接口 ↔ 数据表 ↔ 指标/验证用例。
- 用于测试与验收阶段交叉核对,确保每项需求均有实现与验证。
- 用于测试与验收阶段交叉核对,确保每项需求均有实现与验证。

@ -27,7 +27,7 @@
- In Scope
- 前端页面(参考 `src/fronted/index.html`
- 集群列表/注册、仪表板监控、日志查询、故障诊断多智能体原型、故障中心CRUD、执行日志CRUD、告警配置规则管理原型、用户管理、角色分配、权限策略、审计日志、登录/注册/审批。
- 后端服务FastAPI日志接收与结构化、集群状态查询、诊断接口、大模型调用、修复执行编排、审计与权限。
- 后端服务FastAPI日志接收与结构化、集群状态查询、诊断接口、大模型调用、修复执行编排、审计与权限登录与认证JWT
- 多智能体编排DiagnosisAgent、RepairAgent、PolicyAgent、LogParserAgent。
- 数据存储PostgreSQLJSONB/UUID/INET/TIMESTAMPTZRedis 缓存与限流。
- 可观测性Prometheus 指标采集、Grafana 展示。
@ -54,4 +54,4 @@
## 9. 成功度量KPI
- 诊断准确率 ≥ 85%;故障闭环时效 ≤ 5 分钟;修复复发率 ≤ 5%。
- 关键页面 P95 响应 ≤ 500ms错误率 ≤ 1%WebSocket 在线率 ≥ 99%。
- 关键页面 P95 响应 ≤ 500ms错误率 ≤ 1%WebSocket 在线率 ≥ 99%。

@ -0,0 +1,87 @@
# 项目需求分析基于Hadoop的故障检测与自动恢复
## 项目概述
- 目标:为 Hadoop 运维提供监控、日志分析、智能诊断与自动修复的一体化平台。
- 范围Web 应用(前端、后端、多智能体编排)、日志采集、数据库与缓存、可观测性。
- 技术栈FastAPI + SQLAlchemy异步`asyncpg`、PostgreSQL 14+`JSONB/UUID/INET/TIMESTAMPTZ`、Flume、前端静态站点原型。
## 背景与问题陈述
- Hadoop 集群在生产中存在多源日志、跨节点事件关联困难、故障定位成本高、修复操作规范性不足等问题。
- 需要一个统一的故障中心:能从日志线索到诊断建议、审批把关与执行记录形成可审计闭环,并提供指标、报表与可视化。
## 目标与成功标准
- 核心闭环:日志→诊断→修复→审计→报表;支持状态机与高风险审批。
- 成功标准:接口与前端原型连通、关键场景可演示、性能与安全指标达基线、文档与数据模型一致。
## 用户角色与场景
- 管理员admin配置与审批、规则管理、用户与权限、全局审计与报表。
- 操作员operator诊断与修复执行、故障中心操作、仪表板监控。
- 观察员observer只读查看集群、日志、执行记录、报表
- 角色可在前端原型中映射,路由控制参考 `src/fronted/utils/auth.js`
## 关键业务流程
- 日志采集Flume 推送结构化日志 → 后端预处理与入库 → 前端查询分页展示。
- 故障诊断:选择上下文日志 → LLM 诊断DiagnosisAgent→ 风险评估PolicyAgent→ 建议与报告生成。
- 修复执行:审批通过 → 执行脚本命令 → 标准输出与错误输出入库 → 结果推送与审计。
- WS 推送:仪表板状态与诊断结果实时更新,断线重连策略。
- 时序参考:`doc/project/diagrams/时序图.puml`。
## 功能需求分解(与前端原型/后端接口/数据表映射)
- 身份认证与注册审批F-01`#login/#register/#user-management``POST /api/v1/user/login`、`POST /api/v1/user/register`、`GET /api/v1/user/me`;数据 `users/audit_logs/roles/user_role_mapping`
- 集群列表与注册F-02`#cluster-list``/api/clusters`;数据 `clusters(uuid,name,type,node_count,health_status,config_info)`
- 仪表板监控F-03`#dashboard``/api/cluster/status`、WS `/ws/status`;数据 `nodes(...)`
- 日志查询F-04`#logs``/api/logs?...`;数据 `system_logs(log_id,timestamp,host,service,log_level,message,...)`
- 故障诊断F-05`#diagnosis``POST /api/llm/diagnose`、`POST /api/llm/report`;数据 `fault_records/exec_logs`
- 故障中心F-06`#fault-center``/api/faults/*`、`POST /api/faults/{id}/transition`;数据 `fault_records(...)`
- 执行日志F-07`#exec-logs``/api/exec-logs/*`;数据 `exec_logs(...)`
- 告警配置F-08`#alert-config``/api/alerts/*`;数据 `app_configurations(config_type='alert_rule',...)`
- 用户管理与角色分配F-09`#user-management/#role-assignment``/api/admin/*`;数据 `users/roles/user_role_mapping`
- 权限策略F-10`#permission-policy``/api/policies/*`;数据 `roles/permissions/role_permission_mapping`
- 审计日志F-11`#audit-logs``/api/audit-logs/*`;数据 `audit_logs(...)`
- WebSocket 推送F-12`#dashboard`、`/ws/status`、`/ws/diagnose`;消息格式统一与重连策略。
- 详细 SRS`doc/project/需求规格说明书.md:19-103`
## 接口约定与版本策略
- API 前缀:`/api/v1`(见 `src/backend/app/main.py:15-18` 路由挂载)。
- 返回结构:列表 `{ total, list }`;错误包络 `{ code, message, detail, traceId }`(见 `src/backend/README.md:47-55`)。
- 统一状态码与业务码映射:参考团队约定(见周报 `doc/process/weekly/week-9/...`)。
- 时间与类型规范:`TIMESTAMPTZ`/`JSONB` 对齐 PostgreSQL 原生类型。
## 非功能需求与指标
- 性能:核心查询 P95 ≤ 500msWS 延迟 ≤ 500ms后端并发 ≥ 1k 连接稳定(见 `doc/project/前后端启动前交流确认清单.md:125`)。
- 安全JWT、CORS 白名单、RBAC、速率限制、审计敏感信息通过环境变量管理`DATABASE_URL/JWT_SECRET`)。
- 可靠性:数据库备份与恢复演练;关键链路重试与降级。
- 可观测性Prometheus 指标、Grafana 看板、日志分级与 TraceID。
- 可维护性与可用性模块化、组件化、ARIA 可访问性与清晰交互反馈。
## 数据模型与约束摘要
- 关键实体:`clusters`、`nodes`、`system_logs`、`fault_records`、`exec_logs`、`app_configurations`、`users`、`audit_logs`、`roles`、`permissions`、`user_role_mapping`、`role_permission_mapping`、`user_cluster_mapping`。
- 约束:唯一键与外键完整;检查约束覆盖状态与枚举值;高频查询索引覆盖。
- 参考:`doc/project/数据库设计文档.md`、`doc/project/数据库建表脚本_postgres.sql`、`doc/project/数据字典.md`。
## 当前实现基线(便于联调)
- 后端入口:`src/backend/app/main.py:1-18`CORS 允许所有来源(开发阶段)。
- 基础路由:`/api/v1/health`、`/api/v1/clusters`、`/api/v1/faults`、`/api/v1/logs`(见路由目录)。
- 环境变量加载:`src/backend/app/config.py:6-9`;数据库会话:`src/backend/app/db.py:1-10`。
- 前端原型:`src/fronted/index.html`;导航与权限:`src/fronted/utils/navigation.js`、`src/fronted/utils/auth.js`。
## 里程碑与交付
- M1接口与数据模型最小可用列表与健康检查可演示
- M2登录/注册与 JWT 完成RBAC 初版;审计入库。
- M3诊断与修复闭环含审批与执行日志报表初版。
- M4性能与监控基线达标安全策略落地文档齐备。
## 验收标准
- 端到端闭环可演示;接口与页面一致;关键指标达基线;审计与安全策略有效。
- 文档一致性SRS、数据字典、建表脚本与接口文档一致代码路径与接口前缀清晰可查。
## 风险与应对
- 并发一致性:采用乐观/幂等策略,后续引入锁与事务细化;测试覆盖关键状态转换。
- 安全复杂度JWT 与角色权限细化,接口白名单与速率控制;日志审计与告警联动。
- 集成复杂度Flume/Hadoop 环节逐步引入,保持后端与前端最小闭环可演示。
## 开放问题与下一步
- 诊断与审批策略边界需与业务方细化;高风险分类与联动规则待完善。
- 报表与指标体系细化准确率、时效性、复发率、KPI 定义)。
- WebSocket 与队列/缓存(如 Redis集成方案与参数调优连接池、超时、重试

@ -0,0 +1,72 @@
# 后端代码结构说明FastAPI + SQLAlchemy + asyncpg
## 目录结构
- `src/backend/requirements.txt`Python 依赖列表
- `src/backend/app/main.py`:应用入口与路由注册
- `src/backend/app/config.py`:配置加载(环境变量、数据库连接串)
- `src/backend/app/db.py`:数据库引擎与会话管理(异步)
- `src/backend/app/models/`SQLAlchemy 模型PostgreSQL 原生类型对齐)
- `src/backend/app/routers/`API 路由(按模块划分)
## 关键模块
- 应用入口:`src/backend/app/main.py`
- 注册中间件CORS与路由前缀 `/api/v1`
- 挂载模块:`health`、`clusters`、`faults`、`logs`
- 配置加载:`src/backend/app/config.py`
- 环境变量 `DATABASE_URL`(默认:`postgresql+asyncpg://postgres:password@localhost:5432/hadoop_fault_db`
- 数据库会话:`src/backend/app/db.py`
- 创建 `AsyncEngine``async_sessionmaker`
- 依赖注入函数 `get_db()` 提供异步会话
- 模型层:`src/backend/app/models/`
- `clusters.py``JSONB`、`UUID`、`TIMESTAMPTZ` 字段对齐
- `fault_records.py`:支持 `affected_nodes`/`affected_clusters`JSONB
- `exec_logs.py`:执行日志(文本/JSONB/时间/状态)
- `system_logs.py`:系统日志(文本/时间/级别/处理标记)
- 路由层:`src/backend/app/routers/`
- `health.py``GET /api/v1/health`(连通性检查)
- `clusters.py``GET /api/v1/clusters`(列表)
- `faults.py``GET /api/v1/faults`(列表)
- `logs.py``GET /api/v1/logs`(按级别筛选与分页)
## 运行方式
- 创建并激活虚拟环境、安装依赖:
```
python -m venv venv
venv\Scripts\activate
pip install -r src/backend/requirements.txt
```
- 配置环境变量(或 `.env`
```
DATABASE_URL=postgresql+asyncpg://postgres:<安全口令>@localhost:5432/hadoop_fault_db
```
- 启动服务Windows PowerShell
```
uvicorn src.backend.app.main:app --reload
```
## API 约定
- 前缀:`/api/v1`
- 返回结构:
- 列表:`{ "total": <int>, "list": [...] }`
- 错误包络(建议):`{ "code": <int>, "message": "<string>", "detail": { ... }, "traceId": "<uuid>" }`
- 常用查询:
- 日志:`GET /api/v1/logs?level=&page=&pageSize=`
## 数据库类型对齐
- JSON`JSONB`
- IP`INET`
- 时间:`TIMESTAMPTZ`(后端使用 `TIMESTAMP(timezone=True)` 映射)
- 主键:`GENERATED ALWAYS AS IDENTITY`
## 后续扩展建议
- 增加 Pydantic `schemas`(输入/输出)与 `POST` 接口(例如新增集群、新增故障)
- 引入 WebSocket `/ws/status``/ws/diagnose` 支持实时推送
- 加入审计与权限模块(`roles`/`permissions`/`user_role_mapping` 等)
- 接入 Prometheus 指标与 Grafana 看板接口耗时、错误率、WS 在线率)
## 登录与认证(实现规划)
- 依赖:`passlib[bcrypt]` 或 `bcrypt`、`python-jose[cryptography]`
- 路由:`POST /api/v1/user/login`(用户名/密码)返回 `access_token``GET /api/v1/user/me` 返回当前用户信息
- 认证JWT Bearer环境变量 `JWT_SECRET``JWT_EXPIRE_MINUTES`
- 审计:登录成功/失败在 `audit_logs` 记录 `action=login/login_failed`
- 前端:登录页调用登录接口,保存 `access_token``localStorage`Axios 拦截器注入 `Authorization: Bearer <token>`

@ -0,0 +1,9 @@
import os
from dotenv import load_dotenv
load_dotenv()
DATABASE_URL = os.getenv(
"DATABASE_URL",
"postgresql+asyncpg://postgres:password@localhost:5432/hadoop_fault_db",
)

@ -0,0 +1,10 @@
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from .config import DATABASE_URL
engine = create_async_engine(DATABASE_URL, echo=False, pool_pre_ping=True)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_db() -> AsyncSession:
"""获取一个异步数据库会话,用于依赖注入。"""
async with SessionLocal() as session:
yield session

@ -0,0 +1,18 @@
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from .routers import health, clusters, faults, logs
app = FastAPI(title="Hadoop Fault Detecting API", version="v1")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(health.router, prefix="/api/v1")
app.include_router(clusters.router, prefix="/api/v1")
app.include_router(faults.router, prefix="/api/v1")
app.include_router(logs.router, prefix="/api/v1")

@ -0,0 +1,4 @@
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
class Base(DeclarativeBase):
pass

@ -0,0 +1,34 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Integer
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy import TIMESTAMP
from . import Base
class Cluster(Base):
__tablename__ = "clusters"
id: Mapped[int] = mapped_column(primary_key=True)
uuid: Mapped[str] = mapped_column(UUID(as_uuid=False), unique=True)
name: Mapped[str] = mapped_column(String(100), unique=True)
type: Mapped[str] = mapped_column(String(50))
node_count: Mapped[int] = mapped_column(Integer, default=0)
health_status: Mapped[str] = mapped_column(String(20), default="unknown")
description: Mapped[str | None] = mapped_column(String, nullable=True)
config_info: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
updated_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
def to_dict(self) -> dict:
"""将集群对象转换为可序列化字典。"""
return {
"id": self.id,
"uuid": self.uuid,
"name": self.name,
"type": self.type,
"node_count": self.node_count,
"health_status": self.health_status,
"description": self.description,
"config_info": self.config_info,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}

@ -0,0 +1,39 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Integer
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import TIMESTAMP, Text
from . import Base
class ExecLog(Base):
__tablename__ = "exec_logs"
id: Mapped[int] = mapped_column(primary_key=True)
exec_id: Mapped[str] = mapped_column(String(32), unique=True)
fault_id: Mapped[str] = mapped_column(String(32))
command_type: Mapped[str] = mapped_column(String(50))
script_path: Mapped[str | None] = mapped_column(String(255), nullable=True)
command_content: Mapped[str] = mapped_column(Text)
target_nodes: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
risk_level: Mapped[str] = mapped_column(String(20), default="medium")
execution_status: Mapped[str] = mapped_column(String(20), default="pending")
start_time: Mapped[str | None] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
end_time: Mapped[str | None] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
duration: Mapped[int | None] = mapped_column(Integer, nullable=True)
stdout_log: Mapped[str | None] = mapped_column(Text, nullable=True)
stderr_log: Mapped[str | None] = mapped_column(Text, nullable=True)
exit_code: Mapped[int | None] = mapped_column(Integer, nullable=True)
operator: Mapped[str] = mapped_column(String(50), default="system")
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
updated_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
def to_dict(self) -> dict:
"""将执行日志转换为可序列化字典。"""
return {
"exec_id": self.exec_id,
"fault_id": self.fault_id,
"command_type": self.command_type,
"execution_status": self.execution_status,
"start_time": self.start_time.isoformat() if self.start_time else None,
"end_time": self.end_time.isoformat() if self.end_time else None,
"exit_code": self.exit_code,
}

@ -0,0 +1,38 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy import TIMESTAMP
from . import Base
class FaultRecord(Base):
__tablename__ = "fault_records"
id: Mapped[int] = mapped_column(primary_key=True)
fault_id: Mapped[str] = mapped_column(String(32), unique=True)
cluster_id: Mapped[int | None] = mapped_column(nullable=True)
fault_type: Mapped[str] = mapped_column(String(50))
fault_level: Mapped[str] = mapped_column(String(20), default="medium")
title: Mapped[str] = mapped_column(String(200))
description: Mapped[str | None] = mapped_column(String, nullable=True)
affected_nodes: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
affected_clusters: Mapped[dict | None] = mapped_column(JSONB, nullable=True)
root_cause: Mapped[str | None] = mapped_column(String, nullable=True)
repair_suggestion: Mapped[str | None] = mapped_column(String, nullable=True)
status: Mapped[str] = mapped_column(String(20), default="detected")
assignee: Mapped[str | None] = mapped_column(String(50), nullable=True)
reporter: Mapped[str] = mapped_column(String(50), default="system")
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
updated_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
resolved_at: Mapped[str | None] = mapped_column(TIMESTAMP(timezone=True), nullable=True)
def to_dict(self) -> dict:
"""将故障记录转换为可序列化字典。"""
return {
"fault_id": self.fault_id,
"cluster_id": self.cluster_id,
"fault_type": self.fault_type,
"fault_level": self.fault_level,
"title": self.title,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
}

@ -0,0 +1,33 @@
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Boolean
from sqlalchemy import TIMESTAMP, Text
from . import Base
class SystemLog(Base):
__tablename__ = "system_logs"
id: Mapped[int] = mapped_column(primary_key=True)
log_id: Mapped[str] = mapped_column(String(32), unique=True)
fault_id: Mapped[str | None] = mapped_column(String(32), nullable=True)
cluster_id: Mapped[int | None] = mapped_column(nullable=True)
timestamp: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
host: Mapped[str] = mapped_column(String(100))
service: Mapped[str] = mapped_column(String(50))
log_level: Mapped[str] = mapped_column(String(10))
message: Mapped[str] = mapped_column(Text)
exception: Mapped[str | None] = mapped_column(Text, nullable=True)
raw_log: Mapped[str | None] = mapped_column(Text, nullable=True)
processed: Mapped[bool] = mapped_column(Boolean, default=False)
created_at: Mapped[str] = mapped_column(TIMESTAMP(timezone=True))
def to_dict(self) -> dict:
"""将系统日志转换为可序列化字典。"""
return {
"log_id": self.log_id,
"cluster_id": self.cluster_id,
"timestamp": self.timestamp.isoformat() if self.timestamp else None,
"service": self.service,
"log_level": self.log_level,
"message": self.message,
"processed": self.processed,
}

@ -0,0 +1,14 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..db import get_db
from ..models.clusters import Cluster
router = APIRouter()
@router.get("/clusters")
async def list_clusters(db: AsyncSession = Depends(get_db)):
"""查询集群列表。"""
result = await db.execute(select(Cluster).limit(100))
rows = result.scalars().all()
return {"total": len(rows), "list": [c.to_dict() for c in rows]}

@ -0,0 +1,14 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..db import get_db
from ..models.fault_records import FaultRecord
router = APIRouter()
@router.get("/faults")
async def list_faults(db: AsyncSession = Depends(get_db)):
"""查询故障记录。"""
result = await db.execute(select(FaultRecord).limit(100))
rows = result.scalars().all()
return {"total": len(rows), "list": [f.to_dict() for f in rows]}

@ -0,0 +1,11 @@
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from ..db import get_db
router = APIRouter()
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
"""健康检查:测试数据库连通性。"""
await db.execute("SELECT 1")
return {"status": "ok"}

@ -0,0 +1,23 @@
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from ..db import get_db
from ..models.system_logs import SystemLog
router = APIRouter()
@router.get("/logs")
async def list_logs(
db: AsyncSession = Depends(get_db),
level: str | None = Query(None),
page: int = Query(1, ge=1),
pageSize: int = Query(10, ge=1, le=100),
):
"""查询系统日志,支持按级别筛选与分页。"""
stmt = select(SystemLog)
if level:
stmt = stmt.where(SystemLog.log_level == level)
stmt = stmt.offset((page - 1) * pageSize).limit(pageSize)
result = await db.execute(stmt)
rows = result.scalars().all()
return {"total": len(rows), "list": [l.to_dict() for l in rows]}

@ -1,3 +1,6 @@
fastapi
uvicorn[standard]
psycopg2-binary
SQLAlchemy
asyncpg
python-dotenv
psycopg2-binary

Loading…
Cancel
Save