diff --git a/app.db b/app.db index 6a33a5b..ca1d93e 100644 Binary files a/app.db and b/app.db differ diff --git a/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java b/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java index 12f2583..e39cb55 100644 --- a/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java +++ b/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java @@ -4,16 +4,17 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.iflytop.digester.model.MdbDigestionSolution; import com.iflytop.digester.model.MdbDigestionTask; -import com.iflytop.digester.model.MdbOperationLog; import com.iflytop.digester.model.MdbRuntimeLog; import com.iflytop.digester.underframework.dao.model.UfMdbNotification; import com.iflytop.digester.underframework.dao.model.UfMdbOption; import com.iflytop.digester.underframework.dao.record.UfActiveRecord; +import com.iflytop.digester.underframework.util.UfCommon; import com.iflytop.digester.underframework.util.UfJsonHelper; import jakarta.annotation.PostConstruct; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.beans.factory.annotation.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.HashMap; @@ -21,6 +22,8 @@ import java.util.List; import java.util.Map; @Component public class DigestionTaskTheadManager { + // logger + public static final Logger LOG = LoggerFactory.getLogger(DigestionTaskTheadManager.class); // Singleton instance private static DigestionTaskTheadManager instance = null; // Task list @@ -41,47 +44,58 @@ public class DigestionTaskTheadManager { // setup public void setup() { - this.setupMqttBroker(); + Thread mqttConnectThread = new Thread(DigestionTaskTheadManager.this::setupMqttBroker); + mqttConnectThread.start(); } // setup mqtt broker - private void setupMqttBroker() { - String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); - String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); - String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); - - this.client = null; - try { - this.client = new MqttClient(uri, clientId, new MemoryPersistence()); - } catch (MqttException e) { - throw new RuntimeException(e); - } - this.client.setCallback(new MqttCallback() { - public void messageArrived(String topic, MqttMessage message) { - DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); + public void setupMqttBroker() { + do { + String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); + String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); + String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); + + UfCommon.delay(1000); + LOG.info("[MQTT Server] : connecting {}", uri); + try { + this.client = new MqttClient(uri, clientId, new MemoryPersistence()); + } catch (MqttException e) { + throw new RuntimeException(e); } - public void connectionLost(Throwable cause) { - UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); - System.out.println("connectionLost: " + cause.getMessage()); + + this.client.setCallback(new MqttCallback() { + public void messageArrived(String topic, MqttMessage message) { + DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); + } + public void connectionLost(Throwable cause) { + UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); + System.out.println("connectionLost: " + cause.getMessage()); + } + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("xxxx"); + } + }); + + try { + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + this.client.connect(options); + LOG.info("[MQTT Server] : connected {}", uri); + } catch (MqttException e) { + UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); + this.client = null; + continue; } - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("xxxx"); + + try { + client.subscribe(myTopic, 2); + LOG.info("[MQTT Server] : subscribed {}", myTopic); + } catch (MqttException e) { + throw new RuntimeException(e); } - }); - try { - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - client.connect(options); - } catch (MqttException e) { - UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); - throw new RuntimeException(e); - } - try { - client.subscribe(myTopic, 2); - } catch (MqttException e) { - throw new RuntimeException(e); - } + break ; + } while ( true ); } // Handle on message arrived