mqtt草稿-未完全验证 #6

Merged
p95fco63j merged 1 commits from junmao_branch into develop 3 months ago

@ -0,0 +1,43 @@
package com.campus.water.web.controller;
import com.campus.water.web.entity.SensorDataWebVO;
import com.campus.water.web.service.RealTimeService;
import com.campus.water.web.service.HistoryDataService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* WebVue
*/
@RestController
@RequestMapping("/api/admin/monitor")
public class MonitorController {
@Autowired
private RealTimeService realTimeService;
@Autowired
private HistoryDataService historyDataService;
// 1. 获取单设备实时数据(适配前端设备详情页)
@GetMapping("/realTime/{deviceId}")
public SensorDataWebVO getRealTimeData(@PathVariable String deviceId) {
return realTimeService.getRealTimeData(deviceId);
}
// 2. 获取全量设备实时状态(适配前端监控总览页)
@GetMapping("/realTime/all")
public List<SensorDataWebVO> getAllRealTimeData() {
return realTimeService.getAllRealTimeData();
}
// 3. 获取设备历史数据(适配前端历史曲线,需求中“查看历史水质”)
@GetMapping("/history/{deviceId}")
public List<SensorDataWebVO> getHistoryData(
@PathVariable String deviceId,
@RequestParam String startTime, // 开始时间如2024-05-20 00:00:00
@RequestParam String endTime // 结束时间
) {
return historyDataService.getHistoryData(deviceId, startTime, endTime);
}
}

