Browse Source

添加巴迪泰NettyClient支持

master
zhaohe 1 month ago
parent
commit
ffc04979de
  1. 7
      .mvn/wrapper/maven-wrapper.properties
  2. 20
      pom.xml
  3. 4
      src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusConnection.java
  4. 4
      src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusService.java
  5. 4
      src/main/java/a8k/app/channel/iflytophald/driver/OptModuleDriver.java
  6. 32
      src/main/java/a8k/app/channel/iflytophald/type/protocol/A8kPacket.java
  7. 139
      src/main/java/a8k/app/channel/net/BoditechLisDoubleTrackTcpClient.java
  8. 122
      src/main/java/a8k/app/channel/net/BoditechLisSingleTrackTcpClient.java
  9. 4
      src/main/java/a8k/app/utils/ByteArrayUtils.java
  10. 17
      src/main/java/a8k/app/utils/NettyUtils.java

7
.mvn/wrapper/maven-wrapper.properties

@ -1,2 +1,5 @@
distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
#distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.6/apache-maven-3.9.6-bin.zip
#wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar
# ????????
distributionUrl=https://mirrors.cloud.tencent.com/apache/maven/maven-3/3.8.6/binaries/apache-maven-3.8.6-bin.zip
wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.1.1/maven-wrapper-3.1.1.jar

20
pom.xml

@ -115,6 +115,12 @@
<artifactId>spring-boot-starter-thymeleaf</artifactId> <artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency> </dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.94.Final</version>
</dependency>
</dependencies> </dependencies>
<build> <build>
@ -126,4 +132,18 @@
</plugins> </plugins>
</build> </build>
<!-- 配置源,以阿里云源为例 -->
<repositories>
<repository>
<id>tencent</id>
<url>https://mirrors.cloud.tencent.com/nexus/repository/maven-public/</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>tencent</id>
<url>https://mirrors.cloud.tencent.com/nexus/repository/maven-public/</url>
</pluginRepository>
</pluginRepositories>
</project> </project>

4
src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusConnection.java

