From bb721d431639ba551e2730ccc5b271b67c4bc97b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=BB=81=E5=86=AC?= <274403850@qq.com> Date: Fri, 14 Mar 2025 08:56:36 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E4=BA=86=E8=BF=87=E6=9D=A5?= =?UTF-8?q?=E9=87=8D=E5=A4=8D=E6=8C=87=E4=BB=A4=EF=BC=8C=E5=B0=8F=E4=BD=95?= =?UTF-8?q?=E9=82=A3=E9=87=8C=E5=8A=A0=E4=BA=86=E9=87=8D=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ms/device/handler/DeviceMessageHandler.java | 49 +++++++++++++++++++++- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java b/src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java index 7075006..a1c5239 100644 --- a/src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java +++ b/src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java @@ -12,11 +12,13 @@ import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; +import jakarta.annotation.PostConstruct; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.Iterator; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -33,7 +35,9 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { @Setter private volatile CountDownLatch latch; - + private static final long DUPLICATE_CHECK_INTERVAL = 300;//ms + private static final long CLEANUP_INTERVAL = 2000; // 5 秒清理一次 + private static final ConcurrentHashMap receivedCommands = new ConcurrentHashMap<>(); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { @@ -44,6 +48,20 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { JSONObject deviceResult = JSONUtil.parseObj(serverMsg); JSONObject payload = deviceResult.getJSONObject("payload"); String tag = deviceResult.get("tag").toString(); + Integer cmdId = payload.getInt("cmdId"); + + + long currentTime = System.currentTimeMillis(); + if (receivedCommands.containsKey(payload.getInt("cmdId"))) { + long lastReceivedTime = receivedCommands.get(payload.getInt("cmdId")); + if (currentTime - lastReceivedTime < DUPLICATE_CHECK_INTERVAL) { + return; + } + } + + receivedCommands.put(cmdId, currentTime); + + if ("event".equals(tag)) { //设备上报事件 String eventType = payload.getJSONObject("result").getStr("event_type"); if (latch != null) { @@ -62,7 +80,7 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { } } } else if ("ack".equals(tag)) {//设备指令反馈 - Integer cmdId = payload.getInt("cmdId"); +// Integer cmdId = payload.getInt("cmdId"); CommandFuture commandFuture = CurrentSendCmdMapInstance.getInstance().getCommand(cmdId); if (commandFuture != null) { commandFuture.setReceived(true); @@ -77,4 +95,31 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { buf.release(); } } + + @PostConstruct + public void cleanupOldCommandsPeriodically() { + while (true) { + try { + Thread.sleep(CLEANUP_INTERVAL); + cleanupOldCommands(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + } + + + private static void cleanupOldCommands() { + long currentTime = System.currentTimeMillis(); + Iterator> iterator = receivedCommands.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (currentTime - entry.getValue() > DUPLICATE_CHECK_INTERVAL) { + iterator.remove(); + } + } + System.out.println("清理完成,当前记录数量: " + receivedCommands.size()); + } + } \ No newline at end of file