package com.qyft.ms.device.client; import cn.hutool.json.JSONUtil; import com.qyft.ms.device.common.jsonrpc.JsonRpcRequest; import com.qyft.ms.device.config.TcpConfig; import com.qyft.ms.device.handler.DeviceMessageHandler; import com.qyft.ms.device.model.bo.DeviceFeedback; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.json.JsonObjectDecoder; import io.netty.util.CharsetUtil; import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @Slf4j @Component @RequiredArgsConstructor public class TcpClient { private final TcpConfig tcpConfig; private final DeviceMessageHandler deviceMessageHandler; private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; // 初始化方法,在Spring容器启动后调用 @PostConstruct public void init() { if (tcpConfig.isEnable()) { connect(); } } // 连接到TCP服务器的方法 public void connect() { try { // 如果Bootstrap对象为空,则初始化它 if (bootstrap == null) { bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, tcpConfig.getTimeout()) .handler(new ChannelInitializer<>() { @Override protected void initChannel(Channel ch) { // 添加JSON对象解码器到ChannelPipeline ch.pipeline().addLast(new JsonObjectDecoder()); // 添加设备消息处理器到ChannelPipeline ch.pipeline().addLast(deviceMessageHandler); // 添加TCP连接处理器到ChannelPipeline ch.pipeline().addLast(new TcpConnectionHandler()); } }); } // 记录尝试连接到TCP服务器的日志 log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort()); ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort())); // 添加连接监听器 future.addListener((ChannelFutureListener) f -> { if (f.isSuccess()) { // 连接成功,记录日志并设置channel channel = f.channel(); log.info("已链接到TCP服务"); } else { // 连接失败,记录日志并安排重试 log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect()); f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); } }); } catch (Exception e) { // 捕获并记录连接过程中发生的异常 log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e); } } // 定时检查TCP连接的方法 @Scheduled(fixedRateString = "${tcp.reconnect}") public void checkConnection() { if (channel == null || !channel.isActive()) { // 如果连接丢失,记录日志并尝试重新连接 log.error("TCP服务链接丢失"); connect(); } } /** * 将obj转换成json后发送 */ public boolean sendToJSON(Object request) { String msg = JSONUtil.toJsonStr(request); return this.send(msg); } // 发送字符串请求到TCP服务器的方法 public boolean send(String request) { if (channel != null && channel.isActive()) { // 如果连接有效,发送请求并返回true channel.writeAndFlush(Unpooled.copiedBuffer(request, CharsetUtil.UTF_8)); return true; } else { // 如果连接无效,记录日志并返回false log.error("TCP服务未连接,无法发送请求: {}", request); return false; } } // 发送JSON-RPC命令到TCP服务器的方法,仅包含方法名 public DeviceFeedback sendCommand(String method) { JsonRpcRequest request = new JsonRpcRequest(); request.setMethod(method); return this.sendCommand(request); } // 发送JSON-RPC命令到TCP服务器的方法,包含方法名和参数 public DeviceFeedback sendCommand(String method, Map params) { JsonRpcRequest request = new JsonRpcRequest(); request.setMethod(method); request.setParams(params); return this.sendCommand(request); } // 发送JSON-RPC命令到TCP服务器的方法,包含完整的JsonRpcRequest对象 public DeviceFeedback sendCommand(JsonRpcRequest request) { if (request.getId() == null) { // 如果请求ID为空,则生成一个新的UUID作为ID request.setId(UUID.randomUUID().toString()); } CompletableFuture future = new CompletableFuture<>(); deviceMessageHandler.responseMap.put(request.getId(), future); try { if (request.getParams() == null) { // 如果请求参数为空,则初始化一个空的HashMap request.setParams(new HashMap<>()); } // 将请求对象转换为JSON字符串 String requestJsonStr = JSONUtil.toJsonStr(request); log.info("发送TCP指令(同步) {}", requestJsonStr); // 发送请求到服务器,并等待响应 if (this.send(requestJsonStr)) { return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应 } else { return null; } } catch (Exception e) { // 捕获并记录发送过程中发生的异常 log.error("发送TCP指令错误(同步) {}", JSONUtil.toJsonStr(request), e); } finally { // 确保请求完成后从responseMap中移除对应的Future deviceMessageHandler.responseMap.remove(request.getId()); } return null; } // 内部类,处理TCP连接事件 private class TcpConnectionHandler extends ChannelInboundHandlerAdapter { // 处理连接断开事件 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // 记录连接断开的日志 log.error("TCP连接丢失,准备重新连接..."); if (channel != null) { // 关闭当前channel channel.close(); } // 尝试重新连接 connect(); super.channelInactive(ctx); } // 处理异常事件 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // 记录异常日志 log.error("TCP连接发生异常: {}", cause.getMessage(), cause); // 关闭当前channel ctx.close(); } } }