Browse Source

fix:添加任务队列

master
guoapeng 5 months ago
parent
commit
0bce0a5f39
  1. 79
      src/main/java/com/qyft/ms/app/service/CMDService.java
  2. 37
      src/main/java/com/qyft/ms/device/client/TcpClient.java
  3. 6
      src/main/java/com/qyft/ms/device/common/constant/DeviceCommands.java
  4. 5
      src/main/java/com/qyft/ms/device/config/TcpConfig.java
  5. 12
      src/main/java/com/qyft/ms/device/service/DeviceAlarmService.java
  6. 14
      src/main/java/com/qyft/ms/device/service/DeviceService.java
  7. 1
      src/main/resources/application.yml
  8. 40
      src/test/java/com/qyft/ms/QueueTest.java

79
src/main/java/com/qyft/ms/app/service/CMDService.java

@ -171,8 +171,7 @@ public class CMDService {
!params.containsKey("routeType") ||
!params.containsKey("movementSpeed") ||
!params.containsKey("height") ||
!params.containsKey("matrixFlowVelocity") ||
!params.containsKey("nitrogenFlowVelocity")
!params.containsKey("matrixFlowVelocity")
) {
return "参数错误";
}
@ -191,6 +190,10 @@ public class CMDService {
// 设置指定轴的电机的运行速度
int movementSpeed = (Integer) params.get("movementSpeed");
cmdList.add(() -> deviceTcpCMDService.setMotorSpeed("X", movementSpeed));
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("spraying");
return true;
});
cmdList.add(() -> deviceTcpCMDService.setMotorSpeed("Y", movementSpeed));
cmdList.add(() -> deviceTcpCMDService.setMotorSpeed("Z", 10));
@ -228,7 +231,7 @@ public class CMDService {
left, right, top, bottom, space, routeType == 1 ? PathGenerator.MoveMode.HORIZONTAL_ZIGZAG_TOP_DOWN : PathGenerator.MoveMode.VERTICAL_ZIGZAG_LEFT_RIGHT
);
log.info("horizontalPath:{}", JSONUtil.toJsonStr(horizontalPath));
if(horizontalPath.isEmpty()) {
if (horizontalPath.isEmpty()) {
return "路径规划失败";
}
@ -256,7 +259,11 @@ public class CMDService {
cmdList.add(() -> deviceTcpCMDService.controlValve("Nozzle", true));
// 推注射泵
int matrixFlowVelocity = (int) params.get("matrixFlowVelocity");
double matrixFlowVelocity =Optional.ofNullable(params.get("matrixFlowVelocity"))
.filter(Number.class::isInstance)
.map(Number.class::cast)
.map(Number::doubleValue)
.orElse(0.0);
cmdList.add(() -> deviceTcpCMDService.turnOnSyringePump(matrixFlowVelocity));
// 插入日志
@ -264,15 +271,17 @@ public class CMDService {
cmdList.add( () -> {
OperationLog operationLog = new OperationLog();
operationLog.setStatus(0);
operationLog.setMatrixId((Long) params.get("matrixCraftId"));
Long matrixCraftId = Long.valueOf(Optional.ofNullable(params.get("matrixCraftId"))
.filter(Number.class::isInstance)
.map(Number.class::cast)
.map(Number::intValue)
.orElse(0));
operationLog.setMatrixId(matrixCraftId);
operationLog.setMatrixInfo(JSON.toJSONString(params));
operationLogService.add(operationLog);
return true;
});
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("spraying");
return true;
});
}
double currentX = left;
double currentY = top;
@ -301,7 +310,7 @@ public class CMDService {
// 关闭高压
cmdList.add(deviceTcpCMDService::turnOffHighVoltage);
// 回到原点
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Z"));
// cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Z"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("X"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Y"));
@ -377,9 +386,9 @@ public class CMDService {
// 关闭高压
cmdList.add(deviceTcpCMDService::turnOffHighVoltage);
// 关闭喷嘴阀
cmdList.add(() -> deviceTcpCMDService.controlValve("Nozzle", true));
cmdList.add(() -> deviceTcpCMDService.controlValve("Nozzle", false));
// 回到原点
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Z"));
// cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Z"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("X"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Y"));
@ -416,6 +425,10 @@ public class CMDService {
String type = (String) params.get("type");
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Z"));
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("washing");
return true;
});
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("X"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Y"));
@ -424,19 +437,16 @@ public class CMDService {
cmdList.add(() -> deviceTcpCMDService.moveMotorToPosition("X", wasteLiquorPosition.get("x")));
cmdList.add(() -> deviceTcpCMDService.moveMotorToPosition("Z", wasteLiquorPosition.get("z")));
// type: "injector" | "nozzle"
if(Objects.equals(type, "injector")) {
cmdList.add(() -> deviceTcpCMDService.switchThreeWayValve("clear_spray"));
// type: "injector"注射器 | "nozzle"喷涂
if(Objects.equals(type, "nozzle")) { // ON_C
cmdList.add(() -> deviceTcpCMDService.switchThreeWayValve("clear_nozzle"));
cmdList.add(() -> deviceTcpCMDService.turnOnSyringePump(1000));
} else if (Objects.equals(type, "nozzle")) {
} else if (Objects.equals(type, "injector")) { // OFF_C
cmdList.add(() -> deviceTcpCMDService.switchThreeWayValve("clear_nozzle"));
cmdList.add(() -> deviceTcpCMDService.switchThreeWayValve("clear_spray"));
cmdList.add(() -> deviceTcpCMDService.controlValve("Cleaning", true));
}
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("washing");
return true;
});
initExecutorThread(cmdList, form);
return true;
}
@ -448,6 +458,9 @@ public class CMDService {
cmdList.add(() -> deviceTcpCMDService.controlValve("Cleaning", false));
cmdList.add(() -> deviceTcpCMDService.controlValve("Nozzle", false));
cmdList.add(deviceTcpCMDService::turnOffSyringePump);
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Z"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("X"));
cmdList.add(() -> deviceTcpCMDService.motorMoveToHome("Y"));
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("idle");
return true;
@ -465,7 +478,7 @@ public class CMDService {
public boolean turnOnLightPanel(CMDForm form) {
Map<String, Object> params = form.getParams();
List<Supplier<Boolean>> cmdList = new ArrayList<>();
cmdList.add(() -> deviceTcpCMDService.turnOnLightPanel());
cmdList.add(deviceTcpCMDService::turnOnLightPanel);
initExecutorThread(cmdList, form);
return true;
}
@ -484,19 +497,28 @@ public class CMDService {
public boolean startPrefill(CMDForm form) {
List<Supplier<Boolean>> cmdList = new ArrayList<>();
Map<String, Object> params = form.getParams();
double rotationSpeed = Optional.ofNullable(params.get("rotationSpeed"))
Map<String, Double> wasteLiquorPosition = sysSettingsService.getWasteLiquorPosition();
cmdList.add(() -> deviceTcpCMDService.moveMotorToPosition("X", wasteLiquorPosition.get("x")));
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("prefilling");
return true;
});
cmdList.add(() -> deviceTcpCMDService.moveMotorToPosition("Z", wasteLiquorPosition.get("z")));
double rotationSpeed = Optional.ofNullable(params.get("rotationSpeed"))
.filter(Number.class::isInstance)
.map(Number.class::cast)
.map(Number::doubleValue)
.orElse(0.0);
cmdList.add(() -> deviceTcpCMDService.switchThreeWayValve("clear_spray"));
cmdList.add(() -> deviceTcpCMDService.controlValve("Nozzle", true));
cmdList.add(() -> deviceTcpCMDService.turnOnSyringePump(rotationSpeed));
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("prefilling");
return true;
});
initExecutorThread(cmdList, form);
return true;
}
@ -504,7 +526,6 @@ double rotationSpeed = Optional.ofNullable(params.get("rotationSpeed"))
public boolean stopPrefill(CMDForm form) {
List<Supplier<Boolean>> cmdList = new ArrayList<>();
cmdList.add(deviceTcpCMDService::turnOffSyringePump);
cmdList.add(() -> deviceTcpCMDService.controlValve("Nozzle", false));
cmdList.add(() -> {
sysSettingsService.updateWorkStatus("idle");

37
src/main/java/com/qyft/ms/device/client/TcpClient.java

@ -38,7 +38,7 @@ public class TcpClient {
private Channel channel;
private Bootstrap bootstrap;
// 初始化方法在Spring容器启动后调用
@PostConstruct
public void init() {
if (tcpConfig.isEnable()) {
@ -46,8 +46,10 @@ public class TcpClient {
}
}
// 连接到TCP服务器的方法
public void connect() {
try {
// 如果Bootstrap对象为空则初始化它
if (bootstrap == null) {
bootstrap = new Bootstrap();
bootstrap.group(group)
@ -56,55 +58,70 @@ public class TcpClient {
.handler(new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
// 添加JSON对象解码器到ChannelPipeline
ch.pipeline().addLast(new JsonObjectDecoder());
// 添加设备消息处理器到ChannelPipeline
ch.pipeline().addLast(deviceMessageHandler);
// 添加TCP连接处理器到ChannelPipeline
ch.pipeline().addLast(new TcpConnectionHandler());
}
});
}
// 记录尝试连接到TCP服务器的日志
log.info("尝试连接到TCP服务 {}:{}", tcpConfig.getHost(), tcpConfig.getPort());
ChannelFuture future = bootstrap.connect(new InetSocketAddress(tcpConfig.getHost(), tcpConfig.getPort()));
// 添加连接监听器
future.addListener((ChannelFutureListener) f -> {
if (f.isSuccess()) {
// 连接成功记录日志并设置channel
channel = f.channel();
log.info("已链接到TCP服务");
} else {
// 连接失败记录日志并安排重试
log.error("无法连接到TCP服务. {}ms后重试...", tcpConfig.getReconnect());
f.channel().eventLoop().schedule(this::connect, tcpConfig.getReconnect(), TimeUnit.MILLISECONDS);
}
});
} catch (Exception e) {
// 捕获并记录连接过程中发生的异常
log.error("尝试连接到TCP服务发生意外错误: {}", e.getMessage(), e);
}
}
// 定时检查TCP连接的方法
@Scheduled(fixedRateString = "${tcp.reconnect}")
public void checkConnection() {
if (channel == null || !channel.isActive()) {
// 如果连接丢失记录日志并尝试重新连接
log.error("TCP服务链接丢失");
connect();
}
}
// 发送字符串请求到TCP服务器的方法
public boolean send(String request) {
if (channel != null && channel.isActive()) {
// 如果连接有效发送请求并返回true
channel.writeAndFlush(Unpooled.copiedBuffer(request, CharsetUtil.UTF_8));
return true;
} else {
// 如果连接无效记录日志并返回false
log.error("TCP服务未连接,无法发送请求: {}", request);
return false;
}
}
// 发送JSON-RPC命令到TCP服务器的方法仅包含方法名
public DeviceFeedback sendCommand(String method) {
JsonRpcRequest request = new JsonRpcRequest();
request.setMethod(method);
return this.sendCommand(request);
}
// 发送JSON-RPC命令到TCP服务器的方法包含方法名和参数
public DeviceFeedback sendCommand(String method, Map<String, Object> params) {
JsonRpcRequest request = new JsonRpcRequest();
request.setMethod(method);
@ -112,47 +129,61 @@ public class TcpClient {
return this.sendCommand(request);
}
// 发送JSON-RPC命令到TCP服务器的方法包含完整的JsonRpcRequest对象
public DeviceFeedback sendCommand(JsonRpcRequest request) {
if (request.getId() == null) {
// 如果请求ID为空则生成一个新的UUID作为ID
request.setId(UUID.randomUUID().toString());
}
CompletableFuture<DeviceFeedback> future = new CompletableFuture<>();
deviceMessageHandler.responseMap.put(request.getId(), future);
try {
if (request.getParams() == null) {
// 如果请求参数为空则初始化一个空的HashMap
request.setParams(new HashMap<>());
}
// 将请求对象转换为JSON字符串
String requestJsonStr = JSONUtil.toJsonStr(request);
log.info("发送TCP指令(同步) {}", requestJsonStr);
// 发送请求到服务器并等待响应
if (this.send(requestJsonStr)) {
return future.get(tcpConfig.getFeedbackTimeout(), TimeUnit.MILLISECONDS); // 等待 FEEDBACK 响应
} else {
return null;
}
} catch (Exception e) {
// 捕获并记录发送过程中发生的异常
log.error("发送TCP指令错误(同步) {}", JSONUtil.toJsonStr(request), e);
} finally {
deviceMessageHandler.responseMap.remove(request.getId()); //确保完成后移除
// 确保请求完成后从responseMap中移除对应的Future
deviceMessageHandler.responseMap.remove(request.getId());
}
return null;
}
// 内部类处理TCP连接事件
private class TcpConnectionHandler extends ChannelInboundHandlerAdapter {
// 处理连接断开事件
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// 连接断开时的处理逻辑
// 记录连接断开的日志
log.error("TCP连接丢失,准备重新连接...");
if (channel != null) {
// 关闭当前channel
channel.close();
}
// 尝试重新连接
connect();
super.channelInactive(ctx);
}
// 处理异常事件
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 记录异常日志
log.error("TCP连接发生异常: {}", cause.getMessage(), cause);
// 关闭当前channel
ctx.close();
}
}

6
src/main/java/com/qyft/ms/device/common/constant/DeviceCommands.java

@ -28,12 +28,8 @@ public class DeviceCommands {
// 停止注射泵
public static final String TURN_OFF_SYRINGE_PUMP = "turnOffSyringePump";
public static final String SYRINGE_PUMP_MOVE_AT_SPEED = "syringePumpMoveAtSpeed";
public static final String SYRINGE_PUMP_STOP = "syringePumpStop";
public static final String SYRINGE_PUMP_START = "syringePumpStart";
public static final String STOP_MOTOR = "stopMotor";
public static final String START_DEHUMIDIFY = "startDehumidify";
public static final String SWITCH_THREE_WAY_VALVE = "switchThreeWayValve";
}

5
src/main/java/com/qyft/ms/device/config/TcpConfig.java

@ -12,6 +12,11 @@ public class TcpConfig {
* 是否启用 TCP 连接
*/
private boolean enable;
/**
* 是否开启本地tcp服务器
*/
private boolean serverEnable;
/**
* TCP 链接地址
*/

12
src/main/java/com/qyft/ms/device/service/DeviceAlarmService.java

@ -1,12 +0,0 @@
package com.qyft.ms.device.service;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
* 设备报警服务
*/
@Service
@RequiredArgsConstructor
public class DeviceAlarmService {
}

14
src/main/java/com/qyft/ms/device/service/DeviceService.java

@ -1,14 +0,0 @@
package com.qyft.ms.device.service;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
/**
* 设备操作
*/
@Service
@RequiredArgsConstructor
public class DeviceService {
}

1
src/main/resources/application.yml

@ -35,6 +35,7 @@ jwt:
#与设备TCP链接
tcp:
enable: true # 是否开启 TCP 连接
server-enable: true # 是否开启 TCP 连接
host: 127.0.0.1
# host: 192.168.1.140
port: 9080

40
src/test/java/com/qyft/ms/QueueTest.java

@ -0,0 +1,40 @@
package com.qyft.ms;
import com.qyft.ms.device.service.TaskQueueManager;
public class QueueTest {
public static void main(String[] args) {
TaskQueueManager taskQueueManager = new TaskQueueManager();
// 添加几个任务到队列中
taskQueueManager.addTask(() -> test(10000,1));
taskQueueManager.addTask(() -> test(10000,2));
taskQueueManager.addTask(() ->test(10000,3));
// 开始执行任务
taskQueueManager.start();
try {
Thread.sleep(15000);
taskQueueManager.pause();
System.out.println("暂停");
taskQueueManager.addTask(0, () -> test(10000,4));
taskQueueManager.resume();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
public static Runnable test(int time, int num) {
try {
System.out.println("任务"+num+"开始执行");
Thread.sleep(time); // 模拟耗时操作
System.out.println("任务"+num+"执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
}
Loading…
Cancel
Save