@ -0,0 +1,42 @@
package com.campus.water.web.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* WebMQTTMQTT
*/
@Configuration
public class WebMqttConfig {
// MQTT服务器转发的主题前缀需求中全量设备数据/告警)
private String mqttBroker = "tcp://mqtt-server.campus.com:1883";
private String clientId = "campus-water-admin-web"; // Web端客户端ID
@Bean
public MqttClient mqttClient() throws Exception {
MqttClient client = new MqttClient(mqttBroker, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setKeepAliveInterval(60);
client.connect(options);
System.out.println("管理Web已连接MQTT服务器" + mqttBroker);
return client;
}
// 订阅的主题(全量设备数据+全量告警,适配管理员权限)
@Bean
public String[] subscribeTopics() {
return new String[]{
"forward/web/device/#", // 全量设备实时数据
"forward/web/alert/#" // 全量告警报文
};
}
@Bean
public int[] qosLevels() {
return new int[]{0, 1}; // 告警QoS=1设备数据QoS=0
}
}

@ -0,0 +1,54 @@
package com.campus.water.web.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.campus.water.web.entity.SensorDataWebVO;
import com.campus.water.web.entity.AlertWebVO;
import com.campus.water.web.service.RealTimeService;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* WebMQTT/
*/
@Component
public class WebMqttSubscriber {
@Autowired
private MqttClient mqttClient;
@Autowired
private String[] subscribeTopics;
@Autowired
private int[] qosLevels;
@Autowired
private RealTimeService realTimeService;
@PostConstruct
public void startSubscribe() throws Exception {
for (int i = 0; i < subscribeTopics.length; i++) {
String topic = subscribeTopics[i];
int qos = qosLevels[i];
mqttClient.subscribe(topic, qos, messageListener());
System.out.println("管理Web已订阅主题" + topic);
}
}
private IMqttMessageListener messageListener() {
return (topic, message) -> {
String payload = new String(message.getPayload());
System.out.println("管理Web接收数据主题=" + topic + ",内容=" + payload);
// 解析数据并缓存到内存(供前端轮询获取实时数据)
if (topic.startsWith("forward/web/device/")) {
SensorDataWebVO sensorVO = JSONObject.parseObject(payload, SensorDataWebVO.class);
realTimeService.updateRealTimeData(sensorVO); // 缓存实时数据
} else if (topic.startsWith("forward/web/alert/")) {
AlertWebVO alertVO = JSONObject.parseObject(payload, AlertWebVO.class);
realTimeService.addAlert(alertVO); // 缓存告警(前端告警列表实时更新)
}
};
}
}

@ -0,0 +1,86 @@
package com.campus.water.mqtt.core;
import com.alibaba.fastjson2.JSONObject;
import com.campus.water.mqtt.entity.SensorData;
import com.campus.water.mqtt.entity.AlertMessage;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* Web/APP/APP
*/
@Component
public class DataForwarder {
@Autowired
private MqttClient mqttClient;
// 1. 转发传感器数据到管理Web全量数据适配管理员监控需求
public void forwardToWeb(SensorData sensorData) throws Exception {
String topic = "forward/web/device/" + sensorData.getDeviceId();
String payload = JSONObject.toJSONString(sensorData);
publish(topic, payload, 0); // Web端QoS=0普通监控数据
}
// 2. 转发告警到管理Web适配管理员查看告警需求
public void forwardToWeb(AlertMessage alert) throws Exception {
String topic = "forward/web/alert/" + alert.getDeviceId();
String payload = JSONObject.toJSONString(alert);
publish(topic, payload, 1); // 告警QoS=1确保管理员看到
}
// 3. 转发制水机水质数据到学生APP仅终端机关联的制水机适配查水质需求
public void forwardToStudentApp(SensorData sensorData) throws Exception {
// 仅制水机数据需转发学生APP查水质
if (sensorData.getDeviceType() != 1) return;
// 按终端机ID转发需求中学生扫码查对应终端机的水质
String terminalId = getRelatedTerminalId(sensorData.getDeviceId());
String topic = "forward/student/terminal/" + terminalId;
String payload = JSONObject.toJSONString(new JSONObject() {{
put("terminalId", terminalId);
put("tdsValue", sensorData.getTdsValue());
put("waterQuality", getWaterQuality(sensorData.getTdsValue())); // 水质等级
put("timestamp", sensorData.getTimestamp());
}});
publish(topic, payload, 0);
}
// 4. 转发供水机水位/告警到维修APP仅本辖区设备适配维修员权限需求
public void forwardToRepairApp(SensorData sensorData) throws Exception {
String areaId = sensorData.getAreaId(); // 设备所属片区如AREA01
String topic = "forward/repair/area/" + areaId + "/device/" + sensorData.getDeviceId();
String payload = JSONObject.toJSONString(sensorData);
publish(topic, payload, 0);
}
public void forwardToRepairApp(AlertMessage alert) throws Exception {
String areaId = alert.getAreaId();
String topic = "forward/repair/area/" + areaId + "/alert/" + alert.getDeviceId();
String payload = JSONObject.toJSONString(alert);
publish(topic, payload, 1); // 告警QoS=1确保维修员抢单
}
// 通用发布方法
private void publish(String topic, String payload, int qos) throws Exception {
MqttMessage message = new MqttMessage(payload.getBytes());
message.setQos(qos);
mqttClient.publish(topic, message);
System.out.println("MQTT服务器转发数据主题=" + topic + ",内容=" + payload);
}
// 工具方法根据制水机ID获取关联的终端机ID需求中终端机关联制水机
private String getRelatedTerminalId(String deviceId) {
// 实际需从MySQL设备表查询此处模拟如制水机WM001关联终端机TERM001/TERM002
return deviceId.equals("WM001") ? "TERM001" : "TERM002";
}
// 工具方法根据TDS值判断水质等级需求中“优质矿化水”标准
private String getWaterQuality(double tds) {
if (tds < 50) return "纯净水(无矿物质)";
else if (tds < 300) return "优质矿化水";
else if (tds < 600) return "合格矿化水";
else return "水质超标(不建议饮用)";
}
}

@ -0,0 +1,53 @@
package com.campus.water.mqtt.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Properties;
/**
* MQTTAir724MQTT Broker
*/
@Configuration
public class MqttConfig {
// 从配置文件读取Broker地址需求中云端Broker如EMQX
private String brokerUrl = "tcp://mqtt-server.campus.com:1883";
private String clientId = "campus-water-mqtt-server"; // 服务器端客户端ID
private int keepAlive = 60; // 心跳间隔适配Air724模块低功耗
@Bean
public MqttClient mqttClient() throws Exception {
// 1. 创建MQTT客户端内存持久化避免服务器存储压力
MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());
// 2. 配置连接参数(确保数据可靠传输,适配需求中告警不丢失)
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false); // 保持会话,重连后恢复订阅
options.setKeepAliveInterval(keepAlive);
options.setAutomaticReconnect(true); // 自动重连,适配网络波动
// 3. 连接Broker
client.connect(options);
System.out.println("MQTT服务器已连接Broker" + brokerUrl);
return client;
}
// 订阅的主题(严格按需求中设备类型划分,便于权限隔离)
@Bean
public String[] subscribeTopics() {
return new String[]{
"sensor/watermaker/#", // 制水机传感器数据TDS/水压/流量需求图3
"sensor/watersupply/#", // 供水机传感器数据(水位/漏水需求图3
"alert/#" // 设备告警报文(滤芯损坏/水位异常,需求告警流程)
};
}
// QoS配置需求中告警用QoS=1确保送达普通传感器数据QoS=0
@Bean
public int[] qosLevels() {
return new int[]{0, 0, 1}; // 告警QoS=1传感器数据QoS=0
}
}

