You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

160 lines
5.7 KiB

package com.qyft.ms.device.client;
import cn.hutool.json.JSONUtil;
import com.qyft.ms.device.common.jsonrpc.JsonRpcRequest;
import com.qyft.ms.device.config.TcpConfig;
import com.qyft.ms.device.handler.DeviceMessageHandler;
import com.qyft.ms.device.model.bo.DeviceFeedback;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@Slf4j
@Component
@RequiredArgsConstructor
public class TcpClient {
private final TcpConfig tcpConfig;
private final DeviceMessageHandler deviceMessageHandler;
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
private Bootstrap bootstrap;
@PostConstruct
public void init() {
if (tcpConfig.isEnable()) {
connect();
}
}
public void connect() {
try {
if (bootstrap == null) {
bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, tcpConfig.getTimeout())
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ch.pipeline().addLast(new JsonObjectDecoder());
ch.pipeline().addLast(deviceMessageHandler);
ch.pipeline().addLast(new TcpConnectionHandler());
}
});
}
log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort());
ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort()));
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
channel = f.channel();
log.info("已链接到TCP服务");
} else {
log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect());
f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS);
}
});
} catch (Exception e) {
log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e);
}
}
@Scheduled(fixedRateString = "${tcp.reconnect}")
public void checkConnection() {
if (channel == null || !channel.isActive()) {
log.error("TCP服务链接丢失");
connect();
}
}
public boolean send(String request) {
if (channel != null && channel.isActive()) {
channel.writeAndFlush(Unpooled.copiedBuffer(request, CharsetUtil.UTF_8));
return true;
} else {
log.error("TCP服务未连接,无法发送请求: {}", request);
return false;
}
}
public DeviceFeedback sendCommand(String method) {
JsonRpcRequest request = new JsonRpcRequest();
request.setMethod(method);
return this.sendCommand(request);
}
public DeviceFeedback sendCommand(String method, Map<String, Object> params) {
JsonRpcRequest request = new JsonRpcRequest();
request.setMethod(method);
request.setParams(params);
return this.sendCommand(request);
}
public DeviceFeedback sendCommand(JsonRpcRequest request) {
if (request.getId() == null) {
request.setId(UUID.randomUUID().toString());
}
CompletableFuture<DeviceFeedback> future = new CompletableFuture<>();
deviceMessageHandler.responseMap.put(request.getId(), future);
try {
if (request.getParams() == null) {
request.setParams(new HashMap<>());
}
request.getParams().put("class", "test");
String requestJsonStr = JSONUtil.toJsonStr(request);
log.info("发送TCP指令(同步) {}", requestJsonStr);
if (this.send(requestJsonStr)) {
return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应
} else {
return null;
}
} catch (Exception e) {
log.error("发送TCP指令错误(同步) {}", JSONUtil.toJsonStr(request), e);
} finally {
deviceMessageHandler.responseMap.remove(request.getId()); //确保完成后移除
}
return null;
}
private class TcpConnectionHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时的处理逻辑
log.error("TCP连接丢失,准备重新连接...");
if (channel != null) {
channel.close();
}
connect();
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("TCP连接发生异常: {}", cause.getMessage(), cause);
ctx.close();
}
}
}