Browse Source

feat:完善命令总线的日志和服务段链接配置

tags/freeze
黄翔 3 months ago
parent
commit
746c821d0b
  1. 27
      src/main/java/com/iflytop/gd/infrastructure/drivers/WebSocketCommandBusImpl.java
  2. 3
      src/main/java/com/iflytop/gd/system/drivers/CommandBus.java

27
src/main/java/com/iflytop/gd/infrastructure/drivers/WebSocketCommandBusImpl.java

@ -9,16 +9,20 @@ import com.iflytop.gd.system.models.DataPacket;
import com.iflytop.gd.system.utils.ByteArray; import com.iflytop.gd.system.utils.ByteArray;
import jakarta.websocket.*; import jakarta.websocket.*;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import org.springframework.web.socket.client.WebSocketClient; import org.springframework.web.socket.client.WebSocketClient;
import java.io.IOException; import java.io.IOException;
import java.net.URI;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/**
* 使用WebSocket实现命令总线功能
*/
@Slf4j @Slf4j
@Component @Component
@ClientEndpoint @ClientEndpoint
@ -28,12 +32,18 @@ public class WebSocketCommandBusImpl implements CommandBus {
private CountDownLatch countDownLatch; private CountDownLatch countDownLatch;
private DataPacket lastDataPacket; private DataPacket lastDataPacket;
public static final int PACKET_TYPE_CMD = 0xA0; public static final int PACKET_TYPE_CMD = 0xA0;
public static final int PACKET_TYPE_ACK = 0xA1; public static final int PACKET_TYPE_ACK = 0xA1;
public static final int PACKET_TYPE_ERROR_ACK = 0xA2; public static final int PACKET_TYPE_ERROR_ACK = 0xA2;
public static final int PACKET_TYPE_EVENT = 0xA3; public static final int PACKET_TYPE_EVENT = 0xA3;
//TODO 配置硬件服务段ws链接
public WebSocketCommandBusImpl(@Value("${}") String websocketServerUrl) throws DeploymentException, IOException {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
URI endpointURI = URI.create(websocketServerUrl);
container.connectToServer(this, endpointURI);
}
@Override @Override
public synchronized DataPacket waitForCommandExec(DataPacket commandPacket, Integer timeout, TimeUnit unit) public synchronized DataPacket waitForCommandExec(DataPacket commandPacket, Integer timeout, TimeUnit unit)
throws CommandExecTimeoutException, HardwareErrorException, IOException, InterruptedException { throws CommandExecTimeoutException, HardwareErrorException, IOException, InterruptedException {
@ -43,9 +53,12 @@ public class WebSocketCommandBusImpl implements CommandBus {
boolean isTimeout = this.countDownLatch.await(timeout, unit); boolean isTimeout = this.countDownLatch.await(timeout, unit);
// 命令返回或者超时了 // 命令返回或者超时了
if (isTimeout) { if (isTimeout) {
// TODO处理超时
log.error("Command exec timeout, moduleId={}, commandId={}, timeoutInMilSeconds={}",
commandPacket.getModuleId(), commandPacket.getCmdId(), unit.toMillis(timeout));
throw new CommandExecTimeoutException(); throw new CommandExecTimeoutException();
} }
// 在指定的时间内得到了响应
if (this.lastDataPacket.getPacketType() == PACKET_TYPE_ERROR_ACK) { if (this.lastDataPacket.getPacketType() == PACKET_TYPE_ERROR_ACK) {
log.error("moduleId={}执行command={}发送硬件错误", this.lastDataPacket.getModuleId(), this.lastDataPacket.getCmdId()); log.error("moduleId={}执行command={}发送硬件错误", this.lastDataPacket.getModuleId(), this.lastDataPacket.getCmdId());
throw new HardwareErrorException(); throw new HardwareErrorException();
@ -55,8 +68,7 @@ public class WebSocketCommandBusImpl implements CommandBus {
log.error("发送指令发生异常", e); log.error("发送指令发生异常", e);
throw e; throw e;
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().getName();
log.error("");
log.error("Thread: {}被中断", Thread.currentThread().getName());
throw e; throw e;
} }
} }
@ -78,8 +90,11 @@ public class WebSocketCommandBusImpl implements CommandBus {
public void onMessage(String message) { public void onMessage(String message) {
byte[] bytes = ByteArray.hexStringToBytes(message); byte[] bytes = ByteArray.hexStringToBytes(message);
this.lastDataPacket = new DataPacket(bytes); this.lastDataPacket = new DataPacket(bytes);
if (this.lastDataPacket.getPacketType() == DataPacket.PACKET_TYPE_ACK ||
this.lastDataPacket.getPacketType() == DataPacket.PACKET_TYPE_ERROR_ACK) {
this.countDownLatch.countDown(); this.countDownLatch.countDown();
} }
}
@OnClose @OnClose

3
src/main/java/com/iflytop/gd/system/drivers/CommandBus.java

@ -18,5 +18,6 @@ public interface CommandBus {
* @return 响应数据包 * @return 响应数据包
* @throws Exception * @throws Exception
*/ */
DataPacket waitForCommandExec(DataPacket commandPacket, Integer timeout, TimeUnit unit) throws CommandExecTimeoutException, HardwareErrorException, IOException, InterruptedException;
DataPacket waitForCommandExec(DataPacket commandPacket, Integer timeout, TimeUnit unit)
throws CommandExecTimeoutException, HardwareErrorException, IOException, InterruptedException;
} }
Loading…
Cancel
Save