Browse Source

流式接口

master
白凤吉 5 months ago
parent
commit
ee18077d5b
  1. 32
      src/main/java/com/qyft/ms/app/common/command/CommandFuture.java
  2. 39
      src/main/java/com/qyft/ms/app/common/command/CommandWaitControl.java
  3. 32
      src/main/java/com/qyft/ms/app/common/command/CurrentSendCmdMapInstance.java
  4. 39
      src/main/java/com/qyft/ms/app/common/command/CyclicNumberGenerator.java
  5. 25
      src/main/java/com/qyft/ms/app/common/command/FrontCommandAck.java
  6. 109
      src/main/java/com/qyft/ms/app/common/command/FrontCommandGenerator.java
  7. 12
      src/main/java/com/qyft/ms/app/common/constant/CommandStatus.java
  8. 190
      src/main/java/com/qyft/ms/app/controller/FrontCmdController.java
  9. 30
      src/main/java/com/qyft/ms/app/model/bo/CMDToDevice.java
  10. 24
      src/main/java/com/qyft/ms/app/model/form/CMDFormV2.java
  11. 60
      src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java
  12. 25
      src/main/java/com/qyft/ms/device/service/DeviceTcpCMDServiceV2.java
  13. 2
      src/main/java/com/qyft/ms/system/config/WebConfig.java
  14. 6
      src/main/resources/application.yml

32
src/main/java/com/qyft/ms/app/common/command/CommandFuture.java

@ -0,0 +1,32 @@
package com.qyft.ms.app.common.command;
import cn.hutool.json.JSONObject;
import com.qyft.ms.app.model.bo.CMDToDevice;
import lombok.Data;
@Data
public class CommandFuture {
private CommandWaitControl commandWaitController = new CommandWaitControl();
/**
* 发送给设备的指令
*/
private CMDToDevice cmdToDevice;
/**
* 设备返回的结果
*/
private JSONObject callbackResult;
/**
* 已收到
*/
private boolean isReceived = false;
public void waitForContinue() {
commandWaitController.commandWait();
}
public void commandContinue() {
commandWaitController.commandContinue();
}
}

39
src/main/java/com/qyft/ms/app/common/command/CommandWaitControl.java

