|
|
package com.iflytop.handacid.app.service;
import cn.hutool.json.JSONObject; import cn.hutool.json.JSONUtil; import com.iflytop.handacid.app.common.constant.CommandStatus; import com.iflytop.handacid.app.core.command.CommandFuture; import com.iflytop.handacid.app.core.command.CyclicNumberGenerator; import com.iflytop.handacid.app.core.command.DeviceCommand; import com.iflytop.handacid.app.core.event.VirtualDeviceCmdResponseEvent; import com.iflytop.handacid.app.core.state.DeviceState; import com.iflytop.handacid.app.websocket.server.DebugGenerator; import com.iflytop.handacid.app.websocket.server.WebSocketSender; import com.iflytop.handacid.hardware.HardwareService; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationEventPublisher; import org.springframework.stereotype.Service;
import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.LinkedBlockingQueue;
@Slf4j @Service @RequiredArgsConstructor public class DeviceCommandService { private final HardwareService hardwareService; private final WebSocketSender webSocketService; private final DeviceState deviceState; private final ApplicationEventPublisher publisher;
/** * 需要等待加液区空闲的龙门架机械臂指令 */ private final ConcurrentMap<Integer, CommandFuture> sendCommandFutureMap = new ConcurrentHashMap<>(); private final BlockingQueue<CommandFuture[]> gantryCommandQueue = new LinkedBlockingQueue<>();
@PostConstruct private void initExecutorThread() { new Thread(this::executeCommands).start(); }
private void executeCommands() { while (true) { try { CommandFuture[] commandFutureArray = gantryCommandQueue.take(); for (CommandFuture commandFuture : commandFutureArray) { executeCommand(commandFuture); } } catch (Exception e) { Thread.currentThread().interrupt(); } } }
public synchronized CommandFuture[] sendCommandGantryQueue(DeviceCommand... deviceCommandBundles) { return sendCommandGantryQueue(null, null, deviceCommandBundles); }
public synchronized CommandFuture[] sendCommandGantryQueue(String cmdId, String cmdCode, DeviceCommand... deviceCommands) { List<CommandFuture> commandFutureList = new ArrayList<>(); for (DeviceCommand deviceCommand : deviceCommands) { commandFutureList.add(createDeviceCommandFuture(cmdId, cmdCode, deviceCommand)); } CommandFuture[] commandFutureArray = commandFutureList.toArray(new CommandFuture[0]); try { gantryCommandQueue.put(commandFutureArray); } catch (Exception e) { log.error("设备指令入队列失败", e); throw new RuntimeException(e); } return commandFutureArray; }
/** * 根据 DeviceCommand 创建 CommandFuture */ private CommandFuture createDeviceCommandFuture(String cmdId, String cmdCode, DeviceCommand deviceCommand) { CommandFuture commandFuture = createDeviceCommandFuture(deviceCommand); commandFuture.setCmdId(cmdId); commandFuture.setCmdCode(cmdCode); return commandFuture; }
/** * 根据 DeviceCommand 创建 CommandFuture */ private CommandFuture createDeviceCommandFuture(DeviceCommand deviceCommand) { CommandFuture commandFuture = new CommandFuture(); commandFuture.setDeviceCommand(deviceCommand); commandFuture.getResponseFuture().whenComplete((result, ex) -> { sendCommandFutureMap.remove(deviceCommand.getCmdId()); }); return commandFuture; }
public void executeCommand(CommandFuture commandFuture) { int cmdId = CyclicNumberGenerator.getInstance().generateNumber(); commandFuture.getDeviceCommand().setCmdId(cmdId); sendCommandFutureMap.put(cmdId, commandFuture); commandFuture.setStartSendTime(System.currentTimeMillis()); if (!deviceState.isVirtual()) { if (!hardwareService.sendCommand(commandFuture.getDeviceCommand())) { sendCommandFutureMap.remove(commandFuture.getDeviceCommand().getCmdId()); throw new RuntimeException("向设备发送指令失败"); } if (commandFuture.getCmdId() != null) { webSocketService.pushDebug(DebugGenerator.generateJson(commandFuture.getCmdId(), commandFuture.getCmdCode(), CommandStatus.DEVICE_SEND, commandFuture.getDeviceCommand().getDevice() + "_" + commandFuture.getDeviceCommand().getAction() + "指令,已发给设备", commandFuture.getDeviceCommand())); } } else { //虚拟模式
log.info("模拟向设备发送TCP指令:{}", JSONUtil.toJsonStr(commandFuture.getDeviceCommand())); //模拟反馈
publisher.publishEvent(new VirtualDeviceCmdResponseEvent(this, commandFuture.getDeviceCommand())); }
}
public CommandFuture sendCommand(DeviceCommand deviceCommand) { CommandFuture commandFuture = createDeviceCommandFuture(deviceCommand); executeCommand(commandFuture); return commandFuture; }
public CommandFuture sendCommand(String cmdId, String cmdCode, DeviceCommand deviceCommand) { CommandFuture commandFuture = createDeviceCommandFuture(cmdId, cmdCode, deviceCommand); executeCommand(commandFuture); return commandFuture; }
public void completeCommandResponse(JSONObject deviceResult) { Integer cmdId = deviceResult.getInt("cmdId"); if (cmdId != null) { CommandFuture commandFuture = sendCommandFutureMap.get(cmdId); if (commandFuture != null) { commandFuture.setEndSendTime(System.currentTimeMillis()); Boolean success = deviceResult.getBool("success"); //数据验证
if (success == null || !success) { //response失败
if (commandFuture.getCmdId() != null) { webSocketService.pushDebug(DebugGenerator.generateJson(commandFuture.getCmdId(), commandFuture.getCmdCode(), CommandStatus.DEVICE_ERROR, commandFuture.getDeviceCommand().getDevice() + "_" + commandFuture.getDeviceCommand().getAction() + "指令,设备response错误,耗时:" + (commandFuture.getEndSendTime() - commandFuture.getStartSendTime()), deviceResult)); } commandFuture.completeResponseExceptionally(new RuntimeException("response失败:" + deviceResult)); } else { if (commandFuture.getCmdId() != null) { webSocketService.pushDebug(DebugGenerator.generateJson(commandFuture.getCmdId(), commandFuture.getCmdCode(), CommandStatus.DEVICE_RESULT, commandFuture.getDeviceCommand().getDevice() + "_" + commandFuture.getDeviceCommand().getAction() + "指令,设备response正常,耗时:" + (commandFuture.getEndSendTime() - commandFuture.getStartSendTime()), deviceResult)); } commandFuture.completeResponse(deviceResult); } } } }
/** * 取消等待中的future并从map中移除 */ public synchronized void releaseAllCommandFutures() { for (Integer key : sendCommandFutureMap.keySet()) { CommandFuture future = sendCommandFutureMap.remove(key); if (future != null) { future.getResponseFuture().cancel(true); } } }
}
|