|
|
|
|
@ -1,5 +1,5 @@
|
|
|
|
|
package com.campus.water.service;
|
|
|
|
|
|
|
|
|
|
import com.campus.water.service.AlertTriggerService;
|
|
|
|
|
import com.campus.water.config.MqttConfig;
|
|
|
|
|
import com.campus.water.entity.Alert;
|
|
|
|
|
import com.campus.water.entity.WaterMakerRealtimeData;
|
|
|
|
|
@ -17,7 +17,6 @@ import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannel
|
|
|
|
|
import org.springframework.integration.mqtt.support.MqttHeaders;
|
|
|
|
|
import org.springframework.messaging.handler.annotation.Header;
|
|
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
import com.campus.water.service.AlertTriggerService;
|
|
|
|
|
import jakarta.annotation.PostConstruct;
|
|
|
|
|
import java.math.BigDecimal;
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
|
@ -26,25 +25,19 @@ import java.time.LocalDateTime;
|
|
|
|
|
@RequiredArgsConstructor
|
|
|
|
|
@Slf4j
|
|
|
|
|
public class MqttSensorReceiver {
|
|
|
|
|
// JPA Repository(数据持久化接口,Spring自动注入实现)
|
|
|
|
|
private final WaterMakerRealtimeDataRepository waterMakerRepo;
|
|
|
|
|
private final WaterSupplyRealtimeDataRepository waterSupplyRepo;
|
|
|
|
|
private final AlertRepository alertRepo;
|
|
|
|
|
private final ObjectMapper objectMapper;
|
|
|
|
|
private final MqttPahoMessageDrivenChannelAdapter mqttAdapter;
|
|
|
|
|
// 新增告警触发服务依赖
|
|
|
|
|
private final AlertTriggerService alertTriggerService;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 项目启动后初始化:订阅所有需要的MQTT主题
|
|
|
|
|
* 主题后缀用「+」表示通配符(匹配所有设备ID)
|
|
|
|
|
*/
|
|
|
|
|
@PostConstruct
|
|
|
|
|
public void initMqttSubscription() {
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+"); // 制水机状态(所有设备)
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+"); // 制水机告警(所有设备)
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+"); // 供水机状态(所有设备)
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+"); // 供水机告警(所有设备)
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+");
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+");
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+");
|
|
|
|
|
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+");
|
|
|
|
|
log.info("MQTT订阅初始化完成 | 订阅主题:{}+、{}+、{}+、{}+",
|
|
|
|
|
MqttConfig.TOPIC_WATER_MAKER_STATE,
|
|
|
|
|
MqttConfig.TOPIC_WATER_MAKER_WARN,
|
|
|
|
|
@ -52,25 +45,19 @@ public class MqttSensorReceiver {
|
|
|
|
|
MqttConfig.TOPIC_WATER_SUPPLIER_WARN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 监听MQTT接收通道,处理所有收到的消息
|
|
|
|
|
* @param payload 消息内容(JSON字符串)
|
|
|
|
|
* @param topic 接收主题(用于区分消息类型)
|
|
|
|
|
*/
|
|
|
|
|
@ServiceActivator(inputChannel = "mqttInputChannel")
|
|
|
|
|
public void handleMqttMessage(String payload, @Header(MqttHeaders.RECEIVED_TOPIC) String topic) {
|
|
|
|
|
log.info("MQTT消息接收成功 | 主题:{} | 内容:{}", topic, payload);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
// 根据主题分类处理
|
|
|
|
|
if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_STATE)) {
|
|
|
|
|
handleWaterMakerState(payload); // 制水机状态数据
|
|
|
|
|
handleWaterMakerState(payload);
|
|
|
|
|
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_WARN)) {
|
|
|
|
|
handleWaterMakerWarning(payload); // 制水机告警数据
|
|
|
|
|
handleWaterMakerWarning(payload);
|
|
|
|
|
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_STATE)) {
|
|
|
|
|
handleWaterSupplyState(payload); // 供水机状态数据
|
|
|
|
|
handleWaterSupplyState(payload);
|
|
|
|
|
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_WARN)) {
|
|
|
|
|
handleWaterSupplyWarning(payload); // 供水机告警数据
|
|
|
|
|
handleWaterSupplyWarning(payload); // 新增:处理供水机告警主题
|
|
|
|
|
} else {
|
|
|
|
|
log.warn("MQTT消息主题未匹配 | 未知主题:{} | 内容:{}", topic, payload);
|
|
|
|
|
}
|
|
|
|
|
@ -79,17 +66,11 @@ public class MqttSensorReceiver {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理制水机状态数据:转换为JPA实体并持久化
|
|
|
|
|
*/
|
|
|
|
|
private void handleWaterMakerState(String payload) throws Exception {
|
|
|
|
|
// 1. JSON反序列化为模型对象
|
|
|
|
|
WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class);
|
|
|
|
|
|
|
|
|
|
// 2. 模型对象转换为JPA实体(持久化到数据库)
|
|
|
|
|
WaterMakerRealtimeData entity = new WaterMakerRealtimeData();
|
|
|
|
|
entity.setDeviceId(sensorData.getDeviceId());
|
|
|
|
|
// Double转BigDecimal处理(包含null值判断)
|
|
|
|
|
entity.setTdsValue1(sensorData.getTdsValue1() != null ? BigDecimal.valueOf(sensorData.getTdsValue1()) : null);
|
|
|
|
|
entity.setTdsValue2(sensorData.getTdsValue2() != null ? BigDecimal.valueOf(sensorData.getTdsValue2()) : null);
|
|
|
|
|
entity.setTdsValue3(sensorData.getTdsValue3() != null ? BigDecimal.valueOf(sensorData.getTdsValue3()) : null);
|
|
|
|
|
@ -97,31 +78,31 @@ public class MqttSensorReceiver {
|
|
|
|
|
entity.setWaterFlow2(sensorData.getWaterFlow2() != null ? BigDecimal.valueOf(sensorData.getWaterFlow2()) : null);
|
|
|
|
|
entity.setWaterPress(sensorData.getWaterPress() != null ? BigDecimal.valueOf(sensorData.getWaterPress()) : null);
|
|
|
|
|
entity.setFilterLife(sensorData.getFilterLife());
|
|
|
|
|
entity.setLeakage(sensorData.getLeakage() ? true : false); // 数据库存储:true=漏水,false=正常
|
|
|
|
|
entity.setLeakage(sensorData.getLeakage() ? true : false);
|
|
|
|
|
entity.setWaterQuality(sensorData.getWaterQuality());
|
|
|
|
|
entity.setStatus(WaterMakerRealtimeData.DeviceStatus.valueOf(sensorData.getStatus()));
|
|
|
|
|
entity.setRecordTime(sensorData.getRecordTime());
|
|
|
|
|
entity.setCreatedTime(LocalDateTime.now());
|
|
|
|
|
|
|
|
|
|
// 3. 持久化到数据库(JPA save() 自动实现CRUD)
|
|
|
|
|
waterMakerRepo.save(entity);
|
|
|
|
|
log.info("制水机状态数据持久化成功 | 设备ID:{}", sensorData.getDeviceId());
|
|
|
|
|
|
|
|
|
|
// 新增:调用告警检查逻辑
|
|
|
|
|
alertTriggerService.checkWaterMakerAbnormal(sensorData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理制水机告警数据:持久化告警记录+状态数据
|
|
|
|
|
*/
|
|
|
|
|
private void handleWaterMakerWarning(String payload) throws Exception {
|
|
|
|
|
WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class);
|
|
|
|
|
|
|
|
|
|
// 1. 持久化告警记录
|
|
|
|
|
// 新增:重复告警判断
|
|
|
|
|
if (alertTriggerService.isDuplicateAlert(sensorData.getDeviceId(), "WATER_MAKER_ABNORMAL")) {
|
|
|
|
|
log.info("制水机存在未处理告警,跳过重复触发 | 设备ID:{}", sensorData.getDeviceId());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Alert alert = new Alert();
|
|
|
|
|
alert.setDeviceId(sensorData.getDeviceId());
|
|
|
|
|
alert.setAlertType("WATER_MAKER_ABNORMAL"); // 告警类型(枚举规范)
|
|
|
|
|
alert.setAlertLevel(Alert.AlertLevel.critical); // 告警级别(严重)
|
|
|
|
|
alert.setAlertType("WATER_MAKER_ABNORMAL");
|
|
|
|
|
|
|
|
|
|
alert.setAlertLevel(Alert.AlertLevel.critical);
|
|
|
|
|
alert.setAreaId(alertTriggerService.getDeviceAreaId(sensorData.getDeviceId()));
|
|
|
|
|
alert.setAlertMessage(String.format(
|
|
|
|
|
"制水机异常 - 设备ID:%s,TDS值:%.2f,滤芯寿命:%d%%,漏水状态:%s",
|
|
|
|
|
sensorData.getDeviceId(),
|
|
|
|
|
@ -129,26 +110,23 @@ public class MqttSensorReceiver {
|
|
|
|
|
sensorData.getFilterLife(),
|
|
|
|
|
sensorData.getLeakage() ? "是" : "否"
|
|
|
|
|
));
|
|
|
|
|
alert.setStatus(Alert.AlertStatus.pending); // 告警状态(未处理)
|
|
|
|
|
alert.setTimestamp(sensorData.getRecordTime());
|
|
|
|
|
alert.setCreatedTime(LocalDateTime.now());
|
|
|
|
|
alert.setStatus(Alert.AlertStatus.pending);
|
|
|
|
|
alert.setTimestamp(sensorData.getRecordTime() != null ? sensorData.getRecordTime() : LocalDateTime.now());
|
|
|
|
|
LocalDateTime now = LocalDateTime.now();
|
|
|
|
|
alert.setCreatedTime(alert.getCreatedTime() != null ? alert.getCreatedTime() : now);
|
|
|
|
|
alert.setUpdatedTime(now);
|
|
|
|
|
|
|
|
|
|
alertRepo.save(alert);
|
|
|
|
|
log.warn("制水机告警记录持久化成功 | 告警ID:{} | 设备ID:{}", alert.getAlertId(), sensorData.getDeviceId());
|
|
|
|
|
|
|
|
|
|
// 2. 同时持久化状态数据(便于后续追溯)
|
|
|
|
|
handleWaterMakerState(payload);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理供水机状态数据:转换为JPA实体并持久化
|
|
|
|
|
*/
|
|
|
|
|
private void handleWaterSupplyState(String payload) throws Exception {
|
|
|
|
|
WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class);
|
|
|
|
|
|
|
|
|
|
WaterSupplyRealtimeData entity = new WaterSupplyRealtimeData();
|
|
|
|
|
entity.setDeviceId(sensorData.getDeviceId());
|
|
|
|
|
// Double转BigDecimal处理(包含null值判断)
|
|
|
|
|
entity.setWaterFlow(sensorData.getWaterFlow() != null ? BigDecimal.valueOf(sensorData.getWaterFlow()) : null);
|
|
|
|
|
entity.setWaterPress(sensorData.getWaterPress() != null ? BigDecimal.valueOf(sensorData.getWaterPress()) : null);
|
|
|
|
|
entity.setWaterLevel(sensorData.getWaterLevel() != null ? BigDecimal.valueOf(sensorData.getWaterLevel()) : null);
|
|
|
|
|
@ -156,39 +134,47 @@ public class MqttSensorReceiver {
|
|
|
|
|
entity.setStatus(WaterSupplyRealtimeData.DeviceStatus.valueOf(sensorData.getStatus()));
|
|
|
|
|
entity.setTimestamp(sensorData.getTimestamp());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
waterSupplyRepo.save(entity);
|
|
|
|
|
log.info("供水机状态数据持久化成功 | 设备ID:{}", sensorData.getDeviceId());
|
|
|
|
|
|
|
|
|
|
// 新增:调用告警检查逻辑
|
|
|
|
|
alertTriggerService.checkWaterSupplyAbnormal(sensorData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 处理供水机告警数据:持久化告警记录+状态数据
|
|
|
|
|
* 新增:处理供水机告警数据
|
|
|
|
|
*/
|
|
|
|
|
private void handleWaterSupplyWarning(String payload) throws Exception {
|
|
|
|
|
WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class);
|
|
|
|
|
|
|
|
|
|
// 1. 持久化告警记录
|
|
|
|
|
// 新增:重复告警判断
|
|
|
|
|
if (alertTriggerService.isDuplicateAlert(sensorData.getDeviceId(), "WATER_SUPPLY_ABNORMAL")) {
|
|
|
|
|
log.info("供水机存在未处理告警,跳过重复触发 | 设备ID:{}", sensorData.getDeviceId());
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
Alert alert = new Alert();
|
|
|
|
|
alert.setDeviceId(sensorData.getDeviceId());
|
|
|
|
|
alert.setAlertType("WATER_SUPPLY_ABNORMAL");
|
|
|
|
|
|
|
|
|
|
alert.setAlertLevel(Alert.AlertLevel.error);
|
|
|
|
|
alert.setAreaId(alertTriggerService.getDeviceAreaId(sensorData.getDeviceId()));
|
|
|
|
|
alert.setAlertMessage(String.format(
|
|
|
|
|
"供水机异常 - 设备ID:%s,水压:%.2fMPa,水位:%.2f%%",
|
|
|
|
|
"供水机异常 - 设备ID:%s,水压:%.2fMPa,水位:%.2f%%,水温:%.2f℃",
|
|
|
|
|
sensorData.getDeviceId(),
|
|
|
|
|
sensorData.getWaterPress(),
|
|
|
|
|
sensorData.getWaterLevel()
|
|
|
|
|
sensorData.getWaterLevel(),
|
|
|
|
|
sensorData.getTemperature()
|
|
|
|
|
));
|
|
|
|
|
alert.setStatus(Alert.AlertStatus.pending);
|
|
|
|
|
alert.setTimestamp(sensorData.getTimestamp());
|
|
|
|
|
alert.setCreatedTime(LocalDateTime.now());
|
|
|
|
|
// 完善告警时间戳(确保不为空)
|
|
|
|
|
alert.setTimestamp(sensorData.getTimestamp() != null ? sensorData.getTimestamp() : LocalDateTime.now());
|
|
|
|
|
// 完善创建/更新时间(确保不为空)
|
|
|
|
|
LocalDateTime now = LocalDateTime.now();
|
|
|
|
|
alert.setCreatedTime(alert.getCreatedTime() != null ? alert.getCreatedTime() : now);
|
|
|
|
|
alert.setUpdatedTime(now);
|
|
|
|
|
|
|
|
|
|
alertRepo.save(alert);
|
|
|
|
|
log.warn("供水机告警记录持久化成功 | 告警ID:{} | 设备ID:{}", alert.getAlertId(), sensorData.getDeviceId());
|
|
|
|
|
|
|
|
|
|
// 2. 同时持久化状态数据
|
|
|
|
|
// 同时持久化状态数据
|
|
|
|
|
handleWaterSupplyState(payload);
|
|
|
|
|
}
|
|
|
|
|
}
|