|
|
@ -3,6 +3,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; |
|
|
|
import com.fasterxml.jackson.databind.JsonNode; |
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper; |
|
|
|
import com.iflytop.digester.model.MdbDigestionTask; |
|
|
|
import com.iflytop.digester.model.MdbRuntimeLog; |
|
|
|
import com.iflytop.digester.underframework.dao.model.UfMdbNotification; |
|
|
|
import com.iflytop.digester.underframework.dao.model.UfMdbOption; |
|
|
|
import com.iflytop.digester.underframework.util.UfJsonHelper; |
|
|
|
import jakarta.annotation.PostConstruct; |
|
|
|
import org.eclipse.paho.client.mqttv3.*; |
|
|
@ -22,19 +25,6 @@ public class DigestionTaskTheadManager { |
|
|
|
// Mqtt client |
|
|
|
private MqttClient client; |
|
|
|
|
|
|
|
@Value("${mqtt-broker.uri}") |
|
|
|
private String mqttBrokerUri; |
|
|
|
@Value("${mqtt-broker.username}") |
|
|
|
private String mqttBrokerUsername; |
|
|
|
@Value("${mqtt-broker.password}") |
|
|
|
private String mqttBrokerPassword; |
|
|
|
@Value("${mqtt-broker.clientId}") |
|
|
|
private String mqttBrokerClientId; |
|
|
|
@Value("${mqtt-broker.my-topic}") |
|
|
|
private String mqttBrokerMyTopic; |
|
|
|
@Value("${mqtt-broker.transbot-topic}") |
|
|
|
private String mqttBrokerTransbotTopic; |
|
|
|
|
|
|
|
// Get instance |
|
|
|
public static DigestionTaskTheadManager getInstance() { |
|
|
|
return instance; |
|
|
@ -44,39 +34,47 @@ public class DigestionTaskTheadManager { |
|
|
|
public void init() { |
|
|
|
this.tasks = new ArrayList<>(); |
|
|
|
instance = this; |
|
|
|
} |
|
|
|
|
|
|
|
// setup |
|
|
|
public void setup() { |
|
|
|
this.setupMqttBroker(); |
|
|
|
} |
|
|
|
|
|
|
|
// setup mqtt broker |
|
|
|
private void setupMqttBroker() { |
|
|
|
String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); |
|
|
|
String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); |
|
|
|
String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); |
|
|
|
|
|
|
|
this.client = null; |
|
|
|
try { |
|
|
|
this.client = new MqttClient(this.mqttBrokerUri, this.mqttBrokerClientId, new MemoryPersistence()); |
|
|
|
this.client = new MqttClient(uri, clientId, new MemoryPersistence()); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
this.client.setCallback(new MqttCallback() { |
|
|
|
public void connectionLost(Throwable cause) { |
|
|
|
System.out.println("connectionLost: " + cause.getMessage()); |
|
|
|
} |
|
|
|
public void messageArrived(String topic, MqttMessage message) { |
|
|
|
DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); |
|
|
|
} |
|
|
|
public void connectionLost(Throwable cause) { |
|
|
|
UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); |
|
|
|
System.out.println("connectionLost: " + cause.getMessage()); |
|
|
|
} |
|
|
|
public void deliveryComplete(IMqttDeliveryToken token) { |
|
|
|
// nothing to do here |
|
|
|
System.out.println("xxxx"); |
|
|
|
} |
|
|
|
}); |
|
|
|
try { |
|
|
|
MqttConnectOptions options = new MqttConnectOptions(); |
|
|
|
// options.setUserName(this.mqttBrokerUsername); |
|
|
|
// options.setPassword(this.mqttBrokerPassword.toCharArray()); |
|
|
|
client.connect(options); |
|
|
|
} catch (MqttException e) { |
|
|
|
UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|
|
|
|
|
try { |
|
|
|
client.subscribe(this.mqttBrokerMyTopic, 2); |
|
|
|
client.subscribe(myTopic, 2); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
@ -84,6 +82,8 @@ public class DigestionTaskTheadManager { |
|
|
|
|
|
|
|
// Handle on message arrived |
|
|
|
private void handleOnMessageArrived(String topic, MqttMessage message) { |
|
|
|
MdbRuntimeLog.log("DigestionTaskMqttMessage", "Topic : %s; \nPayload : \n%s", topic, message.toString()); |
|
|
|
|
|
|
|
String content = new String(message.getPayload()); |
|
|
|
JsonNode actionJsonTree = null; |
|
|
|
try { |
|
|
@ -120,16 +120,18 @@ public class DigestionTaskTheadManager { |
|
|
|
|
|
|
|
// Send message to trans bot |
|
|
|
public void sendMessageToTransBot(String action, Map<String, Object> params) { |
|
|
|
String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); |
|
|
|
String transTopic = UfMdbOption.getString("DigestionTaskMqttTransBotTopic", ""); |
|
|
|
try { |
|
|
|
var message = Map.of( |
|
|
|
"src", "stw-a80", |
|
|
|
"src", myTopic, |
|
|
|
"action", action, |
|
|
|
"params", params |
|
|
|
); |
|
|
|
var messageJson = UfJsonHelper.objectToJson(message); |
|
|
|
MqttMessage mqttMessage = new MqttMessage(messageJson.getBytes()); |
|
|
|
mqttMessage.setQos(2); |
|
|
|
this.client.publish(this.mqttBrokerTransbotTopic, mqttMessage); |
|
|
|
this.client.publish(transTopic, mqttMessage); |
|
|
|
} catch (MqttException e) { |
|
|
|
throw new RuntimeException(e); |
|
|
|
} |
|
|
|