Merge remote-tracking branch 'origin/develop' into wanglei_branch

# Conflicts:
#	src/main/resources/application.yml
pull/25/head
wanglei 3 months ago
commit 6e2c620fac

@ -0,0 +1,36 @@
# 小组周计划-第10周
## 团队名称和起止时间
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-24
**结束时间:** 2025-11-30
## 本周任务计划安排
| 序号 | 计划内容 | 执行人 | 情况说明 |
|----|-------------------------------------------------|----------------|----------------------------|
| 1 | 确定本周计划分工 | 全体组员 | 2023-11-24 开会确定计划以及团队分工 |
| 2 | 继续完善mqtt数据生成和接收及存储 | 曹峻茂 | 完善mqtt报文与接口 |
| 3 | 开发工单管理接口,扫码用水接口,水质信息查询接口 | 王磊 | 完善相关接口 |
| 4 | 开发学生端扫码模块,水质信息页面,维修端工单抢单 / 拒单功能,工单处理页面,扫码用水触发功能 | 罗月航 | 完善app相关页面 |
| 5 | 开发设备监控页面功能,告警列表页面,开发工单列表页面,与后端联调 | 张红卫 | 开发设备监控页面功能,告警列表页面,开发工单列表页面 |
| 6 |开发告警触发逻辑,完成登录接口|周竞由| 开发告警触发逻辑,完成登录接口,与前端对接 |
| 7 | 前后端联调 | 王磊,周竞由,罗月航,张红卫 | 前后端联调核心流程 |
## 小结
2. **沟通协作:** 小组成员应积极主动沟通,遇到困难及时寻求帮助,也可以主动向指导老师及研究生学长寻求建议。
3. **学习安排:** 小组成员自行学习spring boot相关知识。
4. **项目管理:** PM及时推进项目流程确保项目有条不紊。
---
## 【注】
1. 在小结一栏中写出希望得到如何的帮助,如讲座等;
2. 请将个人计划和总结提前发给负责人;
3. 周任务总结与计划是项目小组评分考核的重要依据,将直接记入平时成绩,请各位同学按要求认真填写并按时提交;
4. PM综合本小组成员工作情况提交小组周计划、周总结报告按时上传至代码托管平台。
---

@ -0,0 +1,33 @@
# 个人周计划-第10周
## 姓名和起止时间
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-24
**结束时间:** 2025-11-30
## 本周任务计划安排
| 序号 | 计划内容 | 协作人 | 情况说明 |
|----|--------------------|-----|------------------------|
| 1 | 确定分工 | 组员 | 2023-11-24 开会确定计划和团队分工 |
| 2 | 继续完善mqtt数据生成和接收及存储 | 个人 | 优化模拟器支持手动触发异常数据如模拟漏水、TDS 超标),增加断网重连功能。 |
## 小结
1. **学习需求:** 希望能有对于mqtt应用的教学
2. **知识储备:** 学习后续需要使用的知识,为后续的开发做准备;
3. **文档撰写:** 完成迭代开发计划撰写。
4. **代码实现** 参与mqtt协议相关设计
---
## 【注】
1. 在小结一栏中写出希望得到如何的帮助,如讲座等;
1. 请将个人计划和总结提前发给负责人;
1. 周任务总结与计划是项目小组评分考核的重要依据,将直接记入平时成绩,请各位同学按要求认真填写并按时提交;
1. 所有组员都需提交个人周计划、周总结文档,按时上传至代码托管平台;
---

@ -0,0 +1,31 @@
# 个人周计划-第10周
## 姓名和起止时间
**姓  名:** [罗月航]
**团队名称:** 1班-汪汪队
**开始时间:** 2025-12-24
**结束时间:** 2025-12-30
## 本周任务计划安排
| 序号 | 计划内容 | 协作人 | 情况说明 |
| ---- | -------- | ------ | -------- |
| 1 | 完成登录页开发收尾 | 个人 | 完善登录表单验证逻辑,完成上周未完成的登录页面开发工作 |
| 2 | 学生端扫码模块开发 | 个人 | 调用摄像头识别二维码获取设备ID实现扫码功能 |
| 3 | 水质信息页面开发 | 个人 | 调用水质查询接口展示水质数据包括TDS值、水质等级等信息 |
| 4 | 维修端工单抢单/拒单功能 | 个人 | 调用工单管理接口,实现工单抢单和拒单功能 |
| 5 | 工单处理页面开发 | 个人 | 开发维修内容填写页面,支持维修记录提交 |
| 6 | 学生端扫码用水功能 | 个人 | 调用扫码用水接口,实现用水触发功能 |
| 7 | 与后端联调核心流程 | 后端开发 | 完成所有核心功能的接口联调,确保前后端数据交互正常 |
| 8 | 联调报告编写 | 个人 | 整理联调过程中的问题和解决方案,编写联调报告 |
## 小结
1. **接口依赖**:需要后端提供完整的扫码、水质查询、工单管理、用水触发等接口文档;
2. **设备权限**:需要处理摄像头调用权限,确保扫码功能在移动端的正常使用;
3. **联调协调**:需要与后端开发同学协调联调时间,确保问题及时解决;
4. **测试支持**:需要测试同学协助进行功能测试,特别是移动端兼容性测试;
5. **时间安排**:本周开发任务较重,需要合理安排时间,确保核心功能按时完成。

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-10-13
**结束时间:** 2023-10-19
**结束时间:** 2025-10-19
## 本周任务计划安排

@ -3,8 +3,8 @@
## 团队名称和起止时间
**团队名称:** 1班-汪汪队
**开始时间:** 2023-10-13
**结束时间:** 2023-10-19
**开始时间:** 2025-10-13
**结束时间:** 2025-10-19
## 本周任务完成情况

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2023-10-13
**结束时间:** 2023-10-19
**开始时间:** 2025-10-13
**结束时间:** 2025-10-19
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 1班-汪汪队
**开始时间:** 2023-10-13
**结束时间:** 2023-10-19
**开始时间:** 2025-10-13
**结束时间:** 2025-10-19
## 本周任务完成情况

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-10-20
**结束时间:** 2023-10-26
**结束时间:** 2025-10-26
## 本周任务计划安排

@ -4,7 +4,7 @@
**团队名称:** 1班-汪汪队
**开始时间:** 2023-10-19
**结束时间:** 2023-10-26
**结束时间:** 2025-10-26
## 本周任务完成情况

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2023-10-20
**结束时间:** 2023-10-26
**开始时间:** 2025-10-20
**结束时间:** 2025-10-26
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 1班-汪汪队
**开始时间:** 2023-10-20
**结束时间:** 2023-10-26
**开始时间:** 2025-10-20
**结束时间:** 2025-10-26
## 本周任务完成情况

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-10-27
**结束时间:** 2023-11-2
**结束时间:** 2025-11-2
## 本周任务计划安排

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-10-27
**结束时间:** 2023-11-2
**结束时间:** 2025-11-2
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2023-10-27
**结束时间:** 2023-11-2
**开始时间:** 2025-10-27
**结束时间:** 2025-11-2
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 1班-汪汪队
**开始时间:** 2023-10-27
**结束时间:** 2023-11-2
**开始时间:** 2025-10-27
**结束时间:** 2025-11-2
## 本周任务完成情况

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-3
**结束时间:** 2023-11-9
**结束时间:** 2025-11-9
## 本周任务计划安排

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-3
**结束时间:** 2023-11-9
**结束时间:** 2025-11-9
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2023-11-3
**结束时间:** 2023-11-9
**开始时间:** 2025-11-3
**结束时间:** 2025-11-9
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 1班-汪汪队
**开始时间:** 2023-11-3
**结束时间:** 2023-11-9
**开始时间:** 2025-11-3
**结束时间:** 2025-11-9
## 本周任务完成情况
@ -23,7 +23,7 @@
## 小结
1. **设计用例文档:** 完成了用例文档稿;
1. **设计用例文档:** 完成了用例文档最终稿;
2. **技能学习:** 学习了用例图,用例文档相关知识;
3. **项目管理:** 作为PM及时推进项目进度确保工作有条不紊
4. **团队协作**:与团队成员保持良好的沟通协作,确保设计方向与产品需求一致

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-10
**结束时间:** 2023-11-16
**结束时间:** 2025-11-16
## 本周任务计划安排

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-10
**结束时间:** 2023-11-16
**结束时间:** 2025-11-16
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2023-11-3
**结束时间:** 2023-11-9
**开始时间:** 2025-11-3
**结束时间:** 2025-11-9
## 本周任务计划安排

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 1班-汪汪队
**开始时间:** 2023-11-10
**结束时间:** 2023-11-16
**开始时间:** 2025-11-10
**结束时间:** 2025-11-16
## 本周任务完成情况
@ -23,8 +23,7 @@
## 小结
1. **设计用例文档:** 完成了用例文档初稿;
2. **技能学习:** 学习了用例图,用例文档相关知识;
3. **项目管理:** 作为PM及时推进项目进度确保工作有条不紊
4. **团队协作**:与团队成员保持良好的沟通协作,确保设计方向与产品需求一致

@ -4,7 +4,7 @@
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-17
**结束时间:** 2023-11-23
**结束时间:** 2025-11-23
## 本周任务计划安排

@ -0,0 +1,36 @@
# 小组周总结-第9周
## 团队名称和起止时间
**团队名称:** 软1-汪汪队
**开始时间:** 2025-11-17
**结束时间:** 2025-11-23
## 本周任务计划安排
| 序号 | 总结内容 | 是否完成 | 情况说明 |
|----|--------------|------|------------------------------|
| 1 | 确定本周计划分工 | 完成 | 2023-11-17 开会确定计划以及团队分工 |
| 2 | 完成mqtt数据生成和接收 | 进行中 | 完成mqtt数据生成和接收功能 |
| 3 | 完成数据库相关基础接口设计和测试 | 进行中 | 完成数据库相关基础接口设计和测试 |
| 4 | 开发维修 / 学生端登录页,完成学生端地图页面布局| 进行中 | 开发维修人员app和学生app登录页完成学生端地图页面布局 |
|5| 完成Web 端基础框架 + 登录页代码 | 进行中 | 完成管理平台基础框架 + 登录页代码 |
|6|提交迭代开发计划第二稿| 完成 |根据反馈修改迭代开发计划|
## 小结
1. **增强自信心:** 小组成员应增强自信心,发挥想象力和创造力,在原型设计环节积极思考。
2. **沟通协作:** 小组成员应积极主动沟通,遇到困难及时寻求帮助,也可以主动向指导老师及研究生学长寻求建议。
3. **学习安排:** 小组成员仍处于软件开发专业知识的初步学习阶段,应合理安排自主学习时间,以便后续开发的顺利进行。
4. **项目管理:** 原代码框架设计不合理,进行了多次修改
---
## 【注】
1. 在小结一栏中写出希望得到如何的帮助,如讲座等;
2. 请将个人计划和总结提前发给负责人;
3. 周任务总结与计划是项目小组评分考核的重要依据,将直接记入平时成绩,请各位同学按要求认真填写并按时提交;
4. PM综合本小组成员工作情况提交小组周计划、周总结报告按时上传至代码托管平台。
---

@ -4,8 +4,8 @@
**姓  名:** 曹峻茂
**团队名称:** 软1-汪汪队
**开始时间:** 2023-11-17
**结束时间:** 2023-11-23
**开始时间:** 2025-11-17
**结束时间:** 2025-11-23
## 本周任务计划安排

@ -0,0 +1,36 @@
# 个人周总结-第9周
## 姓名和起止时间
**姓  名:** 曹峻茂
**团队名称:** 1班-汪汪队
**开始时间:** 2025-11-17
**结束时间:** 2025-11-23
## 本周任务完成情况
| 序号 | 总结内容 | 是否完成 | 情况说明 |
|----| ------------------------------------------------ |------|--------------------------------------------------------------|
| 1 | 确定分工 | 完成 | 2023-11-17 开会确定计划和团队分工 |
| 2 | 完成mqtt数据生成和接收 | 进行中 | 完成mqtt数据生成和接收功能 |
| 3 |提交迭代开发计划第二稿| 完成 |根据反馈修改迭代开发计划|
## 对团队工作的建议
1. **互助学习:** 小组成员应该根据自身的技能长短开展互帮互助的活动,共同努力提高小组成员的专业水平;
2. **进度统一:** 团队成员尽量统一项目进度;
## 小结
1. **设计计划文档:** 完成了迭代开发计划第二稿;
2. **项目管理:** 修改了代码框架符合mvc
3. **团队协作**:与团队成员保持良好的沟通协作,确保设计方向与产品需求一致
---
## 【注】
1. 在小结一栏中写出希望得到如何的帮助,如讲座等;
2. 请将个人计划和总结提前发给负责人;
3. 周任务总结与计划是项目小组评分考核的重要依据,将直接记入平时成绩,请各位同学按要求认真填写并按时提交;
4. 所有组员都需提交个人周计划、周总结文档,上传至代码托管平台;

@ -0,0 +1,30 @@
# 个人周总结-第9周
## 姓名和起止时间
**姓  名:** [罗月航]
**团队名称:** 1班-汪汪队
**开始时间:** 2025-11-17
**结束时间:** 2025-11-23
## 本周任务完成情况
| 序号 | 总结内容 | 是否完成 | 情况说明 |
| ---- | -------- | -------- | -------- |
| 1 | 开发维修/学生端登录页 | 部分完成 | 已完成登录页面的基础布局和样式设计,实现了用户类型切换功能 |
| 2 | 实现登录表单验证 | 部分完成 | 完成了基础的表单结构搭建,验证逻辑正在开发中 |
| 3 | 学生端地图页面布局 | 部分完成 | 完成了地图页面的整体框架布局和主要功能区域划分 |
## 对团队工作的建议
1. **API文档完善**:建议后端团队尽快提供完整的地图数据接口文档;
2. **设计规范统一**:需要建立统一的设计组件库,提高开发效率;
3. **代码审查机制**:建议建立代码审查流程,确保代码质量;
4. **进度同步**:建议每周进行开发进度同步,及时解决遇到的问题。
## 小结
1. **基础框架搭建**:成功搭建了登录页和地图页面的基础框架结构;
2. **界面效果实现**:登录页面的视觉效果基本达到设计预期,用户体验良好;
3. **问题发现**在地图API集成过程中遇到了一些技术难点需要进一步研究解决
4. **进度评估**:整体进度符合预期,但部分功能需要延至下周完成;

@ -0,0 +1,23 @@
package com.campus.water;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.config.EnableIntegrationManagement;
import org.springframework.scheduling.annotation.EnableScheduling;
/**
* Spring Boot
* Spring Integration
*/
@SpringBootApplication(scanBasePackages = "com.campus.water") // 扫描所有业务组件
@EnableScheduling // 开启定时任务(支持@Scheduled
@EnableIntegration // 开启Spring Integration支持MQTT集成
@EnableIntegrationManagement // 开启Integration管理监控消息流转
public class CampusWaterApplication {
public static void main(String[] args) {
SpringApplication.run(CampusWaterApplication.class, args);
System.out.println("=== 校园直饮矿化水系统Spring Boot版启动成功 ===");
System.out.println("=== MQTT传感器模拟、数据接收、持久化功能已启用 ===");
}
}

@ -0,0 +1,80 @@
package com.campus.water.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
@Configuration
public class MqttConfig {
// MQTT 基础配置(可迁移到 application.yml 中,用 @ConfigurationProperties 绑定)
public static final String BROKER = "tcp://b17be106.ala.cn-hangzhou.emqxsl.cn:8883";
public static final String USERNAME = "admin";
public static final String PASSWORD = "12345678";
public static final int QOS = 1; // 消息质量等级1=确保送达,不重复)
public static final int CONNECTION_TIMEOUT = 30000; // 连接超时(毫秒)
public static final int KEEP_ALIVE_INTERVAL = 60; // 心跳间隔(秒)
// MQTT 主题定义(按设备类型+功能分层)
public static final String TOPIC_WATER_MAKER_STATE = "/device/state/water_maker/";
public static final String TOPIC_WATER_MAKER_WARN = "/device/warn/water_maker/";
public static final String TOPIC_WATER_SUPPLIER_STATE = "/device/state/water_supply/";
public static final String TOPIC_WATER_SUPPLIER_WARN = "/device/warn/water_supply/";
/**
* MQTT Spring
*/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
// 配置连接参数
options.setServerURIs(new String[]{BROKER});
options.setUserName(USERNAME);
options.setPassword(PASSWORD.toCharArray());
options.setConnectionTimeout(CONNECTION_TIMEOUT / 1000); // 转换为秒
options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL);
options.setAutomaticReconnect(true); // 断线自动重连
options.setCleanSession(true); // 断开后清除会话
factory.setConnectionOptions(options);
return factory;
}
/**
* DirectChannel
*/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT
*/
@Bean
public MqttPahoMessageHandler mqttOutbound() {
// 客户端ID前缀+时间戳,避免重复
String clientId = "sensor-sender-" + System.currentTimeMillis();
MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, mqttClientFactory());
handler.setAsync(true); // 异步发送(不阻塞主线程)
handler.setDefaultQos(QOS); // 默认QOS等级
handler.setDefaultTopic(TOPIC_WATER_MAKER_STATE); // 默认主题(可在发送时覆盖)
return handler;
}
/**
*
*/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
}

@ -0,0 +1,35 @@
package com.campus.water.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
@Configuration
public class MqttInboundConfig {
/**
* MQTT MQTT
*/
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttConfig mqttConfig) {
// 接收端客户端ID与发送端区分
String clientId = "sensor-receiver-" + System.currentTimeMillis();
// 创建适配器指定客户端ID、工厂、默认订阅主题可后续动态添加
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId,
mqttConfig.mqttClientFactory()
);
// 配置消息转换器默认UTF-8编码支持JSON格式
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(mqttConfig.QOS); // 订阅QOS等级与发送端一致
adapter.setOutputChannel(mqttConfig.mqttInputChannel()); // 消息转发到接收通道
// 开启异常重试(避免网络波动导致消息丢失)
adapter.setRecoveryInterval(5000); // 重试间隔5秒
return adapter;
}
}

@ -0,0 +1,23 @@
package com.campus.water.model;
import lombok.Data;
import java.time.LocalDateTime;
/**
* MQTT
* MQTT/
*/
@Data
public class WaterMakerSensorData {
private String deviceId; // 设备唯一ID如WM001
private Double tdsValue; // TDS值水质指标
private Double waterFlow; // 水流量L/min
private Double waterPressure; // 水压MPa
private Integer filterLife; // 滤芯寿命(%
private Boolean leakage; // 是否漏水true=漏水false=正常)
private Double temperature; // 水温(℃)
private Double humidity; // 环境湿度(%RH
private String waterQuality; // 水质等级(合格/不合格)
private String status; // 设备状态normal=正常error=异常)
private LocalDateTime timestamp; // 数据采集时间戳
}

@ -0,0 +1,20 @@
package com.campus.water.model;
import lombok.Data;
import java.time.LocalDateTime;
/**
* MQTT
* MQTT/
*/
@Data
public class WaterSupplySensorData {
private String deviceId; // 设备唯一ID如WS001
private Double waterFlow; // 水流量L/min
private Double waterPressure; // 水压MPa
private Double waterLevel; // 水位(%
private Double temperature; // 水温(℃)
private Double humidity; // 环境湿度(%RH
private String status; // 设备状态normal=正常error=异常)
private LocalDateTime timestamp; // 数据采集时间戳
}