@ -0,0 +1,39 @@
package com.qyft.ms.app.common.command;
public class CommandWaitControl {
public String commandCallbackJsonStringHolder = "";
private boolean shouldWait = true;
public synchronized void commandWait() {
while (shouldWait) {
try {
wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 重置标志以便下次可以再次等待
shouldWait = true;
}
public synchronized void commandContinue() {
shouldWait = false;
notify();
}
/**
* 获取命令回调值
* @return
*/
public String getCommandCallbackValue() {
return commandCallbackJsonStringHolder;
}
/**
* 设置命令回调值
* @param commandCallbackJsonString
*/
public void setCommandCallbackValue(String commandCallbackJsonString) {
this.commandCallbackJsonStringHolder = commandCallbackJsonString;
}
}

32
src/main/java/com/qyft/ms/app/common/command/CurrentSendCmdMapInstance.java

@ -0,0 +1,32 @@
package com.qyft.ms.app.common.command;
import io.swagger.v3.oas.models.security.SecurityScheme;
import java.util.concurrent.ConcurrentHashMap;
public class CurrentSendCmdMapInstance {
private static CurrentSendCmdMapInstance instance;
ConcurrentHashMap<Integer, CommandFuture> commandMap = new ConcurrentHashMap<>();
public static synchronized CurrentSendCmdMapInstance getInstance() {
if (instance == null) {
instance = new CurrentSendCmdMapInstance();
}
return instance;
}
public void putCommand(Integer key, CommandFuture commandFuture) {
commandMap.put(key, commandFuture);
}
public CommandFuture removeCommand(Integer key) {
return commandMap.remove(key);
}
public CommandFuture getCommand(Integer key) {
return commandMap.get(key);
}
}

39
src/main/java/com/qyft/ms/app/common/command/CyclicNumberGenerator.java

@ -0,0 +1,39 @@
package com.qyft.ms.app.common.command;
public class CyclicNumberGenerator {
// 饿汉式单例在类加载时就创建实例
private static final CyclicNumberGenerator INSTANCE = new CyclicNumberGenerator();
// 当前生成的数字初始值为 1
private int currentNumber = 1;
// 私有构造函数防止外部实例化
private CyclicNumberGenerator() {}
// 提供全局访问点获取单例实例
public static CyclicNumberGenerator getInstance() {
return INSTANCE;
}
/**
* 生成 1 255 之间的循环整数
* @return 生成的整数
*/
public synchronized int generateNumber() {
int result = currentNumber;
// 每次生成后将当前数字加 1
currentNumber++;
// 如果当前数字超过 255将其重置为 1
if (currentNumber > 255) {
currentNumber = 1;
}
return result;
}
public static void main(String[] args) {
CyclicNumberGenerator generator = CyclicNumberGenerator.getInstance();
// 测试生成 260 个数字
for (int i = 0; i < 260; i++) {
System.out.println(generator.generateNumber());
}
}
}

25
src/main/java/com/qyft/ms/app/common/command/FrontCommandAck.java

@ -0,0 +1,25 @@
package com.qyft.ms.app.common.command;
import cn.hutool.json.JSONObject;
public class FrontCommandAck {
public static String backstageAck(String cmdId, String cmdName,String status,String info) {
JSONObject jsonObject = new JSONObject();
jsonObject.set("cmdId", cmdId);
jsonObject.set("cmdName", cmdName);
jsonObject.set("status", status);
jsonObject.set("info", info);
return jsonObject.toString();
}
public static String deviceAck(String cmdId, String cmdName) {
JSONObject jsonObject = new JSONObject();
jsonObject.set("cmdId", cmdId);
jsonObject.set("cmdName", cmdName);
jsonObject.set("status", "receive");
return jsonObject.toString();
}
}

109
src/main/java/com/qyft/ms/app/common/command/FrontCommandGenerator.java

@ -0,0 +1,109 @@
package com.qyft.ms.app.common.command;
import com.qyft.ms.app.model.bo.CMDToDevice;
/**
* 生成给设备发送的指令
*/
public class FrontCommandGenerator {
/**
* {
* cmdName:'getInfoCmd'
* cmdId:'',
* device:'device',
* action:'get'
* }
*/
public static CMDToDevice device_status_get() {
int cmdId = CyclicNumberGenerator.getInstance().generateNumber();
CMDToDevice cmdToDevice = new CMDToDevice();
cmdToDevice.setCmdId(cmdId);
cmdToDevice.setCmdName("device_status_get");
cmdToDevice.setDevice("device");
cmdToDevice.setAction("get");
return cmdToDevice;
}
/**
* 获取设备当前湿度
* {
* cmdName:'getInfoCmd'
* cmdId:'',
* device:'temperature',
* action:'get'
* }
*/
public static CMDToDevice temperature_get() {
int cmdId = CyclicNumberGenerator.getInstance().generateNumber();
CMDToDevice cmdToDevice = new CMDToDevice();
cmdToDevice.setCmdId(cmdId);
cmdToDevice.setCmdName("getInfoCmd");
cmdToDevice.setDevice("temperature");
cmdToDevice.setAction("get");
return cmdToDevice;
}
/**
* 开启除湿阀
* {
* cmdName:'controlCmd'
* cmdId:'',
* device:'dehumidifier_valve',
* action:'open|close'
* }
*/
public static CMDToDevice dehumidifier_start() {
int cmdId = CyclicNumberGenerator.getInstance().generateNumber();
CMDToDevice cmdToDevice = new CMDToDevice();
cmdToDevice.setCmdId(cmdId);
cmdToDevice.setCmdName("controlCmd");
cmdToDevice.setDevice("dehumidifier_valve");
cmdToDevice.setAction("open");
return cmdToDevice;
}
public static String matrix_prefill() {
return "matrix_prefill";
}
public static String spray_pipeline_wash() {
return "spray_pipeline_wash";
}
public static String syringe_pipeline_wash() {
return "syringe_pipeline_wash";
}
public static String matrix_spray_start() {
return "matrix_spray_start";
}
public static String matrix_spray_change_param() {
return "matrix_spray_change_param";
}
public static String matrix_spray_stop() {
return "matrix_spray_stop";
}
public static String matrix_spray_pause() {
return "matrix_spray_pause";
}
public static String matrix_spray_continue() {
return "matrix_spray_continue";
}
public static String slide_tray_in() {
return "slide_tray_in";
}
public static String slide_tray_out() {
return "slide_tray_out";
}
}

12
src/main/java/com/qyft/ms/app/common/constant/CommandStatus.java

@ -0,0 +1,12 @@
package com.qyft.ms.app.common.constant;
public class CommandStatus {
/**
* 已收到前端指令
*/
public static final String RECEIVE = "receive";
/**
* 指令执行发生错误
*/
public static final String ERROR = "error";
}

190
src/main/java/com/qyft/ms/app/controller/FrontCmdController.java

@ -0,0 +1,190 @@
package com.qyft.ms.app.controller;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qyft.ms.app.common.command.CommandFuture;
import com.qyft.ms.app.common.command.CurrentSendCmdMapInstance;
import com.qyft.ms.app.common.command.FrontCommandAck;
import com.qyft.ms.app.common.command.FrontCommandGenerator;
import com.qyft.ms.app.common.constant.CommandStatus;
import com.qyft.ms.app.model.bo.CMDToDevice;
import com.qyft.ms.app.model.form.CMDFormV2;
import com.qyft.ms.device.service.DeviceTcpCMDServiceV2;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
import java.util.Map;
@Tag(name = "前端调用指令")
@RestController
@RequestMapping("/api/device/front")
@RequiredArgsConstructor
@Slf4j
public class FrontCmdController {
private final DeviceTcpCMDServiceV2 deviceTcpCMDServiceV2;
@Operation(summary = "前端统一调用一个接口")
@PostMapping("/control")
public ResponseBodyEmitter controlMethod(@RequestBody CMDFormV2 cmdForm) {
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
String frontCmdId = cmdForm.getCmdId();
String frontCmdName = cmdForm.getCmdName();
Map<String, Object> param = cmdForm.getParam();
if ("device_status_get".equals(frontCmdName)) {
new Thread(() -> {
try {
//向前端发送接收到指令
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.RECEIVE, "后台已收到指令"), MediaType.TEXT_PLAIN);
} catch (IOException e) {
emitter.completeWithError(e);
}
//生成发给设备的指令
CMDToDevice cmdToDevice = FrontCommandGenerator.device_status_get();
CommandFuture commandFuture = new CommandFuture();
commandFuture.setCmdToDevice(cmdToDevice);
Integer toDeviceCmdId = cmdToDevice.getCmdId();
//将指令放入map,等待设备返回然后取出在调用commandContinue
CurrentSendCmdMapInstance.getInstance().putCommand(toDeviceCmdId, commandFuture);
//发送指令给设备
deviceTcpCMDServiceV2.send(JSONUtil.toJsonStr(cmdToDevice));
commandFuture.waitForContinue();
if (commandFuture.isReceived()) {
try {
emitter.send(FrontCommandAck.deviceAck(frontCmdId, frontCmdName), MediaType.TEXT_PLAIN);
} catch (IOException e) {
emitter.completeWithError(e);
}
}
commandFuture.waitForContinue();
// JSONObject callbackResult = commandFuture.getCallbackResult();
//对返回结果处理
emitter.complete();
}).start();
} else if ("dehumidifier_start".equals(frontCmdName)) {
/*
除湿该方法接收Double类型参数 humidity
*/
new Thread(() -> {
try {
//向前端发送接收到指令
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.RECEIVE, "后台已收到指令"), MediaType.TEXT_PLAIN);
//判断参数是否合法
Integer humidity = (Integer) param.get("humidity");
if (humidity == null) {
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.ERROR, "参数 humidity 必填"), MediaType.TEXT_PLAIN);
emitter.complete();
return;
}
//1先判断当前湿度是否大于设置值
CMDToDevice temperatureCmdToDevice = FrontCommandGenerator.temperature_get();//生成指令 获取当前湿度
CommandFuture temperatureFuture = new CommandFuture();
temperatureFuture.setCmdToDevice(temperatureCmdToDevice);
Integer toDeviceCmdId = temperatureCmdToDevice.getCmdId();
CurrentSendCmdMapInstance.getInstance().putCommand(toDeviceCmdId, temperatureFuture);//将指令放入map
deviceTcpCMDServiceV2.send(temperatureCmdToDevice); //发送指令给设备
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.RECEIVE, "已向设备发送了指令:" + JSONUtil.toJsonStr(temperatureCmdToDevice)), MediaType.TEXT_PLAIN);
temperatureFuture.waitForContinue();//等待设备的反馈
JSONObject deviceResult = temperatureFuture.getCallbackResult();//拿到设备返回结果
if(temperatureFuture.isReceived()){//直接携带数据的反馈其实不需要判断isReceived直接判断deviceResult == null
CurrentSendCmdMapInstance.getInstance().removeCommand(toDeviceCmdId);//将指令从map中删除
//设备已经收到指令并且执行成功
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.RECEIVE, "设备指令反馈:" + JSONUtil.toJsonStr(deviceResult)), MediaType.TEXT_PLAIN);
}
if (deviceResult == null) {
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.ERROR, "未读取到湿度传感器数值"), MediaType.TEXT_PLAIN);
emitter.complete();
return;
}
Double temperature = deviceResult.getDouble("temperature");//拿到设备返回的湿度数值
if (humidity < temperature) {
//2如果小于设置值提醒用户不用除湿
emitter.send(FrontCommandAck.backstageAck(frontCmdId, frontCmdName, CommandStatus.ERROR, "设定湿度小于当前湿度,无需除湿。"), MediaType.TEXT_PLAIN);
emitter.complete();
return;
}
//3如果需要除湿开始调用底层指令"
CMDToDevice cmdToDeviceHumidity = FrontCommandGenerator.dehumidifier_start();//生成指令 开启除湿阀
CommandFuture commandFutureHumidity = new CommandFuture();
//
//
// //生成发给设备的指令
// CMDToDevice cmdToDevice = FrontCommandGenerator.device_status_get();
// CommandFuture commandFuture = new CommandFuture();
// commandFuture.setCmdToDevice(cmdToDevice);
//
// String toDeviceCmdId = cmdToDevice.getCmdId();
// //将指令放入map,等待设备返回然后取出在调用commandContinue
// CurrentSendCmdMapInstance.getInstance().putCommand(toDeviceCmdId, commandFuture);
//
// //发送指令给设备
// deviceTcpCMDServiceV2.send(JSONUtil.toJsonStr(cmdToDevice));
// commandFuture.waitForContinue();
// if (commandFuture.isReceived()) {
//
// emitter.send(FrontCommandAck.deviceAck(frontCmdId, frontCmdName), MediaType.TEXT_PLAIN);
//
// }
// commandFuture.waitForContinue();
// String callbackResult = commandFuture.getCallbackResult();
//对返回结果处理
emitter.complete();
} catch (IOException e) {
emitter.completeWithError(e);
}
}).start();
} else if ("matrix_prefill".equals(frontCmdName)) {
} else if ("spray_pipeline_wash".equals(frontCmdName)) {
} else if ("syringe_pipeline_wash".equals(frontCmdName)) {
} else if ("matrix_spray_start".equals(frontCmdName)) {
} else if ("matrix_spray_change_param".equals(frontCmdName)) {
} else if ("matrix_spray_stop".equals(frontCmdName)) {
} else if ("matrix_spray_pause".equals(frontCmdName)) {
} else if ("matrix_spray_continue".equals(frontCmdName)) {
} else if ("slide_tray_in".equals(frontCmdName)) {
} else if ("slide_tray_out".equals(frontCmdName)) {
}
return emitter;
}
}

