diff --git a/src/main/java/com/qyft/gd/device/client/TcpClient.java b/src/main/java/com/qyft/gd/device/client/TcpClient.java index e343628..2a48a01 100644 --- a/src/main/java/com/qyft/gd/device/client/TcpClient.java +++ b/src/main/java/com/qyft/gd/device/client/TcpClient.java @@ -56,6 +56,7 @@ public class TcpClient { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(deviceMessageHandler); + ch.pipeline().addLast(new TcpConnectionHandler()); } }); } @@ -86,12 +87,14 @@ public class TcpClient { } } - public void send(String request) { + public boolean send(String request) { if (channel != null && channel.isActive()) { ByteBuf byteBuf = Unpooled.copiedBuffer(request, CharsetUtil.UTF_8); channel.writeAndFlush(byteBuf); + return true; } else { log.error("TCP服务未连接,无法发送请求: {}", request); + return false; } } @@ -103,8 +106,11 @@ public class TcpClient { CompletableFuture future = new CompletableFuture<>(); responseMap.put(request.getId(), future); try { - this.send(JSONUtil.toJsonStr(request)); - return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应 + if(this.send(JSONUtil.toJsonStr(request))){ + return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应 + }else{ + return null; + } } catch (Exception e) { log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e); } finally { @@ -122,4 +128,24 @@ public class TcpClient { log.error("未找到 requestId: {} 对应的等待请求", requestId); } } + + private class TcpConnectionHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // 连接断开时的处理逻辑 + log.error("TCP连接丢失,准备重新连接..."); + if (channel != null) { + channel.close(); + } + connect(); + super.channelInactive(ctx); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + log.error("TCP连接发生异常: {}", cause.getMessage(), cause); + ctx.close(); + } + } } diff --git a/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java b/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java index ea89ea5..3944dca 100644 --- a/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java +++ b/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java @@ -8,7 +8,6 @@ import com.qyft.gd.device.model.bo.DeviceFeedback; import com.qyft.gd.device.model.bo.DeviceStatus; import com.qyft.gd.device.service.DeviceStateService; import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -26,19 +25,13 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { private final DeviceStateService deviceStateService; private final TcpClient tcpClient; - @PostConstruct public void init() { tcpClient.setDeviceMessageHandler(this); } @Override - public void channelActive(ChannelHandlerContext ctx) { - System.out.println("client ctx =" + ctx); - ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Server!", CharsetUtil.UTF_8)); - } - @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; String serverMsg = buf.toString(CharsetUtil.UTF_8); diff --git a/src/main/java/com/qyft/gd/device/service/DeviceService.java b/src/main/java/com/qyft/gd/device/service/DeviceService.java index 1e2f013..606d163 100644 --- a/src/main/java/com/qyft/gd/device/service/DeviceService.java +++ b/src/main/java/com/qyft/gd/device/service/DeviceService.java @@ -27,8 +27,6 @@ public class DeviceService { return true; } - // 移动导轨机械臂的关节 - /** * 移动导轨机械臂的关节 * @@ -72,7 +70,7 @@ public class DeviceService { list.add("3"); request.setParams(list); DeviceFeedback deviceFeedback = tcpClient.sendCommand(request); - if (deviceFeedback.getError() != null) { + if (deviceFeedback == null || deviceFeedback.getError() != null) { log.info("TCP openDoor指令执行错误 {}", JSONUtil.toJsonStr(deviceFeedback)); return false; }