Browse Source

增加了过来重复指令,小何那里加了重发

master
仁冬 5 months ago
parent
commit
bb721d4316
  1. 49
      src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java

49
src/main/java/com/qyft/ms/device/handler/DeviceMessageHandler.java

@ -12,11 +12,13 @@ import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@ -33,7 +35,9 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
@Setter
private volatile CountDownLatch latch;
private static final long DUPLICATE_CHECK_INTERVAL = 300;//ms
private static final long CLEANUP_INTERVAL = 2000; // 5 秒清理一次
private static final ConcurrentHashMap<Integer, Long> receivedCommands = new ConcurrentHashMap<>();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
@ -44,6 +48,20 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
JSONObject deviceResult = JSONUtil.parseObj(serverMsg);
JSONObject payload = deviceResult.getJSONObject("payload");
String tag = deviceResult.get("tag").toString();
Integer cmdId = payload.getInt("cmdId");
long currentTime = System.currentTimeMillis();
if (receivedCommands.containsKey(payload.getInt("cmdId"))) {
long lastReceivedTime = receivedCommands.get(payload.getInt("cmdId"));
if (currentTime - lastReceivedTime < DUPLICATE_CHECK_INTERVAL) {
return;
}
}
receivedCommands.put(cmdId, currentTime);
if ("event".equals(tag)) { //设备上报事件
String eventType = payload.getJSONObject("result").getStr("event_type");
if (latch != null) {
@ -62,7 +80,7 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
}
}
} else if ("ack".equals(tag)) {//设备指令反馈
Integer cmdId = payload.getInt("cmdId");
// Integer cmdId = payload.getInt("cmdId");
CommandFuture commandFuture = CurrentSendCmdMapInstance.getInstance().getCommand(cmdId);
if (commandFuture != null) {
commandFuture.setReceived(true);
@ -77,4 +95,31 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
buf.release();
}
}
@PostConstruct
public void cleanupOldCommandsPeriodically() {
while (true) {
try {
Thread.sleep(CLEANUP_INTERVAL);
cleanupOldCommands();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
private static void cleanupOldCommands() {
long currentTime = System.currentTimeMillis();
Iterator<Map.Entry<Integer, Long>> iterator = receivedCommands.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, Long> entry = iterator.next();
if (currentTime - entry.getValue() > DUPLICATE_CHECK_INTERVAL) {
iterator.remove();
}
}
System.out.println("清理完成,当前记录数量: " + receivedCommands.size());
}
}
Loading…
Cancel
Save