30
src/main/java/com/qyft/ms/app/model/bo/CMDToDevice.java

@ -0,0 +1,30 @@
package com.qyft.ms.app.model.bo;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import java.util.Map;
@Schema(description = "指令")
@Data
public class CMDToDevice {
@Schema(description = "指令id,不指定后台会自动生成uuid")
private Integer cmdId;
@NotBlank()
@Schema(description = "指令名字", example = "device_status_get")
private String cmdName;
@NotBlank()
@Schema(description = "子设备", example = "three_way_valve")
private String device;
@NotBlank()
@Schema(description = "行为", example = "open")
private String action;
@Schema(description = "参数")
private Map<String, Object> param;
}

24
src/main/java/com/qyft/ms/app/model/form/CMDFormV2.java

@ -0,0 +1,24 @@
package com.qyft.ms.app.model.form;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotBlank;
import lombok.Data;
import java.util.Map;
@Schema(description = "指令")
@Data
public class CMDFormV2 {
@NotBlank()
@Schema(description = "指令id,前端生成唯一UUID")
private String cmdId;
@NotBlank()
@Schema(description = "指令名字", example = "device_status_get")
private String cmdName;
@Schema(description = "参数")
private Map<String, Object> param;
}

60
src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java

@ -1,14 +1,12 @@
package com.qyft.ms.device.handler;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qyft.ms.app.common.constant.WebSocketMessageType;
import com.qyft.ms.device.common.constant.TcpMessageType;
import com.qyft.ms.device.common.jsonrpc.JsonRpcResponse;
import com.qyft.ms.device.model.bo.DeviceAlarm;
import com.qyft.ms.app.common.command.CommandFuture;
import com.qyft.ms.app.common.command.CurrentSendCmdMapInstance;
import com.qyft.ms.app.service.WebSocketService;
import com.qyft.ms.device.model.bo.DeviceFeedback;
import com.qyft.ms.device.model.bo.DeviceStatus;
import com.qyft.ms.device.service.DeviceStatusService;
import com.qyft.ms.app.service.WebSocketService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@ -37,17 +35,45 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
ByteBuf buf = (ByteBuf) msg;
String serverMsg = buf.toString(CharsetUtil.UTF_8);
try {
JsonRpcResponse jsonRpcResponse = JSONUtil.toBean(serverMsg, JsonRpcResponse.class);
if (TcpMessageType.STATUS.equals(jsonRpcResponse.getType())) {//设备状态
DeviceStatus deviceStatus = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceStatus.class);
deviceStatusService.updateDeviceStatus(deviceStatus); // 更新设备状态
} else if (TcpMessageType.ALARM.equals(jsonRpcResponse.getType())) {//设备报警
log.error("设备报警: {}", serverMsg);
DeviceAlarm deviceAlarm = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceAlarm.class);
webSocketService.pushMsg(WebSocketMessageType.WARN, deviceAlarm);
} else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈
DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class);
this.handleTcpResponse(deviceFeedback);
// JsonRpcResponse jsonRpcResponse = JSONUtil.toBean(serverMsg, JsonRpcResponse.class);
// if (TcpMessageType.STATUS.equals(jsonRpcResponse.getType())) {//设备状态
// DeviceStatus deviceStatus = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceStatus.class);
// deviceStatusService.updateDeviceStatus(deviceStatus); // 更新设备状态
// } else if (TcpMessageType.ALARM.equals(jsonRpcResponse.getType())) {//设备报警
// log.error("设备报警: {}", serverMsg);
// DeviceAlarm deviceAlarm = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceAlarm.class);
// webSocketService.pushMsg(WebSocketMessageType.WARN, deviceAlarm);
// } else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈
// DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class);
// this.handleTcpResponse(deviceFeedback);
// }
JSONObject deviceResult = JSONUtil.parseObj(serverMsg);
JSONObject payload = deviceResult.getJSONObject("payload");
String tag = deviceResult.get("tag").toString();
if ("event".equals(tag)) {
//设备上报事件
} else if ("ack".equals(tag)) {
//设备指令反馈
Integer cmdId = payload.getInt("cmdId");
JSONObject error = payload.getJSONObject("error");
CommandFuture commandFuture = CurrentSendCmdMapInstance.getInstance().getCommand(cmdId);
if (error != null) {
//指令执行错误将错误放到CallbackResult中
commandFuture.setCallbackResult(error);
}
Object result = payload.get("result");
if (result instanceof Boolean) {
//ack 没携带数据
} else if (result instanceof JSONObject) {
//ack 携带了数据将数据放到了CallbackResult中
commandFuture.setCallbackResult((JSONObject) result);
} else {
//TODO 没有定义的类型
}
commandFuture.setReceived(true);//已收到设备反馈
commandFuture.commandContinue();
} else if ("status".equals(tag)) {
//设备上报状态
}
} catch (Exception e) {
log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e);