@ -8,7 +8,7 @@ import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException; import a8k.app.type.exception.AppException;
import a8k.app.factory.A8kPacketFactory; import a8k.app.factory.A8kPacketFactory;
import a8k.app.service.background.AppEventBusService; import a8k.app.service.background.AppEventBusService;
import a8k.app.utils.ByteArray;
import a8k.app.utils.ByteArrayUtils;
import a8k.app.utils.ZList; import a8k.app.utils.ZList;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -283,7 +283,7 @@ public class A8kCanBusConnection extends WebSocketClient {
* 处理接收到的消息 * 处理接收到的消息
*/ */
private void processCanRxMessage(String s) { private void processCanRxMessage(String s) {
byte[] rx = ByteArray.hexStringToBytes(s);
byte[] rx = ByteArrayUtils.hexStringToBytes(s);
if (rx == null || rx.length == 0) { if (rx == null || rx.length == 0) {
log.warn("rx is empty"); log.warn("rx is empty");
return; return;

4
src/main/java/a8k/app/channel/iflytophald/channel/A8kCanBusService.java

@ -4,7 +4,7 @@ import a8k.app.channel.iflytophald.type.protocol.*;
import a8k.app.type.error.AEHardwareError; import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException; import a8k.app.type.exception.AppException;
import a8k.OS; import a8k.OS;
import a8k.app.utils.ByteArray;
import a8k.app.utils.ByteArrayUtils;
import jakarta.annotation.PostConstruct; import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
@ -108,7 +108,7 @@ public class A8kCanBusService {
if (rxPacket.getCmdContent().length == 0) { if (rxPacket.getCmdContent().length == 0) {
break; break;
} }
result = ByteArray.concat(result, rxPacket.getCmdContent());
result = ByteArrayUtils.concat(result, rxPacket.getCmdContent());
} }
return result; return result;
} }

4
src/main/java/a8k/app/channel/iflytophald/driver/OptModuleDriver.java

@ -10,7 +10,7 @@ import a8k.app.channel.iflytophald.type.protocol.ModuleStatus;
import a8k.app.channel.iflytophald.type.protocol.OptModuleRegIndex; import a8k.app.channel.iflytophald.type.protocol.OptModuleRegIndex;
import a8k.app.type.error.AEHardwareError; import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException; import a8k.app.type.exception.AppException;
import a8k.app.utils.ByteArray;
import a8k.app.utils.ByteArrayUtils;
import a8k.app.utils.ZJsonNode; import a8k.app.utils.ZJsonNode;
import a8k.app.utils.ZList; import a8k.app.utils.ZList;
import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.JsonNode;
@ -96,7 +96,7 @@ public class OptModuleDriver {
if (rxPacket.getCmdContent().length == 0) { if (rxPacket.getCmdContent().length == 0) {
break; break;
} }
var rawdata = ByteArray.readU16bitArray(rxPacket.getCmdContent());
var rawdata = ByteArrayUtils.readU16bitArray(rxPacket.getCmdContent());
result.addAll(Arrays.asList(rawdata)); result.addAll(Arrays.asList(rawdata));
} }
return result.toArray(new Integer[0]); return result.toArray(new Integer[0]);

32
src/main/java/a8k/app/channel/iflytophald/type/protocol/A8kPacket.java

@ -1,6 +1,6 @@
package a8k.app.channel.iflytophald.type.protocol; package a8k.app.channel.iflytophald.type.protocol;
import a8k.app.utils.ByteArray;
import a8k.app.utils.ByteArrayUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.util.Assert; import org.springframework.util.Assert;
@ -64,30 +64,30 @@ public class A8kPacket {
} }
public void setPacketIndex(int packetIndex) { public void setPacketIndex(int packetIndex) {
ByteArray.setU16bit(raw, INDEX_OFFSET, packetIndex);
ByteArrayUtils.setU16bit(raw, INDEX_OFFSET, packetIndex);
int checkcode = computeCheckcode(); int checkcode = computeCheckcode();
ByteArray.setU8(raw, raw.length - 1, checkcode);
ByteArrayUtils.setU8(raw, raw.length - 1, checkcode);
} }
public int getPacketIndex() { public int getPacketIndex() {
return ByteArray.readU16bit(raw, INDEX_OFFSET);
return ByteArrayUtils.readU16bit(raw, INDEX_OFFSET);
} }
public int getCmdId() { public int getCmdId() {
return ByteArray.readU16bit(raw, CMDID_OFFSET);
return ByteArrayUtils.readU16bit(raw, CMDID_OFFSET);
} }
public int getPacketType() { public int getPacketType() {
return ByteArray.readU8bit(raw, PACKET_TYPE_OFFSET);
return ByteArrayUtils.readU8bit(raw, PACKET_TYPE_OFFSET);
} }
public int getModuleId() { public int getModuleId() {
return ByteArray.readU8bit(raw, MODULE_ID_OFFSET);
return ByteArrayUtils.readU8bit(raw, MODULE_ID_OFFSET);
} }
public int getDataLen() { public int getDataLen() {
return ByteArray.readU8bit(raw, DATA_LEN_OFFSET);
return ByteArrayUtils.readU8bit(raw, DATA_LEN_OFFSET);
} }
public byte[] getCmdContent() { public byte[] getCmdContent() {
@ -102,18 +102,18 @@ public class A8kPacket {
public int[] getContentI16Array() { public int[] getContentI16Array() {
int[] ret = new int[getDataLen() / 2]; int[] ret = new int[getDataLen() / 2];
for (int i = 0; i < ret.length; i++) { for (int i = 0; i < ret.length; i++) {
ret[i] = ByteArray.readS16bit(raw, DATA_OFFSET + i * 2);
ret[i] = ByteArrayUtils.readS16bit(raw, DATA_OFFSET + i * 2);
} }
return ret; return ret;
} }
public int getContentI32(int index) { public int getContentI32(int index) {
return ByteArray.read32bit(raw, DATA_OFFSET + index * 4);
return ByteArrayUtils.read32bit(raw, DATA_OFFSET + index * 4);
} }
public int getCheckcode() { public int getCheckcode() {
return ByteArray.readU8bit(raw, raw.length - 1);
return ByteArrayUtils.readU8bit(raw, raw.length - 1);
} }
public int computeCheckcode() { public int computeCheckcode() {
@ -125,7 +125,7 @@ public class A8kPacket {
} }
public String toByteString() { public String toByteString() {
return ByteArray.toByteString(raw);
return ByteArrayUtils.toByteString(raw);
} }
public String toString() { public String toString() {
@ -151,7 +151,7 @@ public class A8kPacket {
MId.valueOf(getModuleId()), getModuleId(), formatInt32ATTACH(getCmdContent())); MId.valueOf(getModuleId()), getModuleId(), formatInt32ATTACH(getCmdContent()));
} else { } else {
ret = String.format("[CMD ] index:[%d] (%s %s(%d) :param:[%s])", getPacketIndex(), cmdId, ret = String.format("[CMD ] index:[%d] (%s %s(%d) :param:[%s])", getPacketIndex(), cmdId,
MId.valueOf(getModuleId()), getModuleId(), ByteArray.toByteString(getCmdContent()));
MId.valueOf(getModuleId()), getModuleId(), ByteArrayUtils.toByteString(getCmdContent()));
} }
} }
@ -161,7 +161,7 @@ public class A8kPacket {
formatInt32ATTACH(getCmdContent())); formatInt32ATTACH(getCmdContent()));
} else { } else {
ret = String.format("[ACK ] index:[%d] (%s :[%s])", getPacketIndex(), cmdId, ret = String.format("[ACK ] index:[%d] (%s :[%s])", getPacketIndex(), cmdId,
ByteArray.toByteString(getCmdContent()));
ByteArrayUtils.toByteString(getCmdContent()));
} }
} else if (packetType == PACKET_TYPE_ERROR_ACK) { } else if (packetType == PACKET_TYPE_ERROR_ACK) {
ret = String.format("[EACK ] index:[%d] (%s :[%s])", getPacketIndex(), cmdId, (getContentI32(0))); ret = String.format("[EACK ] index:[%d] (%s :[%s])", getPacketIndex(), cmdId, (getContentI32(0)));
@ -171,7 +171,7 @@ public class A8kPacket {
MId.valueOf(getModuleId()), getModuleId(), formatInt32ATTACH(getCmdContent())); MId.valueOf(getModuleId()), getModuleId(), formatInt32ATTACH(getCmdContent()));
} else { } else {
ret = String.format("[EVENT] index:[%d] (%s %s(%d) :[%s])", getPacketIndex(), cmdId, ret = String.format("[EVENT] index:[%d] (%s %s(%d) :[%s])", getPacketIndex(), cmdId,
MId.valueOf(getModuleId()), getModuleId(), ByteArray.toByteString(getCmdContent()));
MId.valueOf(getModuleId()), getModuleId(), ByteArrayUtils.toByteString(getCmdContent()));
} }
} else { } else {
ret = String.format("Unknown packet type: %d", packetType); ret = String.format("Unknown packet type: %d", packetType);
@ -186,7 +186,7 @@ public class A8kPacket {
break; break;
if (i != 0) if (i != 0)
ret.append(","); ret.append(",");
ret.append(String.format("%d", ByteArray.read32bit(attach, i)));
ret.append(String.format("%d", ByteArrayUtils.read32bit(attach, i)));
} }
return ret.toString(); return ret.toString();
} }

