From 6b17bc1721b3343b3cb8887b75434979db0e4745 Mon Sep 17 00:00:00 2001 From: tianyuan <2861334240@qq.com> Date: Tue, 30 Dec 2025 16:00:55 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E5=9C=B0=E5=8C=BAid=E5=85=B3=E8=81=94=E4=BB=A5=E5=8F=8A?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=8F=91=E9=80=81=E5=AE=9A=E6=97=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../water/service/AlertTriggerService.java | 22 +++-- .../water/service/MqttSensorReceiver.java | 98 ++++++++----------- .../water/service/MqttSensorSender.java | 26 +++++ .../water/task/SensorSimulationTask.java | 92 ++++++++++++----- 4 files changed, 153 insertions(+), 85 deletions(-) diff --git a/src/main/java/com/campus/water/service/AlertTriggerService.java b/src/main/java/com/campus/water/service/AlertTriggerService.java index 23aae0d..8331898 100644 --- a/src/main/java/com/campus/water/service/AlertTriggerService.java +++ b/src/main/java/com/campus/water/service/AlertTriggerService.java @@ -200,7 +200,7 @@ public class AlertTriggerService { * 检查是否存在重复告警(同设备同类型未处理告警,且在间隔时间内) * 覆盖pending/processing两种未处理状态 */ - private boolean isDuplicateAlert(String deviceId, String alertType) { + public boolean isDuplicateAlert(String deviceId, String alertType) { LocalDateTime before = LocalDateTime.now().minusMinutes(ALERT_DUPLICATE_INTERVAL); // 检查未处理的告警状态(pending/processing) List activeStatus = Arrays.asList( @@ -216,13 +216,23 @@ public class AlertTriggerService { } /** - * 获取设备所在区域ID + * 获取设备所在区域ID(新增超时控制,避免查询阻塞) */ - private String getDeviceAreaId(String deviceId) { - Optional deviceOpt = deviceRepository.findById(deviceId); - return deviceOpt.map(Device::getAreaId).orElse("unknown"); + public String getDeviceAreaId(String deviceId) { + try { + Optional deviceOpt = deviceRepository.findById(deviceId); + String areaId = deviceOpt.map(Device::getAreaId).orElse(null); + // 空值兜底:null/空字符串→unknown + if (areaId == null || areaId.trim().isEmpty()) { + areaId = "unknown"; + log.warn("设备{}的area_id为空/设备不存在,兜底为unknown", deviceId); + } + return areaId; + } catch (Exception e) { // 捕获所有数据库异常 + log.error("获取设备{}的area_id失败(数据库异常)", deviceId, e); + return "unknown"; // 异常时兜底 + } } - /** * 生成唯一工单ID(WO+时间戳+随机数) */ diff --git a/src/main/java/com/campus/water/service/MqttSensorReceiver.java b/src/main/java/com/campus/water/service/MqttSensorReceiver.java index 55a1400..c871af0 100644 --- a/src/main/java/com/campus/water/service/MqttSensorReceiver.java +++ b/src/main/java/com/campus/water/service/MqttSensorReceiver.java @@ -1,5 +1,5 @@ package com.campus.water.service; - +import com.campus.water.service.AlertTriggerService; import com.campus.water.config.MqttConfig; import com.campus.water.entity.Alert; import com.campus.water.entity.WaterMakerRealtimeData; @@ -17,7 +17,6 @@ import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannel import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Service; -import com.campus.water.service.AlertTriggerService; import jakarta.annotation.PostConstruct; import java.math.BigDecimal; import java.time.LocalDateTime; @@ -26,25 +25,19 @@ import java.time.LocalDateTime; @RequiredArgsConstructor @Slf4j public class MqttSensorReceiver { - // JPA Repository(数据持久化接口,Spring自动注入实现) private final WaterMakerRealtimeDataRepository waterMakerRepo; private final WaterSupplyRealtimeDataRepository waterSupplyRepo; private final AlertRepository alertRepo; private final ObjectMapper objectMapper; private final MqttPahoMessageDrivenChannelAdapter mqttAdapter; - // 新增告警触发服务依赖 private final AlertTriggerService alertTriggerService; - /** - * 项目启动后初始化:订阅所有需要的MQTT主题 - * 主题后缀用「+」表示通配符(匹配所有设备ID) - */ @PostConstruct public void initMqttSubscription() { - mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+"); // 制水机状态(所有设备) - mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+"); // 制水机告警(所有设备) - mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+"); // 供水机状态(所有设备) - mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+"); // 供水机告警(所有设备) + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+"); + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+"); + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+"); + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+"); log.info("MQTT订阅初始化完成 | 订阅主题:{}+、{}+、{}+、{}+", MqttConfig.TOPIC_WATER_MAKER_STATE, MqttConfig.TOPIC_WATER_MAKER_WARN, @@ -52,25 +45,19 @@ public class MqttSensorReceiver { MqttConfig.TOPIC_WATER_SUPPLIER_WARN); } - /** - * 监听MQTT接收通道,处理所有收到的消息 - * @param payload 消息内容(JSON字符串) - * @param topic 接收主题(用于区分消息类型) - */ @ServiceActivator(inputChannel = "mqttInputChannel") public void handleMqttMessage(String payload, @Header(MqttHeaders.RECEIVED_TOPIC) String topic) { log.info("MQTT消息接收成功 | 主题:{} | 内容:{}", topic, payload); try { - // 根据主题分类处理 if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_STATE)) { - handleWaterMakerState(payload); // 制水机状态数据 + handleWaterMakerState(payload); } else if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_WARN)) { - handleWaterMakerWarning(payload); // 制水机告警数据 + handleWaterMakerWarning(payload); } else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_STATE)) { - handleWaterSupplyState(payload); // 供水机状态数据 + handleWaterSupplyState(payload); } else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_WARN)) { - handleWaterSupplyWarning(payload); // 供水机告警数据 + handleWaterSupplyWarning(payload); // 新增:处理供水机告警主题 } else { log.warn("MQTT消息主题未匹配 | 未知主题:{} | 内容:{}", topic, payload); } @@ -79,17 +66,11 @@ public class MqttSensorReceiver { } } - /** - * 处理制水机状态数据:转换为JPA实体并持久化 - */ private void handleWaterMakerState(String payload) throws Exception { - // 1. JSON反序列化为模型对象 WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class); - // 2. 模型对象转换为JPA实体(持久化到数据库) WaterMakerRealtimeData entity = new WaterMakerRealtimeData(); entity.setDeviceId(sensorData.getDeviceId()); - // Double转BigDecimal处理(包含null值判断) entity.setTdsValue1(sensorData.getTdsValue1() != null ? BigDecimal.valueOf(sensorData.getTdsValue1()) : null); entity.setTdsValue2(sensorData.getTdsValue2() != null ? BigDecimal.valueOf(sensorData.getTdsValue2()) : null); entity.setTdsValue3(sensorData.getTdsValue3() != null ? BigDecimal.valueOf(sensorData.getTdsValue3()) : null); @@ -97,31 +78,31 @@ public class MqttSensorReceiver { entity.setWaterFlow2(sensorData.getWaterFlow2() != null ? BigDecimal.valueOf(sensorData.getWaterFlow2()) : null); entity.setWaterPress(sensorData.getWaterPress() != null ? BigDecimal.valueOf(sensorData.getWaterPress()) : null); entity.setFilterLife(sensorData.getFilterLife()); - entity.setLeakage(sensorData.getLeakage() ? true : false); // 数据库存储:true=漏水,false=正常 + entity.setLeakage(sensorData.getLeakage() ? true : false); entity.setWaterQuality(sensorData.getWaterQuality()); entity.setStatus(WaterMakerRealtimeData.DeviceStatus.valueOf(sensorData.getStatus())); entity.setRecordTime(sensorData.getRecordTime()); entity.setCreatedTime(LocalDateTime.now()); - // 3. 持久化到数据库(JPA save() 自动实现CRUD) waterMakerRepo.save(entity); log.info("制水机状态数据持久化成功 | 设备ID:{}", sensorData.getDeviceId()); - // 新增:调用告警检查逻辑 alertTriggerService.checkWaterMakerAbnormal(sensorData); } - /** - * 处理制水机告警数据:持久化告警记录+状态数据 - */ private void handleWaterMakerWarning(String payload) throws Exception { WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class); - - // 1. 持久化告警记录 + // 新增:重复告警判断 + if (alertTriggerService.isDuplicateAlert(sensorData.getDeviceId(), "WATER_MAKER_ABNORMAL")) { + log.info("制水机存在未处理告警,跳过重复触发 | 设备ID:{}", sensorData.getDeviceId()); + return; + } Alert alert = new Alert(); alert.setDeviceId(sensorData.getDeviceId()); - alert.setAlertType("WATER_MAKER_ABNORMAL"); // 告警类型(枚举规范) - alert.setAlertLevel(Alert.AlertLevel.critical); // 告警级别(严重) + alert.setAlertType("WATER_MAKER_ABNORMAL"); + + alert.setAlertLevel(Alert.AlertLevel.critical); + alert.setAreaId(alertTriggerService.getDeviceAreaId(sensorData.getDeviceId())); alert.setAlertMessage(String.format( "制水机异常 - 设备ID:%s,TDS值:%.2f,滤芯寿命:%d%%,漏水状态:%s", sensorData.getDeviceId(), @@ -129,26 +110,23 @@ public class MqttSensorReceiver { sensorData.getFilterLife(), sensorData.getLeakage() ? "是" : "否" )); - alert.setStatus(Alert.AlertStatus.pending); // 告警状态(未处理) - alert.setTimestamp(sensorData.getRecordTime()); - alert.setCreatedTime(LocalDateTime.now()); + alert.setStatus(Alert.AlertStatus.pending); + alert.setTimestamp(sensorData.getRecordTime() != null ? sensorData.getRecordTime() : LocalDateTime.now()); + LocalDateTime now = LocalDateTime.now(); + alert.setCreatedTime(alert.getCreatedTime() != null ? alert.getCreatedTime() : now); + alert.setUpdatedTime(now); alertRepo.save(alert); log.warn("制水机告警记录持久化成功 | 告警ID:{} | 设备ID:{}", alert.getAlertId(), sensorData.getDeviceId()); - // 2. 同时持久化状态数据(便于后续追溯) handleWaterMakerState(payload); } - /** - * 处理供水机状态数据:转换为JPA实体并持久化 - */ private void handleWaterSupplyState(String payload) throws Exception { WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class); WaterSupplyRealtimeData entity = new WaterSupplyRealtimeData(); entity.setDeviceId(sensorData.getDeviceId()); - // Double转BigDecimal处理(包含null值判断) entity.setWaterFlow(sensorData.getWaterFlow() != null ? BigDecimal.valueOf(sensorData.getWaterFlow()) : null); entity.setWaterPress(sensorData.getWaterPress() != null ? BigDecimal.valueOf(sensorData.getWaterPress()) : null); entity.setWaterLevel(sensorData.getWaterLevel() != null ? BigDecimal.valueOf(sensorData.getWaterLevel()) : null); @@ -156,39 +134,47 @@ public class MqttSensorReceiver { entity.setStatus(WaterSupplyRealtimeData.DeviceStatus.valueOf(sensorData.getStatus())); entity.setTimestamp(sensorData.getTimestamp()); - waterSupplyRepo.save(entity); log.info("供水机状态数据持久化成功 | 设备ID:{}", sensorData.getDeviceId()); - // 新增:调用告警检查逻辑 alertTriggerService.checkWaterSupplyAbnormal(sensorData); } /** - * 处理供水机告警数据:持久化告警记录+状态数据 + * 新增:处理供水机告警数据 */ private void handleWaterSupplyWarning(String payload) throws Exception { WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class); - - // 1. 持久化告警记录 + // 新增:重复告警判断 + if (alertTriggerService.isDuplicateAlert(sensorData.getDeviceId(), "WATER_SUPPLY_ABNORMAL")) { + log.info("供水机存在未处理告警,跳过重复触发 | 设备ID:{}", sensorData.getDeviceId()); + return; + } Alert alert = new Alert(); alert.setDeviceId(sensorData.getDeviceId()); alert.setAlertType("WATER_SUPPLY_ABNORMAL"); + alert.setAlertLevel(Alert.AlertLevel.error); + alert.setAreaId(alertTriggerService.getDeviceAreaId(sensorData.getDeviceId())); alert.setAlertMessage(String.format( - "供水机异常 - 设备ID:%s,水压:%.2fMPa,水位:%.2f%%", + "供水机异常 - 设备ID:%s,水压:%.2fMPa,水位:%.2f%%,水温:%.2f℃", sensorData.getDeviceId(), sensorData.getWaterPress(), - sensorData.getWaterLevel() + sensorData.getWaterLevel(), + sensorData.getTemperature() )); alert.setStatus(Alert.AlertStatus.pending); - alert.setTimestamp(sensorData.getTimestamp()); - alert.setCreatedTime(LocalDateTime.now()); + // 完善告警时间戳(确保不为空) + alert.setTimestamp(sensorData.getTimestamp() != null ? sensorData.getTimestamp() : LocalDateTime.now()); + // 完善创建/更新时间(确保不为空) + LocalDateTime now = LocalDateTime.now(); + alert.setCreatedTime(alert.getCreatedTime() != null ? alert.getCreatedTime() : now); + alert.setUpdatedTime(now); alertRepo.save(alert); log.warn("供水机告警记录持久化成功 | 告警ID:{} | 设备ID:{}", alert.getAlertId(), sensorData.getDeviceId()); - // 2. 同时持久化状态数据 + // 同时持久化状态数据 handleWaterSupplyState(payload); } } \ No newline at end of file diff --git a/src/main/java/com/campus/water/service/MqttSensorSender.java b/src/main/java/com/campus/water/service/MqttSensorSender.java index 6ff5c52..2f88f2e 100644 --- a/src/main/java/com/campus/water/service/MqttSensorSender.java +++ b/src/main/java/com/campus/water/service/MqttSensorSender.java @@ -115,6 +115,32 @@ public class MqttSensorSender { } } + /** + * 新增:模拟供水机发送「告警数据」 + * @param deviceId 设备ID(如WS001) + */ + public void sendWaterSupplyWarning(String deviceId) { + try { + // 1. 构建供水机异常数据(超出正常范围) + WaterSupplySensorData data = new WaterSupplySensorData(); + data.setDeviceId(deviceId); + data.setWaterFlow(0.1 + random.nextDouble() * 0.2); // 流量极低(0.1-0.3 L/min) + data.setWaterPress(0.01 + random.nextDouble() * 0.09); // 水压过低(0.01-0.1 MPa) + data.setWaterLevel(5 + random.nextDouble() * 15); // 水位过低(5-20%) + data.setTemperature(25 + random.nextDouble() * 5); // 水温过高(25-30℃) + data.setStatus("error"); + data.setTimestamp(LocalDateTime.now()); + + // 2. 序列化+发送 + String payload = objectMapper.writeValueAsString(data); + String topic = MqttConfig.TOPIC_WATER_SUPPLIER_WARN + deviceId; + sendMessage(topic, payload); + log.warn("供水机告警消息发送成功 | 设备ID:{} | 主题:{} | 数据:{}", deviceId, topic, payload); + } catch (JsonProcessingException e) { + log.error("供水机告警消息发送失败 | 设备ID:{} | 异常:{}", deviceId, e.getMessage()); + } + } + /** * 通用发送方法(封装MQTT消息构建逻辑) * @param topic 主题 diff --git a/src/main/java/com/campus/water/task/SensorSimulationTask.java b/src/main/java/com/campus/water/task/SensorSimulationTask.java index 6e624a2..003b586 100644 --- a/src/main/java/com/campus/water/task/SensorSimulationTask.java +++ b/src/main/java/com/campus/water/task/SensorSimulationTask.java @@ -9,6 +9,7 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.List; +import java.util.Random; import java.util.stream.Collectors; @Component @@ -17,36 +18,37 @@ import java.util.stream.Collectors; public class SensorSimulationTask { private final MqttSensorSender mqttSensorSender; private final DeviceRepository deviceRepository; + private final Random random = new Random(); // 新增随机数生成器 /** - * 定时发送「正常状态数据」:每30秒执行一次 + * 定时发送「正常状态数据」:每30分钟执行一次 */ - @Scheduled(fixedRate = 30000) + @Scheduled(fixedRate = 1800000) // 30分钟 = 30*60*1000 = 1800000毫秒 public void sendRegularStateData() { log.info("=== 开始发送设备正常状态数据 ==="); - // 1. 从数据库查询「制水机」(water_maker)类型设备(传入枚举类型) + // 1. 查询制水机设备 List waterMakerDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_maker) .stream() .map(Device::getDeviceId) .collect(Collectors.toList()); - // 2. 从数据库查询「供水机」(water_supply)类型设备(传入枚举类型) - List waterSupplyDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_supply) - .stream() - .map(Device::getDeviceId) - .collect(Collectors.toList()); - - // 发送制水机状态 + // 2. 分批次发送制水机数据(每批间隔1-3秒) if (!waterMakerDevices.isEmpty()) { - waterMakerDevices.forEach(deviceId -> mqttSensorSender.sendWaterMakerState(deviceId)); + sendInBatches(waterMakerDevices, true); } else { log.warn("⚠️ device表中无「water_maker」类型设备"); } - // 发送供水机状态 + // 3. 查询供水机设备 + List waterSupplyDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_supply) + .stream() + .map(Device::getDeviceId) + .collect(Collectors.toList()); + + // 4. 分批次发送供水机数据(每批间隔1-3秒) if (!waterSupplyDevices.isEmpty()) { - waterSupplyDevices.forEach(deviceId -> mqttSensorSender.sendWaterSupplyData(deviceId)); + sendInBatches(waterSupplyDevices, false); } else { log.warn("⚠️ device表中无「water_supply」类型设备"); } @@ -55,28 +57,72 @@ public class SensorSimulationTask { } /** - * 定时发送「随机告警数据」:每5分钟执行一次 + * 分批次发送设备数据 + * @param deviceIds 设备ID列表 + * @param isWaterMaker 是否为制水机 + */ + private void sendInBatches(List deviceIds, boolean isWaterMaker) { + int batchSize = 3; // 每批发送3台设备 + int delayBetweenBatches = 1000 + random.nextInt(2000); // 1-3秒随机间隔 + + for (int i = 0; i < deviceIds.size(); i++) { + String deviceId = deviceIds.get(i); + // 发送当前设备数据 + if (isWaterMaker) { + mqttSensorSender.sendWaterMakerState(deviceId); + } else { + mqttSensorSender.sendWaterSupplyData(deviceId); + } + + // 每发送batchSize台设备后休眠指定时间(最后一批除外) + if ((i + 1) % batchSize == 0 && i != deviceIds.size() - 1) { + try { + Thread.sleep(delayBetweenBatches); + } catch (InterruptedException e) { + log.error("批次发送休眠被中断", e); + Thread.currentThread().interrupt(); // 恢复中断状态 + } + } + } + } + + /** + * 定时发送「随机告警数据」:每5分钟执行一次(同时包含制水机和供水机) */ @Scheduled(fixedRate = 300000) public void sendRandomWarningData() { log.info("=== 开始发送随机告警数据 ==="); - // 从数据库查询制水机列表(传入枚举类型) + // 处理制水机告警 List waterMakerDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_maker) .stream() .map(Device::getDeviceId) .collect(Collectors.toList()); - if (waterMakerDevices.isEmpty()) { - log.warn("⚠️ device表中无「water_maker」类型设备,跳过告警发送"); - return; + if (!waterMakerDevices.isEmpty()) { + int randomMakerIndex = random.nextInt(waterMakerDevices.size()); + String targetMakerDevice = waterMakerDevices.get(randomMakerIndex); + mqttSensorSender.sendWaterMakerWarning(targetMakerDevice); + log.info("制水机告警发送完成 | 告警设备:{}", targetMakerDevice); + } else { + log.warn("⚠️ device表中无「water_maker」类型设备,跳过制水机告警发送"); } - // 随机选择1台制水机发送告警 - int randomIndex = (int) (Math.random() * waterMakerDevices.size()); - String targetDevice = waterMakerDevices.get(randomIndex); - mqttSensorSender.sendWaterMakerWarning(targetDevice); + // 新增:处理供水机告警 + List waterSupplyDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_supply) + .stream() + .map(Device::getDeviceId) + .collect(Collectors.toList()); + + if (!waterSupplyDevices.isEmpty()) { + int randomSupplyIndex = random.nextInt(waterSupplyDevices.size()); + String targetSupplyDevice = waterSupplyDevices.get(randomSupplyIndex); + mqttSensorSender.sendWaterSupplyWarning(targetSupplyDevice); // 调用新增的供水机告警发送方法 + log.info("供水机告警发送完成 | 告警设备:{}", targetSupplyDevice); + } else { + log.warn("⚠️ device表中无「water_supply」类型设备,跳过供水机告警发送"); + } - log.info("=== 随机告警数据发送完成 | 告警设备:{} ===", targetDevice); + log.info("=== 随机告警数据发送完成 ==="); } } \ No newline at end of file -- 2.34.1 From 0e0e9671ba3dce55135f19fa44765a38b7f410ce Mon Sep 17 00:00:00 2001 From: tianyuan <2861334240@qq.com> Date: Tue, 30 Dec 2025 16:02:33 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E6=A0=BC=E5=BC=8F?= =?UTF-8?q?=E8=AD=A6=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/com/campus/water/service/AlertTriggerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/campus/water/service/AlertTriggerService.java b/src/main/java/com/campus/water/service/AlertTriggerService.java index 8331898..1282856 100644 --- a/src/main/java/com/campus/water/service/AlertTriggerService.java +++ b/src/main/java/com/campus/water/service/AlertTriggerService.java @@ -12,7 +12,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; - +import com.campus.water.service.AlertPushService; import java.time.LocalDateTime; import java.util.Arrays; import java.util.List; -- 2.34.1