@ -0,0 +1,183 @@
package com.campus.water.service;
import com.campus.water.config.MqttConfig;
import com.campus.water.entity.Alert;
import com.campus.water.entity.WaterMakerRealtimeData;
import com.campus.water.entity.WaterSupplyRealtimeData;
import com.campus.water.mapper.AlertRepository;
import com.campus.water.mapper.WaterMakerRealtimeDataRepository;
import com.campus.water.mapper.WaterSupplyRealtimeDataRepository;
import com.campus.water.model.WaterMakerSensorData;
import com.campus.water.model.WaterSupplySensorData;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
@Service
@RequiredArgsConstructor
@Slf4j
public class MqttSensorReceiver {
// JPA Repository数据持久化接口Spring自动注入实现
private final WaterMakerRealtimeDataRepository waterMakerRepo;
private final WaterSupplyRealtimeDataRepository waterSupplyRepo;
private final AlertRepository alertRepo;
private final ObjectMapper objectMapper;
private final MqttPahoMessageDrivenChannelAdapter mqttAdapter;
/**
* MQTT
* +ID
*/
@PostConstruct
public void initMqttSubscription() {
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+"); // 制水机状态(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+"); // 制水机告警(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+"); // 供水机状态(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+"); // 供水机告警(所有设备)
log.info("MQTT订阅初始化完成 | 订阅主题:{}+、{}+、{}+、{}+",
MqttConfig.TOPIC_WATER_MAKER_STATE,
MqttConfig.TOPIC_WATER_MAKER_WARN,
MqttConfig.TOPIC_WATER_SUPPLIER_STATE,
MqttConfig.TOPIC_WATER_SUPPLIER_WARN);
}
/**
* MQTT
* @param payload JSON
* @param topic
*/
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMqttMessage(String payload, @Header(MqttHeaders.RECEIVED_TOPIC) String topic) {
log.info("MQTT消息接收成功 | 主题:{} | 内容:{}", topic, payload);
try {
// 根据主题分类处理
if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_STATE)) {
handleWaterMakerState(payload); // 制水机状态数据
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_WARN)) {
handleWaterMakerWarning(payload); // 制水机告警数据
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_STATE)) {
handleWaterSupplyState(payload); // 供水机状态数据
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_WARN)) {
handleWaterSupplyWarning(payload); // 供水机告警数据
} else {
log.warn("MQTT消息主题未匹配 | 未知主题:{} | 内容:{}", topic, payload);
}
} catch (Exception e) {
log.error("MQTT消息处理失败 | 主题:{} | 内容:{} | 异常:{}", topic, payload, e.getMessage());
}
}
/**
* JPA
*/
private void handleWaterMakerState(String payload) throws Exception {
// 1. JSON反序列化为模型对象
WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class);
// 2. 模型对象转换为JPA实体持久化到数据库
WaterMakerRealtimeData entity = new WaterMakerRealtimeData();
entity.setDeviceId(sensorData.getDeviceId());
entity.setTdsValue(sensorData.getTdsValue());
entity.setWaterFlow(sensorData.getWaterFlow());
entity.setWaterPressure(sensorData.getWaterPressure());
entity.setFilterLife(sensorData.getFilterLife());
entity.setLeakage(sensorData.getLeakage() ? 1 : 0); // 数据库存储1=漏水0=正常
entity.setTemperature(sensorData.getTemperature());
entity.setHumidity(sensorData.getHumidity());
entity.setWaterQuality(sensorData.getWaterQuality());
entity.setStatus(WaterMakerRealtimeData.DeviceStatus.valueOf(sensorData.getStatus().toUpperCase()));
entity.setTimestamp(sensorData.getTimestamp());
entity.setCreatedTime(LocalDateTime.now());
// 3. 持久化到数据库JPA save() 自动实现CRUD
waterMakerRepo.save(entity);
log.info("制水机状态数据持久化成功 | 设备ID{}", sensorData.getDeviceId());
}
/**
* +
*/
private void handleWaterMakerWarning(String payload) throws Exception {
WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class);
// 1. 持久化告警记录
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setAlertType("WATER_MAKER_ABNORMAL"); // 告警类型(枚举规范)
alert.setAlertLevel(Alert.AlertLevel.CRITICAL); // 告警级别(严重)
alert.setAlertMessage(String.format(
"制水机异常 - 设备ID%sTDS值%.2f,滤芯寿命:%d%%,漏水状态:%s",
sensorData.getDeviceId(),
sensorData.getTdsValue(),
sensorData.getFilterLife(),
sensorData.getLeakage() ? "是" : "否"
));
alert.setStatus(Alert.AlertStatus.PENDING); // 告警状态(未处理)
alert.setTimestamp(sensorData.getTimestamp());
alert.setCreateTime(LocalDateTime.now());
alertRepo.save(alert);
log.warn("制水机告警记录持久化成功 | 告警ID{} | 设备ID{}", alert.getId(), sensorData.getDeviceId());
// 2. 同时持久化状态数据(便于后续追溯)
handleWaterMakerState(payload);
}
/**
* JPA
*/
private void handleWaterSupplyState(String payload) throws Exception {
WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class);
WaterSupplyRealtimeData entity = new WaterSupplyRealtimeData();
entity.setDeviceId(sensorData.getDeviceId());
entity.setWaterFlow(sensorData.getWaterFlow());
entity.setWaterPressure(sensorData.getWaterPressure());
entity.setWaterLevel(sensorData.getWaterLevel());
entity.setTemperature(sensorData.getTemperature());
entity.setHumidity(sensorData.getHumidity());
entity.setStatus(WaterSupplyRealtimeData.DeviceStatus.valueOf(sensorData.getStatus().toUpperCase()));
entity.setTimestamp(sensorData.getTimestamp());
entity.setCreatedTime(LocalDateTime.now());
waterSupplyRepo.save(entity);
log.info("供水机状态数据持久化成功 | 设备ID{}", sensorData.getDeviceId());
}
/**
* +
*/
private void handleWaterSupplyWarning(String payload) throws Exception {
WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class);
// 1. 持久化告警记录
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setAlertType("WATER_SUPPLY_ABNORMAL");
alert.setAlertLevel(Alert.AlertLevel.ERROR);
alert.setAlertMessage(String.format(
"供水机异常 - 设备ID%s水压%.2fMPa,水位:%.2f%%",
sensorData.getDeviceId(),
sensorData.getWaterPressure(),
sensorData.getWaterLevel()
));
alert.setStatus(Alert.AlertStatus.PENDING);
alert.setTimestamp(sensorData.getTimestamp());
alert.setCreateTime(LocalDateTime.now());
alertRepo.save(alert);
log.warn("供水机告警记录持久化成功 | 告警ID{} | 设备ID{}", alert.getId(), sensorData.getDeviceId());
// 2. 同时持久化状态数据
handleWaterSupplyState(payload);
}
}