139
src/main/java/a8k/app/channel/net/BoditechLisDoubleTrackTcpClient.java

@ -0,0 +1,139 @@
package a8k.app.channel.net;
import a8k.app.utils.ByteArrayUtils;
import a8k.app.utils.NettyUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
@ChannelHandler.Sharable
public class BoditechLisDoubleTrackTcpClient extends ChannelInboundHandlerAdapter {
private final EventLoopGroup group = NettyUtils.buildEventLoopGroup("BoditechLisSingleTrackTcpClient", 1);
private final Bootstrap bootstrap = new Bootstrap();
private final Integer reconnectDelay = 5; // 重连延迟时间单位秒
private String serverIp;
private Integer port;
private volatile Channel channel;
private volatile boolean isShuttingDown = false;
public void start(String serverIp, Integer port) {
this.serverIp = serverIp;
this.port = port;
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(BoditechLisDoubleTrackTcpClient.this);
}
});
doConnect();
}
public synchronized void shutdown() {
isShuttingDown = true;
if (channel != null) {
channel.close();
}
group.shutdownGracefully();
}
public synchronized void tx(byte[] data) {
if (channel != null && channel.isActive()) {
ByteBuf buf = Unpooled.wrappedBuffer(data);
channel.writeAndFlush(buf).addListener(future -> {
if (!future.isSuccess()) {
log.error("Failed to send data tail,{}", future.cause().getMessage());
}
});
} else {
log.warn("Channel is not active, cannot send data: {}", data);
// 可以考虑在这里实现消息队列等连接恢复后重发
}
}
// 处理接收到的消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 直接接收原始ByteBuf
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
try {
byte[] bytes = new byte[buf.readableBytes()];
buf.readBytes(bytes);
log.info("Received raw data ({} bytes): {}", bytes.length, ByteArrayUtils.toByteString(bytes));
//
// PROCESS THE RECEIVED DATA HERE
//
} finally {
buf.release(); // 重要释放ByteBuf
}
} else {
log.warn("Unexpected message type: {}", msg.getClass());
}
}
//
// PRIVATE
//
private void doConnect() {
if (isShuttingDown) {
return;
}
log.info("try connect to {}:{}", serverIp, port);
bootstrap.connect(serverIp, port).addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
log.info("successfully connected to {}:{}", serverIp, port);
channel = future.channel();
} else {
log.error("failed,{}", future.cause().getMessage());
if (!isShuttingDown)
group.schedule(this::doConnect, 5, TimeUnit.SECONDS);
}
});
}
@Override
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn("Connection lost, trying to reconnect...");
if (channel != null && channel.equals(ctx.channel())) {
channel = null;
}
if (!isShuttingDown)
group.schedule(this::doConnect, reconnectDelay, TimeUnit.SECONDS);
}
public static void main(String[] args) {
BoditechLisDoubleTrackTcpClient client = new BoditechLisDoubleTrackTcpClient();
client.start("0.0.0.0", 777);
int count = 0;
while (true) {
try {
byte[] data = {0x01, 0x02, 0x03, 0x04, 0x05};
client.tx(data);
Thread.sleep(1000); // 主线程保持运行状态
} catch (InterruptedException e) {
log.error("Main thread interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
break;
}
}
}
}

