diff --git a/app.db b/app.db index fe9d44e..19ff680 100644 Binary files a/app.db and b/app.db differ diff --git a/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java b/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java index 5b72641..2edbbdc 100644 --- a/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java +++ b/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java @@ -2,8 +2,8 @@ package com.iflytop.digester; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.iflytop.digester.deviceinstance.HeatingTurntableSlotTube; import com.iflytop.digester.model.MdbDigestionTask; +import com.iflytop.digester.underframework.util.UfJsonHelper; import jakarta.annotation.PostConstruct; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; - @Component public class DigestionTaskTheadManager { // Singleton instance @@ -107,7 +106,7 @@ public class DigestionTaskTheadManager { DigestionTaskThread task = null; for ( var t : this.tasks ) { - if ( t.getTaskId().equals(taskId) ) { + if ( t.getOutTaskId().equals(taskId) ) { task = t; break; } @@ -120,9 +119,15 @@ public class DigestionTaskTheadManager { } // Send message to trans bot - public void sendMessageToTransBot(String message) { + public void sendMessageToTransBot(String action, Map params) { try { - MqttMessage mqttMessage = new MqttMessage(message.getBytes()); + var message = Map.of( + "src", "stw-a80", + "action", action, + "params", params + ); + var messageJson = UfJsonHelper.objectToJson(message); + MqttMessage mqttMessage = new MqttMessage(messageJson.getBytes()); mqttMessage.setQos(2); this.client.publish(this.mqttBrokerTransbotTopic, mqttMessage); } catch (MqttException e) { @@ -134,6 +139,7 @@ public class DigestionTaskTheadManager { public void startTask( MdbDigestionTask taskModel ) { var task = new DigestionTaskThread(taskModel); task.setFinishCallback(this::handleOnTaskFinished); + task.setManager(this); this.tasks.add(task); task.start(); } @@ -142,4 +148,14 @@ public class DigestionTaskTheadManager { private void handleOnTaskFinished( DigestionTaskThread task ) { this.tasks.remove(task); } + + // get task by task id + public DigestionTaskThread getTaskByOutTaskId( String taskId ) { + for ( var task : this.tasks ) { + if ( task.getOutTaskId().equals(taskId) ) { + return task; + } + } + return null; + } } diff --git a/src/main/java/com/iflytop/digester/DigestionTaskThread.java b/src/main/java/com/iflytop/digester/DigestionTaskThread.java index eec3710..de45f04 100644 --- a/src/main/java/com/iflytop/digester/DigestionTaskThread.java +++ b/src/main/java/com/iflytop/digester/DigestionTaskThread.java @@ -2,14 +2,13 @@ package com.iflytop.digester; import com.iflytop.digester.deviceinstance.*; import com.iflytop.digester.model.MdbDigestionTask; import com.iflytop.digester.model.MdbDigestionSolution; +import com.iflytop.digester.underframework.dao.model.UfMdbNotification; import com.iflytop.digester.underframework.dao.record.UfActiveRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.util.ArrayList; import java.util.List; import java.util.Map; - public class DigestionTaskThread extends Thread { // 任务完成回调 public interface FinishCallback { @@ -25,15 +24,19 @@ public class DigestionTaskThread extends Thread { // 加热转盘位置 private HeatingTurntableSlot heatingSlot; // 异常试管索引列表 - private List errorTubeIndexes; - // 任务完成回调 - private FinishCallback finishCallback; + private final List errorTubeIndexes = new ArrayList<>(); + // 已完成异常试管索引列表 + private final List finishedErrorTubeIndexes = new ArrayList<>(); // 试管架放入等待锁 private final Object tubeRackPutInWaitLock = new Object(); // 试管架取出等待锁 private final Object tubeRackTakeOutWaitLock = new Object(); // 异常处理线程 private Thread errorProcessThread; + // 任务完成回调 + private FinishCallback finishCallback; + // 消解任务管理器 + private DigestionTaskTheadManager manager; // 消解任务 public DigestionTaskThread(MdbDigestionTask taskModel) { @@ -41,14 +44,19 @@ public class DigestionTaskThread extends Thread { this.solution = UfActiveRecord.findOne(MdbDigestionSolution.class, taskModel.digestionId); } + // 设置消解任务管理器 + public void setManager(DigestionTaskTheadManager manager) { + this.manager = manager; + } + // 设置任务完成回调 public void setFinishCallback(FinishCallback finishCallback) { this.finishCallback = finishCallback; } // 获取任务ID - public String getTaskId() { - return this.taskModel.taskId; + public String getOutTaskId() { + return this.taskModel.outTaskId; } // 执行动作 @@ -61,6 +69,20 @@ public class DigestionTaskThread extends Thread { synchronized ( this.tubeRackTakeOutWaitLock ) { this.tubeRackTakeOutWaitLock.notifyAll(); } + } else if ( "ErrorTubeSelected".equals(name) ) { + this.errorTubeIndexes.clear(); + var tubeIndexes = (List) params.get("tubeIndexes"); + this.errorTubeIndexes.addAll(tubeIndexes); + synchronized ( this.errorTubeIndexes ) { + this.errorTubeIndexes.notifyAll(); + } + } else if ( "FinishedErrorTubeSelected".equals(name) ) { + this.finishedErrorTubeIndexes.clear(); + var tubeIndexes = (List) params.get("tubeIndexes"); + this.finishedErrorTubeIndexes.addAll(tubeIndexes); + synchronized ( this.finishedErrorTubeIndexes ) { + this.finishedErrorTubeIndexes.notifyAll(); + } } } @@ -86,8 +108,13 @@ public class DigestionTaskThread extends Thread { this.startErrorProcessThread(); // 消解结束 this.finish(); + + this.errorProcessThread.join(); + this.updateTaskStatus("Finish", "消解任务结束"); + this.finishCallback.callback(this); } catch (Exception e) { this.updateTaskStatus("Error", e.getMessage()); + UfMdbNotification.error("消解任务执行失败 : " + e.getMessage()); } } @@ -98,7 +125,7 @@ public class DigestionTaskThread extends Thread { var device = Device.getInstance(); // 分配加热位 - this.heatingSlot = device.heatingTurntable.allocSlot(this.taskModel.tubeRackNo); + this.heatingSlot = device.heatingTurntable.allocSlot(this.taskModel.batchNo); this.heatingSlot.setTubes(this.taskModel.getTubes()); this.taskModel.heatingSlotIndex = this.heatingSlot.index; this.taskModel.save(); @@ -106,7 +133,7 @@ public class DigestionTaskThread extends Thread { // 打开门 device.door.open(); // 等待放入试管架 - this.waitForTubeRackPutIn(); + this.waitForTubeRackPutIn(false); // 关闭门 device.door.close(); } @@ -127,12 +154,6 @@ public class DigestionTaskThread extends Thread { device.door.close(); // 释放加热位 this.heatingSlot.setTubeRackNo(null); - - // 更新任务状态 - this.updateTaskStatus("Finish", "消解任务结束"); - - // 任务完成回调 - this.finishCallback.callback(this); } // 检查试管 @@ -141,7 +162,7 @@ public class DigestionTaskThread extends Thread { // 移动到加液盘 device.transferArm.moveTubeRackToLiquidPlate(this.heatingSlot.index); // 拍照检查异常试管 - this.errorTubeIndexes = this.takeShotAndCheckErrorTubes(); + this.takeShotAndCheckErrorTubes(); // 将异常试管放入异常处理区域 device.transferArm.moveTubesToErrorSlot(this.errorTubeIndexes); // 将正常试管放入加热转盘 @@ -151,6 +172,11 @@ public class DigestionTaskThread extends Thread { // 启动异常处理线程 private void startErrorProcessThread() { this.errorProcessThread = new Thread(() -> { + if ( this.errorTubeIndexes.isEmpty() ) { + // 无异常,直接返回 + return ; + } + try { // 异常处理执行 var errorRound = this.solution.getErrorRounds(); @@ -160,8 +186,20 @@ public class DigestionTaskThread extends Thread { break; } } + + // 如果还存在异常, 则直接取出 + if ( !this.errorTubeIndexes.isEmpty() ) { + var device = Device.getInstance(); + // 等待放入空试管架 + this.waitForTubeRackPutIn(true); + // 取出剩余异常试管 + device.transferArm.takeOutTubesFromErrorSlot(this.errorTubeIndexes); + // 等待取出试管架 + this.waitForTubeRackTakeOut(); + } } catch (InterruptedException e) { - throw new RuntimeException(e); + UfMdbNotification.error("消解异常处理失败 : " + e.getMessage()); + this.updateTaskStatus("Error", String.format("消解异常处理失败 : %s", e.getMessage())); } }); this.errorProcessThread.start(); @@ -218,17 +256,27 @@ public class DigestionTaskThread extends Thread { // 移至加液区 device.transferArm.moveTubeRackToLiquidPlate(this.heatingSlot.index); // 拍照检查是否存在消解完成的试管 - var tubes = this.takeShotAndCheckFinishedTubes(); + this.takeShotAndCheckFinishedTubes(); // 将试管架放入异常处理区域 device.transferArm.moveTubeRackToHeatingTurntable(this.heatingSlot.index); + // 等待放入空试管架 + this.waitForTubeRackPutIn(true); // 取出消解完成的试管 - device.transferArm.takeOutTubesFromErrorSlot(tubes); + device.transferArm.takeOutTubesFromErrorSlot(this.finishedErrorTubeIndexes); + // 等待取出试管架 + this.waitForTubeRackTakeOut(); // 更新异常试管索引列表 - this.errorTubeIndexes.removeAll(tubes); + this.errorTubeIndexes.removeAll(this.finishedErrorTubeIndexes); } // 等待放入试管架 - private void waitForTubeRackPutIn() { + private void waitForTubeRackPutIn( Boolean requestEmptyTubeRack ) { + if ( requestEmptyTubeRack ) { + this.manager.sendMessageToTransBot("EmptyTubeRackPutIn", Map.of("batchNo", this.taskModel.batchNo)); + } else { + this.manager.sendMessageToTransBot("TubeRackPutIn", Map.of("batchNo", this.taskModel.batchNo)); + } + this.updateTaskStatus("TubeRackPutInWait", "等待放入试管架"); synchronized ( this.tubeRackPutInWaitLock ) { try { @@ -242,6 +290,7 @@ public class DigestionTaskThread extends Thread { // 等待取出试管架 private void waitForTubeRackTakeOut() { + this.manager.sendMessageToTransBot("TubeRackTakeOut", Map.of("batchNo", this.taskModel.batchNo)); this.updateTaskStatus("TubeRackTakeOutWait", "等待取出试管架"); synchronized ( this.tubeRackTakeOutWaitLock ) { try { @@ -254,13 +303,31 @@ public class DigestionTaskThread extends Thread { } // 拍照检查试管 - private List takeShotAndCheckErrorTubes() { - return new ArrayList<>(); + private void takeShotAndCheckErrorTubes() { + this.updateTaskStatus("TubeCheck", "拍照检查试管,等待确认异常试管"); + this.errorTubeIndexes.clear(); + synchronized ( this.errorTubeIndexes ) { + try { + this.errorTubeIndexes.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + this.updateTaskStatus("TubeCheck", "异常试管已确认"); } // 拍照检查试管 - private List takeShotAndCheckFinishedTubes() { - return new ArrayList<>(); + private void takeShotAndCheckFinishedTubes() { + this.updateTaskStatus("ErrorTubeCheck", "拍照检查异常试管,等待确认完成试管"); + this.finishedErrorTubeIndexes.clear(); + synchronized ( this.finishedErrorTubeIndexes ) { + try { + this.finishedErrorTubeIndexes.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + this.updateTaskStatus("ErrorTubeCheck", "完成试管已确认"); } // 更新任务状态 diff --git a/src/main/java/com/iflytop/digester/deviceinstance/Camera.java b/src/main/java/com/iflytop/digester/deviceinstance/Camera.java new file mode 100644 index 0000000..5abc97c --- /dev/null +++ b/src/main/java/com/iflytop/digester/deviceinstance/Camera.java @@ -0,0 +1,6 @@ +package com.iflytop.digester.deviceinstance; +import org.springframework.stereotype.Component; +@Component +public class Camera { + +} diff --git a/src/main/java/com/iflytop/digester/deviceinstance/Device.java b/src/main/java/com/iflytop/digester/deviceinstance/Device.java index ebc2a1c..e4f37b6 100644 --- a/src/main/java/com/iflytop/digester/deviceinstance/Device.java +++ b/src/main/java/com/iflytop/digester/deviceinstance/Device.java @@ -19,6 +19,9 @@ public class Device { @Resource public LiquidAdditionInstance liquidAddition; + @Resource + public Camera camera; + // get device instance public static Device getInstance() { return instance; diff --git a/src/main/java/com/iflytop/digester/model/MdbDigestionTask.java b/src/main/java/com/iflytop/digester/model/MdbDigestionTask.java index aca7ca4..a0144ce 100644 --- a/src/main/java/com/iflytop/digester/model/MdbDigestionTask.java +++ b/src/main/java/com/iflytop/digester/model/MdbDigestionTask.java @@ -12,10 +12,10 @@ public class MdbDigestionTask extends UfActiveRecord { public String digestionId; @UfActiveRecordField - public String taskId; + public String outTaskId; @UfActiveRecordField - public String tubeRackNo; + public String batchNo; @UfActiveRecordField public String tubes; diff --git a/src/main/java/com/iflytop/digester/underframework/dao/model/UfMdbNotification.java b/src/main/java/com/iflytop/digester/underframework/dao/model/UfMdbNotification.java new file mode 100644 index 0000000..01e0040 --- /dev/null +++ b/src/main/java/com/iflytop/digester/underframework/dao/model/UfMdbNotification.java @@ -0,0 +1,62 @@ +package com.iflytop.digester.underframework.dao.model; +import com.iflytop.digester.underframework.dao.record.UfActiveRecord; +import com.iflytop.digester.underframework.dao.record.UfActiveRecordField; +public class UfMdbNotification extends UfActiveRecord { + @UfActiveRecordField + public String type; + @UfActiveRecordField + public String data; + @UfActiveRecordField + public String status; + + // get table name + public static String getTableName() { + return "app_notifications"; + } +// +// // notify action +// public static void taskAction(DiTask task, String action ) { +// DiMdbNotification.taskAction(task, action, ""); +// } +// +// // notify action +// public static void taskAction( DiTask task, String action, Object data ) { +// Map notifyData = Map.of( +// "action", action, +// "data", data, +// "task", task.getUUID() +// ); +// var notification = new DiMdbNotification(); +// notification.type = "task-action"; +// notification.data = DiJsonHelper.objectToJson(notifyData); +// notification.status = "new"; +// notification.save(); +// } +// +// // notify error +// public static void taskError( DiTask task, String error ) { +// var notification = new DiMdbNotification(); +// notification.type = "task-error"; +// notification.data = error; +// notification.status = "new"; +// notification.save(); +// } +// +// // notify warning +// public static void warning( String message ) { +// var notification = new DiMdbNotification(); +// notification.type = "warning"; +// notification.data = message; +// notification.status = "new"; +// notification.save(); +// } + + // notify error + public static void error( String message, Object ... args ) { + var notification = new UfMdbNotification(); + notification.type = "error"; + notification.data = String.format(message, args); + notification.status = "new"; + notification.save(); + } +} diff --git a/src/main/java/com/iflytop/digester/web/DigestionTaskController.java b/src/main/java/com/iflytop/digester/web/DigestionTaskController.java index 5b90155..eaa95ed 100644 --- a/src/main/java/com/iflytop/digester/web/DigestionTaskController.java +++ b/src/main/java/com/iflytop/digester/web/DigestionTaskController.java @@ -23,7 +23,7 @@ public class DigestionTaskController extends UfApiControllerBase { public UfApiResponse start(@RequestBody Map params) { String taskId = (String)params.get("taskId"); String name = (String)params.get("name"); - String tubeRackNo = (String)params.get("tubeRack"); + String batchNo = (String)params.get("batchNo"); List> tubes = (List>)params.get("tubes"); var digestion = UfActiveRecord.findOne(MdbDigestionSolution.class, Map.of("name", name)); @@ -33,8 +33,8 @@ public class DigestionTaskController extends UfApiControllerBase { var task = new MdbDigestionTask(); task.digestionId = digestion.id; - task.taskId = taskId; - task.tubeRackNo = tubeRackNo; + task.outTaskId = taskId; + task.batchNo = batchNo; task.tubes = UfJsonHelper.objectToJson(tubes); task.status = "pending"; task.message = "等待中"; @@ -45,4 +45,23 @@ public class DigestionTaskController extends UfApiControllerBase { this.taskManager.startTask(task); return this.success(); } + + @ResponseBody + @PostMapping("/api/digestion-task/exec-action") + public UfApiResponse executeAction( @RequestBody Map params ) { + String taskId = (String)params.get("taskId"); + var task = this.taskManager.getTaskByOutTaskId(taskId); + if ( null == task ) { + return this.error("无效的任务编号:" + taskId); + } + + String action = (String)params.get("action"); + Map actionParams = (Map)params.get("params"); + try { + task.executeAction(action, actionParams); + return this.success(); + } catch ( Exception e ) { + return this.error(e.getMessage()); + } + } }