|
|
package com.iflytop.gd.infrastructure.drivers;
import cn.hutool.core.util.ObjectUtil; import com.iflytop.gd.system.drivers.CommandBus; import com.iflytop.gd.system.exceptions.CommandExecTimeoutException; import com.iflytop.gd.system.exceptions.HardwareErrorException; import com.iflytop.gd.system.models.DataPacket; import com.iflytop.gd.system.utils.ByteArray; import jakarta.annotation.PostConstruct; import jakarta.websocket.*; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component;
import java.io.IOException; import java.net.URI; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit;
/** * 使用WebSocket实现命令总线功能 */ @Slf4j @Component @ClientEndpoint @EnableScheduling public class WebSocketCommandBusImpl implements CommandBus { private Session session; private CountDownLatch countDownLatch; private DataPacket lastDataPacket; private Integer packetIndex = 0;
private final String COMMAND_BUS_WEBSOCKET_URL;
public static final int PACKET_TYPE_CMD = 0xA0; public static final int PACKET_TYPE_ACK = 0xA1; public static final int PACKET_TYPE_ERROR_ACK = 0xA2; public static final int PACKET_TYPE_EVENT = 0xA3;
//TODO 配置硬件服务段ws链接
public WebSocketCommandBusImpl(@Value("${command_bus.websocket_server_url}") String websocketServerUrl) throws DeploymentException, IOException { this.COMMAND_BUS_WEBSOCKET_URL = websocketServerUrl; }
@Scheduled(fixedRate = 5000) @PostConstruct public void connectToCommandBusWebSocketServer() throws DeploymentException, IOException { if (this.session == null || !this.session.isOpen()) { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); URI endpointURI = URI.create(COMMAND_BUS_WEBSOCKET_URL); try { this.session = container.connectToServer(this, endpointURI); } catch (Exception e) { // log.error(e.getMessage(), e);
} }
}
@Override public synchronized DataPacket waitForCommandExec(DataPacket commandPacket, Integer timeout, TimeUnit unit) throws CommandExecTimeoutException, HardwareErrorException, IOException, InterruptedException { try { if (!this.session.isOpen()) { log.error("Session state={}", this.session.isOpen()); throw new IOException("Session is not open"); } packetIndex = packetIndex + 1; if (packetIndex > 30000) { packetIndex = 1; } commandPacket.setPacketIndex(packetIndex); this.countDownLatch = new CountDownLatch(1); String byteString = commandPacket.toByteString(); log.info("ModuleId={}, CommandId={}, 数据包二进制字符内容={}", commandPacket.getModuleId(), commandPacket.getCmdId(), byteString); this.session.getBasicRemote().sendText(byteString); boolean isTimeout = !this.countDownLatch.await(timeout, unit); // 命令返回或者超时了
if (isTimeout) { log.error("Command exec timeout, moduleId={}, commandId={}, timeoutInMilSeconds={}", commandPacket.getModuleId(), commandPacket.getCmdId(), unit.toMillis(timeout)); throw new CommandExecTimeoutException(); } log.debug("收到数据包{}", this.lastDataPacket); // 在指定的时间内得到了响应
if (this.lastDataPacket.getPacketType() == PACKET_TYPE_ERROR_ACK) { log.error("moduleId={}执行command={}发送硬件错误", this.lastDataPacket.getModuleId(), this.lastDataPacket.getCmdId()); throw new HardwareErrorException(); } return ObjectUtil.cloneByStream(this.lastDataPacket); } catch (IOException e) { log.error("发送指令发生异常", e); throw e; } catch (InterruptedException e) { log.error("Thread: {}被中断", Thread.currentThread().getName()); throw e; } }
@OnOpen public void onOpen(Session session) { this.session = session; log.info("WebSocket connection established"); }
@OnMessage public void onMessage(String message) { byte[] bytes = ByteArray.hexStringToBytes(message); log.info("New packet arrived: {}", message); this.lastDataPacket = new DataPacket(bytes); if (this.lastDataPacket.getPacketType() == DataPacket.PACKET_TYPE_ACK || this.lastDataPacket.getPacketType() == DataPacket.PACKET_TYPE_ERROR_ACK) { this.countDownLatch.countDown(); } }
@OnClose public void onClose() { if (this.countDownLatch != null) { this.countDownLatch.countDown(); } }
@OnError public void onError(Session session, Throwable throwable) { if (this.countDownLatch != null) { this.countDownLatch.countDown(); } } }
|