diff --git a/src/main/java/com/qyft/gd/device/client/TcpClient.java b/src/main/java/com/qyft/gd/device/client/TcpClient.java index 2a48a01..8043318 100644 --- a/src/main/java/com/qyft/gd/device/client/TcpClient.java +++ b/src/main/java/com/qyft/gd/device/client/TcpClient.java @@ -12,16 +12,15 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.util.CharsetUtil; +import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @Slf4j @@ -30,16 +29,16 @@ import java.util.concurrent.TimeUnit; public class TcpClient { private final TcpConfig tcpConfig; - private DeviceMessageHandler deviceMessageHandler; + + private final DeviceMessageHandler deviceMessageHandler; private final EventLoopGroup group = new NioEventLoopGroup(); private Channel channel; private Bootstrap bootstrap; - private final Map> responseMap = new ConcurrentHashMap<>(); - public void setDeviceMessageHandler(DeviceMessageHandler handler) { - this.deviceMessageHandler = handler; + @PostConstruct + public void init() { if (tcpConfig.isEnable()) { connect(); } @@ -104,30 +103,22 @@ public class TcpClient { request.setId(UUID.randomUUID().toString()); } CompletableFuture future = new CompletableFuture<>(); - responseMap.put(request.getId(), future); + deviceMessageHandler.responseMap.put(request.getId(), future); try { - if(this.send(JSONUtil.toJsonStr(request))){ + log.info("发送TCP指令 {}", JSONUtil.toJsonStr(request)); + if (this.send(JSONUtil.toJsonStr(request))) { return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应 - }else{ + } else { return null; } } catch (Exception e) { log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e); } finally { - responseMap.remove(request.getId()); //确保完成后移除 + deviceMessageHandler.responseMap.remove(request.getId()); //确保完成后移除 } return null; } - public void handleTcpResponse(DeviceFeedback deviceFeedback) { - String requestId = deviceFeedback.getId(); - CompletableFuture future = responseMap.remove(requestId); - if (future != null) { - future.complete(deviceFeedback); - } else { - log.error("未找到 requestId: {} 对应的等待请求", requestId); - } - } private class TcpConnectionHandler extends ChannelInboundHandlerAdapter { diff --git a/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java b/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java index 3944dca..a5b82b1 100644 --- a/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java +++ b/src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java @@ -1,7 +1,6 @@ package com.qyft.gd.device.handler; import cn.hutool.json.JSONUtil; -import com.qyft.gd.device.client.TcpClient; import com.qyft.gd.device.common.constant.TcpMessageType; import com.qyft.gd.device.common.jsonrpc.JsonRpcResponse; import com.qyft.gd.device.model.bo.DeviceFeedback; @@ -12,11 +11,14 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; -import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + @Slf4j @Component @ChannelHandler.Sharable @@ -24,14 +26,10 @@ import org.springframework.stereotype.Component; public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { private final DeviceStateService deviceStateService; - private final TcpClient tcpClient; - @PostConstruct - public void init() { - tcpClient.setDeviceMessageHandler(this); - } - @Override + public final Map> responseMap = new ConcurrentHashMap<>(); + @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf buf = (ByteBuf) msg; String serverMsg = buf.toString(CharsetUtil.UTF_8); @@ -45,10 +43,20 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { } else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈 DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class); - tcpClient.handleTcpResponse(deviceFeedback); + this.handleTcpResponse(deviceFeedback); } } catch (Exception e) { - log.error("TCP数据处理错误: {}, error: {}", serverMsg, e.getMessage(), e); + log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e); + } + } + + private void handleTcpResponse(DeviceFeedback deviceFeedback) { + String requestId = deviceFeedback.getId(); + CompletableFuture future = responseMap.remove(requestId); + if (future != null) { + future.complete(deviceFeedback); + } else { + log.error("未找到 requestId: {} 对应的等待请求", requestId); } } } \ No newline at end of file diff --git a/src/main/java/com/qyft/gd/device/service/DeviceFeedbackService.java b/src/main/java/com/qyft/gd/device/service/DeviceFeedbackService.java deleted file mode 100644 index 4668bd0..0000000 --- a/src/main/java/com/qyft/gd/device/service/DeviceFeedbackService.java +++ /dev/null @@ -1,9 +0,0 @@ -package com.qyft.gd.device.service; - -import lombok.RequiredArgsConstructor; -import org.springframework.stereotype.Service; - -@Service -@RequiredArgsConstructor -public class DeviceFeedbackService { -}