Browse Source

优化tcp代码,解决依赖问题

master
白凤吉 6 months ago
parent
commit
006423d3ba
  1. 29
      src/main/java/com/qyft/gd/device/client/TcpClient.java
  2. 28
      src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java
  3. 9
      src/main/java/com/qyft/gd/device/service/DeviceFeedbackService.java

29
src/main/java/com/qyft/gd/device/client/TcpClient.java

@ -12,16 +12,15 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Slf4j @Slf4j
@ -30,16 +29,16 @@ import java.util.concurrent.TimeUnit;
public class TcpClient { public class TcpClient {
private final TcpConfig tcpConfig; private final TcpConfig tcpConfig;
private DeviceMessageHandler deviceMessageHandler;
private final DeviceMessageHandler deviceMessageHandler;
private final EventLoopGroup group = new NioEventLoopGroup(); private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel; private Channel channel;
private Bootstrap bootstrap; private Bootstrap bootstrap;
private final Map<String, CompletableFuture<DeviceFeedback>> responseMap = new ConcurrentHashMap<>();
public void setDeviceMessageHandler(DeviceMessageHandler handler) {
this.deviceMessageHandler = handler;
@PostConstruct
public void init() {
if (tcpConfig.isEnable()) { if (tcpConfig.isEnable()) {
connect(); connect();
} }
@ -104,30 +103,22 @@ public class TcpClient {
request.setId(UUID.randomUUID().toString()); request.setId(UUID.randomUUID().toString());
} }
CompletableFuture<DeviceFeedback> future = new CompletableFuture<>(); CompletableFuture<DeviceFeedback> future = new CompletableFuture<>();
responseMap.put(request.getId(), future);
deviceMessageHandler.responseMap.put(request.getId(), future);
try { try {
if(this.send(JSONUtil.toJsonStr(request))){
log.info("发送TCP指令 {}", JSONUtil.toJsonStr(request));
if (this.send(JSONUtil.toJsonStr(request))) {
return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应 return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应
}else{
} else {
return null; return null;
} }
} catch (Exception e) { } catch (Exception e) {
log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e); log.error("发送TCP指令错误 {}", JSONUtil.toJsonStr(request), e);
} finally { } finally {
responseMap.remove(request.getId()); //确保完成后移除
deviceMessageHandler.responseMap.remove(request.getId()); //确保完成后移除
} }
return null; return null;
} }
public void handleTcpResponse(DeviceFeedback deviceFeedback) {
String requestId = deviceFeedback.getId();
CompletableFuture<DeviceFeedback> future = responseMap.remove(requestId);
if (future != null) {
future.complete(deviceFeedback);
} else {
log.error("未找到 requestId: {} 对应的等待请求", requestId);
}
}
private class TcpConnectionHandler extends ChannelInboundHandlerAdapter { private class TcpConnectionHandler extends ChannelInboundHandlerAdapter {

28
src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java

@ -1,7 +1,6 @@
package com.qyft.gd.device.handler; package com.qyft.gd.device.handler;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import com.qyft.gd.device.client.TcpClient;
import com.qyft.gd.device.common.constant.TcpMessageType; import com.qyft.gd.device.common.constant.TcpMessageType;
import com.qyft.gd.device.common.jsonrpc.JsonRpcResponse; import com.qyft.gd.device.common.jsonrpc.JsonRpcResponse;
import com.qyft.gd.device.model.bo.DeviceFeedback; import com.qyft.gd.device.model.bo.DeviceFeedback;
@ -12,11 +11,14 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil; import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j @Slf4j
@Component @Component
@ChannelHandler.Sharable @ChannelHandler.Sharable
@ -24,14 +26,10 @@ import org.springframework.stereotype.Component;
public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
private final DeviceStateService deviceStateService; private final DeviceStateService deviceStateService;
private final TcpClient tcpClient;
@PostConstruct
public void init() {
tcpClient.setDeviceMessageHandler(this);
}
@Override
public final Map<String, CompletableFuture<DeviceFeedback>> responseMap = new ConcurrentHashMap<>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg; ByteBuf buf = (ByteBuf) msg;
String serverMsg = buf.toString(CharsetUtil.UTF_8); String serverMsg = buf.toString(CharsetUtil.UTF_8);
@ -45,10 +43,20 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
} else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈 } else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈
DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class); DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class);
tcpClient.handleTcpResponse(deviceFeedback);
this.handleTcpResponse(deviceFeedback);
} }
} catch (Exception e) { } catch (Exception e) {
log.error("TCP数据处理错误: {}, error: {}", serverMsg, e.getMessage(), e);
log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e);
}
}
private void handleTcpResponse(DeviceFeedback deviceFeedback) {
String requestId = deviceFeedback.getId();
CompletableFuture<DeviceFeedback> future = responseMap.remove(requestId);
if (future != null) {
future.complete(deviceFeedback);
} else {
log.error("未找到 requestId: {} 对应的等待请求", requestId);
} }
} }
} }

9
src/main/java/com/qyft/gd/device/service/DeviceFeedbackService.java

@ -1,9 +0,0 @@
package com.qyft.gd.device.service;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
@Service
@RequiredArgsConstructor
public class DeviceFeedbackService {
}
Loading…
Cancel
Save