122
src/main/java/a8k/app/channel/net/BoditechLisSingleTrackTcpClient.java

@ -0,0 +1,122 @@
package a8k.app.channel.net;
import a8k.app.utils.NettyUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
@Slf4j
@ChannelHandler.Sharable
public class BoditechLisSingleTrackTcpClient extends SimpleChannelInboundHandler<String> {
private final EventLoopGroup group = NettyUtils.buildEventLoopGroup("BoditechLisSingleTrackTcpClient", 1);
private final Bootstrap bootstrap = new Bootstrap();
private final Integer reconnectDelay = 5; // 重连延迟时间单位秒
private String serverIp;
private Integer port;
private volatile Channel channel;
private volatile boolean isShuttingDown = false;
public void start(String serverIp, Integer port) {
this.serverIp = serverIp;
this.port = port;
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new LineBasedFrameDecoder(1024))///r/n为结束符的解码器
.addLast(new StringEncoder())
.addLast(new StringDecoder())
.addLast(BoditechLisSingleTrackTcpClient.this);
}
});
doConnect();
}
public synchronized void shutdown() {
isShuttingDown = true;
if (channel != null) {
channel.close();
}
group.shutdownGracefully();
}
public synchronized void tx(String data) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(data).addListener(future -> {
if (!future.isSuccess()) {
log.error("Failed to send data tail");
}
});
} else {
log.warn("Channel is not active, cannot send data: {}", data);
// 可以考虑在这里实现消息队列等连接恢复后重发
}
}
private void doConnect() {
if (isShuttingDown) {
return;
}
log.info("try connect to {}:{}", serverIp, port);
bootstrap.connect(serverIp, port).addListener((ChannelFuture future) -> {
if (future.isSuccess()) {
log.info("successfully connected to {}:{}", serverIp, port);
channel = future.channel();
} else {
log.error("failed,{}", future.cause().getMessage());
if (!isShuttingDown)
group.schedule(this::doConnect, 5, TimeUnit.SECONDS);
}
});
}
@Override
public synchronized void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn("Connection lost, trying to reconnect...");
if (channel != null && channel.equals(ctx.channel())) {
channel = null;
}
// 重连逻辑
if (!isShuttingDown)
group.schedule(this::doConnect, reconnectDelay, TimeUnit.SECONDS);
}
// 处理接收到的消息
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
log.info("Received message from server: {}({})", msg,msg.length());
}
public static void main(String[] args) {
BoditechLisSingleTrackTcpClient client = new BoditechLisSingleTrackTcpClient();
client.start("0.0.0.0", 777);
int count = 0;
while (true) {
try {
client.tx("Test message " + (++count));
Thread.sleep(1000); // 主线程保持运行状态
} catch (InterruptedException e) {
log.error("Main thread interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
break;
}
}
}
}

4
src/main/java/a8k/app/utils/ByteArray.java → src/main/java/a8k/app/utils/ByteArrayUtils.java

@ -3,8 +3,8 @@ package a8k.app.utils;
import org.springframework.lang.NonNull; import org.springframework.lang.NonNull;
public class ByteArray {
public ByteArray() {
public class ByteArrayUtils {
public ByteArrayUtils() {
} }
public static int readU8bit(byte[] code, int index) { public static int readU8bit(byte[] code, int index) {

17
src/main/java/a8k/app/utils/NettyUtils.java

@ -0,0 +1,17 @@
package a8k.app.utils;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import java.util.concurrent.Executor;
public class NettyUtils {
public static EventLoopGroup buildEventLoopGroup(String name, int nThread) {
DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory(name, Thread.MAX_PRIORITY);
Executor executor = new ThreadPerTaskExecutor(defaultThreadFactory);
return new NioEventLoopGroup(nThread, executor);
}
}
Loading…
Cancel
Save