王磊-第四周周总结 #9

Merged
hnu202326010106 merged 1 commits from wanglei_branch into develop 3 months ago

@ -0,0 +1,39 @@
# 个人周总结-第4周
## 姓名和起止时间
**姓  名:** 王磊
**团队名称:** 1班-汪汪队
**开始时间:** 2025-10-13
**结束时间:** 2025-10-19
## 本周任务完成情况
序号 总结内容 是否完成 情况说明
1 MySQL数据库环境搭建与配置 完成 成功安装MySQL 8.0.43创建water_management数据库配置数据库连接环境变量解决数据库连接权限问题
2 MQTT数据处理器开发 完成 实现基于Java的MQTT数据接收、解析和处理模块支持制水机和供水机两种设备类型的数据处理
3 数据库表结构设计与创建 完成 设计并创建device_data设备数据表和device_alert告警表建立完整的数据存储结构
4 数据解析与业务逻辑实现 完成 实现JSON数据解析、设备状态判断、异常检测和自动告警生成功能
5 项目集成与测试 完成 成功将数据处理模块集成到智慧饮水系统项目中,完成多组测试数据的验证
## 对团队工作的建议
1. **互助学习:** 小组成员应该根据自身的技能长短开展互帮互助的活动,共同努力提高小组成员的专业水平;
2. **进度统一:** 团队成员尽量统一项目进度;
## 小结
1. **技能学习:** 小组成员各自开展自己所负责部分的个人技能的学习;
2. **项目管理:** PM及时推进项目进度确保工作有条不紊
3. **计划制定:** 根据本周任务完成情况与下一阶段文档提交要求,制定团队任务计划。
---
## 【注】
1. 在小结一栏中写出希望得到如何的帮助,如讲座等;
2. 请将个人计划和总结提前发给负责人;
3. 周任务总结与计划是项目小组评分考核的重要依据,将直接记入平时成绩,请各位同学按要求认真填写并按时提交;
4. 所有组员都需提交个人周计划、周总结文档,上传至代码托管平台;

@ -1,16 +1,10 @@
package com.campus.water.web.controller;
import com.campus.water.web.entity.SensorDataWebVO;
import com.campus.water.web.service.RealTimeService;
import com.campus.water.web.service.HistoryDataService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* WebVue
*/
@RestController
@RequestMapping("/api/admin/monitor")
public class MonitorController {
@ -19,24 +13,21 @@ public class MonitorController {
@Autowired
private HistoryDataService historyDataService;
// 1. 获取单设备实时数据(适配前端设备详情页)
@GetMapping("/realTime/{deviceId}")
public SensorDataWebVO getRealTimeData(@PathVariable String deviceId) {
public Object getRealTimeData(@PathVariable String deviceId) {
return realTimeService.getRealTimeData(deviceId);
}
// 2. 获取全量设备实时状态(适配前端监控总览页)
@GetMapping("/realTime/all")
public List<SensorDataWebVO> getAllRealTimeData() {
public List<Object> getAllRealTimeData() {
return realTimeService.getAllRealTimeData();
}
// 3. 获取设备历史数据(适配前端历史曲线,需求中“查看历史水质”)
@GetMapping("/history/{deviceId}")
public List<SensorDataWebVO> getHistoryData(
public List<Object> getHistoryData(
@PathVariable String deviceId,
@RequestParam String startTime, // 开始时间如2024-05-20 00:00:00
@RequestParam String endTime // 结束时间
@RequestParam String startTime,
@RequestParam String endTime
) {
return historyDataService.getHistoryData(deviceId, startTime, endTime);
}

@ -0,0 +1,30 @@
package com.campus.water.web;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class RealTimeService {
private ConcurrentHashMap<String, Object> realTimeData = new ConcurrentHashMap<>();
private List<Object> alerts = new ArrayList<>();
public void updateRealTimeData(Object sensorData) {
System.out.println("更新实时数据");
}
public void addAlert(Object alertData) {
alerts.add(alertData);
System.out.println("添加告警");
}
public Object getRealTimeData(String deviceId) {
return realTimeData.get(deviceId);
}
public List<Object> getAllRealTimeData() {
return new ArrayList<>(realTimeData.values());
}
}

@ -6,14 +6,11 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* WebMQTTMQTT
*/
@Configuration
public class WebMqttConfig {
// MQTT服务器转发的主题前缀(需求中全量设备数据/告警)
private String mqttBroker = "tcp://mqtt-server.campus.com:1883";
private String clientId = "campus-water-admin-web"; // Web端客户端ID
// 修改为本地MQTT
private String mqttBroker = "tcp://localhost:1883";
private String clientId = "campus-water-admin-web";
@Bean
public MqttClient mqttClient() throws Exception {
@ -26,17 +23,16 @@ public class WebMqttConfig {
return client;
}
// 订阅的主题(全量设备数据+全量告警,适配管理员权限)
@Bean
public String[] subscribeTopics() {
return new String[]{
"forward/web/device/#", // 全量设备实时数据
"forward/web/alert/#" // 全量告警报文
"forward/web/device/#",
"forward/web/alert/#"
};
}
@Bean
public int[] qosLevels() {
return new int[]{0, 1}; // 告警QoS=1设备数据QoS=0
return new int[]{0, 1};
}
}

@ -1,9 +1,6 @@
package com.campus.water.web.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.campus.water.web.entity.SensorDataWebVO;
import com.campus.water.web.entity.AlertWebVO;
import com.campus.water.web.service.RealTimeService;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@ -12,9 +9,6 @@ import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* WebMQTT/
*/
@Component
public class WebMqttSubscriber {
@Autowired
@ -41,13 +35,13 @@ public class WebMqttSubscriber {
String payload = new String(message.getPayload());
System.out.println("管理Web接收数据主题=" + topic + ",内容=" + payload);
// 解析数据并缓存到内存(供前端轮询获取实时数据)
// 直接使用JSONObject处理数据
JSONObject data = JSONObject.parseObject(payload);
if (topic.startsWith("forward/web/device/")) {
SensorDataWebVO sensorVO = JSONObject.parseObject(payload, SensorDataWebVO.class);
realTimeService.updateRealTimeData(sensorVO); // 缓存实时数据
realTimeService.updateRealTimeData(data);
} else if (topic.startsWith("forward/web/alert/")) {
AlertWebVO alertVO = JSONObject.parseObject(payload, AlertWebVO.class);
realTimeService.addAlert(alertVO); // 缓存告警(前端告警列表实时更新)
realTimeService.addAlert(data);
}
};
}

@ -1,83 +1,83 @@
package com.campus.water.mqtt.core;
import com.alibaba.fastjson2.JSONObject;
import com.campus.water.mqtt.entity.SensorData;
import com.campus.water.mqtt.entity.AlertMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Web/APP/APP
*/
import java.util.Map;
@Component
public class DataForwarder {
@Autowired
private MqttClient mqttClient;
// 1. 转发传感器数据到管理Web全量数据适配管理员监控需求
public void forwardToWeb(SensorData sensorData) throws Exception {
String topic = "forward/web/device/" + sensorData.getDeviceId();
public void forwardToWeb(Map<String, Object> sensorData) throws Exception {
String deviceId = (String) sensorData.get("deviceId");
String topic = "forward/web/device/" + deviceId;
String payload = JSONObject.toJSONString(sensorData);
publish(topic, payload, 0); // Web端QoS=0普通监控数据
publish(topic, payload, 0);
System.out.println("转发到管理Web: " + deviceId);
}
// 2. 转发告警到管理Web适配管理员查看告警需求
public void forwardToWeb(AlertMessage alert) throws Exception {
String topic = "forward/web/alert/" + alert.getDeviceId();
String payload = JSONObject.toJSONString(alert);
publish(topic, payload, 1); // 告警QoS=1确保管理员看到
public void forwardToWeb(Map<String, Object> alertData, boolean isAlert) throws Exception {
String deviceId = (String) alertData.get("deviceId");
String topic = "forward/web/alert/" + deviceId;
String payload = JSONObject.toJSONString(alertData);
publish(topic, payload, 1);
System.out.println("转发告警到管理Web: " + deviceId);
}
// 3. 转发制水机水质数据到学生APP仅终端机关联的制水机适配查水质需求
public void forwardToStudentApp(SensorData sensorData) throws Exception {
// 仅制水机数据需转发学生APP查水质
if (sensorData.getDeviceType() != 1) return;
public void forwardToStudentApp(Map<String, Object> sensorData) throws Exception {
Integer deviceType = (Integer) sensorData.get("deviceType");
if (deviceType != 1) return;
// 按终端机ID转发需求中学生扫码查对应终端机的水质
String terminalId = getRelatedTerminalId(sensorData.getDeviceId());
String deviceId = (String) sensorData.get("deviceId");
String terminalId = getRelatedTerminalId(deviceId);
String topic = "forward/student/terminal/" + terminalId;
Double tdsValue = (Double) sensorData.get("tdsValue");
String payload = JSONObject.toJSONString(new JSONObject() {{
put("terminalId", terminalId);
put("tdsValue", sensorData.getTdsValue());
put("waterQuality", getWaterQuality(sensorData.getTdsValue())); // 水质等级
put("timestamp", sensorData.getTimestamp());
put("tdsValue", tdsValue);
put("waterQuality", getWaterQuality(tdsValue));
put("timestamp", sensorData.get("timestamp"));
}});
publish(topic, payload, 0);
System.out.println("转发到学生APP: " + terminalId);
}
// 4. 转发供水机水位/告警到维修APP仅本辖区设备适配维修员权限需求
public void forwardToRepairApp(SensorData sensorData) throws Exception {
String areaId = sensorData.getAreaId(); // 设备所属片区如AREA01
String topic = "forward/repair/area/" + areaId + "/device/" + sensorData.getDeviceId();
public void forwardToRepairApp(Map<String, Object> sensorData) throws Exception {
String areaId = (String) sensorData.get("areaId");
String deviceId = (String) sensorData.get("deviceId");
String topic = "forward/repair/area/" + areaId + "/device/" + deviceId;
String payload = JSONObject.toJSONString(sensorData);
publish(topic, payload, 0);
System.out.println("转发到维修APP: " + deviceId);
}
public void forwardToRepairApp(AlertMessage alert) throws Exception {
String areaId = alert.getAreaId();
String topic = "forward/repair/area/" + areaId + "/alert/" + alert.getDeviceId();
String payload = JSONObject.toJSONString(alert);
publish(topic, payload, 1); // 告警QoS=1确保维修员抢单
public void forwardToRepairApp(Map<String, Object> alertData, boolean isAlert) throws Exception {
String areaId = (String) alertData.get("areaId");
String deviceId = (String) alertData.get("deviceId");
String topic = "forward/repair/area/" + areaId + "/alert/" + deviceId;
String payload = JSONObject.toJSONString(alertData);
publish(topic, payload, 1);
System.out.println("转发告警到维修APP: " + deviceId);
}
// 通用发布方法
private void publish(String topic, String payload, int qos) throws Exception {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
mqttClient.publish(topic, message);
System.out.println("MQTT服务器转发数据主题=" + topic + ",内容=" + payload);
}
// 工具方法根据制水机ID获取关联的终端机ID需求中终端机关联制水机
private String getRelatedTerminalId(String deviceId) {
// 实际需从MySQL设备表查询此处模拟如制水机WM001关联终端机TERM001/TERM002
return deviceId.equals("WM001") ? "TERM001" : "TERM002";
}
// 工具方法根据TDS值判断水质等级需求中“优质矿化水”标准
private String getWaterQuality(double tds) {
private String getWaterQuality(Double tds) {
if (tds == null) return "未知";
if (tds < 50) return "纯净水(无矿物质)";
else if (tds < 300) return "优质矿化水";
else if (tds < 600) return "合格矿化水";

@ -6,48 +6,38 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* MQTTAir724MQTT Broker
*/
@Configuration
public class MqttConfig {
// 从配置文件读取Broker地址需求中云端Broker如EMQX
private String brokerUrl = "tcp://mqtt-server.campus.com:1883";
private String clientId = "campus-water-mqtt-server"; // 服务器端客户端ID
private int keepAlive = 60; // 心跳间隔适配Air724模块低功耗
// 修改为本地MQTT broker
private String brokerUrl = "tcp://localhost:1883";
private String clientId = "campus-water-mqtt-server";
private int keepAlive = 60;
@Bean
public MqttClient mqttClient() throws Exception {
// 1. 创建MQTT客户端内存持久化避免服务器存储压力
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
// 2. 配置连接参数(确保数据可靠传输,适配需求中告警不丢失)
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false); // 保持会话,重连后恢复订阅
options.setCleanSession(false);
options.setKeepAliveInterval(keepAlive);
options.setAutomaticReconnect(true); // 自动重连,适配网络波动
options.setAutomaticReconnect(true);
// 3. 连接Broker
client.connect(options);
System.out.println("MQTT服务器已连接Broker" + brokerUrl);
return client;
}
// 订阅的主题(严格按需求中设备类型划分,便于权限隔离)
@Bean
public String[] subscribeTopics() {
return new String[]{
"sensor/watermaker/#", // 制水机传感器数据TDS/水压/流量需求图3
"sensor/watersupply/#", // 供水机传感器数据(水位/漏水需求图3
"alert/#" // 设备告警报文(滤芯损坏/水位异常,需求告警流程)
"sensor/watermaker/#",
"sensor/watersupply/#",
"alert/#"
};
}
// QoS配置需求中告警用QoS=1确保送达普通传感器数据QoS=0
@Bean
public int[] qosLevels() {
return new int[]{0, 0, 1}; // 告警QoS=1传感器数据QoS=0
return new int[]{0, 0, 1};
}
}

@ -1,7 +1,6 @@
package com.campus.water.mqtt.core;
import com.campus.water.mqtt.service.AlertTriggerService;
import com.campus.water.mqtt.service.SensorDataStorage;
import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
@ -9,10 +8,10 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.Map;
/**
* MQTT/
*/
@Component
public class MqttSubscriber {
@Autowired
@ -22,18 +21,10 @@ public class MqttSubscriber {
@Autowired
private int[] qosLevels;
@Autowired
private DataParser dataParser;
@Autowired
private DataForwarder dataForwarder;
@Autowired
private SensorDataStorage sensorDataStorage;
@Autowired
private AlertTriggerService alertTriggerService;
// 初始化订阅(项目启动后自动执行)
@PostConstruct
public void startSubscribe() throws Exception {
// 为每个主题绑定消息监听器
for (int i = 0; i < subscribeTopics.length; i++) {
String topic = subscribeTopics[i];
int qos = qosLevels[i];
@ -42,32 +33,112 @@ public class MqttSubscriber {
}
}
// 消息监听器(接收数据后解析→存储→转发)
private IMqttMessageListener messageListener() {
return (topic, message) -> {
String payload = new String(message.getPayload());
System.out.println("MQTT服务器接收数据主题=" + topic + ",内容=" + payload);
// 按主题类型处理(严格对齐需求中设备数据类型)
if (topic.startsWith("sensor/watermaker/")) {
// 1. 处理制水机传感器数据TDS/水压/流量)
var sensorData = dataParser.parseWaterMakerData(topic, payload);
sensorDataStorage.saveToInfluxDB(sensorData); // 存入时序库
dataForwarder.forwardToWeb(sensorData); // 转发到管理Web
dataForwarder.forwardToStudentApp(sensorData); // 转发到学生APP水质数据
} else if (topic.startsWith("sensor/watersupply/")) {
// 2. 处理供水机传感器数据(水位)
var sensorData = dataParser.parseWaterSupplyData(topic, payload);
sensorDataStorage.saveToInfluxDB(sensorData); // 存入时序库
dataForwarder.forwardToWeb(sensorData); // 转发到管理Web
dataForwarder.forwardToRepairApp(sensorData); // 转发到维修APP辖区水位
} else if (topic.startsWith("alert/")) {
// 3. 处理告警报文(触发工单)
var alert = dataParser.parseAlertMessage(topic, payload);
alertTriggerService.createWorkOrder(alert); // 生成工单存入MySQL
dataForwarder.forwardToWeb(alert); // 转发到管理Web告警列表
dataForwarder.forwardToRepairApp(alert); // 转发到维修APP抢单
try {
if (topic.startsWith("sensor/watermaker/")) {
// 解析制水机数据
Map<String, Object> sensorData = parseWaterMakerData(topic, payload);
dataForwarder.forwardToWeb(sensorData);
dataForwarder.forwardToStudentApp(sensorData);
} else if (topic.startsWith("sensor/watersupply/")) {
// 解析供水机数据
Map<String, Object> sensorData = parseWaterSupplyData(topic, payload);
dataForwarder.forwardToWeb(sensorData);
dataForwarder.forwardToRepairApp(sensorData);
} else if (topic.startsWith("alert/")) {
// 解析告警数据
Map<String, Object> alertData = parseAlertMessage(topic, payload);
dataForwarder.forwardToWeb(alertData);
dataForwarder.forwardToRepairApp(alertData);
}
} catch (Exception e) {
System.err.println("处理MQTT消息异常: " + e.getMessage());
e.printStackTrace();
}
};
}
private Map<String, Object> parseWaterMakerData(String topic, String payload) {
Map<String, Object> data = new HashMap<>();
parseCommonData(topic, payload, data);
data.put("deviceType", 1); // 制水机
JSONObject json = JSONObject.parseObject(payload);
data.put("tdsValue", json.getDouble("tds"));
data.put("waterFlow", json.getDouble("flow"));
data.put("waterPressure", json.getDouble("pressure"));
data.put("filterLife", json.getInteger("filter_life"));
data.put("leakage", json.getBoolean("leakage"));
data.put("status", determineStatus(data));
return data;
}
private Map<String, Object> parseWaterSupplyData(String topic, String payload) {
Map<String, Object> data = new HashMap<>();
parseCommonData(topic, payload, data);
data.put("deviceType", 2); // 供水机
JSONObject json = JSONObject.parseObject(payload);
data.put("waterFlow", json.getDouble("flow"));
data.put("waterPressure", json.getDouble("pressure"));
data.put("waterLevel", json.getDouble("water_level"));
data.put("status", determineStatus(data));
return data;
}
private Map<String, Object> parseAlertMessage(String topic, String payload) {
Map<String, Object> alert = new HashMap<>();
String[] parts = topic.split("/");
alert.put("deviceId", parts.length >= 2 ? parts[1] : "unknown");
JSONObject json = JSONObject.parseObject(payload);
alert.put("alertType", json.getString("alert_type"));
alert.put("alertLevel", json.getString("alert_level"));
alert.put("alertMessage", json.getString("alert_message"));
alert.put("areaId", json.getString("area_id"));
alert.put("deviceType", json.getInteger("device_type"));
alert.put("timestamp", LocalDateTime.now().toString());
return alert;
}
private void parseCommonData(String topic, String payload, Map<String, Object> data) {
String[] parts = topic.split("/");
data.put("deviceId", parts.length >= 3 ? parts[2] : "unknown");
JSONObject json = JSONObject.parseObject(payload);
data.put("areaId", json.getString("area_id"));
data.put("temperature", json.getDouble("temperature"));
data.put("humidity", json.getDouble("humidity"));
data.put("timestamp", LocalDateTime.now().toString());
}
private String determineStatus(Map<String, Object> data) {
Integer deviceType = (Integer) data.get("deviceType");
if (deviceType == 1) { // 制水机
Double tdsValue = (Double) data.get("tdsValue");
Integer filterLife = (Integer) data.get("filterLife");
Boolean leakage = (Boolean) data.get("leakage");
if (tdsValue != null && tdsValue > 600) return "error";
if (filterLife != null && filterLife < 10) return "error";
if (leakage != null && leakage) return "error";
if (tdsValue != null && tdsValue > 300) return "warning";
} else if (deviceType == 2) { // 供水机
Double waterLevel = (Double) data.get("waterLevel");
if (waterLevel != null && waterLevel < 10) return "error";
if (waterLevel != null && waterLevel < 20) return "warning";
}
return "normal";
}
}
Loading…
Cancel
Save