8 changed files with 207 additions and 0 deletions
-
1build.gradle
-
87src/main/java/com/qyft/gd/device/client/TcpClient.java
-
7src/main/java/com/qyft/gd/device/common/constant/DeviceCommands.java
-
32src/main/java/com/qyft/gd/device/config/TcpConfig.java
-
37src/main/java/com/qyft/gd/device/handler/DeviceMessageHandler.java
-
10src/main/java/com/qyft/gd/device/model/bo/DeviceStatus.java
-
24src/main/java/com/qyft/gd/device/service/DeviceStateService.java
-
9src/main/resources/application.yml
@ -0,0 +1,87 @@ |
|||
package com.qyft.gd.device.client; |
|||
|
|||
import com.qyft.gd.device.config.TcpConfig; |
|||
import com.qyft.gd.device.handler.DeviceMessageHandler; |
|||
import io.netty.bootstrap.Bootstrap; |
|||
import io.netty.channel.*; |
|||
import io.netty.channel.nio.NioEventLoopGroup; |
|||
import io.netty.channel.socket.nio.NioSocketChannel; |
|||
import jakarta.annotation.PostConstruct; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.scheduling.annotation.Scheduled; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
import java.net.InetSocketAddress; |
|||
import java.util.concurrent.TimeUnit; |
|||
|
|||
|
|||
@Component |
|||
@RequiredArgsConstructor |
|||
public class TcpClient { |
|||
|
|||
private static final Logger log = LoggerFactory.getLogger(TcpClient.class); |
|||
private final TcpConfig tcpConfig; |
|||
private final DeviceMessageHandler deviceMessageHandler; |
|||
|
|||
private final EventLoopGroup group = new NioEventLoopGroup(); |
|||
private Channel channel; |
|||
private Bootstrap bootstrap; |
|||
|
|||
@PostConstruct |
|||
public void init() { |
|||
if (tcpConfig.isEnable()) { |
|||
connect(); |
|||
} |
|||
} |
|||
|
|||
public void connect() { |
|||
try { |
|||
if (bootstrap == null) { |
|||
bootstrap = new Bootstrap(); |
|||
bootstrap.group(group) |
|||
.channel(NioSocketChannel.class) |
|||
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, tcpConfig.getTimeout()) |
|||
.handler(new ChannelInitializer<>() { |
|||
@Override |
|||
protected void initChannel(Channel ch) { |
|||
ch.pipeline().addLast(deviceMessageHandler); |
|||
} |
|||
}); |
|||
} |
|||
|
|||
log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort()); |
|||
ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort())); |
|||
|
|||
future.addListener((ChannelFutureListener) f -> { |
|||
if (f.isSuccess()) { |
|||
channel = f.channel(); |
|||
log.info("已链接到TCP服务"); |
|||
} else { |
|||
log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect()); |
|||
f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS); |
|||
} |
|||
}); |
|||
|
|||
} catch (Exception e) { |
|||
log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e); |
|||
} |
|||
} |
|||
|
|||
@Scheduled(fixedRateString = "${tcp.reconnect}") |
|||
public void checkConnection() { |
|||
if (channel == null || !channel.isActive()) { |
|||
log.error("TCP服务链接丢失"); |
|||
connect(); |
|||
} |
|||
} |
|||
|
|||
public void sendCommand(String command) { |
|||
if (channel != null && channel.isActive()) { |
|||
channel.writeAndFlush(command); |
|||
} else { |
|||
log.error("TCP服务未连接,无法发送指令: {}", command); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,7 @@ |
|||
package com.qyft.gd.device.common.constant; |
|||
|
|||
/** |
|||
* 设备指令 |
|||
*/ |
|||
public class DeviceCommands { |
|||
} |
@ -0,0 +1,32 @@ |
|||
package com.qyft.gd.device.config; |
|||
|
|||
import lombok.Data; |
|||
import org.springframework.boot.context.properties.ConfigurationProperties; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Data |
|||
@Component |
|||
@ConfigurationProperties(prefix = "tcp") |
|||
public class TcpConfig { |
|||
/** |
|||
* 是否启用 TCP 连接 |
|||
*/ |
|||
private boolean enable; |
|||
/** |
|||
* TCP 链接地址 |
|||
*/ |
|||
private String host; |
|||
/** |
|||
* TCP 端口 |
|||
*/ |
|||
private int port; |
|||
/** |
|||
* 断线重连间隔(毫秒) |
|||
*/ |
|||
private int reconnect; |
|||
/** |
|||
* 连接超时时间(毫秒) |
|||
*/ |
|||
private int timeout; |
|||
} |
|||
|
@ -0,0 +1,37 @@ |
|||
package com.qyft.gd.device.handler; |
|||
|
|||
import cn.hutool.json.JSONObject; |
|||
import cn.hutool.json.JSONUtil; |
|||
import com.qyft.gd.device.model.bo.DeviceStatus; |
|||
import com.qyft.gd.device.service.DeviceStateService; |
|||
import io.netty.channel.ChannelHandler; |
|||
import io.netty.channel.ChannelHandlerContext; |
|||
import io.netty.channel.SimpleChannelInboundHandler; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.slf4j.Logger; |
|||
import org.slf4j.LoggerFactory; |
|||
import org.springframework.stereotype.Component; |
|||
|
|||
@Component |
|||
@ChannelHandler.Sharable |
|||
@RequiredArgsConstructor |
|||
public class DeviceMessageHandler extends SimpleChannelInboundHandler<String> { |
|||
|
|||
private static final Logger log = LoggerFactory.getLogger(DeviceMessageHandler.class); |
|||
|
|||
private final DeviceStateService deviceStateService; |
|||
@Override |
|||
protected void channelRead0(ChannelHandlerContext ctx, String msg) { |
|||
try { |
|||
// 解析 JSON |
|||
JSONObject json = JSONUtil.parseObj(msg); |
|||
DeviceStatus deviceStatus = json.toBean(DeviceStatus.class); |
|||
// 更新设备状态 |
|||
deviceStateService.updateDeviceStatus(deviceStatus); |
|||
|
|||
log.info("设备状态已更新: {}", json.toStringPretty()); |
|||
} catch (Exception e) { |
|||
log.error("设备状态更新错误: {}, error: {}", msg, e.getMessage(), e); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,10 @@ |
|||
package com.qyft.gd.device.model.bo; |
|||
|
|||
import lombok.Data; |
|||
|
|||
/** |
|||
* 设备当前状态 |
|||
*/ |
|||
@Data |
|||
public class DeviceStatus { |
|||
} |
@ -0,0 +1,24 @@ |
|||
package com.qyft.gd.device.service; |
|||
|
|||
import cn.hutool.core.bean.BeanUtil; |
|||
import com.qyft.gd.device.model.bo.DeviceStatus; |
|||
import lombok.RequiredArgsConstructor; |
|||
import org.springframework.stereotype.Service; |
|||
|
|||
@Service |
|||
@RequiredArgsConstructor |
|||
public class DeviceStateService { |
|||
private final DeviceStatus deviceStatus = new DeviceStatus(); |
|||
|
|||
public void updateDeviceStatus(DeviceStatus newStatus) { |
|||
synchronized (deviceStatus) { |
|||
BeanUtil.copyProperties(newStatus, deviceStatus); |
|||
} |
|||
} |
|||
|
|||
public DeviceStatus getDeviceStatus() { |
|||
synchronized (deviceStatus) { |
|||
return BeanUtil.copyProperties(deviceStatus, DeviceStatus.class); |
|||
} |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue