Browse Source

项目初始化

tags/1.0
白凤吉 5 months ago
parent
commit
0a19e222e6
  1. 2
      src/main/java/com/qyft/ms/system/config/MybatisPlusConfig.java
  2. 65
      src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java
  3. 15
      src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java
  4. 2
      src/main/java/com/qyft/ms/system/core/handler/MyMetaObjectHandler.java
  5. 4
      src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java

2
src/main/java/com/qyft/ms/system/config/MybatisPlusConfig.java

@ -5,7 +5,7 @@ import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.OptimisticLockerInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.qyft.ms.system.core.MyMetaObjectHandler;
import com.qyft.ms.system.core.handler.MyMetaObjectHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.annotation.EnableTransactionManagement;

65
src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java

@ -12,6 +12,7 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.util.CharsetUtil;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
@ -19,6 +20,7 @@ import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
@ -26,10 +28,11 @@ import java.util.concurrent.TimeUnit;
public class DeviceTcpClient {
private final DeviceTcpConfig tcpConfig;
private final EventLoopGroup group = new NioEventLoopGroup();
private Channel channel;
private volatile Channel channel;
private Bootstrap bootstrap;
// 使用原子变量确保同时只有一个连接尝试在进行
private final AtomicBoolean connecting = new AtomicBoolean(false);
@PostConstruct
private void init() {
@ -38,8 +41,16 @@ public class DeviceTcpClient {
}
}
// 连接到TCP服务器
/**
* 建立 TCP 连接确保同时只有一个连接尝试
*/
public void connect() {
if (channel != null && channel.isActive()) {
return;
}
if (!connecting.compareAndSet(false, true)) {
return;
}
try {
if (bootstrap == null) {
bootstrap = new Bootstrap();
@ -49,64 +60,70 @@ public class DeviceTcpClient {
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
// 添加JSON解码器
ch.pipeline().addLast(new JsonObjectDecoder());
// 添加设备消息处理器
ch.pipeline().addLast(new DeviceMessageHandler());
// 添加外部定义的TCP连接处理器并传入TcpClient实例
ch.pipeline().addLast(new TcpConnectionHandler(DeviceTcpClient.this));
ch.pipeline().addLast(new JsonObjectDecoder()); // JSON 解码器
ch.pipeline().addLast(new DeviceMessageHandler()); // 消息处理器
ch.pipeline().addLast(new TcpConnectionHandler(DeviceTcpClient.this)); // 连接事件处理器
}
});
}
log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort());
log.info("尝试连接到设备TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort());
ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort()));
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
channel = f.channel();
log.info("已链接到TCP服务");
log.info("已链接到设备TCP服务");
} else {
log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect());
f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS);
log.error("无法连接到设备TCP服务. {}ms后重试...", tcpConfig.getReconnect());
group.schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); // 使用 group 调度重连避免 f.channel() 为空的问题
}
connecting.set(false);// 在连接尝试完成后重置 connecting 标志
});
} catch (Exception e) {
log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e);
log.error("尝试连接到设备TCP服务发生意外错误: {}", e.getMessage(), e);
connecting.set(false);
group.schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS);
}
}
/**
* 定时检查TCP连接
* 定时检测连接状态若连接失效则尝试重新连接
*/
@Scheduled(fixedRateString = "${tcp.reconnect}")
private void checkConnection() {
if (channel == null || !channel.isActive()) {
log.error("TCP服务链接丢失");
log.error("设备TCP服务链接丢失,重新连接中...");
connect();
}
}
/**
* obj转换成json后发送
* 对象转换成 JSON 后发送
*/
public boolean sendToJSON(Object request) {
String msg = JSONUtil.toJsonStr(request);
return this.send(msg);
return send(msg);
}
/**
* 发送字符串请求到TCP服务器
* 发送字符串到 TCP 服务器
*/
public boolean send(String request) {
if (channel != null && channel.isActive()) {
log.info("发送TCP指令:{}", request);
log.info("发送设备TCP指令:{}", request);
channel.writeAndFlush(Unpooled.copiedBuffer(request, CharsetUtil.UTF_8));
return true;
} else {
log.error("TCP服务未连接,无法发送请求: {}", request);
log.error("设备TCP服务未连接,无法发送请求: {}", request);
return false;
}
}
/**
* 应用退出时优雅关闭 EventLoopGroup
*/
@PreDestroy
private void shutdown() {
log.info("正在关闭 TCP 客户端...");
group.shutdownGracefully();
}
}

15
src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java

@ -13,17 +13,20 @@ public class DeviceMessageHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf buf = (ByteBuf) msg;
String serverMsg = buf.toString(CharsetUtil.UTF_8);
log.info("serverMsg:{}", serverMsg);
if (!(msg instanceof ByteBuf buf)) {
log.warn("收到未知类型的消息:{}", msg.getClass());
return;
}
try {
String serverMsg = buf.toString(CharsetUtil.UTF_8);
log.info("serverMsg: {}", serverMsg);
JSONObject deviceResult = JSONUtil.parseObj(serverMsg);
JSONObject payload = deviceResult.getJSONObject("payload");
String tag = deviceResult.get("tag").toString();
String tag = deviceResult.getStr("tag");
Integer cmdId = payload.getInt("cmdId");
// 根据 tag cmdId 进行处理
} catch (Exception e) {
log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e);
log.error("TCP服务消息处理错误: {}, error: {}", buf.toString(CharsetUtil.UTF_8), e.getMessage(), e);
} finally {
buf.release();
}

2
src/main/java/com/qyft/ms/system/core/MyMetaObjectHandler.java → src/main/java/com/qyft/ms/system/core/handler/MyMetaObjectHandler.java

@ -1,4 +1,4 @@
package com.qyft.ms.system.core;
package com.qyft.ms.system.core.handler;
import com.baomidou.mybatisplus.core.handlers.MetaObjectHandler;
import org.apache.ibatis.reflection.MetaObject;

4
src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java

@ -10,7 +10,6 @@ public class TcpConnectionHandler extends ChannelInboundHandlerAdapter {
private final DeviceTcpClient tcpClient;
// 通过构造方法注入 TcpClient 引用
public TcpConnectionHandler(DeviceTcpClient tcpClient) {
this.tcpClient = tcpClient;
}
@ -21,8 +20,7 @@ public class TcpConnectionHandler extends ChannelInboundHandlerAdapter {
if (ctx.channel() != null && ctx.channel().isOpen()) {
ctx.channel().close();
}
// 调用 TcpClient 的连接方法
tcpClient.connect();
tcpClient.connect();// 触发重连
super.channelInactive(ctx);
}

Loading…
Cancel
Save