diff --git a/src/main/java/a8k/appbean/AppEvent.java b/src/main/java/a8k/appbean/AppEvent.java deleted file mode 100644 index a624f89..0000000 --- a/src/main/java/a8k/appbean/AppEvent.java +++ /dev/null @@ -1,4 +0,0 @@ -package a8k.appbean; - -public class AppEvent { -} diff --git a/src/main/java/a8k/appbean/AppEventBus.java b/src/main/java/a8k/appbean/AppEventBus.java deleted file mode 100644 index 344ef90..0000000 --- a/src/main/java/a8k/appbean/AppEventBus.java +++ /dev/null @@ -1,4 +0,0 @@ -package a8k.appbean; - -public class AppEventBus { -} diff --git a/src/main/java/a8k/appbean/AppEventListener.java b/src/main/java/a8k/appbean/AppEventListener.java new file mode 100644 index 0000000..b28c6e7 --- /dev/null +++ b/src/main/java/a8k/appbean/AppEventListener.java @@ -0,0 +1,7 @@ +package a8k.appbean; + +import a8k.appbean.appevent.AppEvent; + +public interface AppEventListener { + public void onAppEvent(AppEvent event); +} diff --git a/src/main/java/a8k/appbean/AppException.java b/src/main/java/a8k/appbean/AppException.java deleted file mode 100644 index 18bf9ec..0000000 --- a/src/main/java/a8k/appbean/AppException.java +++ /dev/null @@ -1,4 +0,0 @@ -package a8k.appbean; - -public class AppException { -} diff --git a/src/main/java/a8k/appbean/appevent/A8kHardwareReport.java b/src/main/java/a8k/appbean/appevent/A8kHardwareReport.java new file mode 100644 index 0000000..c5fc697 --- /dev/null +++ b/src/main/java/a8k/appbean/appevent/A8kHardwareReport.java @@ -0,0 +1,21 @@ +package a8k.appbean.appevent; + +import a8k.a8k_can_protocol.A8kPacket; +import org.springframework.lang.NonNull; + +/** + * A8k底层硬件上报事件 + */ +public class A8kHardwareReport extends AppEvent{ + + A8kPacket reportPacket; + public A8kHardwareReport(@NonNull A8kPacket packet) {} + + A8kPacket getReportPacket(){ + return reportPacket; + } + + public String toString(){ + return String.format("|Event A8kHardwareReport :%s|", reportPacket.toString()); + } +} diff --git a/src/main/java/a8k/appbean/appevent/AppEvent.java b/src/main/java/a8k/appbean/appevent/AppEvent.java new file mode 100644 index 0000000..80d77f1 --- /dev/null +++ b/src/main/java/a8k/appbean/appevent/AppEvent.java @@ -0,0 +1,6 @@ +package a8k.appbean.appevent; + +public class AppEvent { + + +} diff --git a/src/main/java/a8k/base_hardware/A8kCanBusService.java b/src/main/java/a8k/base_hardware/A8kCanBusService.java index dddf881..a0bfee6 100644 --- a/src/main/java/a8k/base_hardware/A8kCanBusService.java +++ b/src/main/java/a8k/base_hardware/A8kCanBusService.java @@ -4,13 +4,17 @@ import a8k.a8k_can_protocol.A8kPacket; import a8k.a8k_can_protocol.CmdId; import a8k.a8k_can_protocol.Errorcode; import a8k.appbean.HardwareException; +import a8k.appbean.appevent.A8kHardwareReport; +import a8k.service.AppEventBusService; import a8k.utils.ByteArray; +import ch.qos.logback.core.model.processor.AppenderModelHandler; import jakarta.annotation.PostConstruct; import org.java_websocket.client.WebSocketClient; import org.java_websocket.enums.ReadyState; import org.java_websocket.handshake.ServerHandshake; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.net.URI; @@ -45,17 +49,25 @@ public class A8kCanBusService { * } * */ - public String uri; //assign by application.yml - private WebSocketClient client; - public BlockingQueue receiptQueue; - public BlockingQueue eventQueue; - boolean isWaitingReceipt = false; - int waitingReceiptIndex = 0; - int packetIndex; - boolean debugFlag = false; + @Autowired + private AppEventBusService eventBusService; - Timer autoConnectTimer = new Timer(); + public String uri; //assign by application.yml + private WebSocketClient client; + + //接收回执上下文 + public BlockingQueue receiptQueue = new LinkedBlockingQueue(); // + boolean isWaitingReceipt = false; // + int waitingReceiptIndex = 0;// + + //发送包的packetIndex + int packetIndex = 0;// + + //调试标志位 + boolean debugFlag = false;// + //websocket自动重连时间 + Timer autoConnectTimer = new Timer();// @PostConstruct @@ -68,9 +80,6 @@ public class A8kCanBusService { //this.uri = "ws://127.0.0.1:19005"; this.uri = "ws://192.168.8.10:19005"; } - receiptQueue = new LinkedBlockingQueue(); - eventQueue = new LinkedBlockingQueue<>(); - URI uri = new URI(this.uri); client = new WebSocketClient(uri) { @@ -121,15 +130,16 @@ public class A8kCanBusService { } //TODO - public void callblockcmd(Integer moduleId, Integer cmdId, Integer[] params,int acitionOvertime) { + public void callblockcmd(Integer moduleId, Integer cmdId, Integer[] params, int acitionOvertime) { //调用sendCmdAutoResend // 通过kmodule_get_status判断指令是否完成,超时未完成则报错,则调用module_stop,同时抛出异常 // return; } + //TODO - public A8kPacket sendCmdAutoResend(A8kPacket pack, int overtime) throws HardwareException { + public A8kPacket sendCmdAutoResend(A8kPacket pack, int overtime) throws HardwareException { //调用sendCmd, // 如果捕获到超时异常,则重发,最多重发三次 // 如果是其他异常,则直接抛出异常。 @@ -137,7 +147,7 @@ public class A8kCanBusService { } - private A8kPacket sendCmd(A8kPacket pack, int overtime) throws HardwareException { + private A8kPacket sendCmd(A8kPacket pack, int overtime) throws HardwareException { pack.setPacketIndex(packetIndex); waitingReceiptIndex = packetIndex; packetIndex = packetIndex + 1; @@ -204,7 +214,9 @@ public class A8kCanBusService { } if (packet.getPacketType() == A8kPacket.PACKET_TYPE_EVENT) { - eventQueue.add(packet); + if (eventBusService != null) { + eventBusService.pushEvent(new A8kHardwareReport(packet)); + } } } @@ -228,7 +240,7 @@ public class A8kCanBusService { } } }); - thread.run(); + thread.start(); } } diff --git a/src/main/java/a8k/service/AppEventBusService.java b/src/main/java/a8k/service/AppEventBusService.java new file mode 100644 index 0000000..bb25ec9 --- /dev/null +++ b/src/main/java/a8k/service/AppEventBusService.java @@ -0,0 +1,56 @@ +package a8k.service; + +import a8k.appbean.AppEventListener; +import a8k.appbean.appevent.AppEvent; +import jakarta.annotation.PostConstruct; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +@Component +public class AppEventBusService { + public static final Logger logger = LoggerFactory.getLogger(AppEventBusService.class); + + Thread eventProcessorThread; + BlockingQueue eventQueue; + List listeners; + + @PostConstruct + public void init() { + eventQueue = new LinkedBlockingQueue(); + eventProcessorThread = new Thread(new Runnable() { + public void run() { + eventBusSchedule(); + } + }); + } + + public void regListener(AppEventListener listener) { + listeners.add(listener); + } + + public void pushEvent(AppEvent event) { + logger.info("pushEvent: {}", event); + eventQueue.add(event); + } + + private void eventBusSchedule() { + while (!Thread.currentThread().isInterrupted()) { + try { + AppEvent event = eventQueue.take(); + } catch (InterruptedException ignored) { + } + } + } + + private void callOnEvent(AppEvent event) { + logger.info("Processing event: {}", event); + for (AppEventListener listener : listeners) { + listener.onAppEvent(event); + } + } +}