修改告警地区id关联以及数据发送定时 #154

Merged
p95fco63j merged 2 commits from junmao_branch into develop 2 weeks ago

@ -12,7 +12,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.campus.water.service.AlertPushService;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@ -200,7 +200,7 @@ public class AlertTriggerService {
*
* pending/processing
*/
private boolean isDuplicateAlert(String deviceId, String alertType) {
public boolean isDuplicateAlert(String deviceId, String alertType) {
LocalDateTime before = LocalDateTime.now().minusMinutes(ALERT_DUPLICATE_INTERVAL);
// 检查未处理的告警状态pending/processing
List<Alert.AlertStatus> activeStatus = Arrays.asList(
@ -216,13 +216,23 @@ public class AlertTriggerService {
}
/**
* ID
* ID
*/
private String getDeviceAreaId(String deviceId) {
Optional<Device> deviceOpt = deviceRepository.findById(deviceId);
return deviceOpt.map(Device::getAreaId).orElse("unknown");
public String getDeviceAreaId(String deviceId) {
try {
Optional<Device> deviceOpt = deviceRepository.findById(deviceId);
String areaId = deviceOpt.map(Device::getAreaId).orElse(null);
// 空值兜底null/空字符串→unknown
if (areaId == null || areaId.trim().isEmpty()) {
areaId = "unknown";
log.warn("设备{}的area_id为空/设备不存在兜底为unknown", deviceId);
}
return areaId;
} catch (Exception e) { // 捕获所有数据库异常
log.error("获取设备{}的area_id失败数据库异常", deviceId, e);
return "unknown"; // 异常时兜底
}
}
/**
* IDWO++
*/

@ -1,5 +1,5 @@
package com.campus.water.service;
import com.campus.water.service.AlertTriggerService;
import com.campus.water.config.MqttConfig;
import com.campus.water.entity.Alert;
import com.campus.water.entity.WaterMakerRealtimeData;
@ -17,7 +17,6 @@ import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannel
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import com.campus.water.service.AlertTriggerService;
import jakarta.annotation.PostConstruct;
import java.math.BigDecimal;
import java.time.LocalDateTime;
@ -26,25 +25,19 @@ import java.time.LocalDateTime;
@RequiredArgsConstructor
@Slf4j
public class MqttSensorReceiver {
// JPA Repository数据持久化接口Spring自动注入实现
private final WaterMakerRealtimeDataRepository waterMakerRepo;
private final WaterSupplyRealtimeDataRepository waterSupplyRepo;
private final AlertRepository alertRepo;
private final ObjectMapper objectMapper;
private final MqttPahoMessageDrivenChannelAdapter mqttAdapter;
// 新增告警触发服务依赖
private final AlertTriggerService alertTriggerService;
/**
* MQTT
* +ID
*/
@PostConstruct
public void initMqttSubscription() {
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+"); // 制水机状态(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+"); // 制水机告警(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+"); // 供水机状态(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+"); // 供水机告警(所有设备)
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+");
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+");
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+");
mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+");
log.info("MQTT订阅初始化完成 | 订阅主题:{}+、{}+、{}+、{}+",
MqttConfig.TOPIC_WATER_MAKER_STATE,
MqttConfig.TOPIC_WATER_MAKER_WARN,
@ -52,25 +45,19 @@ public class MqttSensorReceiver {
MqttConfig.TOPIC_WATER_SUPPLIER_WARN);
}
/**
* MQTT
* @param payload JSON
* @param topic
*/
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMqttMessage(String payload, @Header(MqttHeaders.RECEIVED_TOPIC) String topic) {
log.info("MQTT消息接收成功 | 主题:{} | 内容:{}", topic, payload);
try {
// 根据主题分类处理
if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_STATE)) {
handleWaterMakerState(payload); // 制水机状态数据
handleWaterMakerState(payload);
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_WARN)) {
handleWaterMakerWarning(payload); // 制水机告警数据
handleWaterMakerWarning(payload);
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_STATE)) {
handleWaterSupplyState(payload); // 供水机状态数据
handleWaterSupplyState(payload);
} else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_WARN)) {
handleWaterSupplyWarning(payload); // 供水机告警数据
handleWaterSupplyWarning(payload); // 新增:处理供水机告警主题
} else {
log.warn("MQTT消息主题未匹配 | 未知主题:{} | 内容:{}", topic, payload);
}
@ -79,17 +66,11 @@ public class MqttSensorReceiver {
}
}
/**
* JPA
*/
private void handleWaterMakerState(String payload) throws Exception {
// 1. JSON反序列化为模型对象
WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class);
// 2. 模型对象转换为JPA实体持久化到数据库
WaterMakerRealtimeData entity = new WaterMakerRealtimeData();
entity.setDeviceId(sensorData.getDeviceId());
// Double转BigDecimal处理包含null值判断
entity.setTdsValue1(sensorData.getTdsValue1() != null ? BigDecimal.valueOf(sensorData.getTdsValue1()) : null);
entity.setTdsValue2(sensorData.getTdsValue2() != null ? BigDecimal.valueOf(sensorData.getTdsValue2()) : null);
entity.setTdsValue3(sensorData.getTdsValue3() != null ? BigDecimal.valueOf(sensorData.getTdsValue3()) : null);
@ -97,31 +78,31 @@ public class MqttSensorReceiver {
entity.setWaterFlow2(sensorData.getWaterFlow2() != null ? BigDecimal.valueOf(sensorData.getWaterFlow2()) : null);
entity.setWaterPress(sensorData.getWaterPress() != null ? BigDecimal.valueOf(sensorData.getWaterPress()) : null);
entity.setFilterLife(sensorData.getFilterLife());
entity.setLeakage(sensorData.getLeakage() ? true : false); // 数据库存储true=漏水false=正常
entity.setLeakage(sensorData.getLeakage() ? true : false);
entity.setWaterQuality(sensorData.getWaterQuality());
entity.setStatus(WaterMakerRealtimeData.DeviceStatus.valueOf(sensorData.getStatus()));
entity.setRecordTime(sensorData.getRecordTime());
entity.setCreatedTime(LocalDateTime.now());
// 3. 持久化到数据库JPA save() 自动实现CRUD
waterMakerRepo.save(entity);
log.info("制水机状态数据持久化成功 | 设备ID{}", sensorData.getDeviceId());
// 新增:调用告警检查逻辑
alertTriggerService.checkWaterMakerAbnormal(sensorData);
}
/**
* +
*/
private void handleWaterMakerWarning(String payload) throws Exception {
WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class);
// 1. 持久化告警记录
// 新增:重复告警判断
if (alertTriggerService.isDuplicateAlert(sensorData.getDeviceId(), "WATER_MAKER_ABNORMAL")) {
log.info("制水机存在未处理告警,跳过重复触发 | 设备ID{}", sensorData.getDeviceId());
return;
}
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setAlertType("WATER_MAKER_ABNORMAL"); // 告警类型(枚举规范)
alert.setAlertLevel(Alert.AlertLevel.critical); // 告警级别(严重)
alert.setAlertType("WATER_MAKER_ABNORMAL");
alert.setAlertLevel(Alert.AlertLevel.critical);
alert.setAreaId(alertTriggerService.getDeviceAreaId(sensorData.getDeviceId()));
alert.setAlertMessage(String.format(
"制水机异常 - 设备ID%sTDS值%.2f,滤芯寿命:%d%%,漏水状态:%s",
sensorData.getDeviceId(),
@ -129,26 +110,23 @@ public class MqttSensorReceiver {
sensorData.getFilterLife(),
sensorData.getLeakage() ? "是" : "否"
));
alert.setStatus(Alert.AlertStatus.pending); // 告警状态(未处理)
alert.setTimestamp(sensorData.getRecordTime());
alert.setCreatedTime(LocalDateTime.now());
alert.setStatus(Alert.AlertStatus.pending);
alert.setTimestamp(sensorData.getRecordTime() != null ? sensorData.getRecordTime() : LocalDateTime.now());
LocalDateTime now = LocalDateTime.now();
alert.setCreatedTime(alert.getCreatedTime() != null ? alert.getCreatedTime() : now);
alert.setUpdatedTime(now);
alertRepo.save(alert);
log.warn("制水机告警记录持久化成功 | 告警ID{} | 设备ID{}", alert.getAlertId(), sensorData.getDeviceId());
// 2. 同时持久化状态数据(便于后续追溯)
handleWaterMakerState(payload);
}
/**
* JPA
*/
private void handleWaterSupplyState(String payload) throws Exception {
WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class);
WaterSupplyRealtimeData entity = new WaterSupplyRealtimeData();
entity.setDeviceId(sensorData.getDeviceId());
// Double转BigDecimal处理包含null值判断
entity.setWaterFlow(sensorData.getWaterFlow() != null ? BigDecimal.valueOf(sensorData.getWaterFlow()) : null);
entity.setWaterPress(sensorData.getWaterPress() != null ? BigDecimal.valueOf(sensorData.getWaterPress()) : null);
entity.setWaterLevel(sensorData.getWaterLevel() != null ? BigDecimal.valueOf(sensorData.getWaterLevel()) : null);
@ -156,39 +134,47 @@ public class MqttSensorReceiver {
entity.setStatus(WaterSupplyRealtimeData.DeviceStatus.valueOf(sensorData.getStatus()));
entity.setTimestamp(sensorData.getTimestamp());
waterSupplyRepo.save(entity);
log.info("供水机状态数据持久化成功 | 设备ID{}", sensorData.getDeviceId());
// 新增:调用告警检查逻辑
alertTriggerService.checkWaterSupplyAbnormal(sensorData);
}
/**
* +
*
*/
private void handleWaterSupplyWarning(String payload) throws Exception {
WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class);
// 1. 持久化告警记录
// 新增:重复告警判断
if (alertTriggerService.isDuplicateAlert(sensorData.getDeviceId(), "WATER_SUPPLY_ABNORMAL")) {
log.info("供水机存在未处理告警,跳过重复触发 | 设备ID{}", sensorData.getDeviceId());
return;
}
Alert alert = new Alert();
alert.setDeviceId(sensorData.getDeviceId());
alert.setAlertType("WATER_SUPPLY_ABNORMAL");
alert.setAlertLevel(Alert.AlertLevel.error);
alert.setAreaId(alertTriggerService.getDeviceAreaId(sensorData.getDeviceId()));
alert.setAlertMessage(String.format(
"供水机异常 - 设备ID%s水压%.2fMPa,水位:%.2f%%",
"供水机异常 - 设备ID%s水压%.2fMPa,水位:%.2f%%,水温:%.2f℃",
sensorData.getDeviceId(),
sensorData.getWaterPress(),
sensorData.getWaterLevel()
sensorData.getWaterLevel(),
sensorData.getTemperature()
));
alert.setStatus(Alert.AlertStatus.pending);
alert.setTimestamp(sensorData.getTimestamp());
alert.setCreatedTime(LocalDateTime.now());
// 完善告警时间戳(确保不为空)
alert.setTimestamp(sensorData.getTimestamp() != null ? sensorData.getTimestamp() : LocalDateTime.now());
// 完善创建/更新时间(确保不为空)
LocalDateTime now = LocalDateTime.now();
alert.setCreatedTime(alert.getCreatedTime() != null ? alert.getCreatedTime() : now);
alert.setUpdatedTime(now);
alertRepo.save(alert);
log.warn("供水机告警记录持久化成功 | 告警ID{} | 设备ID{}", alert.getAlertId(), sensorData.getDeviceId());
// 2. 同时持久化状态数据
// 同时持久化状态数据
handleWaterSupplyState(payload);
}
}

