diff --git a/device/sub_spray-mock-server-1.0.1.jar b/device/sub_spray-mock-server-1.0.1.jar new file mode 100644 index 0000000..55f5cd4 Binary files /dev/null and b/device/sub_spray-mock-server-1.0.1.jar differ diff --git a/src/main/java/com/qyft/ms/device/server/TcpServer.java b/src/main/java/com/qyft/ms/device/server/TcpServer.java new file mode 100644 index 0000000..bfa609e --- /dev/null +++ b/src/main/java/com/qyft/ms/device/server/TcpServer.java @@ -0,0 +1,158 @@ +package com.qyft.ms.device.server; + +import com.qyft.ms.device.config.TcpConfig; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.json.JsonObjectDecoder; +import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.GlobalEventExecutor; +import jakarta.annotation.PostConstruct; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +@RequiredArgsConstructor +public class TcpServer { + + private final TcpConfig tcpConfig; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + private final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + /** + * 初始化方法,在Spring容器启动后调用 + */ + @PostConstruct + public void init() { + if (tcpConfig.isEnable()) { + new Thread(this::start).start(); // 在独立线程中启动TCP服务器 + } + } + + /** + * 启动TCP服务器的方法 + */ + public void start() { + bossGroup = new NioEventLoopGroup(1); // 创建Boss线程组 + workerGroup = new NioEventLoopGroup(); // 创建Worker线程组 + try { + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) { + // 添加JSON对象解码器到ChannelPipeline + ch.pipeline().addLast(new JsonObjectDecoder()); + // 添加TCP消息处理器到ChannelPipeline + ch.pipeline().addLast(new TcpMessageHandler()); + // 添加客户端到ChannelGroup + ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + clients.add(ctx.channel()); // 将新连接的客户端添加到ChannelGroup + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + clients.remove(ctx.channel()); // 从ChannelGroup中移除断开连接的客户端 + super.channelInactive(ctx); + } + }); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true); + + // 绑定并开始接受传入连接 + ChannelFuture future = bootstrap.bind(tcpConfig.getPort()).sync(); + log.info("TCP服务器已启动,监听端口: {}", tcpConfig.getPort()); + + // 等待服务器套接字关闭 + future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + log.error("TCP服务器启动过程中发生中断: {}", e.getMessage(), e); + } finally { + // 优雅地关闭EventLoopGroup + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + /** + * 服务器端推送消息的方法 + * @param message 要推送的消息内容 + */ + public void sendMessage(String message) { + clients.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); // 将消息推送给所有客户端 + } + + /** + * 停止TCP服务器的方法 + */ + public void stop() { + if (workerGroup != null) { + workerGroup.shutdownGracefully(); // 优雅地关闭Worker线程组 + } + if (bossGroup != null) { + bossGroup.shutdownGracefully(); // 优雅地关闭Boss线程组 + } + } + + // 内部类,处理TCP消息事件 + private static class TcpMessageHandler extends ChannelInboundHandlerAdapter { + + /** + * 处理接收到的消息 + * @param ctx 通道处理上下文 + * @param msg 接收到的消息 + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + // 记录接收到的消息 + log.info("接收到消息: {}", msg.toString()); + + // 将ByteBuf转换为字符串 + String messageStr = ((io.netty.buffer.ByteBuf) msg).toString(CharsetUtil.UTF_8); + + // 解析JSON对象 + cn.hutool.json.JSONObject jsonObject = cn.hutool.json.JSONUtil.parseObj(messageStr); + + // 可以在这里添加消息处理逻辑 + // 例如:处理特定字段 + String someField = jsonObject.getStr("someField"); + log.info("解析后的字段值: {}", someField); + + // 新增:五秒后推送消息 + Runnable task = () -> { + String responseMessage = "服务器推送消息: " + someField; + ctx.writeAndFlush(Unpooled.copiedBuffer(responseMessage, CharsetUtil.UTF_8)); // 推送消息给客户端 + log.info("五秒后推送消息: {}", responseMessage); + }; + } + + /** + * 处理异常事件 + * @param ctx 通道处理上下文 + * @param cause 异常对象 + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + // 记录异常日志 + log.error("TCP连接发生异常: {}", cause.getMessage(), cause); + // 关闭当前channel + ctx.close(); + } + } +} \ No newline at end of file diff --git a/src/main/java/com/qyft/ms/device/service/TaskQueueManager.java b/src/main/java/com/qyft/ms/device/service/TaskQueueManager.java new file mode 100644 index 0000000..dcd6bac --- /dev/null +++ b/src/main/java/com/qyft/ms/device/service/TaskQueueManager.java @@ -0,0 +1,121 @@ +package com.qyft.ms.device.service; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; + +@Slf4j +@Component +public class TaskQueueManager { + + // 任务队列,用于存储待执行的任务 + private final CopyOnWriteArrayList taskQueue; + // 执行任务的线程池 + private final ExecutorService executorService; + // 标记任务队列是否暂停 + private final AtomicBoolean isPaused; + + /** + * @param taskId 唯一标识 + */ // 新增任务包装类 + private record TaskWrapper(String taskId, Runnable task) { + + } + /** + * 构造函数,初始化任务队列管理器 + */ + public TaskQueueManager() { + this.taskQueue = new CopyOnWriteArrayList<>(); + this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + this.isPaused = new AtomicBoolean(false); + } + + public void addTask(Runnable task) { + addTask(UUID.randomUUID().toString(), task); // 自动生成 taskId + } + + private void addTask(String taskId, Runnable task) { + taskQueue.add(new TaskWrapper(taskId, task)); + log.info("添加到任务队列: taskId={}, task={}", taskId, task); + } + + /** + * 在指定位置添加任务到任务队列 + * @param index 插入位置 + * @param task 待添加的任务 + */ + public void addTask(int index, Runnable task) { + addTask(index, UUID.randomUUID().toString(), task); // 自动生成 taskId + } + + private void addTask(int index, String taskId, Runnable task) { + taskQueue.add(index, new TaskWrapper(taskId, task)); + log.info("在位置 {} 添加到任务队列: taskId={}, task={}", index, taskId, task); + } + + /** + * 获取当前任务队列中的任务列表 + * @return 任务列表 + */ + public List getTasks() { + List tasks = new ArrayList<>(); + for (TaskWrapper wrapper : taskQueue) { + tasks.add(wrapper.task()); + } + return tasks; + } + + /** + * 启动任务队列管理器,开始处理任务 + */ + public void start() { + executorService.submit(() -> { // 提交一个任务到线程池 + while (!executorService.isShutdown()) { // 循环直到线程池关闭 + try { + while (isPaused.get()) { // 如果任务队列暂停,则休眠 + Thread.sleep(100); + } + TaskWrapper taskWrapper = taskQueue.remove(0); // 从队列中取出一个任务,如果队列为空则阻塞等待 + taskWrapper.task().run(); // 执行任务 + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // 恢复中断状态 + break; // 如果线程被中断,则退出循环 + } + } + }); + } + + /** + * 暂停任务队列 + */ + public void pause() { + isPaused.set(true); // 设置暂停标志为true + } + + /** + * 恢复任务队列 + */ + public void resume() { + isPaused.set(false); // 设置暂停标志为false + } + + /** + * 关闭任务队列管理器 + */ + public void shutdown() { + executorService.shutdown(); // 关闭线程池 + } + + /** + * 检查任务队列管理器是否已关闭 + * @return 如果已关闭则返回true,否则返回false + */ + public boolean isShutdown() { + return executorService.isShutdown(); + } +} \ No newline at end of file