@ -0,0 +1,133 @@
package com.campus.water.service;
import com.campus.water.config.MqttConfig;
import com.campus.water.model.WaterMakerSensorData;
import com.campus.water.model.WaterSupplySensorData;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;
import java.time.LocalDateTime;
import java.util.Random;
@Service
@RequiredArgsConstructor
@Slf4j // 日志注解替代System.out
public class MqttSensorSender {
private final MqttPahoMessageHandler mqttMessageHandler;
private final ObjectMapper objectMapper; // JSON序列化工具Spring默认注入
private final Random random = new Random(); // 生成模拟数据
/**
*
* @param deviceId IDWM001
*/
public void sendWaterMakerState(String deviceId) {
try {
// 1. 构建模拟数据(符合正常业务范围)
WaterMakerSensorData data = new WaterMakerSensorData();
data.setDeviceId(deviceId);
data.setTdsValue(50 + random.nextDouble() * 30); // 50-80正常范围
data.setWaterFlow(random.nextDouble() * 2); // 0-2 L/min
data.setWaterPressure(0.2 + random.nextDouble() * 0.3); // 0.2-0.5 MPa
data.setFilterLife(30 + random.nextInt(70)); // 30-100%
data.setLeakage(random.nextBoolean() && random.nextBoolean()); // 漏水概率较低25%
data.setTemperature(20 + random.nextDouble() * 5); // 20-25℃
data.setHumidity(40 + random.nextDouble() * 20); // 40-60%RH
data.setWaterQuality("合格");
data.setStatus("normal");
data.setTimestamp(LocalDateTime.now());
// 2. 序列化JSONMQTT消息 payload 为JSON字符串
String payload = objectMapper.writeValueAsString(data);
// 3. 构建主题设备ID作为主题后缀精准订阅
String topic = MqttConfig.TOPIC_WATER_MAKER_STATE + deviceId;
// 4. 发送MQTT消息
sendMessage(topic, payload);
log.info("制水机状态消息发送成功 | 设备ID{} | 主题:{} | 数据:{}", deviceId, topic, payload);
} catch (JsonProcessingException e) {
log.error("制水机状态消息发送失败 | 设备ID{} | 异常:{}", deviceId, e.getMessage());
}
}
/**
*
* @param deviceId IDWM001
*/
public void sendWaterMakerWarning(String deviceId) {
try {
// 1. 构建异常数据(超出正常范围)
WaterMakerSensorData data = new WaterMakerSensorData();
data.setDeviceId(deviceId);
data.setTdsValue(150 + random.nextDouble() * 50); // 150-200异常范围
data.setWaterFlow(0.1 + random.nextDouble() * 0.3); // 流量极低
data.setWaterPressure(0.1 + random.nextDouble() * 0.1); // 水压过低0.1-0.2 MPa
data.setFilterLife(5 + random.nextInt(10)); // 滤芯寿命低5-15%
data.setLeakage(true); // 强制漏水
data.setTemperature(28 + random.nextDouble() * 3); // 水温过高28-31℃
data.setStatus("error");
data.setTimestamp(LocalDateTime.now());
// 2. 序列化+发送
String payload = objectMapper.writeValueAsString(data);
String topic = MqttConfig.TOPIC_WATER_MAKER_WARN + deviceId;
sendMessage(topic, payload);
log.warn("制水机告警消息发送成功 | 设备ID{} | 主题:{} | 数据:{}", deviceId, topic, payload);
} catch (JsonProcessingException e) {
log.error("制水机告警消息发送失败 | 设备ID{} | 异常:{}", deviceId, e.getMessage());
}
}
/**
*
* @param deviceId IDWS001
*/
public void sendWaterSupplyData(String deviceId) {
try {
// 1. 构建模拟数据
WaterSupplySensorData data = new WaterSupplySensorData();
data.setDeviceId(deviceId);
data.setWaterFlow(random.nextDouble() * 3); // 0-3 L/min
data.setWaterPressure(0.1 + random.nextDouble() * 0.2); // 0.1-0.3 MPa
data.setWaterLevel(30 + random.nextDouble() * 50); // 30-80%
data.setTemperature(18 + random.nextDouble() * 4); // 18-22℃
data.setHumidity(35 + random.nextDouble() * 15); // 35-50%RH
data.setStatus("normal");
data.setTimestamp(LocalDateTime.now());
// 2. 序列化+发送
String payload = objectMapper.writeValueAsString(data);
String topic = MqttConfig.TOPIC_WATER_SUPPLIER_STATE + deviceId;
sendMessage(topic, payload);
log.info("供水机状态消息发送成功 | 设备ID{} | 主题:{} | 数据:{}", deviceId, topic, payload);
} catch (JsonProcessingException e) {
log.error("供水机状态消息发送失败 | 设备ID{} | 异常:{}", deviceId, e.getMessage());
}
}
/**
* MQTT
* @param topic
* @param payload JSON
*/
private void sendMessage(String topic, String payload) {
// 构建Spring Messaging消息指定JSON格式、主题、QOS
Message<String> message = MessageBuilder
.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
.setHeader("mqtt_topic", topic)
.setHeader("mqtt_qos", MqttConfig.QOS)
.build();
// 调用处理器发送消息
mqttMessageHandler.handleMessage(message);
}
}

@ -0,0 +1,68 @@
package com.campus.water.task;
import com.campus.water.service.MqttSensorSender;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
@Component
@RequiredArgsConstructor
@Slf4j
public class SensorSimulationTask {
private final MqttSensorSender mqttSensorSender;
// 模拟已部署的设备列表(实际项目可从数据库查询)
private final List<String> waterMakerDevices = new ArrayList<>() {{
add("WM001"); // 制水机1
add("WM002"); // 制水机2
add("WM003"); // 制水机3
add("WM004"); // 制水机4
}};
private final List<String> waterSupplyDevices = new ArrayList<>() {{
add("WS001"); // 供水机1
add("WS002"); // 供水机2
add("WS003"); // 供水机3
}};
/**
* 30
* fixedRate
*/
@Scheduled(fixedRate = 30000)
public void sendRegularStateData() {
log.info("=== 开始发送设备正常状态数据 ===");
// 1. 发送所有制水机状态
for (String deviceId : waterMakerDevices) {
mqttSensorSender.sendWaterMakerState(deviceId);
}
// 2. 发送所有供水机状态
for (String deviceId : waterSupplyDevices) {
mqttSensorSender.sendWaterSupplyData(deviceId);
}
log.info("=== 设备正常状态数据发送完成 ===");
}
/**
* 5
* fixedRate
*/
@Scheduled(fixedRate = 300000)
public void sendRandomWarningData() {
log.info("=== 开始发送随机告警数据 ===");
// 随机选择1台制水机发送告警模拟设备故障
int randomIndex = (int) (Math.random() * waterMakerDevices.size());
String targetDevice = waterMakerDevices.get(randomIndex);
mqttSensorSender.sendWaterMakerWarning(targetDevice);
log.info("=== 随机告警数据发送完成 | 告警设备:{} ===", targetDevice);
}
}
Loading…
Cancel
Save