@ -0,0 +1,73 @@
package com.campus.water.mqtt.core;
import com.campus.water.mqtt.service.AlertTriggerService;
import com.campus.water.mqtt.service.SensorDataStorage;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
* MQTT/
*/
@Component
public class MqttSubscriber {
@Autowired
private MqttClient mqttClient;
@Autowired
private String[] subscribeTopics;
@Autowired
private int[] qosLevels;
@Autowired
private DataParser dataParser;
@Autowired
private DataForwarder dataForwarder;
@Autowired
private SensorDataStorage sensorDataStorage;
@Autowired
private AlertTriggerService alertTriggerService;
// 初始化订阅(项目启动后自动执行)
@PostConstruct
public void startSubscribe() throws Exception {
// 为每个主题绑定消息监听器
for (int i = 0; i < subscribeTopics.length; i++) {
String topic = subscribeTopics[i];
int qos = qosLevels[i];
mqttClient.subscribe(topic, qos, messageListener());
System.out.println("MQTT服务器已订阅主题" + topic + "QoS=" + qos + "");
}
}
// 消息监听器(接收数据后解析→存储→转发)
private IMqttMessageListener messageListener() {
return (topic, message) -> {
String payload = new String(message.getPayload());
System.out.println("MQTT服务器接收数据主题=" + topic + ",内容=" + payload);
// 按主题类型处理(严格对齐需求中设备数据类型)
if (topic.startsWith("sensor/watermaker/")) {
// 1. 处理制水机传感器数据TDS/水压/流量)
var sensorData = dataParser.parseWaterMakerData(topic, payload);
sensorDataStorage.saveToInfluxDB(sensorData); // 存入时序库
dataForwarder.forwardToWeb(sensorData); // 转发到管理Web
dataForwarder.forwardToStudentApp(sensorData); // 转发到学生APP水质数据
} else if (topic.startsWith("sensor/watersupply/")) {
// 2. 处理供水机传感器数据(水位)
var sensorData = dataParser.parseWaterSupplyData(topic, payload);
sensorDataStorage.saveToInfluxDB(sensorData); // 存入时序库
dataForwarder.forwardToWeb(sensorData); // 转发到管理Web
dataForwarder.forwardToRepairApp(sensorData); // 转发到维修APP辖区水位
} else if (topic.startsWith("alert/")) {
// 3. 处理告警报文(触发工单)
var alert = dataParser.parseAlertMessage(topic, payload);
alertTriggerService.createWorkOrder(alert); // 生成工单存入MySQL
dataForwarder.forwardToWeb(alert); // 转发到管理Web告警列表
dataForwarder.forwardToRepairApp(alert); // 转发到维修APP抢单
}
};
}
}

