Browse Source

fix:解决tcp反馈抛出异常事件无法传递的问题

tags/1.0
白凤吉 4 months ago
parent
commit
207c4fc4f7
  1. 32
      src/main/java/com/qyft/ms/system/common/device/command/CommandFuture.java
  2. 3
      src/main/java/com/qyft/ms/system/core/handler/BaseCommandHandler.java
  3. 2
      src/main/java/com/qyft/ms/system/core/listener/DeviceTcpMessageEventListener.java
  4. 56
      src/main/java/com/qyft/ms/system/service/device/DeviceCommandService.java

32
src/main/java/com/qyft/ms/system/common/device/command/CommandFuture.java

@ -1,6 +1,7 @@
package com.qyft.ms.system.common.device.command; package com.qyft.ms.system.common.device.command;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import com.qyft.ms.system.model.bo.DeviceCommand;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
@ -11,10 +12,6 @@ import java.util.concurrent.ExecutionException;
@Getter @Getter
public class CommandFuture { public class CommandFuture {
/** /**
* 用于保存ack反馈
*/
// private final CompletableFuture<JSONObject> ackFuture = new CompletableFuture<>();
/**
* 用于保存response反馈 * 用于保存response反馈
*/ */
private CompletableFuture<JSONObject> responseFuture = new CompletableFuture<>(); private CompletableFuture<JSONObject> responseFuture = new CompletableFuture<>();
@ -23,29 +20,34 @@ public class CommandFuture {
private long endSendTime; private long endSendTime;
/** /**
* 完成ack反馈
* 业务指令id
*/ */
public void completeAck(JSONObject result) {
// ackFuture.complete(result);
}
private String cmdId;
/**
* 业务指令code
*/
private String cmdCode;
/**
* 设备指令
*/
private DeviceCommand deviceCommand;
/** /**
* 完成response反馈
* 完成response
*/ */
public void completeResponse(JSONObject result) { public void completeResponse(JSONObject result) {
responseFuture.complete(result); responseFuture.complete(result);
} }
/** /**
* 获取ack反馈的json
* 异常完成response
*/ */
// public JSONObject getAckResult() throws ExecutionException, InterruptedException {
// return ackFuture.get();
// }
public void completeResponseExceptionally(Throwable ex) {
responseFuture.completeExceptionally(ex);
}
/** /**
* 获取response反馈的json
* 获取response的json
*/ */
public JSONObject getResponseResult() throws ExecutionException, InterruptedException { public JSONObject getResponseResult() throws ExecutionException, InterruptedException {
return responseFuture.get(); return responseFuture.get();

3
src/main/java/com/qyft/ms/system/core/handler/BaseCommandHandler.java

@ -22,8 +22,7 @@ public abstract class BaseCommandHandler implements CommandHandler {
.map(CommandFuture::getResponseFuture) .map(CommandFuture::getResponseFuture)
.toArray(CompletableFuture[]::new); .toArray(CompletableFuture[]::new);
CompletableFuture.allOf(responseFutures) CompletableFuture.allOf(responseFutures)
.orTimeout(120, TimeUnit.SECONDS)
.get();
.get(120, TimeUnit.SECONDS);
} }
} }

2
src/main/java/com/qyft/ms/system/core/listener/DeviceTcpMessageEventListener.java

@ -27,7 +27,6 @@ public class DeviceTcpMessageEventListener {
String tag = deviceResult.getStr("tag"); String tag = deviceResult.getStr("tag");
if ("ACK".equals(tag)) { if ("ACK".equals(tag)) {
log.info("ACK {}", JSONUtil.toJsonStr(deviceResult)); log.info("ACK {}", JSONUtil.toJsonStr(deviceResult));
deviceCommandService.completeCommandAck(deviceResult);
} else if ("RESPONSE".equals(tag)) { } else if ("RESPONSE".equals(tag)) {
log.info("RESPONSE {}", JSONUtil.toJsonStr(deviceResult)); log.info("RESPONSE {}", JSONUtil.toJsonStr(deviceResult));
deviceCommandService.completeCommandResponse(deviceResult); deviceCommandService.completeCommandResponse(deviceResult);
@ -40,6 +39,7 @@ public class DeviceTcpMessageEventListener {
deviceStatus.setPaused(false); deviceStatus.setPaused(false);
deviceStatus.setSuspendable(false); deviceStatus.setSuspendable(false);
deviceStatus.setStopPressed(true); deviceStatus.setStopPressed(true);
deviceCommandService.releaseAllCommandFutures();
} else if ("system_e_stop_released".equals(eventType)) {//系统急停按钮被释放 } else if ("system_e_stop_released".equals(eventType)) {//系统急停按钮被释放
deviceStatus.setStopPressed(false); deviceStatus.setStopPressed(false);
} }

56
src/main/java/com/qyft/ms/system/service/device/DeviceCommandService.java

@ -1,7 +1,6 @@
package com.qyft.ms.system.service.device; package com.qyft.ms.system.service.device;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qyft.ms.system.common.constant.CommandStatus; import com.qyft.ms.system.common.constant.CommandStatus;
import com.qyft.ms.system.common.device.command.CommandFuture; import com.qyft.ms.system.common.device.command.CommandFuture;
import com.qyft.ms.system.common.device.command.CyclicNumberGenerator; import com.qyft.ms.system.common.device.command.CyclicNumberGenerator;
@ -49,22 +48,12 @@ public class DeviceCommandService {
public CommandFuture sendCommandNoFront(DeviceCommand deviceCommand) { public CommandFuture sendCommandNoFront(DeviceCommand deviceCommand) {
CommandFuture commandFuture = executeCommand(deviceCommand); CommandFuture commandFuture = executeCommand(deviceCommand);
// commandFuture.getAckFuture().thenApply(result -> {
// Boolean status = result.getBool("status");
// if (!status) { //ack失败
// String message = deviceCommand.getCmdName() + "指令,设备ack错误";
// throw new RuntimeException(message);
// }
// return result;
// });
commandFuture.getResponseFuture().thenApply(result -> { commandFuture.getResponseFuture().thenApply(result -> {
Boolean success = result.getBool("success"); Boolean success = result.getBool("success");
if (!success) { //response失败 if (!success) { //response失败
String message = deviceCommand.getCmdName() + "指令,设备response错误"; String message = deviceCommand.getCmdName() + "指令,设备response错误";
throw new RuntimeException(message);
} }
return result.get("data");
return result;
}); });
return commandFuture; return commandFuture;
@ -72,49 +61,44 @@ public class DeviceCommandService {
public CommandFuture sendCommand(String cmdId, String cmdCode, DeviceCommand deviceCommand) throws IOException { public CommandFuture sendCommand(String cmdId, String cmdCode, DeviceCommand deviceCommand) throws IOException {
CommandFuture commandFuture = executeCommand(deviceCommand); CommandFuture commandFuture = executeCommand(deviceCommand);
// webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.SEND, "已向设备发送了" + deviceCommand.getCmdName() + "指令", deviceCommand));
// commandFuture.getAckFuture().thenApply(result -> {
// Boolean success = result.getBool("success");
// if (success == null || !success) { //ack失败
// String message = deviceCommand.getCmdName() + "指令,设备ack错误";
// webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_ERROR, message, result));
// throw new RuntimeException(message);
// }
// webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.RESULT, deviceCommand.getCmdName() + "指令,设备ack正常", result));
// return result;
// });
webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_SEND, deviceCommand.getCmdName() + "指令,已发给设备", deviceCommand)); webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_SEND, deviceCommand.getCmdName() + "指令,已发给设备", deviceCommand));
commandFuture.getResponseFuture().thenApply(result -> { commandFuture.getResponseFuture().thenApply(result -> {
Boolean success = result.getBool("success"); Boolean success = result.getBool("success");
if (success == null || !success) { //response失败 if (success == null || !success) { //response失败
String message = deviceCommand.getCmdName() + "指令,设备response错误"; String message = deviceCommand.getCmdName() + "指令,设备response错误";
webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_ERROR, message, result)); webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_ERROR, message, result));
throw new RuntimeException(message);
} }
webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_RESULT, deviceCommand.getCmdName() + "指令,设备response正常,耗时:" + (commandFuture.getEndSendTime() - commandFuture.getStartSendTime()), result)); webSocketService.pushDebugMsg(FrontResponseGenerator.generateJson(cmdId, cmdCode, CommandStatus.DEVICE_RESULT, deviceCommand.getCmdName() + "指令,设备response正常,耗时:" + (commandFuture.getEndSendTime() - commandFuture.getStartSendTime()), result));
return result.get("data");
return result;
}); });
return commandFuture; return commandFuture;
} }
public void completeCommandAck(JSONObject deviceResult) {
// Integer cmdId = deviceResult.getInt("cmdId");
// if (cmdId != null) {
// CommandFuture future = commandFutureMap.get(cmdId);
// if (future != null) {
// future.completeAck(deviceResult);
// }
// }
}
public void completeCommandResponse(JSONObject deviceResult) { public void completeCommandResponse(JSONObject deviceResult) {
Integer cmdId = deviceResult.getInt("cmdId"); Integer cmdId = deviceResult.getInt("cmdId");
if (cmdId != null) { if (cmdId != null) {
CommandFuture future = commandFutureMap.get(cmdId); CommandFuture future = commandFutureMap.get(cmdId);
if (future != null) { if (future != null) {
future.setEndSendTime(System.currentTimeMillis()); future.setEndSendTime(System.currentTimeMillis());
future.completeResponse(deviceResult);
Boolean success = deviceResult.getBool("success"); //数据验证
if (success == null || !success) { //response失败
future.completeResponseExceptionally(new RuntimeException("response失败"));
} else {
future.completeResponse(deviceResult);
}
}
}
}
/**
* 取消等待中的future并从map中移除
*/
public synchronized void releaseAllCommandFutures() {
for (Integer key : commandFutureMap.keySet()) {
CommandFuture future = commandFutureMap.remove(key);
if (future != null) {
future.getResponseFuture().cancel(true);
} }
} }
} }

Loading…
Cancel
Save