Browse Source

fix:添加任务队列1

master
guoapeng 5 months ago
parent
commit
1ce245185e
  1. BIN
      device/sub_spray-mock-server-1.0.1.jar
  2. 158
      src/main/java/com/qyft/ms/device/server/TcpServer.java
  3. 121
      src/main/java/com/qyft/ms/device/service/TaskQueueManager.java

BIN
device/sub_spray-mock-server-1.0.1.jar

158
src/main/java/com/qyft/ms/device/server/TcpServer.java

@ -0,0 +1,158 @@
package com.qyft.ms.device.server;
import com.qyft.ms.device.config.TcpConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.GlobalEventExecutor;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RequiredArgsConstructor
public class TcpServer {
private final TcpConfig tcpConfig;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 初始化方法在Spring容器启动后调用
*/
@PostConstruct
public void init() {
if (tcpConfig.isEnable()) {
new Thread(this::start).start(); // 在独立线程中启动TCP服务器
}
}
/**
* 启动TCP服务器的方法
*/
public void start() {
bossGroup = new NioEventLoopGroup(1); // 创建Boss线程组
workerGroup = new NioEventLoopGroup(); // 创建Worker线程组
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 添加JSON对象解码器到ChannelPipeline
ch.pipeline().addLast(new JsonObjectDecoder());
// 添加TCP消息处理器到ChannelPipeline
ch.pipeline().addLast(new TcpMessageHandler());
// 添加客户端到ChannelGroup
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
clients.add(ctx.channel()); // 将新连接的客户端添加到ChannelGroup
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
clients.remove(ctx.channel()); // 从ChannelGroup中移除断开连接的客户端
super.channelInactive(ctx);
}
});
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 绑定并开始接受传入连接
ChannelFuture future = bootstrap.bind(tcpConfig.getPort()).sync();
log.info("TCP服务器已启动,监听端口: {}", tcpConfig.getPort());
// 等待服务器套接字关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("TCP服务器启动过程中发生中断: {}", e.getMessage(), e);
} finally {
// 优雅地关闭EventLoopGroup
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
/**
* 服务器端推送消息的方法
* @param message 要推送的消息内容
*/
public void sendMessage(String message) {
clients.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); // 将消息推送给所有客户端
}
/**
* 停止TCP服务器的方法
*/
public void stop() {
if (workerGroup != null) {
workerGroup.shutdownGracefully(); // 优雅地关闭Worker线程组
}
if (bossGroup != null) {
bossGroup.shutdownGracefully(); // 优雅地关闭Boss线程组
}
}
// 内部类处理TCP消息事件
private static class TcpMessageHandler extends ChannelInboundHandlerAdapter {
/**
* 处理接收到的消息
* @param ctx 通道处理上下文
* @param msg 接收到的消息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 记录接收到的消息
log.info("接收到消息: {}", msg.toString());
// 将ByteBuf转换为字符串
String messageStr = ((io.netty.buffer.ByteBuf) msg).toString(CharsetUtil.UTF_8);
// 解析JSON对象
cn.hutool.json.JSONObject jsonObject = cn.hutool.json.JSONUtil.parseObj(messageStr);
// 可以在这里添加消息处理逻辑
// 例如处理特定字段
String someField = jsonObject.getStr("someField");
log.info("解析后的字段值: {}", someField);
// 新增五秒后推送消息
Runnable task = () -> {
String responseMessage = "服务器推送消息: " + someField;
ctx.writeAndFlush(Unpooled.copiedBuffer(responseMessage, CharsetUtil.UTF_8)); // 推送消息给客户端
log.info("五秒后推送消息: {}", responseMessage);
};
}
/**
* 处理异常事件
* @param ctx 通道处理上下文
* @param cause 异常对象
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 记录异常日志
log.error("TCP连接发生异常: {}", cause.getMessage(), cause);
// 关闭当前channel
ctx.close();
}
}
}

121
src/main/java/com/qyft/ms/device/service/TaskQueueManager.java

@ -0,0 +1,121 @@
package com.qyft.ms.device.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
@Component
public class TaskQueueManager {
// 任务队列用于存储待执行的任务
private final CopyOnWriteArrayList<TaskWrapper> taskQueue;
// 执行任务的线程池
private final ExecutorService executorService;
// 标记任务队列是否暂停
private final AtomicBoolean isPaused;
/**
* @param taskId 唯一标识
*/ // 新增任务包装类
private record TaskWrapper(String taskId, Runnable task) {
}
/**
* 构造函数初始化任务队列管理器
*/
public TaskQueueManager() {
this.taskQueue = new CopyOnWriteArrayList<>();
this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
this.isPaused = new AtomicBoolean(false);
}
public void addTask(Runnable task) {
addTask(UUID.randomUUID().toString(), task); // 自动生成 taskId
}
private void addTask(String taskId, Runnable task) {
taskQueue.add(new TaskWrapper(taskId, task));
log.info("添加到任务队列: taskId={}, task={}", taskId, task);
}
/**
* 在指定位置添加任务到任务队列
* @param index 插入位置
* @param task 待添加的任务
*/
public void addTask(int index, Runnable task) {
addTask(index, UUID.randomUUID().toString(), task); // 自动生成 taskId
}
private void addTask(int index, String taskId, Runnable task) {
taskQueue.add(index, new TaskWrapper(taskId, task));
log.info("在位置 {} 添加到任务队列: taskId={}, task={}", index, taskId, task);
}
/**
* 获取当前任务队列中的任务列表
* @return 任务列表
*/
public List<Runnable> getTasks() {
List<Runnable> tasks = new ArrayList<>();
for (TaskWrapper wrapper : taskQueue) {
tasks.add(wrapper.task());
}
return tasks;
}
/**
* 启动任务队列管理器开始处理任务
*/
public void start() {
executorService.submit(() -> { // 提交一个任务到线程池
while (!executorService.isShutdown()) { // 循环直到线程池关闭
try {
while (isPaused.get()) { // 如果任务队列暂停则休眠
Thread.sleep(100);
}
TaskWrapper taskWrapper = taskQueue.remove(0); // 从队列中取出一个任务如果队列为空则阻塞等待
taskWrapper.task().run(); // 执行任务
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // 恢复中断状态
break; // 如果线程被中断则退出循环
}
}
});
}
/**
* 暂停任务队列
*/
public void pause() {
isPaused.set(true); // 设置暂停标志为true
}
/**
* 恢复任务队列
*/
public void resume() {
isPaused.set(false); // 设置暂停标志为false
}
/**
* 关闭任务队列管理器
*/
public void shutdown() {
executorService.shutdown(); // 关闭线程池
}
/**
* 检查任务队列管理器是否已关闭
* @return 如果已关闭则返回true否则返回false
*/
public boolean isShutdown() {
return executorService.isShutdown();
}
}
Loading…
Cancel
Save