@ -0,0 +1,62 @@
package com.campus.water.repair.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.campus.water.repair.config.RepairMqttConfig;
import com.campus.water.repair.entity.AreaDeviceVO;
import com.campus.water.repair.entity.RepairAlertVO;
import com.campus.water.repair.service.AreaDeviceService;
import com.campus.water.repair.service.WorkOrderService;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* APPMQTT//
*/
@Component
public class RepairMqttSubscriber {
@Autowired
private RepairMqttConfig mqttConfig;
@Autowired
private AreaDeviceService areaDeviceService;
@Autowired
private WorkOrderService workOrderService;
// 维修员登录后订阅本辖区数据(先查询维修员所属片区)
public void subscribeAreaData(String repairmanId, String areaId) throws Exception {
MqttClient client = mqttConfig.mqttClient(repairmanId);
// 1. 订阅本辖区设备数据(供水机水位/制水机状态)
String deviceTopic = mqttConfig.getDeviceSubscribeTopic(areaId);
client.subscribe(deviceTopic, 0, deviceMessageListener(areaId));
// 2. 订阅本辖区告警报文(触发抢单)
String alertTopic = mqttConfig.getAlertSubscribeTopic(areaId);
client.subscribe(alertTopic, 1, alertMessageListener(areaId));
System.out.println("维修APPID" + repairmanId + ")已订阅片区" + areaId + "数据");
}
// 设备数据监听器(缓存辖区设备状态,供巡检使用)
private IMqttMessageListener deviceMessageListener(String areaId) {
return (topic, message) -> {
String payload = new String(message.getPayload());
AreaDeviceVO deviceVO = JSONObject.parseObject(payload, AreaDeviceVO.class);
areaDeviceService.updateAreaDeviceStatus(areaId, deviceVO);
System.out.println("维修APP接收片区" + areaId + "设备数据:" + payload);
};
}
// 告警监听器(新告警触发工单,推送抢单提醒)
private IMqttMessageListener alertMessageListener(String areaId) {
return (topic, message) -> {
String payload = new String(message.getPayload());
RepairAlertVO alertVO = JSONObject.parseObject(payload, RepairAlertVO.class);
// 触发工单生成存入MySQL并推送抢单提醒
workOrderService.createWorkOrderFromAlert(alertVO);
System.out.println("维修APP接收片区" + areaId + "告警:" + payload + "(已生成工单)");
};
}
}

@ -0,0 +1,43 @@
package com.campus.water.repair.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* APPMQTT/
*/
@Configuration
public class RepairMqttConfig {
private String mqttBroker = "tcp://mqtt-server.campus.com:1883";
private String clientIdPrefix = "campus-water-repair-"; // 客户端ID前缀拼接维修员ID
// 动态生成客户端ID按维修员ID区分
@Bean
public String clientId(String repairmanId) {
return clientIdPrefix + repairmanId;
}
@Bean
public MqttClient mqttClient(String repairmanId) throws Exception {
String clientId = clientId(repairmanId);
MqttClient client = new MqttClient(mqttBroker, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false); // 重连后恢复订阅,避免错过工单
options.setKeepAliveInterval(60);
client.connect(options);
System.out.println("维修APPID" + repairmanId + "已连接MQTT服务器");
return client;
}
// 订阅主题(仅本辖区设备数据+告警,适配“维修员仅查看本辖区”需求)
public String getDeviceSubscribeTopic(String areaId) {
return "forward/repair/area/" + areaId + "/device/#";
}
public String getAlertSubscribeTopic(String areaId) {
return "forward/repair/area/" + areaId + "/alert/#";
}
}

@ -0,0 +1,53 @@
package com.campus.water.repair.controller;
import com.campus.water.repair.entity.WorkOrderVO;
import com.campus.water.repair.service.WorkOrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* APP/
*/
@RestController
@RequestMapping("/api/repair/workOrder")
public class WorkOrderController {
@Autowired
private WorkOrderService workOrderService;
// 1. 获取本辖区待抢单工单(需求“抢单”功能)
@GetMapping("/pending/{repairmanId}")
public List<WorkOrderVO> getPendingWorkOrder(@PathVariable String repairmanId) {
String areaId = workOrderService.getRepairmanArea(repairmanId); // 获取维修员所属片区
return workOrderService.getPendingWorkOrderByArea(areaId);
}
// 2. 抢单(需求核心功能,需校验辖区权限)
@PostMapping("/grab")
public String grabWorkOrder(
@RequestParam String repairmanId,
@RequestParam String orderId
) {
return workOrderService.grabWorkOrder(repairmanId, orderId) ?
"抢单成功,请及时处理" : "工单已被抢,或您无权限抢此工单";
}
// 3. 提交工单处理结果(需求“处理工单并提交”功能)
@PostMapping("/complete")
public String completeWorkOrder(
@RequestParam String orderId,
@RequestParam String repairmanId,
@RequestParam String dealNote, // 处理备注(如“已更换滤芯”)
@RequestParam String imgUrl // 现场照片URL可选
) {
workOrderService.completeWorkOrder(orderId, repairmanId, dealNote, imgUrl);
return "工单处理完成,已提交审核";
}
// 4. 查看个人处理中的工单(需求“跟踪工单状态”功能)
@GetMapping("/processing/{repairmanId}")
public List<WorkOrderVO> getProcessingWorkOrder(@PathVariable String repairmanId) {
return workOrderService.getProcessingWorkOrderByRepairman(repairmanId);
}
}