25
src/main/java/com/qyft/ms/device/service/DeviceTcpCMDServiceV2.java

@ -0,0 +1,25 @@
package com.qyft.ms.device.service;
import cn.hutool.json.JSONUtil;
import com.qyft.ms.device.client.TcpClient;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 设备tcp指令发送服务
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class DeviceTcpCMDServiceV2 {
private final TcpClient tcpClient;
public boolean send(Object object) {
return tcpClient.send(JSONUtil.toJsonStr(object));
}
}

2
src/main/java/com/qyft/ms/system/config/WebConfig.java

@ -8,7 +8,7 @@ import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
public class WebConfig implements WebMvcConfigurer {
@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/api/**")
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("GET", "POST", "PUT", "DELETE")
.allowedHeaders("*");

6
src/main/resources/application.yml

@ -1,7 +1,7 @@
server:
servlet:
context-path: /
port: 8090
port: 8080
spring:
application:
@ -36,8 +36,8 @@ jwt:
tcp:
enable: true # 是否开启 TCP 连接
server-enable: true # 是否开启 TCP 连接
host: 127.0.0.1
# host: 192.168.1.140
# host: 127.0.0.1
host: 192.168.1.140
port: 9080
reconnect: 5000 # 断线重连间隔(单位:毫秒)
timeout: 10000 # 连接超时时间(单位:毫秒)

Loading…
Cancel
Save