You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

112 lines
4.5 KiB

package com.qyft.ms.device.handler;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.qyft.ms.app.common.command.CommandFuture;
import com.qyft.ms.app.common.command.CurrentSendCmdMapInstance;
import com.qyft.ms.app.service.WebSocketService;
import com.qyft.ms.device.model.bo.DeviceFeedback;
import com.qyft.ms.device.service.DeviceStatusService;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
public final Map<String, CompletableFuture<DeviceFeedback>> responseMap = new ConcurrentHashMap<>();
private final DeviceStatusService deviceStatusService;
private final WebSocketService webSocketService;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String serverMsg = buf.toString(CharsetUtil.UTF_8);
try {
// JsonRpcResponse jsonRpcResponse = JSONUtil.toBean(serverMsg, JsonRpcResponse.class);
// if (TcpMessageType.STATUS.equals(jsonRpcResponse.getType())) {//设备状态
// DeviceStatus deviceStatus = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceStatus.class);
// deviceStatusService.updateDeviceStatus(deviceStatus); // 更新设备状态
// } else if (TcpMessageType.ALARM.equals(jsonRpcResponse.getType())) {//设备报警
// log.error("设备报警: {}", serverMsg);
// DeviceAlarm deviceAlarm = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceAlarm.class);
// webSocketService.pushMsg(WebSocketMessageType.WARN, deviceAlarm);
// } else if (TcpMessageType.FEEDBACK.equals(jsonRpcResponse.getType())) {//设备指令反馈
// DeviceFeedback deviceFeedback = JSONUtil.toBean(jsonRpcResponse.getData(), DeviceFeedback.class);
// this.handleTcpResponse(deviceFeedback);
// }
JSONObject deviceResult = JSONUtil.parseObj(serverMsg);
JSONObject payload = deviceResult.getJSONObject("payload");
String tag = deviceResult.get("tag").toString();
if ("event".equals(tag)) {
//设备上报事件
} else if ("ack".equals(tag)) {
//设备指令反馈
Integer cmdId = payload.getInt("cmdId");
CommandFuture commandFuture = CurrentSendCmdMapInstance.getInstance().getCommand(cmdId);
if(commandFuture!=null){
commandFuture.setReceived(true);
commandFuture.setCallbackResult(payload);
commandFuture.commandContinue();
}
//
// if (error != null) {
// //指令执行错误,将错误放到CallbackResult中
// commandFuture.setCallbackResult(error);
// }
// Object result = payload.get("result");
// if (result instanceof Boolean) {
// //ack 没携带数据
// } else if (result instanceof JSONObject) {
// //ack 携带了数据,将数据放到了CallbackResult中
// commandFuture.setCallbackResult((JSONObject) result);
// } else {
// //TODO 没有定义的类型
// }
// commandFuture.setReceived(true);//已收到设备反馈
// commandFuture.commandContinue();
} else if ("status".equals(tag)) {
//设备上报状态
}
} catch (Exception e) {
log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e);
} finally {
buf.release();
}
}
private void handleTcpResponse(DeviceFeedback deviceFeedback) {
String requestId = deviceFeedback.getId();
CompletableFuture<DeviceFeedback> future = responseMap.remove(requestId);
if (future != null) {
future.complete(deviceFeedback);
} else {
log.error("未找到 requestId: {} 对应的等待请求", requestId);
}
}
}