Browse Source

TCP断线重连

master
白凤吉 6 months ago
parent
commit
ecbac81d13
  1. 32
      src/main/java/com/qyft/gd/device/client/TcpClient.java
  2. 7
      src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java
  3. 4
      src/main/java/com/qyft/gd/device/service/DeviceService.java

32
src/main/java/com/qyft/gd/device/client/TcpClient.java

@ -56,6 +56,7 @@ public class TcpClient {
@Override @Override
protected void initChannel(Channel ch) { protected void initChannel(Channel ch) {
ch.pipeline().addLast(deviceMessageHandler); ch.pipeline().addLast(deviceMessageHandler);
ch.pipeline().addLast(new TcpConnectionHandler());
} }
}); });
} }
@ -86,12 +87,14 @@ public class TcpClient {
} }
} }
public void send(String request) {
public boolean send(String request) {
if (channel != null && channel.isActive()) { if (channel != null && channel.isActive()) {
ByteBuf byteBuf = Unpooled.copiedBuffer(request, CharsetUtil.UTF_8); ByteBuf byteBuf = Unpooled.copiedBuffer(request, CharsetUtil.UTF_8);
channel.writeAndFlush(byteBuf); channel.writeAndFlush(byteBuf);
return true;
} else { } else {
log.error("TCP服务未连接,无法发送请求: {}", request); log.error("TCP服务未连接,无法发送请求: {}", request);
return false;
} }
} }
@ -103,8 +106,11 @@ public class TcpClient {
CompletableFuture<DeviceFeedback> future = new CompletableFuture<>(); CompletableFuture<DeviceFeedback> future = new CompletableFuture<>();
responseMap.put(request.getId(), future); responseMap.put(request.getId(), future);
try { try {
this.send(JSONUtil.toJsonStr(request));
return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应
if(this.send(JSONUtil.toJsonStr(request))){
return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应
}else{
return null;
}
} catch (Exception e) { } catch (Exception e) {
log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e); log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e);
} finally { } finally {
@ -122,4 +128,24 @@ public class TcpClient {
log.error("未找到 requestId: {} 对应的等待请求", requestId); log.error("未找到 requestId: {} 对应的等待请求", requestId);
} }
} }
private class TcpConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时的处理逻辑
log.error("TCP连接丢失,准备重新连接...");
if (channel != null) {
channel.close();
}
connect();
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("TCP连接发生异常: {}", cause.getMessage(), cause);
ctx.close();
}
}
} }

7
src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java

@ -8,7 +8,6 @@ import com.qyft.gd.device.model.bo.DeviceFeedback;
import com.qyft.gd.device.model.bo.DeviceStatus; import com.qyft.gd.device.model.bo.DeviceStatus;
import com.qyft.gd.device.service.DeviceStateService; import com.qyft.gd.device.service.DeviceStateService;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
@ -26,19 +25,13 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
private final DeviceStateService deviceStateService; private final DeviceStateService deviceStateService;
private final TcpClient tcpClient; private final TcpClient tcpClient;
@PostConstruct @PostConstruct
public void init() { public void init() {
tcpClient.setDeviceMessageHandler(this); tcpClient.setDeviceMessageHandler(this);
} }
@Override @Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("client ctx =" + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
String serverMsg = buf.toString(CharsetUtil.UTF_8); String serverMsg = buf.toString(CharsetUtil.UTF_8);

4
src/main/java/com/qyft/gd/device/service/DeviceService.java

@ -27,8 +27,6 @@ public class DeviceService {
return true; return true;
} }
// 移动导轨机械臂的关节
/** /**
* 移动导轨机械臂的关节 * 移动导轨机械臂的关节
* *
@ -72,7 +70,7 @@ public class DeviceService {
list.add("3"); list.add("3");
request.setParams(list); request.setParams(list);
DeviceFeedback deviceFeedback = tcpClient.sendCommand(request); DeviceFeedback deviceFeedback = tcpClient.sendCommand(request);
if (deviceFeedback.getError() != null) {
if (deviceFeedback == null || deviceFeedback.getError() != null) {
log.info("TCP openDoor指令执行错误 {}", JSONUtil.toJsonStr(deviceFeedback)); log.info("TCP openDoor指令执行错误 {}", JSONUtil.toJsonStr(deviceFeedback));
return false; return false;
} }

Loading…
Cancel
Save