From 207c4fc4f77183e29d695c3ed887dd8f0d820517 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4=E5=90=89?= Date: Sun, 23 Mar 2025 17:37:18 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E8=A7=A3=E5=86=B3tcp=E5=8F=8D=E9=A6=88?= =?UTF-8?q?=E6=8A=9B=E5=87=BA=E5=BC=82=E5=B8=B8=E4=BA=8B=E4=BB=B6=E6=97=A0?= =?UTF-8?q?=E6=B3=95=E4=BC=A0=E9=80=92=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/device/command/CommandFuture.java | 32 +++++++------ .../ms/system/core/handler/BaseCommandHandler.java | 3 +- .../listener/DeviceTcpMessageEventListener.java | 2 +- .../service/device/DeviceCommandService.java | 56 ++++++++-------------- 4 files changed, 39 insertions(+), 54 deletions(-) diff --git a/src/main/java/com/qyft/ms/system/common/device/command/CommandFuture.java b/src/main/java/com/qyft/ms/system/common/device/command/CommandFuture.java index e13c775..771f903 100644 --- a/src/main/java/com/qyft/ms/system/common/device/command/CommandFuture.java +++ b/src/main/java/com/qyft/ms/system/common/device/command/CommandFuture.java @@ -1,6 +1,7 @@ package com.qyft.ms.system.common.device.command; import cn.hutool.json.JSONObject; +import com.qyft.ms.system.model.bo.DeviceCommand; import lombok.Getter; import lombok.Setter; @@ -11,10 +12,6 @@ import java.util.concurrent.ExecutionException; @Getter public class CommandFuture { /** - * 用于保存ack反馈 - */ -// private final CompletableFuture ackFuture = new CompletableFuture<>(); - /** * 用于保存response反馈 */ private CompletableFuture responseFuture = new CompletableFuture<>(); @@ -23,29 +20,34 @@ public class CommandFuture { private long endSendTime; /** - * 完成ack反馈 + * 业务指令id */ - public void completeAck(JSONObject result) { -// ackFuture.complete(result); - } + private String cmdId; + /** + * 业务指令code + */ + private String cmdCode; + /** + * 设备指令 + */ + private DeviceCommand deviceCommand; /** - * 完成response反馈 + * 完成response */ public void completeResponse(JSONObject result) { responseFuture.complete(result); } - /** - * 获取ack反馈的json + * 异常完成response */ -// public JSONObject getAckResult() throws ExecutionException, InterruptedException { -// return ackFuture.get(); -// } + public void completeResponseExceptionally(Throwable ex) { + responseFuture.completeExceptionally(ex); + } /** - * 获取response反馈的json + * 获取response的json */ public JSONObject getResponseResult() throws ExecutionException, InterruptedException { return responseFuture.get(); diff --git a/src/main/java/com/qyft/ms/system/core/handler/BaseCommandHandler.java b/src/main/java/com/qyft/ms/system/core/handler/BaseCommandHandler.java index 19b18e9..9f06a62 100644 --- a/src/main/java/com/qyft/ms/system/core/handler/BaseCommandHandler.java +++ b/src/main/java/com/qyft/ms/system/core/handler/BaseCommandHandler.java @@ -22,8 +22,7 @@ public abstract class BaseCommandHandler implements CommandHandler { .map(CommandFuture::getResponseFuture) .toArray(CompletableFuture[]::new); CompletableFuture.allOf(responseFutures) - .orTimeout(120, TimeUnit.SECONDS) - .get(); + .get(120, TimeUnit.SECONDS); } } \ No newline at end of file diff --git a/src/main/java/com/qyft/ms/system/core/listener/DeviceTcpMessageEventListener.java b/src/main/java/com/qyft/ms/system/core/listener/DeviceTcpMessageEventListener.java index e04ee60..ad40567 100644 --- a/src/main/java/com/qyft/ms/system/core/listener/DeviceTcpMessageEventListener.java +++ b/src/main/java/com/qyft/ms/system/core/listener/DeviceTcpMessageEventListener.java @@ -27,7 +27,6 @@ public class DeviceTcpMessageEventListener { String tag = deviceResult.getStr("tag"); if ("ACK".equals(tag)) { log.info("ACK {}", JSONUtil.toJsonStr(deviceResult)); - deviceCommandService.completeCommandAck(deviceResult); } else if ("RESPONSE".equals(tag)) { log.info("RESPONSE {}", JSONUtil.toJsonStr(deviceResult)); deviceCommandService.completeCommandResponse(deviceResult); @@ -40,6 +39,7 @@ public class DeviceTcpMessageEventListener { deviceStatus.setPaused(false); deviceStatus.setSuspendable(false); deviceStatus.setStopPressed(true); + deviceCommandService.releaseAllCommandFutures(); } else if ("system_e_stop_released".equals(eventType)) {//系统急停按钮被释放 deviceStatus.setStopPressed(false); } diff --git a/src/main/java/com/qyft/ms/system/service/device/DeviceCommandService.java b/src/main/java/com/qyft/ms/system/service/device/DeviceCommandService.java index 5e68f49..7dfa5c6 100644 --- a/src/main/java/com/qyft/ms/system/service/device/DeviceCommandService.java +++ b/src/main/java/com/qyft/ms/system/service/device/DeviceCommandService.java @@ -1,7 +1,6 @@ package com.qyft.ms.system.service.device; import cn.hutool.json.JSONObject; -import cn.hutool.json.JSONUtil; import com.qyft.ms.system.common.constant.CommandStatus; import com.qyft.ms.system.common.device.command.CommandFuture; import com.qyft.ms.system.common.device.command.CyclicNumberGenerator; @@ -49,22 +48,12 @@ public class DeviceCommandService { public CommandFuture sendCommandNoFront(DeviceCommand deviceCommand) { CommandFuture commandFuture = executeCommand(deviceCommand); -// commandFuture.getAckFuture().thenApply(result -> { -// Boolean status = result.getBool("status"); -// if (!status) { //ack失败 -// String message = deviceCommand.getCmdName() + "指令,设备ack错误"; -// throw new RuntimeException(message); -// } -// return result; -// }); - commandFuture.getResponseFuture().thenApply(result -> { Boolean success = result.getBool("success"); if (!success) { //response失败 String message = deviceCommand.getCmdName() + "指令,设备response错误"; - throw new RuntimeException(message); } - return result.get("data"); + return result; }); return commandFuture; @@ -72,49 +61,44 @@ public class DeviceCommandService { public CommandFuture sendCommand(String cmdId, String cmdCode, DeviceCommand deviceCommand) throws IOException { CommandFuture commandFuture = executeCommand(deviceCommand); -// webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.SEND, "已向设备发送了" + deviceCommand.getCmdName() + "指令", deviceCommand)); -// commandFuture.getAckFuture().thenApply(result -> { -// Boolean success = result.getBool("success"); -// if (success == null || !success) { //ack失败 -// String message = deviceCommand.getCmdName() + "指令,设备ack错误"; -// webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_ERROR, message, result)); -// throw new RuntimeException(message); -// } -// webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.RESULT, deviceCommand.getCmdName() + "指令,设备ack正常", result)); -// return result; -// }); webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_SEND, deviceCommand.getCmdName() + "指令,已发给设备", deviceCommand)); commandFuture.getResponseFuture().thenApply(result -> { Boolean success = result.getBool("success"); if (success == null || !success) { //response失败 String message = deviceCommand.getCmdName() + "指令,设备response错误"; webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_ERROR, message, result)); - throw new RuntimeException(message); } webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_RESULT, deviceCommand.getCmdName() + "指令,设备response正常,耗时:" + (commandFuture.getEndSendTime() - commandFuture.getStartSendTime()), result)); - return result.get("data"); + return result; }); return commandFuture; } - public void completeCommandAck(JSONObject deviceResult) { -// Integer cmdId = deviceResult.getInt("cmdId"); -// if (cmdId != null) { -// CommandFuture future = commandFutureMap.get(cmdId); -// if (future != null) { -// future.completeAck(deviceResult); -// } -// } - } - public void completeCommandResponse(JSONObject deviceResult) { Integer cmdId = deviceResult.getInt("cmdId"); if (cmdId != null) { CommandFuture future = commandFutureMap.get(cmdId); if (future != null) { future.setEndSendTime(System.currentTimeMillis()); - future.completeResponse(deviceResult); + Boolean success = deviceResult.getBool("success"); //数据验证 + if (success == null || !success) { //response失败 + future.completeResponseExceptionally(new RuntimeException("response失败")); + } else { + future.completeResponse(deviceResult); + } + } + } + } + + /** + * 取消等待中的future并从map中移除 + */ + public synchronized void releaseAllCommandFutures() { + for (Integer key : commandFutureMap.keySet()) { + CommandFuture future = commandFutureMap.remove(key); + if (future != null) { + future.getResponseFuture().cancel(true); } } }