From f71f2b4fac73a40aa9ace4191899b19ac3e29168 Mon Sep 17 00:00:00 2001 From: wanglei <3085637232@qq.com> Date: Sun, 19 Oct 2025 18:01:18 +0800 Subject: [PATCH] =?UTF-8?q?=E7=8E=8B=E7=A3=8A=E7=AC=AC=E5=9B=9B=E5=91=A8?= =?UTF-8?q?=E5=AE=8C=E6=88=90=E7=9A=84=E4=BB=BB=E5=8A=A1-=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E7=8E=B0=E5=B7=B2=E5=AE=9E=E7=8E=B0=E4=BB=8EMQTT?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=8E=A5=E6=94=B6=E3=80=81=E6=99=BA=E8=83=BD?= =?UTF-8?q?=E8=A7=A3=E6=9E=90=E3=80=81=E8=87=AA=E5=8A=A8=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E5=88=B0MySQL=E6=95=B0=E6=8D=AE=E5=BA=93=E5=AD=98=E5=82=A8?= =?UTF-8?q?=E7=9A=84=E5=AE=8C=E6=95=B4=E6=95=B0=E6=8D=AE=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=B5=81=E6=B0=B4=E7=BA=BF=EF=BC=8C=E6=94=AF=E6=8C=81=E5=A4=9A?= =?UTF-8?q?=E7=AB=AF=E6=95=B0=E6=8D=AE=E5=88=86=E5=8F=91=E5=92=8C=E5=AE=9E?= =?UTF-8?q?=E6=97=B6=E7=9B=91=E6=8E=A7=E5=91=8A=E8=AD=A6=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../members/wanglei-weekly-summary-04.md | 39 +++++ src/manage-web/MonitorController.java | 19 +-- src/manage-web/RealTimeService.java | 30 ++++ src/manage-web/WebMqttConfig.java | 16 +- src/manage-web/WebMqttSubscriber.java | 16 +- src/mqtt-server/DataForwarder.java | 76 +++++----- src/mqtt-server/MqttConfig.java | 30 ++-- src/mqtt-server/MqttSubscriber.java | 137 +++++++++++++----- 8 files changed, 237 insertions(+), 126 deletions(-) create mode 100644 doc/process/weekly/week-4/members/wanglei-weekly-summary-04.md create mode 100644 src/manage-web/RealTimeService.java diff --git a/doc/process/weekly/week-4/members/wanglei-weekly-summary-04.md b/doc/process/weekly/week-4/members/wanglei-weekly-summary-04.md new file mode 100644 index 0000000..b94bfe3 --- /dev/null +++ b/doc/process/weekly/week-4/members/wanglei-weekly-summary-04.md @@ -0,0 +1,39 @@ + +# 个人周总结-第4周 + +## 姓名和起止时间 + +**姓  名:** 王磊 +**团队名称:** 1班-汪汪队 +**开始时间:** 2025-10-13 +**结束时间:** 2025-10-19 + +## 本周任务完成情况 + +序号 总结内容 是否完成 情况说明 +1 MySQL数据库环境搭建与配置 完成 成功安装MySQL 8.0.43,创建water_management数据库,配置数据库连接环境变量,解决数据库连接权限问题 +2 MQTT数据处理器开发 完成 实现基于Java的MQTT数据接收、解析和处理模块,支持制水机和供水机两种设备类型的数据处理 +3 数据库表结构设计与创建 完成 设计并创建device_data设备数据表和device_alert告警表,建立完整的数据存储结构 +4 数据解析与业务逻辑实现 完成 实现JSON数据解析、设备状态判断、异常检测和自动告警生成功能 +5 项目集成与测试 完成 成功将数据处理模块集成到智慧饮水系统项目中,完成多组测试数据的验证 + +## 对团队工作的建议 + +1. **互助学习:** 小组成员应该根据自身的技能长短开展互帮互助的活动,共同努力提高小组成员的专业水平; +2. **进度统一:** 团队成员尽量统一项目进度; + +## 小结 + + +1. **技能学习:** 小组成员各自开展自己所负责部分的个人技能的学习; +2. **项目管理:** PM及时推进项目进度,确保工作有条不紊; +3. **计划制定:** 根据本周任务完成情况与下一阶段文档提交要求,制定团队任务计划。 + +--- + +## 【注】 + +1. 在小结一栏中写出希望得到如何的帮助,如讲座等; +2. 请将个人计划和总结提前发给负责人; +3. 周任务总结与计划是项目小组评分考核的重要依据,将直接记入平时成绩,请各位同学按要求认真填写并按时提交; +4. 所有组员都需提交个人周计划、周总结文档,上传至代码托管平台; \ No newline at end of file diff --git a/src/manage-web/MonitorController.java b/src/manage-web/MonitorController.java index 441d1f5..9f82f30 100644 --- a/src/manage-web/MonitorController.java +++ b/src/manage-web/MonitorController.java @@ -1,16 +1,10 @@ package com.campus.water.web.controller; -import com.campus.water.web.entity.SensorDataWebVO; -import com.campus.water.web.service.RealTimeService; -import com.campus.water.web.service.HistoryDataService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.List; -/** - * 监控数据接口(供Web前端Vue调用,适配管理员“监控设备数据”需求) - */ @RestController @RequestMapping("/api/admin/monitor") public class MonitorController { @@ -19,24 +13,21 @@ public class MonitorController { @Autowired private HistoryDataService historyDataService; - // 1. 获取单设备实时数据(适配前端设备详情页) @GetMapping("/realTime/{deviceId}") - public SensorDataWebVO getRealTimeData(@PathVariable String deviceId) { + public Object getRealTimeData(@PathVariable String deviceId) { return realTimeService.getRealTimeData(deviceId); } - // 2. 获取全量设备实时状态(适配前端监控总览页) @GetMapping("/realTime/all") - public List getAllRealTimeData() { + public List getAllRealTimeData() { return realTimeService.getAllRealTimeData(); } - // 3. 获取设备历史数据(适配前端历史曲线,需求中“查看历史水质”) @GetMapping("/history/{deviceId}") - public List getHistoryData( + public List getHistoryData( @PathVariable String deviceId, - @RequestParam String startTime, // 开始时间(如2024-05-20 00:00:00) - @RequestParam String endTime // 结束时间 + @RequestParam String startTime, + @RequestParam String endTime ) { return historyDataService.getHistoryData(deviceId, startTime, endTime); } diff --git a/src/manage-web/RealTimeService.java b/src/manage-web/RealTimeService.java new file mode 100644 index 0000000..5ee933e --- /dev/null +++ b/src/manage-web/RealTimeService.java @@ -0,0 +1,30 @@ +package com.campus.water.web; + +import org.springframework.stereotype.Service; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +@Service +public class RealTimeService { + + private ConcurrentHashMap realTimeData = new ConcurrentHashMap<>(); + private List alerts = new ArrayList<>(); + + public void updateRealTimeData(Object sensorData) { + System.out.println("更新实时数据"); + } + + public void addAlert(Object alertData) { + alerts.add(alertData); + System.out.println("添加告警"); + } + + public Object getRealTimeData(String deviceId) { + return realTimeData.get(deviceId); + } + + public List getAllRealTimeData() { + return new ArrayList<>(realTimeData.values()); + } +} \ No newline at end of file diff --git a/src/manage-web/WebMqttConfig.java b/src/manage-web/WebMqttConfig.java index 404c953..4f4670c 100644 --- a/src/manage-web/WebMqttConfig.java +++ b/src/manage-web/WebMqttConfig.java @@ -6,14 +6,11 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -/** - * 管理Web端MQTT配置(订阅MQTT服务器转发的全量数据,适配管理员监控需求) - */ @Configuration public class WebMqttConfig { - // MQTT服务器转发的主题前缀(需求中全量设备数据/告警) - private String mqttBroker = "tcp://mqtt-server.campus.com:1883"; - private String clientId = "campus-water-admin-web"; // Web端客户端ID + // 修改为本地MQTT + private String mqttBroker = "tcp://localhost:1883"; + private String clientId = "campus-water-admin-web"; @Bean public MqttClient mqttClient() throws Exception { @@ -26,17 +23,16 @@ public class WebMqttConfig { return client; } - // 订阅的主题(全量设备数据+全量告警,适配管理员权限) @Bean public String[] subscribeTopics() { return new String[]{ - "forward/web/device/#", // 全量设备实时数据 - "forward/web/alert/#" // 全量告警报文 + "forward/web/device/#", + "forward/web/alert/#" }; } @Bean public int[] qosLevels() { - return new int[]{0, 1}; // 告警QoS=1,设备数据QoS=0 + return new int[]{0, 1}; } } \ No newline at end of file diff --git a/src/manage-web/WebMqttSubscriber.java b/src/manage-web/WebMqttSubscriber.java index 3881249..604501f 100644 --- a/src/manage-web/WebMqttSubscriber.java +++ b/src/manage-web/WebMqttSubscriber.java @@ -1,9 +1,6 @@ package com.campus.water.web.mqtt; import com.alibaba.fastjson2.JSONObject; -import com.campus.water.web.entity.SensorDataWebVO; -import com.campus.water.web.entity.AlertWebVO; -import com.campus.water.web.service.RealTimeService; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -12,9 +9,6 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; -/** - * 管理Web端MQTT订阅器(接收实时设备数据/告警,适配管理员监控需求) - */ @Component public class WebMqttSubscriber { @Autowired @@ -41,13 +35,13 @@ public class WebMqttSubscriber { String payload = new String(message.getPayload()); System.out.println("管理Web接收数据:主题=" + topic + ",内容=" + payload); - // 解析数据并缓存到内存(供前端轮询获取实时数据) + // 直接使用JSONObject处理数据 + JSONObject data = JSONObject.parseObject(payload); + if (topic.startsWith("forward/web/device/")) { - SensorDataWebVO sensorVO = JSONObject.parseObject(payload, SensorDataWebVO.class); - realTimeService.updateRealTimeData(sensorVO); // 缓存实时数据 + realTimeService.updateRealTimeData(data); } else if (topic.startsWith("forward/web/alert/")) { - AlertWebVO alertVO = JSONObject.parseObject(payload, AlertWebVO.class); - realTimeService.addAlert(alertVO); // 缓存告警(前端告警列表实时更新) + realTimeService.addAlert(data); } }; } diff --git a/src/mqtt-server/DataForwarder.java b/src/mqtt-server/DataForwarder.java index bbd4dcf..99a2123 100644 --- a/src/mqtt-server/DataForwarder.java +++ b/src/mqtt-server/DataForwarder.java @@ -1,83 +1,83 @@ package com.campus.water.mqtt.core; import com.alibaba.fastjson2.JSONObject; -import com.campus.water.mqtt.entity.SensorData; -import com.campus.water.mqtt.entity.AlertMessage; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; -/** - * 数据转发器(按需求转发到管理Web/学生APP/维修APP,实现多端数据同步) - */ +import java.util.Map; + @Component public class DataForwarder { @Autowired private MqttClient mqttClient; - // 1. 转发传感器数据到管理Web(全量数据,适配管理员监控需求) - public void forwardToWeb(SensorData sensorData) throws Exception { - String topic = "forward/web/device/" + sensorData.getDeviceId(); + public void forwardToWeb(Map sensorData) throws Exception { + String deviceId = (String) sensorData.get("deviceId"); + String topic = "forward/web/device/" + deviceId; String payload = JSONObject.toJSONString(sensorData); - publish(topic, payload, 0); // Web端QoS=0(普通监控数据) + publish(topic, payload, 0); + System.out.println("转发到管理Web: " + deviceId); } - // 2. 转发告警到管理Web(适配管理员查看告警需求) - public void forwardToWeb(AlertMessage alert) throws Exception { - String topic = "forward/web/alert/" + alert.getDeviceId(); - String payload = JSONObject.toJSONString(alert); - publish(topic, payload, 1); // 告警QoS=1(确保管理员看到) + public void forwardToWeb(Map alertData, boolean isAlert) throws Exception { + String deviceId = (String) alertData.get("deviceId"); + String topic = "forward/web/alert/" + deviceId; + String payload = JSONObject.toJSONString(alertData); + publish(topic, payload, 1); + System.out.println("转发告警到管理Web: " + deviceId); } - // 3. 转发制水机水质数据到学生APP(仅终端机关联的制水机,适配查水质需求) - public void forwardToStudentApp(SensorData sensorData) throws Exception { - // 仅制水机数据需转发(学生APP查水质) - if (sensorData.getDeviceType() != 1) return; + public void forwardToStudentApp(Map sensorData) throws Exception { + Integer deviceType = (Integer) sensorData.get("deviceType"); + if (deviceType != 1) return; - // 按终端机ID转发(需求中学生扫码查对应终端机的水质) - String terminalId = getRelatedTerminalId(sensorData.getDeviceId()); + String deviceId = (String) sensorData.get("deviceId"); + String terminalId = getRelatedTerminalId(deviceId); String topic = "forward/student/terminal/" + terminalId; + + Double tdsValue = (Double) sensorData.get("tdsValue"); String payload = JSONObject.toJSONString(new JSONObject() {{ put("terminalId", terminalId); - put("tdsValue", sensorData.getTdsValue()); - put("waterQuality", getWaterQuality(sensorData.getTdsValue())); // 水质等级 - put("timestamp", sensorData.getTimestamp()); + put("tdsValue", tdsValue); + put("waterQuality", getWaterQuality(tdsValue)); + put("timestamp", sensorData.get("timestamp")); }}); publish(topic, payload, 0); + System.out.println("转发到学生APP: " + terminalId); } - // 4. 转发供水机水位/告警到维修APP(仅本辖区设备,适配维修员权限需求) - public void forwardToRepairApp(SensorData sensorData) throws Exception { - String areaId = sensorData.getAreaId(); // 设备所属片区(如AREA01) - String topic = "forward/repair/area/" + areaId + "/device/" + sensorData.getDeviceId(); + public void forwardToRepairApp(Map sensorData) throws Exception { + String areaId = (String) sensorData.get("areaId"); + String deviceId = (String) sensorData.get("deviceId"); + String topic = "forward/repair/area/" + areaId + "/device/" + deviceId; String payload = JSONObject.toJSONString(sensorData); publish(topic, payload, 0); + System.out.println("转发到维修APP: " + deviceId); } - public void forwardToRepairApp(AlertMessage alert) throws Exception { - String areaId = alert.getAreaId(); - String topic = "forward/repair/area/" + areaId + "/alert/" + alert.getDeviceId(); - String payload = JSONObject.toJSONString(alert); - publish(topic, payload, 1); // 告警QoS=1(确保维修员抢单) + public void forwardToRepairApp(Map alertData, boolean isAlert) throws Exception { + String areaId = (String) alertData.get("areaId"); + String deviceId = (String) alertData.get("deviceId"); + String topic = "forward/repair/area/" + areaId + "/alert/" + deviceId; + String payload = JSONObject.toJSONString(alertData); + publish(topic, payload, 1); + System.out.println("转发告警到维修APP: " + deviceId); } - // 通用发布方法 private void publish(String topic, String payload, int qos) throws Exception { MqttMessage message = new MqttMessage(payload.getBytes()); message.setQos(qos); mqttClient.publish(topic, message); - System.out.println("MQTT服务器转发数据:主题=" + topic + ",内容=" + payload); } - // 工具方法:根据制水机ID获取关联的终端机ID(需求中终端机关联制水机) private String getRelatedTerminalId(String deviceId) { - // 实际需从MySQL设备表查询,此处模拟(如制水机WM001关联终端机TERM001/TERM002) return deviceId.equals("WM001") ? "TERM001" : "TERM002"; } - // 工具方法:根据TDS值判断水质等级(需求中“优质矿化水”标准) - private String getWaterQuality(double tds) { + private String getWaterQuality(Double tds) { + if (tds == null) return "未知"; if (tds < 50) return "纯净水(无矿物质)"; else if (tds < 300) return "优质矿化水"; else if (tds < 600) return "合格矿化水"; diff --git a/src/mqtt-server/MqttConfig.java b/src/mqtt-server/MqttConfig.java index 0f060f6..6d0c562 100644 --- a/src/mqtt-server/MqttConfig.java +++ b/src/mqtt-server/MqttConfig.java @@ -6,48 +6,38 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import java.util.Properties; - -/** - * MQTT服务器配置(适配需求中Air724模块数据上传,对接MQTT Broker) - */ @Configuration public class MqttConfig { - // 从配置文件读取Broker地址(需求中云端Broker,如EMQX) - private String brokerUrl = "tcp://mqtt-server.campus.com:1883"; - private String clientId = "campus-water-mqtt-server"; // 服务器端客户端ID - private int keepAlive = 60; // 心跳间隔(秒),适配Air724模块低功耗 + // 修改为本地MQTT broker + private String brokerUrl = "tcp://localhost:1883"; + private String clientId = "campus-water-mqtt-server"; + private int keepAlive = 60; @Bean public MqttClient mqttClient() throws Exception { - // 1. 创建MQTT客户端(内存持久化,避免服务器存储压力) MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence()); - // 2. 配置连接参数(确保数据可靠传输,适配需求中告警不丢失) MqttConnectOptions options = new MqttConnectOptions(); - options.setCleanSession(false); // 保持会话,重连后恢复订阅 + options.setCleanSession(false); options.setKeepAliveInterval(keepAlive); - options.setAutomaticReconnect(true); // 自动重连,适配网络波动 + options.setAutomaticReconnect(true); - // 3. 连接Broker client.connect(options); System.out.println("MQTT服务器已连接Broker:" + brokerUrl); return client; } - // 订阅的主题(严格按需求中设备类型划分,便于权限隔离) @Bean public String[] subscribeTopics() { return new String[]{ - "sensor/watermaker/#", // 制水机传感器数据(TDS/水压/流量,需求图3) - "sensor/watersupply/#", // 供水机传感器数据(水位/漏水,需求图3) - "alert/#" // 设备告警报文(滤芯损坏/水位异常,需求告警流程) + "sensor/watermaker/#", + "sensor/watersupply/#", + "alert/#" }; } - // QoS配置(需求中告警用QoS=1确保送达,普通传感器数据QoS=0) @Bean public int[] qosLevels() { - return new int[]{0, 0, 1}; // 告警QoS=1,传感器数据QoS=0 + return new int[]{0, 0, 1}; } } \ No newline at end of file diff --git a/src/mqtt-server/MqttSubscriber.java b/src/mqtt-server/MqttSubscriber.java index beeb56b..3f83755 100644 --- a/src/mqtt-server/MqttSubscriber.java +++ b/src/mqtt-server/MqttSubscriber.java @@ -1,7 +1,6 @@ package com.campus.water.mqtt.core; -import com.campus.water.mqtt.service.AlertTriggerService; -import com.campus.water.mqtt.service.SensorDataStorage; +import com.alibaba.fastjson2.JSONObject; import org.eclipse.paho.client.mqttv3.IMqttMessageListener; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttMessage; @@ -9,10 +8,10 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; -/** - * MQTT数据订阅器(接收制水机/供水机数据,对应需求中“云端接收设备数据”) - */ @Component public class MqttSubscriber { @Autowired @@ -22,18 +21,10 @@ public class MqttSubscriber { @Autowired private int[] qosLevels; @Autowired - private DataParser dataParser; - @Autowired private DataForwarder dataForwarder; - @Autowired - private SensorDataStorage sensorDataStorage; - @Autowired - private AlertTriggerService alertTriggerService; - // 初始化订阅(项目启动后自动执行) @PostConstruct public void startSubscribe() throws Exception { - // 为每个主题绑定消息监听器 for (int i = 0; i < subscribeTopics.length; i++) { String topic = subscribeTopics[i]; int qos = qosLevels[i]; @@ -42,32 +33,112 @@ public class MqttSubscriber { } } - // 消息监听器(接收数据后解析→存储→转发) private IMqttMessageListener messageListener() { return (topic, message) -> { String payload = new String(message.getPayload()); System.out.println("MQTT服务器接收数据:主题=" + topic + ",内容=" + payload); - // 按主题类型处理(严格对齐需求中设备数据类型) - if (topic.startsWith("sensor/watermaker/")) { - // 1. 处理制水机传感器数据(TDS/水压/流量) - var sensorData = dataParser.parseWaterMakerData(topic, payload); - sensorDataStorage.saveToInfluxDB(sensorData); // 存入时序库 - dataForwarder.forwardToWeb(sensorData); // 转发到管理Web - dataForwarder.forwardToStudentApp(sensorData); // 转发到学生APP(水质数据) - } else if (topic.startsWith("sensor/watersupply/")) { - // 2. 处理供水机传感器数据(水位) - var sensorData = dataParser.parseWaterSupplyData(topic, payload); - sensorDataStorage.saveToInfluxDB(sensorData); // 存入时序库 - dataForwarder.forwardToWeb(sensorData); // 转发到管理Web - dataForwarder.forwardToRepairApp(sensorData); // 转发到维修APP(辖区水位) - } else if (topic.startsWith("alert/")) { - // 3. 处理告警报文(触发工单) - var alert = dataParser.parseAlertMessage(topic, payload); - alertTriggerService.createWorkOrder(alert); // 生成工单(存入MySQL) - dataForwarder.forwardToWeb(alert); // 转发到管理Web(告警列表) - dataForwarder.forwardToRepairApp(alert); // 转发到维修APP(抢单) + try { + if (topic.startsWith("sensor/watermaker/")) { + // 解析制水机数据 + Map sensorData = parseWaterMakerData(topic, payload); + dataForwarder.forwardToWeb(sensorData); + dataForwarder.forwardToStudentApp(sensorData); + + } else if (topic.startsWith("sensor/watersupply/")) { + // 解析供水机数据 + Map sensorData = parseWaterSupplyData(topic, payload); + dataForwarder.forwardToWeb(sensorData); + dataForwarder.forwardToRepairApp(sensorData); + + } else if (topic.startsWith("alert/")) { + // 解析告警数据 + Map alertData = parseAlertMessage(topic, payload); + dataForwarder.forwardToWeb(alertData); + dataForwarder.forwardToRepairApp(alertData); + } + } catch (Exception e) { + System.err.println("处理MQTT消息异常: " + e.getMessage()); + e.printStackTrace(); } }; } + + private Map parseWaterMakerData(String topic, String payload) { + Map data = new HashMap<>(); + parseCommonData(topic, payload, data); + data.put("deviceType", 1); // 制水机 + + JSONObject json = JSONObject.parseObject(payload); + data.put("tdsValue", json.getDouble("tds")); + data.put("waterFlow", json.getDouble("flow")); + data.put("waterPressure", json.getDouble("pressure")); + data.put("filterLife", json.getInteger("filter_life")); + data.put("leakage", json.getBoolean("leakage")); + + data.put("status", determineStatus(data)); + return data; + } + + private Map parseWaterSupplyData(String topic, String payload) { + Map data = new HashMap<>(); + parseCommonData(topic, payload, data); + data.put("deviceType", 2); // 供水机 + + JSONObject json = JSONObject.parseObject(payload); + data.put("waterFlow", json.getDouble("flow")); + data.put("waterPressure", json.getDouble("pressure")); + data.put("waterLevel", json.getDouble("water_level")); + + data.put("status", determineStatus(data)); + return data; + } + + private Map parseAlertMessage(String topic, String payload) { + Map alert = new HashMap<>(); + + String[] parts = topic.split("/"); + alert.put("deviceId", parts.length >= 2 ? parts[1] : "unknown"); + + JSONObject json = JSONObject.parseObject(payload); + alert.put("alertType", json.getString("alert_type")); + alert.put("alertLevel", json.getString("alert_level")); + alert.put("alertMessage", json.getString("alert_message")); + alert.put("areaId", json.getString("area_id")); + alert.put("deviceType", json.getInteger("device_type")); + alert.put("timestamp", LocalDateTime.now().toString()); + + return alert; + } + + private void parseCommonData(String topic, String payload, Map data) { + String[] parts = topic.split("/"); + data.put("deviceId", parts.length >= 3 ? parts[2] : "unknown"); + + JSONObject json = JSONObject.parseObject(payload); + data.put("areaId", json.getString("area_id")); + data.put("temperature", json.getDouble("temperature")); + data.put("humidity", json.getDouble("humidity")); + data.put("timestamp", LocalDateTime.now().toString()); + } + + private String determineStatus(Map data) { + Integer deviceType = (Integer) data.get("deviceType"); + + if (deviceType == 1) { // 制水机 + Double tdsValue = (Double) data.get("tdsValue"); + Integer filterLife = (Integer) data.get("filterLife"); + Boolean leakage = (Boolean) data.get("leakage"); + + if (tdsValue != null && tdsValue > 600) return "error"; + if (filterLife != null && filterLife < 10) return "error"; + if (leakage != null && leakage) return "error"; + if (tdsValue != null && tdsValue > 300) return "warning"; + } else if (deviceType == 2) { // 供水机 + Double waterLevel = (Double) data.get("waterLevel"); + if (waterLevel != null && waterLevel < 10) return "error"; + if (waterLevel != null && waterLevel < 20) return "warning"; + } + return "normal"; + } } \ No newline at end of file -- 2.34.1