13 changed files with 541 additions and 100 deletions
-
41src/main/java/com/qyft/ms/system/config/DeviceTcpConfig.java
-
14src/main/java/com/qyft/ms/system/config/WebSocketConfig.java
-
112src/main/java/com/qyft/ms/system/core/client/DeviceTcpClient.java
-
31src/main/java/com/qyft/ms/system/core/handler/DeviceMessageHandler.java
-
34src/main/java/com/qyft/ms/system/core/handler/TcpConnectionHandler.java
-
79src/main/java/com/qyft/ms/system/core/server/WebSocketServer.java
-
2src/main/java/com/qyft/ms/system/service/RoleService.java
-
2src/main/java/com/qyft/ms/system/service/UserService.java
-
6src/main/resources/application.yml
-
80src/main/resources/logback.xml
-
4src/main/resources/mapper/system/RoleMapper.xml
-
4src/main/resources/mapper/system/UserMapper.xml
-
232src/main/resources/sql/init.sql
@ -0,0 +1,41 @@ |
|||
package com.qyft.ms.system.config; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Data |
|||
@Component |
|||
@ConfigurationProperties(prefix = "tcp") |
|||
public class DeviceTcpConfig { |
|||
/** |
|||
* 是否启用 TCP 连接 |
|||
*/ |
|||
private boolean enable; |
|||
/** |
|||
* 是否开启本地tcp服务器 |
|||
*/ |
|||
private boolean serverEnable; |
|||
/** |
|||
* TCP 链接地址 |
|||
*/ |
|||
private String host; |
|||
/** |
|||
* TCP 端口 |
|||
*/ |
|||
private int port; |
|||
/** |
|||
* 断线重连间隔(毫秒) |
|||
*/ |
|||
private int reconnect; |
|||
/** |
|||
* 连接超时时间(毫秒) |
|||
*/ |
|||
private int timeout; |
|||
/** |
|||
* 指令反馈超时时间 |
|||
*/ |
|||
private int feedbackTimeout; |
|||
|
|||
} |
|||
|
@ -0,0 +1,14 @@ |
|||
package com.qyft.ms.system.config; |
|||
|
|||
import org.springframework.context.annotation.Bean; |
|||
import org.springframework.context.annotation.Configuration; |
|||
import org.springframework.web.socket.server.standard.ServerEndpointExporter; |
|||
|
|||
@Configuration |
|||
public class WebSocketConfig { |
|||
|
|||
@Bean |
|||
public ServerEndpointExporter serverEndpointExporter() { |
|||
return new ServerEndpointExporter(); |
|||
} |
|||
} |
@ -0,0 +1,112 @@ |
|||
package com.qyft.ms.system.core.client; |
|||
|
|||
import cn.hutool.json.JSONUtil; |
|||
import com.qyft.ms.system.config.DeviceTcpConfig; |
|||
import com.qyft.ms.system.core.handler.DeviceMessageHandler; |
|||
import com.qyft.ms.system.core.handler.TcpConnectionHandler; |
|||
import io.netty.bootstrap.Bootstrap; |
|||
import io.netty.buffer.Unpooled; |
|||
import io.netty.channel.*; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.channel.socket.nio.NioSocketChannel; |
|||
import io.netty.handler.codec.json.JsonObjectDecoder; |
|||
import io.netty.util.CharsetUtil; |
|||
import jakarta.annotation.PostConstruct; |
|||
import lombok.RequiredArgsConstructor; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.scheduling.annotation.Scheduled; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.net.InetSocketAddress; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
@Slf4j |
|||
@Component |
|||
@RequiredArgsConstructor |
|||
public class DeviceTcpClient { |
|||
|
|||
private final DeviceTcpConfig tcpConfig; |
|||
|
|||
private final EventLoopGroup group = new NioEventLoopGroup(); |
|||
private Channel channel; |
|||
private Bootstrap bootstrap; |
|||
|
|||
@PostConstruct |
|||
private void init() { |
|||
if (tcpConfig.isEnable()) { |
|||
connect(); |
|||
} |
|||
} |
|||
|
|||
// 连接到TCP服务器 |
|||
public void connect() { |
|||
try { |
|||
if (bootstrap == null) { |
|||
bootstrap = new Bootstrap(); |
|||
bootstrap.group(group) |
|||
.channel(NioSocketChannel.class) |
|||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, tcpConfig.getTimeout()) |
|||
.handler(new ChannelInitializer<>() { |
|||
@Override |
|||
protected void initChannel(Channel ch) { |
|||
// 添加JSON解码器 |
|||
ch.pipeline().addLast(new JsonObjectDecoder()); |
|||
// 添加设备消息处理器 |
|||
ch.pipeline().addLast(new DeviceMessageHandler()); |
|||
// 添加外部定义的TCP连接处理器,并传入TcpClient实例 |
|||
ch.pipeline().addLast(new TcpConnectionHandler(DeviceTcpClient.this)); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort()); |
|||
ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort())); |
|||
|
|||
future.addListener((ChannelFutureListener) f -> { |
|||
if (f.isSuccess()) { |
|||
channel = f.channel(); |
|||
log.info("已链接到TCP服务"); |
|||
} else { |
|||
log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect()); |
|||
f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); |
|||
} |
|||
}); |
|||
|
|||
} catch (Exception e) { |
|||
log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 定时检查TCP连接 |
|||
*/ |
|||
@Scheduled(fixedRateString = "${tcp.reconnect}") |
|||
private void checkConnection() { |
|||
if (channel == null || !channel.isActive()) { |
|||
log.error("TCP服务链接丢失"); |
|||
connect(); |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 将obj转换成json后发送 |
|||
*/ |
|||
public boolean sendToJSON(Object request) { |
|||
String msg = JSONUtil.toJsonStr(request); |
|||
return this.send(msg); |
|||
} |
|||
|
|||
/** |
|||
* 发送字符串请求到TCP服务器 |
|||
*/ |
|||
public boolean send(String request) { |
|||
if (channel != null && channel.isActive()) { |
|||
log.info("发送TCP指令:{}", request); |
|||
channel.writeAndFlush(Unpooled.copiedBuffer(request, CharsetUtil.UTF_8)); |
|||
return true; |
|||
} else { |
|||
log.error("TCP服务未连接,无法发送请求: {}", request); |
|||
return false; |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,31 @@ |
|||
package com.qyft.ms.system.core.handler; |
|||
|
|||
import cn.hutool.json.JSONObject; |
|||
import cn.hutool.json.JSONUtil; |
|||
import io.netty.buffer.ByteBuf; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.ChannelInboundHandlerAdapter; |
|||
import io.netty.util.CharsetUtil; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class DeviceMessageHandler extends ChannelInboundHandlerAdapter { |
|||
|
|||
@Override |
|||
public void channelRead(ChannelHandlerContext ctx, Object msg) { |
|||
ByteBuf buf = (ByteBuf) msg; |
|||
String serverMsg = buf.toString(CharsetUtil.UTF_8); |
|||
log.info("serverMsg:{}", serverMsg); |
|||
try { |
|||
JSONObject deviceResult = JSONUtil.parseObj(serverMsg); |
|||
JSONObject payload = deviceResult.getJSONObject("payload"); |
|||
String tag = deviceResult.get("tag").toString(); |
|||
Integer cmdId = payload.getInt("cmdId"); |
|||
// 根据 tag 和 cmdId 进行处理 |
|||
} catch (Exception e) { |
|||
log.error("TCP服务消息处理错误: {}, error: {}", serverMsg, e.getMessage(), e); |
|||
} finally { |
|||
buf.release(); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,34 @@ |
|||
package com.qyft.ms.system.core.handler; |
|||
|
|||
import com.qyft.ms.system.core.client.DeviceTcpClient; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.ChannelInboundHandlerAdapter; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
|
|||
@Slf4j |
|||
public class TcpConnectionHandler extends ChannelInboundHandlerAdapter { |
|||
|
|||
private final DeviceTcpClient tcpClient; |
|||
|
|||
// 通过构造方法注入 TcpClient 引用 |
|||
public TcpConnectionHandler(DeviceTcpClient tcpClient) { |
|||
this.tcpClient = tcpClient; |
|||
} |
|||
|
|||
@Override |
|||
public void channelInactive(ChannelHandlerContext ctx) throws Exception { |
|||
log.error("TCP连接丢失,准备重新连接..."); |
|||
if (ctx.channel() != null && ctx.channel().isOpen()) { |
|||
ctx.channel().close(); |
|||
} |
|||
// 调用 TcpClient 的连接方法 |
|||
tcpClient.connect(); |
|||
super.channelInactive(ctx); |
|||
} |
|||
|
|||
@Override |
|||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
|||
log.error("TCP连接发生异常: {}", cause.getMessage(), cause); |
|||
ctx.close(); |
|||
} |
|||
} |
@ -0,0 +1,79 @@ |
|||
package com.qyft.ms.system.core.server; |
|||
|
|||
import jakarta.websocket.*; |
|||
import jakarta.websocket.server.ServerEndpoint; |
|||
import lombok.extern.slf4j.Slf4j; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.util.Set; |
|||
import java.util.concurrent.CopyOnWriteArraySet; |
|||
|
|||
@Slf4j |
|||
@ServerEndpoint("/ws") |
|||
@Component |
|||
public class WebSocketServer { |
|||
|
|||
private static final Set<Session> sessions = new CopyOnWriteArraySet<>(); |
|||
// 消息按顺序发送,加锁 |
|||
private static final Object sendLock = new Object(); |
|||
|
|||
/** |
|||
* 群发消息给所有客户端 |
|||
*/ |
|||
public static void sendMessageToClients(String message) { |
|||
synchronized (sendLock) { |
|||
for (Session session : sessions) { |
|||
if (session.isOpen()) { |
|||
try { |
|||
session.getBasicRemote().sendText(message); |
|||
log.info("发送消息给客户端(sessionId={}):{}", session.getId(), message); |
|||
} catch (Exception e) { |
|||
log.error("发送消息失败,sessionId={}", session.getId(), e); |
|||
} |
|||
} else { |
|||
log.warn("跳过关闭状态的 session,sessionId={}", session.getId()); |
|||
} |
|||
} |
|||
} |
|||
} |
|||
|
|||
/** |
|||
* 当有新的 WebSocket 连接建立时调用 |
|||
*/ |
|||
@OnOpen |
|||
private void onOpen(Session session) { |
|||
sessions.add(session); |
|||
log.info("WebSocket 客户端已连接,sessionId={},当前在线人数={}", session.getId(), sessions.size()); |
|||
} |
|||
|
|||
/** |
|||
* 当接收到来自客户端的消息时调用 |
|||
*/ |
|||
@OnMessage |
|||
public void onMessage(String message, Session session) { |
|||
log.info("接收到来自客户端(sessionId={})的消息:{}", session.getId(), message); |
|||
// 根据业务需求处理消息,此处可添加进一步的处理逻辑 |
|||
} |
|||
|
|||
/** |
|||
* 当 WebSocket 连接关闭时调用 |
|||
*/ |
|||
@OnClose |
|||
public void onClose(Session session, CloseReason reason) { |
|||
sessions.remove(session); |
|||
log.info("WebSocket 客户端断开连接,sessionId={},原因={},当前在线人数={}", |
|||
session.getId(), reason.getReasonPhrase(), sessions.size()); |
|||
} |
|||
|
|||
/** |
|||
* 当 WebSocket 连接发生错误时调用 |
|||
*/ |
|||
@OnError |
|||
public void onError(Session session, Throwable error) { |
|||
if (session != null) { |
|||
log.error("WebSocket 发生错误,sessionId={}", session.getId(), error); |
|||
} else { |
|||
log.error("WebSocket 发生错误,session 未初始化", error); |
|||
} |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue