diff --git a/.mvn/wrapper/maven-wrapper.properties b/.mvn/wrapper/maven-wrapper.properties
index e70e7bc..dfaa181 100644
--- a/.mvn/wrapper/maven-wrapper.properties
+++ b/.mvn/wrapper/maven-wrapper.properties
@@ -1,2 +1,5 @@
-distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
-wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
+#distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
+#wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
+# ????????
+distributionUrl=https://mirrors.cloud.tencent.com/apache/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.zip
+wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.1/maven-wrapper-3.1.1.jar
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 6fb942c..bc6f326 100644
--- a/pom.xml
+++ b/pom.xml
@@ -115,6 +115,12 @@
spring-boot-starter-thymeleaf
+
+ io.netty
+ netty-all
+ 4.1.94.Final
+
+
@@ -126,4 +132,18 @@
+
+
+
+
+ tencent
+ https://mirrors.cloud.tencent.com/nexus/repository/maven-public/
+
+
+
+
+ tencent
+ https://mirrors.cloud.tencent.com/nexus/repository/maven-public/
+
+
diff --git a/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusConnection.java b/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusConnection.java
index 135bc4c..76db7a9 100644
--- a/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusConnection.java
+++ b/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusConnection.java
@@ -8,7 +8,7 @@ import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException;
import a8k.app.factory.A8kPacketFactory;
import a8k.app.service.background.AppEventBusService;
-import a8k.app.utils.ByteArray;
+import a8k.app.utils.ByteArrayUtils;
import a8k.app.utils.ZList;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -283,7 +283,7 @@ public class A8kCanBusConnection extends WebSocketClient {
* 处理接收到的消息
*/
private void processCanRxMessage(String s) {
- byte[] rx = ByteArray.hexStringToBytes(s);
+ byte[] rx = ByteArrayUtils.hexStringToBytes(s);
if (rx == null || rx.length == 0) {
log.warn("rx is empty");
return;
diff --git a/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusService.java b/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusService.java
index 246ef73..9ee95d4 100644
--- a/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusService.java
+++ b/src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusService.java
@@ -4,7 +4,7 @@ import a8k.app.channel.iflytophald.type.protocol.*;
import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException;
import a8k.OS;
-import a8k.app.utils.ByteArray;
+import a8k.app.utils.ByteArrayUtils;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
@@ -108,7 +108,7 @@ public class A8kCanBusService {
if (rxPacket.getCmdContent().length == 0) {
break;
}
- result = ByteArray.concat(result, rxPacket.getCmdContent());
+ result = ByteArrayUtils.concat(result, rxPacket.getCmdContent());
}
return result;
}
diff --git a/src/main/java/a8k/app/channel/iflytophald/driver/OptModuleDriver.java b/src/main/java/a8k/app/channel/iflytophald/driver/OptModuleDriver.java
index fb76814..ee8d96a 100644
--- a/src/main/java/a8k/app/channel/iflytophald/driver/OptModuleDriver.java
+++ b/src/main/java/a8k/app/channel/iflytophald/driver/OptModuleDriver.java
@@ -10,7 +10,7 @@ import a8k.app.channel.iflytophald.type.protocol.ModuleStatus;
import a8k.app.channel.iflytophald.type.protocol.OptModuleRegIndex;
import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException;
-import a8k.app.utils.ByteArray;
+import a8k.app.utils.ByteArrayUtils;
import a8k.app.utils.ZJsonNode;
import a8k.app.utils.ZList;
import com.fasterxml.jackson.databind.JsonNode;
@@ -96,7 +96,7 @@ public class OptModuleDriver {
if (rxPacket.getCmdContent().length == 0) {
break;
}
- var rawdata = ByteArray.readU16bitArray(rxPacket.getCmdContent());
+ var rawdata = ByteArrayUtils.readU16bitArray(rxPacket.getCmdContent());
result.addAll(Arrays.asList(rawdata));
}
return result.toArray(new Integer[0]);
diff --git a/src/main/java/a8k/app/channel/iflytophald/type/protocol/A8kPacket.java b/src/main/java/a8k/app/channel/iflytophald/type/protocol/A8kPacket.java
index 81a7d65..302d90e 100644
--- a/src/main/java/a8k/app/channel/iflytophald/type/protocol/A8kPacket.java
+++ b/src/main/java/a8k/app/channel/iflytophald/type/protocol/A8kPacket.java
@@ -1,6 +1,6 @@
package a8k.app.channel.iflytophald.type.protocol;
-import a8k.app.utils.ByteArray;
+import a8k.app.utils.ByteArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.Assert;
@@ -64,30 +64,30 @@ public class A8kPacket {
}
public void setPacketIndex(int packetIndex) {
- ByteArray.setU16bit(raw, INDEX_OFFSET, packetIndex);
+ ByteArrayUtils.setU16bit(raw, INDEX_OFFSET, packetIndex);
int checkcode = computeCheckcode();
- ByteArray.setU8(raw, raw.length - 1, checkcode);
+ ByteArrayUtils.setU8(raw, raw.length - 1, checkcode);
}
public int getPacketIndex() {
- return ByteArray.readU16bit(raw, INDEX_OFFSET);
+ return ByteArrayUtils.readU16bit(raw, INDEX_OFFSET);
}
public int getCmdId() {
- return ByteArray.readU16bit(raw, CMDID_OFFSET);
+ return ByteArrayUtils.readU16bit(raw, CMDID_OFFSET);
}
public int getPacketType() {
- return ByteArray.readU8bit(raw, PACKET_TYPE_OFFSET);
+ return ByteArrayUtils.readU8bit(raw, PACKET_TYPE_OFFSET);
}
public int getModuleId() {
- return ByteArray.readU8bit(raw, MODULE_ID_OFFSET);
+ return ByteArrayUtils.readU8bit(raw, MODULE_ID_OFFSET);
}
public int getDataLen() {
- return ByteArray.readU8bit(raw, DATA_LEN_OFFSET);
+ return ByteArrayUtils.readU8bit(raw, DATA_LEN_OFFSET);
}
public byte[] getCmdContent() {
@@ -102,18 +102,18 @@ public class A8kPacket {
public int[] getContentI16Array() {
int[] ret = new int[getDataLen() / 2];
for (int i = 0; i < ret.length; i++) {
- ret[i] = ByteArray.readS16bit(raw, DATA_OFFSET + i * 2);
+ ret[i] = ByteArrayUtils.readS16bit(raw, DATA_OFFSET + i * 2);
}
return ret;
}
public int getContentI32(int index) {
- return ByteArray.read32bit(raw, DATA_OFFSET + index * 4);
+ return ByteArrayUtils.read32bit(raw, DATA_OFFSET + index * 4);
}
public int getCheckcode() {
- return ByteArray.readU8bit(raw, raw.length - 1);
+ return ByteArrayUtils.readU8bit(raw, raw.length - 1);
}
public int computeCheckcode() {
@@ -125,7 +125,7 @@ public class A8kPacket {
}
public String toByteString() {
- return ByteArray.toByteString(raw);
+ return ByteArrayUtils.toByteString(raw);
}
public String toString() {
@@ -151,7 +151,7 @@ public class A8kPacket {
MId.valueOf(getModuleId()), getModuleId(), formatInt32ATTACH(getCmdContent()));
} else {
ret = String.format("[CMD ] index:[%d] (%s %s(%d) :param:[%s])", getPacketIndex(), cmdId,
- MId.valueOf(getModuleId()), getModuleId(), ByteArray.toByteString(getCmdContent()));
+ MId.valueOf(getModuleId()), getModuleId(), ByteArrayUtils.toByteString(getCmdContent()));
}
}
@@ -161,7 +161,7 @@ public class A8kPacket {
formatInt32ATTACH(getCmdContent()));
} else {
ret = String.format("[ACK ] index:[%d] (%s :[%s])", getPacketIndex(), cmdId,
- ByteArray.toByteString(getCmdContent()));
+ ByteArrayUtils.toByteString(getCmdContent()));
}
} else if (packetType == PACKET_TYPE_ERROR_ACK) {
ret = String.format("[EACK ] index:[%d] (%s :[%s])", getPacketIndex(), cmdId, (getContentI32(0)));
@@ -171,7 +171,7 @@ public class A8kPacket {
MId.valueOf(getModuleId()), getModuleId(), formatInt32ATTACH(getCmdContent()));
} else {
ret = String.format("[EVENT] index:[%d] (%s %s(%d) :[%s])", getPacketIndex(), cmdId,
- MId.valueOf(getModuleId()), getModuleId(), ByteArray.toByteString(getCmdContent()));
+ MId.valueOf(getModuleId()), getModuleId(), ByteArrayUtils.toByteString(getCmdContent()));
}
} else {
ret = String.format("Unknown packet type: %d", packetType);
@@ -186,7 +186,7 @@ public class A8kPacket {
break;
if (i != 0)
ret.append(",");
- ret.append(String.format("%d", ByteArray.read32bit(attach, i)));
+ ret.append(String.format("%d", ByteArrayUtils.read32bit(attach, i)));
}
return ret.toString();
}
diff --git a/src/main/java/a8k/app/channel/net/BoditechLisDoubleTrackTcpClient.java b/src/main/java/a8k/app/channel/net/BoditechLisDoubleTrackTcpClient.java
new file mode 100644
index 0000000..e272973
--- /dev/null
+++ b/src/main/java/a8k/app/channel/net/BoditechLisDoubleTrackTcpClient.java
@@ -0,0 +1,139 @@
+package a8k.app.channel.net;
+
+import a8k.app.utils.ByteArrayUtils;
+import a8k.app.utils.NettyUtils;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@ChannelHandler.Sharable
+public class BoditechLisDoubleTrackTcpClient extends ChannelInboundHandlerAdapter {
+
+ private final EventLoopGroup group = NettyUtils.buildEventLoopGroup("BoditechLisSingleTrackTcpClient", 1);
+ private final Bootstrap bootstrap = new Bootstrap();
+ private final Integer reconnectDelay = 5; // 重连延迟时间,单位秒
+ private String serverIp;
+ private Integer port;
+ private volatile Channel channel;
+ private volatile boolean isShuttingDown = false;
+
+
+ public void start(String serverIp, Integer port) {
+ this.serverIp = serverIp;
+ this.port = port;
+
+ bootstrap.group(group)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
+ .handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline()
+ .addLast(BoditechLisDoubleTrackTcpClient.this);
+ }
+ });
+ doConnect();
+ }
+ public synchronized void shutdown() {
+ isShuttingDown = true;
+ if (channel != null) {
+ channel.close();
+ }
+ group.shutdownGracefully();
+ }
+
+ public synchronized void tx(byte[] data) {
+ if (channel != null && channel.isActive()) {
+ ByteBuf buf = Unpooled.wrappedBuffer(data);
+
+ channel.writeAndFlush(buf).addListener(future -> {
+ if (!future.isSuccess()) {
+ log.error("Failed to send data tail,{}", future.cause().getMessage());
+ }
+ });
+ } else {
+ log.warn("Channel is not active, cannot send data: {}", data);
+ // 可以考虑在这里实现消息队列,等连接恢复后重发
+ }
+ }
+
+ // 处理接收到的消息
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ // 直接接收原始ByteBuf
+ if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ try {
+ byte[] bytes = new byte[buf.readableBytes()];
+ buf.readBytes(bytes);
+ log.info("Received raw data ({} bytes): {}", bytes.length, ByteArrayUtils.toByteString(bytes));
+ //
+ // PROCESS THE RECEIVED DATA HERE
+ //
+ } finally {
+ buf.release(); // 重要:释放ByteBuf
+ }
+ } else {
+ log.warn("Unexpected message type: {}", msg.getClass());
+ }
+ }
+ //
+ // PRIVATE
+ //
+
+ private void doConnect() {
+ if (isShuttingDown) {
+ return;
+ }
+ log.info("try connect to {}:{}", serverIp, port);
+ bootstrap.connect(serverIp, port).addListener((ChannelFuture future) -> {
+ if (future.isSuccess()) {
+ log.info("successfully connected to {}:{}", serverIp, port);
+ channel = future.channel();
+ } else {
+ log.error("failed,{}", future.cause().getMessage());
+ if (!isShuttingDown)
+ group.schedule(this::doConnect, 5, TimeUnit.SECONDS);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ log.warn("Connection lost, trying to reconnect...");
+ if (channel != null && channel.equals(ctx.channel())) {
+ channel = null;
+ }
+ if (!isShuttingDown)
+ group.schedule(this::doConnect, reconnectDelay, TimeUnit.SECONDS);
+ }
+
+ public static void main(String[] args) {
+ BoditechLisDoubleTrackTcpClient client = new BoditechLisDoubleTrackTcpClient();
+ client.start("0.0.0.0", 777);
+
+ int count = 0;
+ while (true) {
+ try {
+ byte[] data = {0x01, 0x02, 0x03, 0x04, 0x05};
+ client.tx(data);
+ Thread.sleep(1000); // 主线程保持运行状态
+ } catch (InterruptedException e) {
+ log.error("Main thread interrupted: {}", e.getMessage());
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/main/java/a8k/app/channel/net/BoditechLisSingleTrackTcpClient.java b/src/main/java/a8k/app/channel/net/BoditechLisSingleTrackTcpClient.java
new file mode 100644
index 0000000..8693e56
--- /dev/null
+++ b/src/main/java/a8k/app/channel/net/BoditechLisSingleTrackTcpClient.java
@@ -0,0 +1,122 @@
+package a8k.app.channel.net;
+
+import a8k.app.utils.NettyUtils;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.*;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.LineBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@ChannelHandler.Sharable
+public class BoditechLisSingleTrackTcpClient extends SimpleChannelInboundHandler {
+
+ private final EventLoopGroup group = NettyUtils.buildEventLoopGroup("BoditechLisSingleTrackTcpClient", 1);
+ private final Bootstrap bootstrap = new Bootstrap();
+ private final Integer reconnectDelay = 5; // 重连延迟时间,单位秒
+ private String serverIp;
+ private Integer port;
+ private volatile Channel channel;
+ private volatile boolean isShuttingDown = false;
+
+
+ public void start(String serverIp, Integer port) {
+ this.serverIp = serverIp;
+ this.port = port;
+
+ bootstrap.group(group)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
+ .handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) {
+ ch.pipeline()
+ .addLast(new LineBasedFrameDecoder(1024))//以/r/n为结束符的解码器
+ .addLast(new StringEncoder())
+ .addLast(new StringDecoder())
+ .addLast(BoditechLisSingleTrackTcpClient.this);
+ }
+ });
+
+ doConnect();
+ }
+
+ public synchronized void shutdown() {
+ isShuttingDown = true;
+ if (channel != null) {
+ channel.close();
+ }
+ group.shutdownGracefully();
+ }
+
+ public synchronized void tx(String data) {
+ if (channel != null && channel.isActive()) {
+ channel.writeAndFlush(data).addListener(future -> {
+ if (!future.isSuccess()) {
+ log.error("Failed to send data tail");
+ }
+ });
+ } else {
+ log.warn("Channel is not active, cannot send data: {}", data);
+ // 可以考虑在这里实现消息队列,等连接恢复后重发
+ }
+ }
+
+ private void doConnect() {
+ if (isShuttingDown) {
+ return;
+ }
+ log.info("try connect to {}:{}", serverIp, port);
+ bootstrap.connect(serverIp, port).addListener((ChannelFuture future) -> {
+ if (future.isSuccess()) {
+ log.info("successfully connected to {}:{}", serverIp, port);
+ channel = future.channel();
+ } else {
+ log.error("failed,{}", future.cause().getMessage());
+ if (!isShuttingDown)
+ group.schedule(this::doConnect, 5, TimeUnit.SECONDS);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ log.warn("Connection lost, trying to reconnect...");
+ if (channel != null && channel.equals(ctx.channel())) {
+ channel = null;
+ }
+ // 重连逻辑
+ if (!isShuttingDown)
+ group.schedule(this::doConnect, reconnectDelay, TimeUnit.SECONDS);
+ }
+
+
+ // 处理接收到的消息
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
+ log.info("Received message from server: {}({})", msg,msg.length());
+ }
+
+ public static void main(String[] args) {
+ BoditechLisSingleTrackTcpClient client = new BoditechLisSingleTrackTcpClient();
+ client.start("0.0.0.0", 777);
+
+ int count = 0;
+ while (true) {
+ try {
+ client.tx("Test message " + (++count));
+ Thread.sleep(1000); // 主线程保持运行状态
+ } catch (InterruptedException e) {
+ log.error("Main thread interrupted: {}", e.getMessage());
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/a8k/app/utils/ByteArray.java b/src/main/java/a8k/app/utils/ByteArrayUtils.java
similarity index 98%
rename from src/main/java/a8k/app/utils/ByteArray.java
rename to src/main/java/a8k/app/utils/ByteArrayUtils.java
index 9ed891f..d0a6f15 100644
--- a/src/main/java/a8k/app/utils/ByteArray.java
+++ b/src/main/java/a8k/app/utils/ByteArrayUtils.java
@@ -3,8 +3,8 @@ package a8k.app.utils;
import org.springframework.lang.NonNull;
-public class ByteArray {
- public ByteArray() {
+public class ByteArrayUtils {
+ public ByteArrayUtils() {
}
public static int readU8bit(byte[] code, int index) {
diff --git a/src/main/java/a8k/app/utils/NettyUtils.java b/src/main/java/a8k/app/utils/NettyUtils.java
new file mode 100644
index 0000000..5872413
--- /dev/null
+++ b/src/main/java/a8k/app/utils/NettyUtils.java
@@ -0,0 +1,17 @@
+package a8k.app.utils;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.util.concurrent.DefaultThreadFactory;
+import io.netty.util.concurrent.ThreadPerTaskExecutor;
+
+import java.util.concurrent.Executor;
+
+public class NettyUtils {
+ public static EventLoopGroup buildEventLoopGroup(String name, int nThread) {
+ DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory(name, Thread.MAX_PRIORITY);
+ Executor executor = new ThreadPerTaskExecutor(defaultThreadFactory);
+ return new NioEventLoopGroup(nThread, executor);
+ }
+
+}