|
|
@ -4,16 +4,17 @@ import com.fasterxml.jackson.databind.JsonNode; |
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper; |
|
|
|
import com.iflytop.digester.model.MdbDigestionSolution; |
|
|
|
import com.iflytop.digester.model.MdbDigestionTask; |
|
|
|
import com.iflytop.digester.model.MdbOperationLog; |
|
|
|
import com.iflytop.digester.model.MdbRuntimeLog; |
|
|
|
import com.iflytop.digester.underframework.dao.model.UfMdbNotification; |
|
|
|
import com.iflytop.digester.underframework.dao.model.UfMdbOption; |
|
|
|
import com.iflytop.digester.underframework.dao.record.UfActiveRecord; |
|
|
|
import com.iflytop.digester.underframework.util.UfCommon; |
|
|
|
import com.iflytop.digester.underframework.util.UfJsonHelper; |
|
|
|
import jakarta.annotation.PostConstruct; |
|
|
|
import org.eclipse.paho.client.mqttv3.*; |
|
|
|
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; |
|
|
|
import org.springframework.beans.factory.annotation.Value; |
|
|
|
import org.slf4j.Logger; |
|
|
|
import org.slf4j.LoggerFactory; |
|
|
|
import org.springframework.stereotype.Component; |
|
|
|
import java.util.ArrayList; |
|
|
|
import java.util.HashMap; |
|
|
@ -21,6 +22,8 @@ import java.util.List; |
|
|
|
import java.util.Map; |
|
|
|
@Component |
|
|
|
public class DigestionTaskTheadManager { |
|
|
|
// logger |
|
|
|
public static final Logger LOG = LoggerFactory.getLogger(DigestionTaskTheadManager.class); |
|
|
|
// Singleton instance |
|
|
|
private static DigestionTaskTheadManager instance = null; |
|
|
|
// Task list |
|
|
@ -41,47 +44,58 @@ public class DigestionTaskTheadManager { |
|
|
|
|
|
|
|
// setup |
|
|
|
public void setup() { |
|
|
|
this.setupMqttBroker(); |
|
|
|
Thread mqttConnectThread = new Thread(DigestionTaskTheadManager.this::setupMqttBroker); |
|
|
|
mqttConnectThread.start(); |
|
|
|
} |
|
|
|
|
|
|
|
// setup mqtt broker |
|
|
|
private void setupMqttBroker() { |
|
|
|
String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); |
|
|
|
String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); |
|
|
|
String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); |
|
|
|
|
|
|
|
this.client = null; |
|
|
|
try { |
|
|
|
this.client = new MqttClient(uri, clientId, new MemoryPersistence()); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
this.client.setCallback(new MqttCallback() { |
|
|
|
public void messageArrived(String topic, MqttMessage message) { |
|
|
|
DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); |
|
|
|
public void setupMqttBroker() { |
|
|
|
do { |
|
|
|
String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); |
|
|
|
String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); |
|
|
|
String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); |
|
|
|
|
|
|
|
UfCommon.delay(1000); |
|
|
|
LOG.info("[MQTT Server] : connecting {}", uri); |
|
|
|
try { |
|
|
|
this.client = new MqttClient(uri, clientId, new MemoryPersistence()); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
public void connectionLost(Throwable cause) { |
|
|
|
UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); |
|
|
|
System.out.println("connectionLost: " + cause.getMessage()); |
|
|
|
|
|
|
|
this.client.setCallback(new MqttCallback() { |
|
|
|
public void messageArrived(String topic, MqttMessage message) { |
|
|
|
DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); |
|
|
|
} |
|
|
|
public void connectionLost(Throwable cause) { |
|
|
|
UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); |
|
|
|
System.out.println("connectionLost: " + cause.getMessage()); |
|
|
|
} |
|
|
|
public void deliveryComplete(IMqttDeliveryToken token) { |
|
|
|
System.out.println("xxxx"); |
|
|
|
} |
|
|
|
}); |
|
|
|
|
|
|
|
try { |
|
|
|
MqttConnectOptions options = new MqttConnectOptions(); |
|
|
|
options.setAutomaticReconnect(true); |
|
|
|
this.client.connect(options); |
|
|
|
LOG.info("[MQTT Server] : connected {}", uri); |
|
|
|
} catch (MqttException e) { |
|
|
|
UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); |
|
|
|
this.client = null; |
|
|
|
continue; |
|
|
|
} |
|
|
|
public void deliveryComplete(IMqttDeliveryToken token) { |
|
|
|
System.out.println("xxxx"); |
|
|
|
|
|
|
|
try { |
|
|
|
client.subscribe(myTopic, 2); |
|
|
|
LOG.info("[MQTT Server] : subscribed {}", myTopic); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
}); |
|
|
|
try { |
|
|
|
MqttConnectOptions options = new MqttConnectOptions(); |
|
|
|
options.setAutomaticReconnect(true); |
|
|
|
client.connect(options); |
|
|
|
} catch (MqttException e) { |
|
|
|
UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
|
|
|
|
try { |
|
|
|
client.subscribe(myTopic, 2); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
break ; |
|
|
|
} while ( true ); |
|
|
|
} |
|
|
|
|
|
|
|
// Handle on message arrived |
|
|
|