diff --git a/doc/project/03-计划文档/~$开发计划第一稿.docx b/doc/project/03-计划文档/~$开发计划第一稿.docx new file mode 100644 index 0000000..fcc4ec6 Binary files /dev/null and b/doc/project/03-计划文档/~$开发计划第一稿.docx differ diff --git a/src/com/campus/water/ CampusWaterApplication.java b/src/com/campus/water/ CampusWaterApplication.java deleted file mode 100644 index e69de29..0000000 diff --git a/src/main/java/com/campus/water/ CampusWaterApplication.java b/src/main/java/com/campus/water/ CampusWaterApplication.java new file mode 100644 index 0000000..375cd42 --- /dev/null +++ b/src/main/java/com/campus/water/ CampusWaterApplication.java @@ -0,0 +1,23 @@ +package com.campus.water; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.integration.config.EnableIntegration; +import org.springframework.integration.config.EnableIntegrationManagement; +import org.springframework.scheduling.annotation.EnableScheduling; + +/** + * Spring Boot 主启动类 + * 核心注解:开启自动配置、定时任务、Spring Integration + */ +@SpringBootApplication(scanBasePackages = "com.campus.water") // 扫描所有业务组件 +@EnableScheduling // 开启定时任务(支持@Scheduled) +@EnableIntegration // 开启Spring Integration(支持MQTT集成) +@EnableIntegrationManagement // 开启Integration管理(监控消息流转) +public class CampusWaterApplication { + public static void main(String[] args) { + SpringApplication.run(CampusWaterApplication.class, args); + System.out.println("=== 校园直饮矿化水系统(Spring Boot版)启动成功 ==="); + System.out.println("=== MQTT传感器模拟、数据接收、持久化功能已启用 ==="); + } +} \ No newline at end of file diff --git a/src/main/java/com/campus/water/config/MqttConfig.java b/src/main/java/com/campus/water/config/MqttConfig.java new file mode 100644 index 0000000..cac67a8 --- /dev/null +++ b/src/main/java/com/campus/water/config/MqttConfig.java @@ -0,0 +1,80 @@ +package com.campus.water.config; + +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; +import org.springframework.integration.mqtt.core.MqttPahoClientFactory; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.MessageChannel; + +@Configuration +public class MqttConfig { + // MQTT 基础配置(可迁移到 application.yml 中,用 @ConfigurationProperties 绑定) + public static final String BROKER = "tcp://b17be106.ala.cn-hangzhou.emqxsl.cn:8883"; + public static final String USERNAME = "admin"; + public static final String PASSWORD = "12345678"; + public static final int QOS = 1; // 消息质量等级(1=确保送达,不重复) + public static final int CONNECTION_TIMEOUT = 30000; // 连接超时(毫秒) + public static final int KEEP_ALIVE_INTERVAL = 60; // 心跳间隔(秒) + + // MQTT 主题定义(按设备类型+功能分层) + public static final String TOPIC_WATER_MAKER_STATE = "/device/state/water_maker/"; + public static final String TOPIC_WATER_MAKER_WARN = "/device/warn/water_maker/"; + public static final String TOPIC_WATER_SUPPLIER_STATE = "/device/state/water_supply/"; + public static final String TOPIC_WATER_SUPPLIER_WARN = "/device/warn/water_supply/"; + + /** + * MQTT 客户端工厂(Spring 管理,统一创建客户端) + */ + @Bean + public MqttPahoClientFactory mqttClientFactory() { + DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); + MqttConnectOptions options = new MqttConnectOptions(); + + // 配置连接参数 + options.setServerURIs(new String[]{BROKER}); + options.setUserName(USERNAME); + options.setPassword(PASSWORD.toCharArray()); + options.setConnectionTimeout(CONNECTION_TIMEOUT / 1000); // 转换为秒 + options.setKeepAliveInterval(KEEP_ALIVE_INTERVAL); + options.setAutomaticReconnect(true); // 断线自动重连 + options.setCleanSession(true); // 断开后清除会话 + + factory.setConnectionOptions(options); + return factory; + } + + /** + * 发送消息通道(DirectChannel:同步发送,确保消息顺序) + */ + @Bean + public MessageChannel mqttOutboundChannel() { + return new DirectChannel(); + } + + /** + * MQTT 消息发送处理器(封装发送逻辑) + */ + @Bean + public MqttPahoMessageHandler mqttOutbound() { + // 客户端ID:前缀+时间戳,避免重复 + String clientId = "sensor-sender-" + System.currentTimeMillis(); + MqttPahoMessageHandler handler = new MqttPahoMessageHandler(clientId, mqttClientFactory()); + + handler.setAsync(true); // 异步发送(不阻塞主线程) + handler.setDefaultQos(QOS); // 默认QOS等级 + handler.setDefaultTopic(TOPIC_WATER_MAKER_STATE); // 默认主题(可在发送时覆盖) + + return handler; + } + + /** + * 接收消息通道(用于接收端转发消息) + */ + @Bean + public MessageChannel mqttInputChannel() { + return new DirectChannel(); + } +} \ No newline at end of file diff --git a/src/main/java/com/campus/water/config/MqttInboundConfig.java b/src/main/java/com/campus/water/config/MqttInboundConfig.java new file mode 100644 index 0000000..62b7381 --- /dev/null +++ b/src/main/java/com/campus/water/config/MqttInboundConfig.java @@ -0,0 +1,35 @@ +package com.campus.water.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; + +@Configuration +public class MqttInboundConfig { + + /** + * MQTT 接收适配器(监听MQTT主题,接收消息并转发到通道) + */ + @Bean + public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttConfig mqttConfig) { + // 接收端客户端ID(与发送端区分) + String clientId = "sensor-receiver-" + System.currentTimeMillis(); + + // 创建适配器:指定客户端ID、工厂、默认订阅主题(可后续动态添加) + MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( + clientId, + mqttConfig.mqttClientFactory() + ); + + // 配置消息转换器(默认UTF-8编码,支持JSON格式) + adapter.setConverter(new DefaultPahoMessageConverter()); + adapter.setQos(mqttConfig.QOS); // 订阅QOS等级与发送端一致 + adapter.setOutputChannel(mqttConfig.mqttInputChannel()); // 消息转发到接收通道 + + // 开启异常重试(避免网络波动导致消息丢失) + adapter.setRecoveryInterval(5000); // 重试间隔5秒 + + return adapter; + } +} \ No newline at end of file diff --git a/src/com/campus/water/config/先读我.md b/src/main/java/com/campus/water/config/先读我.md similarity index 100% rename from src/com/campus/water/config/先读我.md rename to src/main/java/com/campus/water/config/先读我.md diff --git a/src/com/campus/water/controller/app/先读我.md b/src/main/java/com/campus/water/controller/app/先读我.md similarity index 100% rename from src/com/campus/water/controller/app/先读我.md rename to src/main/java/com/campus/water/controller/app/先读我.md diff --git a/src/com/campus/water/controller/common/先读我.md b/src/main/java/com/campus/water/controller/common/先读我.md similarity index 100% rename from src/com/campus/water/controller/common/先读我.md rename to src/main/java/com/campus/water/controller/common/先读我.md diff --git a/src/com/campus/water/controller/web/先读我.md b/src/main/java/com/campus/water/controller/web/先读我.md similarity index 100% rename from src/com/campus/water/controller/web/先读我.md rename to src/main/java/com/campus/water/controller/web/先读我.md diff --git a/src/com/campus/water/entity/Admin.java b/src/main/java/com/campus/water/entity/Admin.java similarity index 100% rename from src/com/campus/water/entity/Admin.java rename to src/main/java/com/campus/water/entity/Admin.java diff --git a/src/com/campus/water/entity/Alert.java b/src/main/java/com/campus/water/entity/Alert.java similarity index 100% rename from src/com/campus/water/entity/Alert.java rename to src/main/java/com/campus/water/entity/Alert.java diff --git a/src/com/campus/water/entity/Area.java b/src/main/java/com/campus/water/entity/Area.java similarity index 100% rename from src/com/campus/water/entity/Area.java rename to src/main/java/com/campus/water/entity/Area.java diff --git a/src/com/campus/water/entity/Device.java b/src/main/java/com/campus/water/entity/Device.java similarity index 100% rename from src/com/campus/water/entity/Device.java rename to src/main/java/com/campus/water/entity/Device.java diff --git a/src/com/campus/water/entity/DeviceTerminalMapping.java b/src/main/java/com/campus/water/entity/DeviceTerminalMapping.java similarity index 100% rename from src/com/campus/water/entity/DeviceTerminalMapping.java rename to src/main/java/com/campus/water/entity/DeviceTerminalMapping.java diff --git a/src/com/campus/water/entity/DrinkRecommendation.java b/src/main/java/com/campus/water/entity/DrinkRecommendation.java similarity index 100% rename from src/com/campus/water/entity/DrinkRecommendation.java rename to src/main/java/com/campus/water/entity/DrinkRecommendation.java diff --git a/src/com/campus/water/entity/DrinkRecord.java b/src/main/java/com/campus/water/entity/DrinkRecord.java similarity index 100% rename from src/com/campus/water/entity/DrinkRecord.java rename to src/main/java/com/campus/water/entity/DrinkRecord.java diff --git a/src/com/campus/water/entity/InspectionRecord.java b/src/main/java/com/campus/water/entity/InspectionRecord.java similarity index 100% rename from src/com/campus/water/entity/InspectionRecord.java rename to src/main/java/com/campus/water/entity/InspectionRecord.java diff --git a/src/com/campus/water/entity/MaintenancePlan.java b/src/main/java/com/campus/water/entity/MaintenancePlan.java similarity index 100% rename from src/com/campus/water/entity/MaintenancePlan.java rename to src/main/java/com/campus/water/entity/MaintenancePlan.java diff --git a/src/com/campus/water/entity/MessagePush.java b/src/main/java/com/campus/water/entity/MessagePush.java similarity index 100% rename from src/com/campus/water/entity/MessagePush.java rename to src/main/java/com/campus/water/entity/MessagePush.java diff --git a/src/com/campus/water/entity/RepairerAuth.java b/src/main/java/com/campus/water/entity/RepairerAuth.java similarity index 100% rename from src/com/campus/water/entity/RepairerAuth.java rename to src/main/java/com/campus/water/entity/RepairerAuth.java diff --git a/src/com/campus/water/entity/Repairman.java b/src/main/java/com/campus/water/entity/Repairman.java similarity index 100% rename from src/com/campus/water/entity/Repairman.java rename to src/main/java/com/campus/water/entity/Repairman.java diff --git a/src/com/campus/water/entity/TerminalUsageStats.java b/src/main/java/com/campus/water/entity/TerminalUsageStats.java similarity index 100% rename from src/com/campus/water/entity/TerminalUsageStats.java rename to src/main/java/com/campus/water/entity/TerminalUsageStats.java diff --git a/src/com/campus/water/entity/User.java b/src/main/java/com/campus/water/entity/User.java similarity index 100% rename from src/com/campus/water/entity/User.java rename to src/main/java/com/campus/water/entity/User.java diff --git a/src/com/campus/water/entity/WaterMakerRealtimeData.java b/src/main/java/com/campus/water/entity/WaterMakerRealtimeData.java similarity index 100% rename from src/com/campus/water/entity/WaterMakerRealtimeData.java rename to src/main/java/com/campus/water/entity/WaterMakerRealtimeData.java diff --git a/src/com/campus/water/entity/WaterQualityHistory.java b/src/main/java/com/campus/water/entity/WaterQualityHistory.java similarity index 100% rename from src/com/campus/water/entity/WaterQualityHistory.java rename to src/main/java/com/campus/water/entity/WaterQualityHistory.java diff --git a/src/com/campus/water/entity/WaterSupplyRealtimeData.java b/src/main/java/com/campus/water/entity/WaterSupplyRealtimeData.java similarity index 100% rename from src/com/campus/water/entity/WaterSupplyRealtimeData.java rename to src/main/java/com/campus/water/entity/WaterSupplyRealtimeData.java diff --git a/src/com/campus/water/entity/WorkOrder.java b/src/main/java/com/campus/water/entity/WorkOrder.java similarity index 100% rename from src/com/campus/water/entity/WorkOrder.java rename to src/main/java/com/campus/water/entity/WorkOrder.java diff --git a/src/com/campus/water/entity/先读我.md b/src/main/java/com/campus/water/entity/先读我.md similarity index 100% rename from src/com/campus/water/entity/先读我.md rename to src/main/java/com/campus/water/entity/先读我.md diff --git a/src/com/campus/water/mapper/AdminRepository.java b/src/main/java/com/campus/water/mapper/AdminRepository.java similarity index 100% rename from src/com/campus/water/mapper/AdminRepository.java rename to src/main/java/com/campus/water/mapper/AdminRepository.java diff --git a/src/com/campus/water/mapper/AlertRepository.java b/src/main/java/com/campus/water/mapper/AlertRepository.java similarity index 100% rename from src/com/campus/water/mapper/AlertRepository.java rename to src/main/java/com/campus/water/mapper/AlertRepository.java diff --git a/src/com/campus/water/mapper/AreaRepository.java b/src/main/java/com/campus/water/mapper/AreaRepository.java similarity index 100% rename from src/com/campus/water/mapper/AreaRepository.java rename to src/main/java/com/campus/water/mapper/AreaRepository.java diff --git a/src/com/campus/water/mapper/DeviceRepository.java b/src/main/java/com/campus/water/mapper/DeviceRepository.java similarity index 100% rename from src/com/campus/water/mapper/DeviceRepository.java rename to src/main/java/com/campus/water/mapper/DeviceRepository.java diff --git a/src/com/campus/water/mapper/DeviceTerminalMappingRepository.java b/src/main/java/com/campus/water/mapper/DeviceTerminalMappingRepository.java similarity index 100% rename from src/com/campus/water/mapper/DeviceTerminalMappingRepository.java rename to src/main/java/com/campus/water/mapper/DeviceTerminalMappingRepository.java diff --git a/src/com/campus/water/mapper/DrinkRecommendationRepository.java b/src/main/java/com/campus/water/mapper/DrinkRecommendationRepository.java similarity index 100% rename from src/com/campus/water/mapper/DrinkRecommendationRepository.java rename to src/main/java/com/campus/water/mapper/DrinkRecommendationRepository.java diff --git a/src/com/campus/water/mapper/DrinkRecordRepository.java b/src/main/java/com/campus/water/mapper/DrinkRecordRepository.java similarity index 100% rename from src/com/campus/water/mapper/DrinkRecordRepository.java rename to src/main/java/com/campus/water/mapper/DrinkRecordRepository.java diff --git a/src/com/campus/water/mapper/InspectionRecordRepository.java b/src/main/java/com/campus/water/mapper/InspectionRecordRepository.java similarity index 100% rename from src/com/campus/water/mapper/InspectionRecordRepository.java rename to src/main/java/com/campus/water/mapper/InspectionRecordRepository.java diff --git a/src/com/campus/water/mapper/MaintenancePlanRepository.java b/src/main/java/com/campus/water/mapper/MaintenancePlanRepository.java similarity index 100% rename from src/com/campus/water/mapper/MaintenancePlanRepository.java rename to src/main/java/com/campus/water/mapper/MaintenancePlanRepository.java diff --git a/src/com/campus/water/mapper/MessagePushRepository.java b/src/main/java/com/campus/water/mapper/MessagePushRepository.java similarity index 100% rename from src/com/campus/water/mapper/MessagePushRepository.java rename to src/main/java/com/campus/water/mapper/MessagePushRepository.java diff --git a/src/com/campus/water/mapper/RepairerAuthRepository.java b/src/main/java/com/campus/water/mapper/RepairerAuthRepository.java similarity index 100% rename from src/com/campus/water/mapper/RepairerAuthRepository.java rename to src/main/java/com/campus/water/mapper/RepairerAuthRepository.java diff --git a/src/com/campus/water/mapper/RepairmanRepository.java b/src/main/java/com/campus/water/mapper/RepairmanRepository.java similarity index 100% rename from src/com/campus/water/mapper/RepairmanRepository.java rename to src/main/java/com/campus/water/mapper/RepairmanRepository.java diff --git a/src/com/campus/water/mapper/TerminalUsageStatsRepository.java b/src/main/java/com/campus/water/mapper/TerminalUsageStatsRepository.java similarity index 100% rename from src/com/campus/water/mapper/TerminalUsageStatsRepository.java rename to src/main/java/com/campus/water/mapper/TerminalUsageStatsRepository.java diff --git a/src/com/campus/water/mapper/UserRepository.java b/src/main/java/com/campus/water/mapper/UserRepository.java similarity index 100% rename from src/com/campus/water/mapper/UserRepository.java rename to src/main/java/com/campus/water/mapper/UserRepository.java diff --git a/src/com/campus/water/mapper/WaterMakerRealtimeDataRepository.java b/src/main/java/com/campus/water/mapper/WaterMakerRealtimeDataRepository.java similarity index 100% rename from src/com/campus/water/mapper/WaterMakerRealtimeDataRepository.java rename to src/main/java/com/campus/water/mapper/WaterMakerRealtimeDataRepository.java diff --git a/src/com/campus/water/mapper/WaterQualityHistoryRepository.java b/src/main/java/com/campus/water/mapper/WaterQualityHistoryRepository.java similarity index 100% rename from src/com/campus/water/mapper/WaterQualityHistoryRepository.java rename to src/main/java/com/campus/water/mapper/WaterQualityHistoryRepository.java diff --git a/src/com/campus/water/mapper/WaterSupplyRealtimeDataRepository.java b/src/main/java/com/campus/water/mapper/WaterSupplyRealtimeDataRepository.java similarity index 100% rename from src/com/campus/water/mapper/WaterSupplyRealtimeDataRepository.java rename to src/main/java/com/campus/water/mapper/WaterSupplyRealtimeDataRepository.java diff --git a/src/com/campus/water/mapper/WorkOrderRepository.java b/src/main/java/com/campus/water/mapper/WorkOrderRepository.java similarity index 100% rename from src/com/campus/water/mapper/WorkOrderRepository.java rename to src/main/java/com/campus/water/mapper/WorkOrderRepository.java diff --git a/src/com/campus/water/mapper/先读我.md b/src/main/java/com/campus/water/mapper/先读我.md similarity index 100% rename from src/com/campus/water/mapper/先读我.md rename to src/main/java/com/campus/water/mapper/先读我.md diff --git a/src/main/java/com/campus/water/model/WaterMakerSensorData.java b/src/main/java/com/campus/water/model/WaterMakerSensorData.java new file mode 100644 index 0000000..aa4c0d1 --- /dev/null +++ b/src/main/java/com/campus/water/model/WaterMakerSensorData.java @@ -0,0 +1,23 @@ +package com.campus.water.model; + +import lombok.Data; +import java.time.LocalDateTime; + +/** + * 制水机传感器数据模型(与MQTT消息格式完全对齐) + * 用于MQTT消息的序列化/反序列化,不直接持久化 + */ +@Data +public class WaterMakerSensorData { + private String deviceId; // 设备唯一ID(如WM001) + private Double tdsValue; // TDS值(水质指标) + private Double waterFlow; // 水流量(L/min) + private Double waterPressure; // 水压(MPa) + private Integer filterLife; // 滤芯寿命(%) + private Boolean leakage; // 是否漏水(true=漏水,false=正常) + private Double temperature; // 水温(℃) + private Double humidity; // 环境湿度(%RH) + private String waterQuality; // 水质等级(合格/不合格) + private String status; // 设备状态(normal=正常,error=异常) + private LocalDateTime timestamp; // 数据采集时间戳 +} \ No newline at end of file diff --git a/src/main/java/com/campus/water/model/WaterSupplySensorData.java b/src/main/java/com/campus/water/model/WaterSupplySensorData.java new file mode 100644 index 0000000..bf8ec47 --- /dev/null +++ b/src/main/java/com/campus/water/model/WaterSupplySensorData.java @@ -0,0 +1,20 @@ +package com.campus.water.model; + +import lombok.Data; +import java.time.LocalDateTime; + +/** + * 供水机传感器数据模型(与MQTT消息格式完全对齐) + * 用于MQTT消息的序列化/反序列化,不直接持久化 + */ +@Data +public class WaterSupplySensorData { + private String deviceId; // 设备唯一ID(如WS001) + private Double waterFlow; // 水流量(L/min) + private Double waterPressure; // 水压(MPa) + private Double waterLevel; // 水位(%) + private Double temperature; // 水温(℃) + private Double humidity; // 环境湿度(%RH) + private String status; // 设备状态(normal=正常,error=异常) + private LocalDateTime timestamp; // 数据采集时间戳 +} \ No newline at end of file diff --git a/src/com/campus/water/security/先读我.md b/src/main/java/com/campus/water/security/先读我.md similarity index 100% rename from src/com/campus/water/security/先读我.md rename to src/main/java/com/campus/water/security/先读我.md diff --git a/src/main/java/com/campus/water/service/MqttSensorReceiver.java b/src/main/java/com/campus/water/service/MqttSensorReceiver.java new file mode 100644 index 0000000..c8af40e --- /dev/null +++ b/src/main/java/com/campus/water/service/MqttSensorReceiver.java @@ -0,0 +1,183 @@ +package com.campus.water.service; + +import com.campus.water.config.MqttConfig; +import com.campus.water.entity.Alert; +import com.campus.water.entity.WaterMakerRealtimeData; +import com.campus.water.entity.WaterSupplyRealtimeData; +import com.campus.water.mapper.AlertRepository; +import com.campus.water.mapper.WaterMakerRealtimeDataRepository; +import com.campus.water.mapper.WaterSupplyRealtimeDataRepository; +import com.campus.water.model.WaterMakerSensorData; +import com.campus.water.model.WaterSupplySensorData; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.annotation.ServiceActivator; +import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; +import org.springframework.integration.mqtt.support.MqttHeaders; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.time.LocalDateTime; + +@Service +@RequiredArgsConstructor +@Slf4j +public class MqttSensorReceiver { + // JPA Repository(数据持久化接口,Spring自动注入实现) + private final WaterMakerRealtimeDataRepository waterMakerRepo; + private final WaterSupplyRealtimeDataRepository waterSupplyRepo; + private final AlertRepository alertRepo; + private final ObjectMapper objectMapper; + private final MqttPahoMessageDrivenChannelAdapter mqttAdapter; + + /** + * 项目启动后初始化:订阅所有需要的MQTT主题 + * 主题后缀用「+」表示通配符(匹配所有设备ID) + */ + @PostConstruct + public void initMqttSubscription() { + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_STATE + "+"); // 制水机状态(所有设备) + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_MAKER_WARN + "+"); // 制水机告警(所有设备) + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_STATE + "+"); // 供水机状态(所有设备) + mqttAdapter.addTopic(MqttConfig.TOPIC_WATER_SUPPLIER_WARN + "+"); // 供水机告警(所有设备) + log.info("MQTT订阅初始化完成 | 订阅主题:{}+、{}+、{}+、{}+", + MqttConfig.TOPIC_WATER_MAKER_STATE, + MqttConfig.TOPIC_WATER_MAKER_WARN, + MqttConfig.TOPIC_WATER_SUPPLIER_STATE, + MqttConfig.TOPIC_WATER_SUPPLIER_WARN); + } + + /** + * 监听MQTT接收通道,处理所有收到的消息 + * @param payload 消息内容(JSON字符串) + * @param topic 接收主题(用于区分消息类型) + */ + @ServiceActivator(inputChannel = "mqttInputChannel") + public void handleMqttMessage(String payload, @Header(MqttHeaders.RECEIVED_TOPIC) String topic) { + log.info("MQTT消息接收成功 | 主题:{} | 内容:{}", topic, payload); + + try { + // 根据主题分类处理 + if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_STATE)) { + handleWaterMakerState(payload); // 制水机状态数据 + } else if (topic.startsWith(MqttConfig.TOPIC_WATER_MAKER_WARN)) { + handleWaterMakerWarning(payload); // 制水机告警数据 + } else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_STATE)) { + handleWaterSupplyState(payload); // 供水机状态数据 + } else if (topic.startsWith(MqttConfig.TOPIC_WATER_SUPPLIER_WARN)) { + handleWaterSupplyWarning(payload); // 供水机告警数据 + } else { + log.warn("MQTT消息主题未匹配 | 未知主题:{} | 内容:{}", topic, payload); + } + } catch (Exception e) { + log.error("MQTT消息处理失败 | 主题:{} | 内容:{} | 异常:{}", topic, payload, e.getMessage()); + } + } + + /** + * 处理制水机状态数据:转换为JPA实体并持久化 + */ + private void handleWaterMakerState(String payload) throws Exception { + // 1. JSON反序列化为模型对象 + WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class); + + // 2. 模型对象转换为JPA实体(持久化到数据库) + WaterMakerRealtimeData entity = new WaterMakerRealtimeData(); + entity.setDeviceId(sensorData.getDeviceId()); + entity.setTdsValue(sensorData.getTdsValue()); + entity.setWaterFlow(sensorData.getWaterFlow()); + entity.setWaterPressure(sensorData.getWaterPressure()); + entity.setFilterLife(sensorData.getFilterLife()); + entity.setLeakage(sensorData.getLeakage() ? 1 : 0); // 数据库存储:1=漏水,0=正常 + entity.setTemperature(sensorData.getTemperature()); + entity.setHumidity(sensorData.getHumidity()); + entity.setWaterQuality(sensorData.getWaterQuality()); + entity.setStatus(WaterMakerRealtimeData.DeviceStatus.valueOf(sensorData.getStatus().toUpperCase())); + entity.setTimestamp(sensorData.getTimestamp()); + entity.setCreatedTime(LocalDateTime.now()); + + // 3. 持久化到数据库(JPA save() 自动实现CRUD) + waterMakerRepo.save(entity); + log.info("制水机状态数据持久化成功 | 设备ID:{}", sensorData.getDeviceId()); + } + + /** + * 处理制水机告警数据:持久化告警记录+状态数据 + */ + private void handleWaterMakerWarning(String payload) throws Exception { + WaterMakerSensorData sensorData = objectMapper.readValue(payload, WaterMakerSensorData.class); + + // 1. 持久化告警记录 + Alert alert = new Alert(); + alert.setDeviceId(sensorData.getDeviceId()); + alert.setAlertType("WATER_MAKER_ABNORMAL"); // 告警类型(枚举规范) + alert.setAlertLevel(Alert.AlertLevel.CRITICAL); // 告警级别(严重) + alert.setAlertMessage(String.format( + "制水机异常 - 设备ID:%s,TDS值:%.2f,滤芯寿命:%d%%,漏水状态:%s", + sensorData.getDeviceId(), + sensorData.getTdsValue(), + sensorData.getFilterLife(), + sensorData.getLeakage() ? "是" : "否" + )); + alert.setStatus(Alert.AlertStatus.PENDING); // 告警状态(未处理) + alert.setTimestamp(sensorData.getTimestamp()); + alert.setCreateTime(LocalDateTime.now()); + + alertRepo.save(alert); + log.warn("制水机告警记录持久化成功 | 告警ID:{} | 设备ID:{}", alert.getId(), sensorData.getDeviceId()); + + // 2. 同时持久化状态数据(便于后续追溯) + handleWaterMakerState(payload); + } + + /** + * 处理供水机状态数据:转换为JPA实体并持久化 + */ + private void handleWaterSupplyState(String payload) throws Exception { + WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class); + + WaterSupplyRealtimeData entity = new WaterSupplyRealtimeData(); + entity.setDeviceId(sensorData.getDeviceId()); + entity.setWaterFlow(sensorData.getWaterFlow()); + entity.setWaterPressure(sensorData.getWaterPressure()); + entity.setWaterLevel(sensorData.getWaterLevel()); + entity.setTemperature(sensorData.getTemperature()); + entity.setHumidity(sensorData.getHumidity()); + entity.setStatus(WaterSupplyRealtimeData.DeviceStatus.valueOf(sensorData.getStatus().toUpperCase())); + entity.setTimestamp(sensorData.getTimestamp()); + entity.setCreatedTime(LocalDateTime.now()); + + waterSupplyRepo.save(entity); + log.info("供水机状态数据持久化成功 | 设备ID:{}", sensorData.getDeviceId()); + } + + /** + * 处理供水机告警数据:持久化告警记录+状态数据 + */ + private void handleWaterSupplyWarning(String payload) throws Exception { + WaterSupplySensorData sensorData = objectMapper.readValue(payload, WaterSupplySensorData.class); + + // 1. 持久化告警记录 + Alert alert = new Alert(); + alert.setDeviceId(sensorData.getDeviceId()); + alert.setAlertType("WATER_SUPPLY_ABNORMAL"); + alert.setAlertLevel(Alert.AlertLevel.ERROR); + alert.setAlertMessage(String.format( + "供水机异常 - 设备ID:%s,水压:%.2fMPa,水位:%.2f%%", + sensorData.getDeviceId(), + sensorData.getWaterPressure(), + sensorData.getWaterLevel() + )); + alert.setStatus(Alert.AlertStatus.PENDING); + alert.setTimestamp(sensorData.getTimestamp()); + alert.setCreateTime(LocalDateTime.now()); + + alertRepo.save(alert); + log.warn("供水机告警记录持久化成功 | 告警ID:{} | 设备ID:{}", alert.getId(), sensorData.getDeviceId()); + + // 2. 同时持久化状态数据 + handleWaterSupplyState(payload); + } +} \ No newline at end of file diff --git a/src/main/java/com/campus/water/service/MqttSensorSender.java b/src/main/java/com/campus/water/service/MqttSensorSender.java new file mode 100644 index 0000000..e553b73 --- /dev/null +++ b/src/main/java/com/campus/water/service/MqttSensorSender.java @@ -0,0 +1,133 @@ +package com.campus.water.service; + +import com.campus.water.config.MqttConfig; +import com.campus.water.model.WaterMakerSensorData; +import com.campus.water.model.WaterSupplySensorData; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageHeaders; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Service; +import org.springframework.util.MimeTypeUtils; + +import java.time.LocalDateTime; +import java.util.Random; + +@Service +@RequiredArgsConstructor +@Slf4j // 日志注解(替代System.out) +public class MqttSensorSender { + private final MqttPahoMessageHandler mqttMessageHandler; + private final ObjectMapper objectMapper; // JSON序列化工具(Spring默认注入) + private final Random random = new Random(); // 生成模拟数据 + + /** + * 模拟制水机发送「正常状态数据」 + * @param deviceId 设备ID(如WM001) + */ + public void sendWaterMakerState(String deviceId) { + try { + // 1. 构建模拟数据(符合正常业务范围) + WaterMakerSensorData data = new WaterMakerSensorData(); + data.setDeviceId(deviceId); + data.setTdsValue(50 + random.nextDouble() * 30); // 50-80(正常范围) + data.setWaterFlow(random.nextDouble() * 2); // 0-2 L/min + data.setWaterPressure(0.2 + random.nextDouble() * 0.3); // 0.2-0.5 MPa + data.setFilterLife(30 + random.nextInt(70)); // 30-100% + data.setLeakage(random.nextBoolean() && random.nextBoolean()); // 漏水概率较低(25%) + data.setTemperature(20 + random.nextDouble() * 5); // 20-25℃ + data.setHumidity(40 + random.nextDouble() * 20); // 40-60%RH + data.setWaterQuality("合格"); + data.setStatus("normal"); + data.setTimestamp(LocalDateTime.now()); + + // 2. 序列化JSON(MQTT消息 payload 为JSON字符串) + String payload = objectMapper.writeValueAsString(data); + // 3. 构建主题(设备ID作为主题后缀,精准订阅) + String topic = MqttConfig.TOPIC_WATER_MAKER_STATE + deviceId; + + // 4. 发送MQTT消息 + sendMessage(topic, payload); + log.info("制水机状态消息发送成功 | 设备ID:{} | 主题:{} | 数据:{}", deviceId, topic, payload); + } catch (JsonProcessingException e) { + log.error("制水机状态消息发送失败 | 设备ID:{} | 异常:{}", deviceId, e.getMessage()); + } + } + + /** + * 模拟制水机发送「告警数据」 + * @param deviceId 设备ID(如WM001) + */ + public void sendWaterMakerWarning(String deviceId) { + try { + // 1. 构建异常数据(超出正常范围) + WaterMakerSensorData data = new WaterMakerSensorData(); + data.setDeviceId(deviceId); + data.setTdsValue(150 + random.nextDouble() * 50); // 150-200(异常范围) + data.setWaterFlow(0.1 + random.nextDouble() * 0.3); // 流量极低 + data.setWaterPressure(0.1 + random.nextDouble() * 0.1); // 水压过低(0.1-0.2 MPa) + data.setFilterLife(5 + random.nextInt(10)); // 滤芯寿命低(5-15%) + data.setLeakage(true); // 强制漏水 + data.setTemperature(28 + random.nextDouble() * 3); // 水温过高(28-31℃) + data.setStatus("error"); + data.setTimestamp(LocalDateTime.now()); + + // 2. 序列化+发送 + String payload = objectMapper.writeValueAsString(data); + String topic = MqttConfig.TOPIC_WATER_MAKER_WARN + deviceId; + sendMessage(topic, payload); + log.warn("制水机告警消息发送成功 | 设备ID:{} | 主题:{} | 数据:{}", deviceId, topic, payload); + } catch (JsonProcessingException e) { + log.error("制水机告警消息发送失败 | 设备ID:{} | 异常:{}", deviceId, e.getMessage()); + } + } + + /** + * 模拟供水机发送「正常状态数据」 + * @param deviceId 设备ID(如WS001) + */ + public void sendWaterSupplyData(String deviceId) { + try { + // 1. 构建模拟数据 + WaterSupplySensorData data = new WaterSupplySensorData(); + data.setDeviceId(deviceId); + data.setWaterFlow(random.nextDouble() * 3); // 0-3 L/min + data.setWaterPressure(0.1 + random.nextDouble() * 0.2); // 0.1-0.3 MPa + data.setWaterLevel(30 + random.nextDouble() * 50); // 30-80% + data.setTemperature(18 + random.nextDouble() * 4); // 18-22℃ + data.setHumidity(35 + random.nextDouble() * 15); // 35-50%RH + data.setStatus("normal"); + data.setTimestamp(LocalDateTime.now()); + + // 2. 序列化+发送 + String payload = objectMapper.writeValueAsString(data); + String topic = MqttConfig.TOPIC_WATER_SUPPLIER_STATE + deviceId; + sendMessage(topic, payload); + log.info("供水机状态消息发送成功 | 设备ID:{} | 主题:{} | 数据:{}", deviceId, topic, payload); + } catch (JsonProcessingException e) { + log.error("供水机状态消息发送失败 | 设备ID:{} | 异常:{}", deviceId, e.getMessage()); + } + } + + /** + * 通用发送方法(封装MQTT消息构建逻辑) + * @param topic 主题 + * @param payload 消息内容(JSON字符串) + */ + private void sendMessage(String topic, String payload) { + // 构建Spring Messaging消息(指定JSON格式、主题、QOS) + Message message = MessageBuilder + .withPayload(payload) + .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON) + .setHeader("mqtt_topic", topic) + .setHeader("mqtt_qos", MqttConfig.QOS) + .build(); + + // 调用处理器发送消息 + mqttMessageHandler.handleMessage(message); + } +} \ No newline at end of file diff --git a/src/com/campus/water/service/先读我.md b/src/main/java/com/campus/water/service/先读我.md similarity index 100% rename from src/com/campus/water/service/先读我.md rename to src/main/java/com/campus/water/service/先读我.md diff --git a/src/main/java/com/campus/water/task/SensorSimulationTask.java b/src/main/java/com/campus/water/task/SensorSimulationTask.java new file mode 100644 index 0000000..4835f8e --- /dev/null +++ b/src/main/java/com/campus/water/task/SensorSimulationTask.java @@ -0,0 +1,68 @@ +package com.campus.water.task; + +import com.campus.water.service.MqttSensorSender; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; + +@Component +@RequiredArgsConstructor +@Slf4j +public class SensorSimulationTask { + private final MqttSensorSender mqttSensorSender; + + // 模拟已部署的设备列表(实际项目可从数据库查询) + private final List waterMakerDevices = new ArrayList<>() {{ + add("WM001"); // 制水机1 + add("WM002"); // 制水机2 + add("WM003"); // 制水机3 + add("WM004"); // 制水机4 + }}; + + private final List waterSupplyDevices = new ArrayList<>() {{ + add("WS001"); // 供水机1 + add("WS002"); // 供水机2 + add("WS003"); // 供水机3 + }}; + + /** + * 定时发送「正常状态数据」:每30秒执行一次 + * fixedRate:固定间隔(从上一次任务开始时间计算) + */ + @Scheduled(fixedRate = 30000) + public void sendRegularStateData() { + log.info("=== 开始发送设备正常状态数据 ==="); + + // 1. 发送所有制水机状态 + for (String deviceId : waterMakerDevices) { + mqttSensorSender.sendWaterMakerState(deviceId); + } + + // 2. 发送所有供水机状态 + for (String deviceId : waterSupplyDevices) { + mqttSensorSender.sendWaterSupplyData(deviceId); + } + + log.info("=== 设备正常状态数据发送完成 ==="); + } + + /** + * 定时发送「随机告警数据」:每5分钟执行一次 + * fixedRate:固定间隔(从上一次任务开始时间计算) + */ + @Scheduled(fixedRate = 300000) + public void sendRandomWarningData() { + log.info("=== 开始发送随机告警数据 ==="); + + // 随机选择1台制水机发送告警(模拟设备故障) + int randomIndex = (int) (Math.random() * waterMakerDevices.size()); + String targetDevice = waterMakerDevices.get(randomIndex); + mqttSensorSender.sendWaterMakerWarning(targetDevice); + + log.info("=== 随机告警数据发送完成 | 告警设备:{} ===", targetDevice); + } +} \ No newline at end of file diff --git a/src/com/campus/water/util/先读我.md b/src/main/java/com/campus/water/util/先读我.md similarity index 100% rename from src/com/campus/water/util/先读我.md rename to src/main/java/com/campus/water/util/先读我.md diff --git a/src/com/campus/water/view/先读我.md b/src/main/java/com/campus/water/view/先读我.md similarity index 100% rename from src/com/campus/water/view/先读我.md rename to src/main/java/com/campus/water/view/先读我.md diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml new file mode 100644 index 0000000..942fb57 --- /dev/null +++ b/src/main/resources/application.yml @@ -0,0 +1,49 @@ +# Spring Boot 全局配置文件 +spring: + # 数据库配置(JPA自动集成HikariCP连接池) + datasource: + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://localhost:3306/campus_water?useSSL=false&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true + username: root # 你的MySQL用户名 + password: 123456 # 你的MySQL密码 + hikari: + maximum-pool-size: 10 # 连接池最大连接数 + idle-timeout: 300000 # 连接空闲超时时间(5分钟) + + # JPA配置(hibernate 自动建表/更新表结构) + jpa: + hibernate: + ddl-auto: update # 开发环境:update(自动更新表结构);生产环境:none + show-sql: true # 控制台打印SQL语句 + properties: + hibernate: + dialect: org.hibernate.dialect.MySQL8Dialect # MySQL 8.0方言 + format_sql: true # SQL格式化(便于调试) + open-in-view: false # 关闭OpenInView模式(避免懒加载异常) + +# 服务器配置 +server: + port: 8080 # 项目启动端口 + servlet: + context-path: /water # 接口前缀(如http://localhost:8080/water) + tomcat: + uri-encoding: UTF-8 # 编码格式(避免中文乱码) + +# 日志配置(SLF4J + Logback) +logging: + level: + root: INFO # 全局日志级别 + com.campus.water: DEBUG # 项目包日志级别(调试用) + org.springframework.integration.mqtt: INFO # MQTT集成日志级别 + file: + name: logs/water-system.log # 日志文件路径(项目根目录/logs) + max-size: 10MB # 单个日志文件最大10MB + max-history: 7 # 日志保留7天 + total-size-cap: 100MB # 日志总大小限制100MB + +# Jackson配置(JSON序列化/反序列化) +spring.jackson: + date-format: yyyy-MM-dd HH:mm:ss # 日期格式化 + time-zone: GMT+8 # 时区(避免时间偏移) + serialization: + fail-on-empty-beans: false # 允许空对象序列化 \ No newline at end of file