@ -0,0 +1,39 @@
package com.campus.water.student.config;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* APPMQTT
*/
@Configuration
public class StudentMqttConfig {
private String mqttBroker = "tcp://mqtt-server.campus.com:1883";
private String clientIdPrefix = "campus-water-student-"; // 客户端ID前缀拼接学生ID
// 动态生成客户端ID按学生ID区分避免冲突
@Bean
public String clientId(String studentId) {
return clientIdPrefix + studentId;
}
@Bean
public MqttClient mqttClient(String studentId) throws Exception {
String clientId = clientId(studentId);
MqttClient client = new MqttClient(mqttBroker, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true); // 学生APP登录后重新订阅无需保留会话
options.setKeepAliveInterval(120); // 移动端心跳间隔 longer适配电池续航
client.connect(options);
System.out.println("学生APPID" + studentId + "已连接MQTT服务器");
return client;
}
// 订阅主题(仅终端机水质数据,适配学生“扫码查水质”需求)
public String getSubscribeTopic(String terminalId) {
return "forward/student/terminal/" + terminalId;
}
}

@ -0,0 +1,49 @@
package com.campus.water.student.controller;
import com.campus.water.student.entity.DrinkRecordVO;
import com.campus.water.student.service.DrinkRecordService;
import com.campus.water.student.service.WaterQualityService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* APP
*/
@RestController
@RequestMapping("/api/student/drink")
public class DrinkController {
@Autowired
private DrinkRecordService drinkRecordService;
@Autowired
private WaterQualityService waterQualityService;
// 1. 扫码用水(生成饮水记录,需求核心功能)
@PostMapping("/scan")
public String scanDrink(
@RequestParam String studentId,
@RequestParam String terminalId,
@RequestParam double volume // 饮水量(终端机流量传感器获取或按时间估算)
) {
// 获取当前终端机水质(用于记录)
String waterQuality = waterQualityService.getWaterQuality(terminalId);
// 生成饮水记录存入MySQL
drinkRecordService.createDrinkRecord(studentId, terminalId, volume, waterQuality);
return "扫码用水成功,饮水量:" + volume + "L水质" + waterQuality;
}
// 2. 查询今日饮水量(需求“查看每日饮水量”功能)
@GetMapping("/today/{studentId}")
public DrinkRecordVO getTodayDrink(@PathVariable String studentId) {
return drinkRecordService.getTodayDrinkRecord(studentId);
}
// 3. 查询历史饮水量(按日/周/月筛选)
@GetMapping("/history/{studentId}")
public List<DrinkRecordVO> getHistoryDrink(
@PathVariable String studentId,
@RequestParam String type, // type: day/week/month
@RequestParam String date // 日期如2024-05-20
) {
return drinkRecordService.getHistoryDrinkRecord(studentId, type, date);
}
}

@ -0,0 +1,45 @@
package com.campus.water.student.mqtt;
import com.alibaba.fastjson2.JSONObject;
import com.campus.water.student.config.StudentMqttConfig;
import com.campus.water.student.entity.WaterQualityVO;
import com.campus.water.student.service.WaterQualityService;
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* APPMQTT
*/
@Component
public class StudentMqttSubscriber {
@Autowired
private StudentMqttConfig mqttConfig;
@Autowired
private WaterQualityService waterQualityService;
// 学生扫码后订阅对应终端机的水质数据
public void subscribeTerminalWaterQuality(String studentId, String terminalId) throws Exception {
// 1. 创建MQTT客户端按学生ID区分
MqttClient client = mqttConfig.mqttClient(studentId);
// 2. 订阅该终端机的水质主题
String topic = mqttConfig.getSubscribeTopic(terminalId);
client.subscribe(topic, 0, messageListener(terminalId));
System.out.println("学生APPID" + studentId + ")已订阅终端机" + terminalId + "水质数据");
}
// 消息监听器缓存水质数据供APP前端展示
private IMqttMessageListener messageListener(String terminalId) {
return (topic, message) -> {
String payload = new String(message.getPayload());
System.out.println("学生APP接收终端机" + terminalId + "水质数据:" + payload);
// 解析水质数据并缓存
WaterQualityVO qualityVO = JSONObject.parseObject(payload, WaterQualityVO.class);
waterQualityService.updateWaterQuality(terminalId, qualityVO);
};
}
}
Loading…
Cancel
Save