@ -115,6 +115,32 @@ public class MqttSensorSender {
}
}
/**
*
* @param deviceId IDWS001
*/
public void sendWaterSupplyWarning(String deviceId) {
try {
// 1. 构建供水机异常数据(超出正常范围)
WaterSupplySensorData data = new WaterSupplySensorData();
data.setDeviceId(deviceId);
data.setWaterFlow(0.1 + random.nextDouble() * 0.2); // 流量极低0.1-0.3 L/min
data.setWaterPress(0.01 + random.nextDouble() * 0.09); // 水压过低0.01-0.1 MPa
data.setWaterLevel(5 + random.nextDouble() * 15); // 水位过低5-20%
data.setTemperature(25 + random.nextDouble() * 5); // 水温过高25-30℃
data.setStatus("error");
data.setTimestamp(LocalDateTime.now());
// 2. 序列化+发送
String payload = objectMapper.writeValueAsString(data);
String topic = MqttConfig.TOPIC_WATER_SUPPLIER_WARN + deviceId;
sendMessage(topic, payload);
log.warn("供水机告警消息发送成功 | 设备ID{} | 主题:{} | 数据:{}", deviceId, topic, payload);
} catch (JsonProcessingException e) {
log.error("供水机告警消息发送失败 | 设备ID{} | 异常:{}", deviceId, e.getMessage());
}
}
/**
* MQTT
* @param topic

@ -9,6 +9,7 @@ import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
@Component
@ -17,36 +18,37 @@ import java.util.stream.Collectors;
public class SensorSimulationTask {
private final MqttSensorSender mqttSensorSender;
private final DeviceRepository deviceRepository;
private final Random random = new Random(); // 新增随机数生成器
/**
* 30
* 30
*/
@Scheduled(fixedRate = 30000)
@Scheduled(fixedRate = 1800000) // 30分钟 = 30*60*1000 = 1800000毫秒
public void sendRegularStateData() {
log.info("=== 开始发送设备正常状态数据 ===");
// 1. 从数据库查询制水机water_maker类型设备(传入枚举类型)
// 1. 查询制水机设备
List<String> waterMakerDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_maker)
.stream()
.map(Device::getDeviceId)
.collect(Collectors.toList());
// 2. 从数据库查询「供水机」water_supply类型设备传入枚举类型
List<String> waterSupplyDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_supply)
.stream()
.map(Device::getDeviceId)
.collect(Collectors.toList());
// 发送制水机状态
// 2. 分批次发送制水机数据每批间隔1-3秒
if (!waterMakerDevices.isEmpty()) {
waterMakerDevices.forEach(deviceId -> mqttSensorSender.sendWaterMakerState(deviceId));
sendInBatches(waterMakerDevices, true);
} else {
log.warn("⚠️ device表中无「water_maker」类型设备");
}
// 发送供水机状态
// 3. 查询供水机设备
List<String> waterSupplyDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_supply)
.stream()
.map(Device::getDeviceId)
.collect(Collectors.toList());
// 4. 分批次发送供水机数据每批间隔1-3秒
if (!waterSupplyDevices.isEmpty()) {
waterSupplyDevices.forEach(deviceId -> mqttSensorSender.sendWaterSupplyData(deviceId));
sendInBatches(waterSupplyDevices, false);
} else {
log.warn("⚠️ device表中无「water_supply」类型设备");
}
@ -55,28 +57,72 @@ public class SensorSimulationTask {
}
/**
* 5
*
* @param deviceIds ID
* @param isWaterMaker
*/
private void sendInBatches(List<String> deviceIds, boolean isWaterMaker) {
int batchSize = 3; // 每批发送3台设备
int delayBetweenBatches = 1000 + random.nextInt(2000); // 1-3秒随机间隔
for (int i = 0; i < deviceIds.size(); i++) {
String deviceId = deviceIds.get(i);
// 发送当前设备数据
if (isWaterMaker) {
mqttSensorSender.sendWaterMakerState(deviceId);
} else {
mqttSensorSender.sendWaterSupplyData(deviceId);
}
// 每发送batchSize台设备后休眠指定时间最后一批除外
if ((i + 1) % batchSize == 0 && i != deviceIds.size() - 1) {
try {
Thread.sleep(delayBetweenBatches);
} catch (InterruptedException e) {
log.error("批次发送休眠被中断", e);
Thread.currentThread().interrupt(); // 恢复中断状态
}
}
}
}
/**
* 5
*/
@Scheduled(fixedRate = 300000)
public void sendRandomWarningData() {
log.info("=== 开始发送随机告警数据 ===");
// 从数据库查询制水机列表(传入枚举类型)
// 处理制水机告警
List<String> waterMakerDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_maker)
.stream()
.map(Device::getDeviceId)
.collect(Collectors.toList());
if (waterMakerDevices.isEmpty()) {
log.warn("⚠️ device表中无「water_maker」类型设备跳过告警发送");
return;
if (!waterMakerDevices.isEmpty()) {
int randomMakerIndex = random.nextInt(waterMakerDevices.size());
String targetMakerDevice = waterMakerDevices.get(randomMakerIndex);
mqttSensorSender.sendWaterMakerWarning(targetMakerDevice);
log.info("制水机告警发送完成 | 告警设备:{}", targetMakerDevice);
} else {
log.warn("⚠️ device表中无「water_maker」类型设备跳过制水机告警发送");
}
// 随机选择1台制水机发送告警
int randomIndex = (int) (Math.random() * waterMakerDevices.size());
String targetDevice = waterMakerDevices.get(randomIndex);
mqttSensorSender.sendWaterMakerWarning(targetDevice);
// 新增:处理供水机告警
List<String> waterSupplyDevices = deviceRepository.findByDeviceType(Device.DeviceType.water_supply)
.stream()
.map(Device::getDeviceId)
.collect(Collectors.toList());
if (!waterSupplyDevices.isEmpty()) {
int randomSupplyIndex = random.nextInt(waterSupplyDevices.size());
String targetSupplyDevice = waterSupplyDevices.get(randomSupplyIndex);
mqttSensorSender.sendWaterSupplyWarning(targetSupplyDevice); // 调用新增的供水机告警发送方法
log.info("供水机告警发送完成 | 告警设备:{}", targetSupplyDevice);
} else {
log.warn("⚠️ device表中无「water_supply」类型设备跳过供水机告警发送");
}
log.info("=== 随机告警数据发送完成 | 告警设备:{} ===", targetDevice);
log.info("=== 随机告警数据发送完成 ===");
}
}
Loading…
Cancel
Save