From 0a19e222e6bc0ad0263721268451d421e181ea69 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=99=BD=E5=87=A4=E5=90=89?= Date: Sun, 16 Mar 2025 12:14:16 +0800 Subject: [PATCH] =?UTF-8?q?=E9=A1=B9=E7=9B=AE=E5=88=9D=E5=A7=8B=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../qyft/ms/system/config/MybatisPlusConfig.java | 2 +- .../qyft/ms/system/core/MyMetaObjectHandler.java | 36 ------------ .../ms/system/core/client/DeviceTcpClient.java | 65 ++++++++++++++-------- .../system/core/handler/DeviceMessageHandler.java | 15 +++-- .../system/core/handler/MyMetaObjectHandler.java | 36 ++++++++++++ .../system/core/handler/TcpConnectionHandler.java | 4 +- 6 files changed, 88 insertions(+), 70 deletions(-) delete mode 100644 src/main/java/com/qyft/ms/system/core/MyMetaObjectHandler.java create mode 100644 src/main/java/com/qyft/ms/system/core/handler/MyMetaObjectHandler.java diff --git a/src/main/java/com/qyft/ms/system/config/MybatisPlusConfig.java b/src/main/java/com/qyft/ms/system/config/MybatisPlusConfig.java index 3db265f..f2af2eb 100644 --- a/src/main/java/com/qyft/ms/system/config/MybatisPlusConfig.java +++ b/src/main/java/com/qyft/ms/system/config/MybatisPlusConfig.java @@ -5,7 +5,7 @@ import com.baomidou.mybatisplus.core.config.GlobalConfig; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor; -import com.qyft.ms.system.core.MyMetaObjectHandler; +import com.qyft.ms.system.core.handler.MyMetaObjectHandler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.annotation.EnableTransactionManagement; diff --git a/src/main/java/com/qyft/ms/system/core/MyMetaObjectHandler.java b/src/main/java/com/qyft/ms/system/core/MyMetaObjectHandler.java deleted file mode 100644 index f5523ac..0000000 --- a/src/main/java/com/qyft/ms/system/core/MyMetaObjectHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -package com.qyft.ms.system.core; - -import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; -import org.apache.ibatis.reflection.MetaObject; -import org.springframework.stereotype.Component; - -import java.time.LocalDateTime; - -/** - * mybatis-plus 字段自动填充 - */ -@Component -public class MyMetaObjectHandler implements MetaObjectHandler { - - /** - * 新增填充创建时间 - * - * @param metaObject 元数据 - */ - @Override - public void insertFill(MetaObject metaObject) { - this.strictInsertFill(metaObject, "createTime", LocalDateTime::now, LocalDateTime.class); - this.strictUpdateFill(metaObject, "updateTime", LocalDateTime::now, LocalDateTime.class); - } - - /** - * 更新填充更新时间 - * - * @param metaObject 元数据 - */ - @Override - public void updateFill(MetaObject metaObject) { - this.strictUpdateFill(metaObject, "updateTime", LocalDateTime::now, LocalDateTime.class); - } - -} diff --git a/src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java b/src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java index 82b9793..d84990e 100644 --- a/src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java +++ b/src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java @@ -12,6 +12,7 @@ import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.json.JsonObjectDecoder; import io.netty.util.CharsetUtil; import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -19,6 +20,7 @@ import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Component @@ -26,10 +28,11 @@ import java.util.concurrent.TimeUnit; public class DeviceTcpClient { private final DeviceTcpConfig tcpConfig; - private final EventLoopGroup group = new NioEventLoopGroup(); - private Channel channel; + private volatile Channel channel; private Bootstrap bootstrap; + // 使用原子变量确保同时只有一个连接尝试在进行 + private final AtomicBoolean connecting = new AtomicBoolean(false); @PostConstruct private void init() { @@ -38,8 +41,16 @@ public class DeviceTcpClient { } } - // 连接到TCP服务器 + /** + * 建立 TCP 连接,确保同时只有一个连接尝试 + */ public void connect() { + if (channel != null && channel.isActive()) { + return; + } + if (!connecting.compareAndSet(false, true)) { + return; + } try { if (bootstrap == null) { bootstrap = new Bootstrap(); @@ -49,64 +60,70 @@ public class DeviceTcpClient { .handler(new ChannelInitializer<>() { @Override protected void initChannel(Channel ch) { - // 添加JSON解码器 - ch.pipeline().addLast(new JsonObjectDecoder()); - // 添加设备消息处理器 - ch.pipeline().addLast(new DeviceMessageHandler()); - // 添加外部定义的TCP连接处理器,并传入TcpClient实例 - ch.pipeline().addLast(new TcpConnectionHandler(DeviceTcpClient.this)); + ch.pipeline().addLast(new JsonObjectDecoder()); // JSON 解码器 + ch.pipeline().addLast(new DeviceMessageHandler()); // 消息处理器 + ch.pipeline().addLast(new TcpConnectionHandler(DeviceTcpClient.this)); // 连接事件处理器 } }); } - - log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort()); + log.info("尝试连接到设备TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort()); ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort())); - future.addListener((ChannelFutureListener) f -> { if (f.isSuccess()) { channel = f.channel(); - log.info("已链接到TCP服务"); + log.info("已链接到设备TCP服务"); } else { - log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect()); - f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); + log.error("无法连接到设备TCP服务. {}ms后重试...", tcpConfig.getReconnect()); + group.schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); // 使用 group 调度重连,避免 f.channel() 为空的问题 } + connecting.set(false);// 在连接尝试完成后重置 connecting 标志 }); - } catch (Exception e) { - log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e); + log.error("尝试连接到设备TCP服务发生意外错误: {}", e.getMessage(), e); + connecting.set(false); + group.schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); } } /** - * 定时检查TCP连接 + * 定时检测连接状态,若连接失效则尝试重新连接 */ @Scheduled(fixedRateString = "${tcp.reconnect}") private void checkConnection() { if (channel == null || !channel.isActive()) { - log.error("TCP服务链接丢失"); + log.error("设备TCP服务链接丢失,重新连接中..."); connect(); } } /** - * 将obj转换成json后发送 + * 将对象转换成 JSON 后发送 */ public boolean sendToJSON(Object request) { String msg = JSONUtil.toJsonStr(request); - return this.send(msg); + return send(msg); } /** - * 发送字符串请求到TCP服务器 + * 发送字符串到 TCP 服务器 */ public boolean send(String request) { if (channel != null && channel.isActive()) { - log.info("发送TCP指令:{}", request); + log.info("发送设备TCP指令:{}", request); channel.writeAndFlush(Unpooled.copiedBuffer(request, CharsetUtil.UTF_8)); return true; } else { - log.error("TCP服务未连接,无法发送请求: {}", request); + log.error("设备TCP服务未连接,无法发送请求: {}", request); return false; } } + + /** + * 应用退出时优雅关闭 EventLoopGroup + */ + @PreDestroy + private void shutdown() { + log.info("正在关闭 TCP 客户端..."); + group.shutdownGracefully(); + } } diff --git a/src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java b/src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java index 62a3de4..32e5d1f 100644 --- a/src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java +++ b/src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java @@ -13,17 +13,20 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { - ByteBuf buf = (ByteBuf) msg; - String serverMsg = buf.toString(CharsetUtil.UTF_8); - log.info("serverMsg:{}", serverMsg); + if (!(msg instanceof ByteBuf buf)) { + log.warn("收到未知类型的消息:{}", msg.getClass()); + return; + } try { + String serverMsg = buf.toString(CharsetUtil.UTF_8); + log.info("serverMsg: {}", serverMsg); JSONObject deviceResult = JSONUtil.parseObj(serverMsg); JSONObject payload = deviceResult.getJSONObject("payload"); - String tag = deviceResult.get("tag").toString(); + String tag = deviceResult.getStr("tag"); Integer cmdId = payload.getInt("cmdId"); - // 根据 tag 和 cmdId 进行处理 + } catch (Exception e) { - log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e); + log.error("TCP服务消息处理错误: {}, error: {}", buf.toString(CharsetUtil.UTF_8), e.getMessage(), e); } finally { buf.release(); } diff --git a/src/main/java/com/qyft/ms/system/core/handler/MyMetaObjectHandler.java b/src/main/java/com/qyft/ms/system/core/handler/MyMetaObjectHandler.java new file mode 100644 index 0000000..f5c204a --- /dev/null +++ b/src/main/java/com/qyft/ms/system/core/handler/MyMetaObjectHandler.java @@ -0,0 +1,36 @@ +package com.qyft.ms.system.core.handler; + +import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler; +import org.apache.ibatis.reflection.MetaObject; +import org.springframework.stereotype.Component; + +import java.time.LocalDateTime; + +/** + * mybatis-plus 字段自动填充 + */ +@Component +public class MyMetaObjectHandler implements MetaObjectHandler { + + /** + * 新增填充创建时间 + * + * @param metaObject 元数据 + */ + @Override + public void insertFill(MetaObject metaObject) { + this.strictInsertFill(metaObject, "createTime", LocalDateTime::now, LocalDateTime.class); + this.strictUpdateFill(metaObject, "updateTime", LocalDateTime::now, LocalDateTime.class); + } + + /** + * 更新填充更新时间 + * + * @param metaObject 元数据 + */ + @Override + public void updateFill(MetaObject metaObject) { + this.strictUpdateFill(metaObject, "updateTime", LocalDateTime::now, LocalDateTime.class); + } + +} diff --git a/src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java b/src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java index 4145930..74d4932 100644 --- a/src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java +++ b/src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java @@ -10,7 +10,6 @@ public class TcpConnectionHandler extends ChannelInboundHandlerAdapter { private final DeviceTcpClient tcpClient; - // 通过构造方法注入 TcpClient 引用 public TcpConnectionHandler(DeviceTcpClient tcpClient) { this.tcpClient = tcpClient; } @@ -21,8 +20,7 @@ public class TcpConnectionHandler extends ChannelInboundHandlerAdapter { if (ctx.channel() != null && ctx.channel().isOpen()) { ctx.channel().close(); } - // 调用 TcpClient 的连接方法 - tcpClient.connect(); + tcpClient.connect();// 触发重连 super.channelInactive(ctx); }