From c53547624169324e9db03b519cbfa5806feb02e0 Mon Sep 17 00:00:00 2001 From: sige Date: Sat, 27 Apr 2024 17:01:41 +0800 Subject: [PATCH] 1 --- app.db | Bin 737280 -> 737280 bytes .../digester/DigestionTaskTheadManager.java | 86 ++++++++++++--------- 2 files changed, 50 insertions(+), 36 deletions(-) diff --git a/app.db b/app.db index 6a33a5bd673711cb20005d1d96ad960c8f8dca53..ca1d93e3cde3b8e6e4812f9b6490b9322c9987bf 100644 GIT binary patch delta 535 zcmZo@&~0eYogl?$5X->8umyR zz!$YyP$7(OSp%a$Jr~qA_9z%+OMyYw1K1(UXR_ca7ECCP5L(EB1h-F{z{t(Q!kWjR zIayFam4($6#(2qa7ASI(g{_xi36QaXg}s^K8<6pZiDTJjL4`#e%N8(B05ab4PgYP6 zZgXI2b6{?BU|HtCT7VEez}j|zZP@{K1qG1zfq)H&HvsWQAYKZ@pomlf;uxTw2mEax i*xNpEEc?J205@V;0M`OAXS0IBJ&;wV%LKR;;L-qLDWn(x delta 25146 zcmeHP36Nb?nSQUk-_~oOc_a-9G;JVh%nf%rcR67TO9B{-Nl1hSqMm!txdao^ozO9a zh&LIWsFdvJ1BaOb#OOHSxD}f^MxB`&cguBVs%BhCfj||`xK&xB^Zobs>wev@U%&fW zEUc+OWr05TKmYm9fBy4t-#KtVJ8(d|>!jYZzMIKpK8;`TKX)&8^7ub1-*($46OpxKr%eN%eAt&DJG=39^4^`i@OAkfkGB9ZxY-|zZ<-S>;W z7rVaQB|E>-X_f9P{Pg~fwpJc)@DDN?aGX1|C|23_qajr z$BVTArB5L%5NDscxBo2nGD0g(5yg8jTv0mI8#z{h4O6MGF;D4l?TVNyUtul zA8akVLU7v;m?i?p!3T_KzBC<8+B$QD&n#|L++Eqx4e>OKPNc84lpVor+vbiIIu_TM z?YNp}_^u{(ODM-Q9Nm;67{7jW=l-GO&!JUAYj*D*-Mx8_uP)lAg6);hj%7Ej?JpC- zb!A_-byr%trW>KJG1s>>VF^bIJSzxo;Rnif>j+M#6%Cw_$bjp*&zSCN+!uky9N*GB zY5JP!INUImGIU3;(i#)H#SJ$Ot=c~{E^gd@nFO1g@fXc-4a?Ro!{fT`4y_pS#dxr7 z{V0BXA^mOg=Z@#N=FlxGu*!&*OW$LKn?$gEv)HcQuxEVemRNF&`xAK^jtj!f(70tn zO}{f7X}55& z*&fzPY0&~=WJ}pJtib0sbX|3-!nznDkF39F&k{{KRJx4R8?_+nQFT#3rIz!o342_A(7sj%STVgwT^Qi1f8J!$F^J)N+giB{44v2m{a%UE_{i%je&=C%w(We!6Tav^X;Lb}~>75KuG znxQjGN@+W|>#9d&iUnK%SB#6@i)d8iwIIbpo10QCec6zavo; zPsbTH;Y}DkzWl)i#0cZ>d7chyYa8z5RW!9rRYw96ouMeLG0~nd!64Jn;}fYw)R{ z0o*q}K9dQrr0$xoHS+!0ge_B9aP?pq zMG%sSgvhv$|1{K6k4^$Xs+tQN` zAIL6<+VIzG-*@nP<`aG2X*;{;1MOFL-r4=@jz@ZSbbOgQ|XI+Tf3}| zuV$a`_+{shi@)kE=kMtLdglqbpXbl%eL7nyel5GRcj0-Z2ilG=y^vdyyTALdr}~}xR3@4VMeB5;F5uVORf?V7zxE?OyO{c z+=m-~0gdzxQ}7`0H1Y*tlpUtQ2e9FvhXJl|I0Q0SO#>mV&@^3BQ-*@|_8kOQW}qWB zkq#3$k-o>zr-7kxRA`uyp?M}4V0tJuKQNSr#aJqIOgHfT^J0>&?nws6k_UpenyW|TCpsoV zE(v35z<61l*&(;=vuUIgNQ7G0oFJ4~w_|e653#EfPNZ)7uz{{xMFW+KJq&S==nni% zhOiA*A2RnG;ovf`0s(_a16>A&eF019!uey7Z;|S2czqjj5bZ1m8jfdK}>u!LG*SWpbfea*L=K!eZa2vb*{4_%~z7B@qQC{PQy zhlnl2{Tguy1zQv<+hT?uAhM*9uoV`(XgH=OR?7k$2zYh@oO#-INYZr`qk(?lDtHO- zwqQLVb0{ucNMA$5Wa@CUIh4|(fsTR06e3>r8Js9)fl~p-BF*KH4LB1{;G6MCrVv#z zOLIax5K_SbV2HV}=rDjQo{x1aX=~Cka6~}Egj{GK_F9;p z?&G}J6u>D@6QKi_!Z0EhIHvCiSDr;9rGUe0;R=z=1DDbzKUIfcZ1d2B7v@0dD`_B@ zwpD1;=0G-Z2jQ(bV8z7+XeuXkb?7P$#FWr~oGEZmAqhqI0tSS09iDO+;b3{z3L5Fd z2@Xt~WCwXNoJ*29*Q9Up0J78(1;3UCg1C*X3gIDf7;>R7H#`f?HoO2D#C2GnGie}P z5tkvk09C|{$P2iTIS#QN;&geIfm=GVhH0e0r?{R>O}ZBB8WxW84Yfx|ZQ-70h*7NN zG!U}q4)z2(fkVzL9|4TX1(FIvAc$~DH7hR1_@ z)d)frER6>muzWhwB8DPg9TI``;Z#ffCr+8`f!o*6K*Nq0!gr>iVK&UP3;To9>1xP0 z*nwqZa|Iisk)iI|lFW+B0x(I`1ckUJE?~Hic4)csG#co6zVvM$_Qw@;#taj*VKYNZ zgoedj9da6^fpFraXGJd4q|jjixdp8>ads^@p**w{gPcx{Ng_ED;+oNvj}HE zOQQk$Ynt5FA{v+?rzIBBz%*&B6EQFX>&^31Wg?j-`E&veOp^%er-4Y+L`^d_$J59( zS(D>vV4AGPf|$u9#ej=-jvPZDjZ72u_r`Olh~9f>U`-SrH4MYMX=Ivkx{C&;35z@9 zfzw0RXx7-Z6wjR^!YtCjG_hiV2F?=mMRh@6o<^n#-f}cBO@P&rP@FXgdh@g>sGUYO zi%a03e>|Rg?kg*~u?|0!z3ZGFd~#A|G}HIi-tYFF*YlB{p6;D#lj2B+TgTGj)rdGw ztshH=H%!*oCVG#h!yA<}YNd269p1R^b1WTxEFE4`HLNRJ97~5EONY;1-7)Z;zPwHg~J6c|3v=W+%vfi9nW-}(*EwYm)h26Kbbu-vmfsBx$NK~n{xYG z%Cltl>Jobssr|}7j%81+{QZ~nrKm24Pc0N2iD%Tc9AjEhVMhIEqA!8b?C4-gku9ti z7N$!7e&s7;+5VO}nY~wIz)Dz2laH)trSj#m>4Mj%wF)LpISE8>x(V3bdH$B z&DL^*r02Ap=Oj0kBV*YmXQpQ&cJ_F(6N`*Q51bUa(^@^4*P3V5g?^nlSXWMyl$~61 zQmmer3e;A(2F^-0e~vuwWSVzs?rIlN(%Vt_=2*6OUgDTtA^TS*@<@|Wp5!T3fq64g zBa&@eZp*@bi1yyslDzuMX|mu|c_n{R z2g0=S|Asv`jBU7nOkJVI_lzaGvvabwYqIjZYJbuhu;yH=fX8#$qs&!?NqVO=WsQ4<3JO{adjvl{RuS5DNQs2DZ?AIK(gPGHl zm8Gd=%vjiQw>{kOWLz7sYr>QPY$Pb*FaG%ty<}O*ygZqI~67pV(Lf1UVFJ6)eCrx=Q7EaWX z>=!L2@>|MNMYYB?wW`J?f8yp^Gp)gT;+Cj5 zROMgM!ndLoCJr6R9XgUfe55eAt|#-inI)NvGkv1>pL@^hxpyMAS=M<@>2RsP_*8Lg zv90jo!sYp2V3&ipL^G*CU-iP>(J#i#rAUL-!a8t-4rwS zw{q2srOjRR;mEpVVQu%#XJIQ}hgtukyO`8A{(os#6*kmgYE?7+`^pbL# z&cSAkS4~8r8j+HXh??1m{_~S7F-=Qi)iO`USM}fqrduYOj0~IN1mp~&nc$*$0N`3K z&=0lI+2rM#1VP)*qf@!JJkbC-Xa8fJU}_9oM;x1fYq0cz_*fq7KX+oeDcU8|Emt?t z+!TjJv%p`lT6lxp^xUCK=UVQ-ImFhLJ+QO3z}8D-Rxh>woFakUqP7gUi7)`kYTR(!JJE4F!Poq< z=t(lF4rl1l{9DUu`gp5inKD#y?mAAPo>t>2PC+7T^Z9YgI2YTYLoXK&y<9x}a%n?E zg^1;uZJEBCdjG5U{GN?Hk9GgF`|aJmU4vcwJD={{RQgSRXJKjSqWqUL+w%Q||0vpp zsCZ}TuZz#;p2}^_o|5@e`v=}xuTwjNeOHW+j!nc#@o|{&SV!Z# zjGO372FM1jlIW5|e?EGf(Wk4St=s0Vfo4+OnrT#{X0gP`&WU>xfF5+*Yk?)vMI4}i z9GEZg9->&P(1eA4;F+|WlD;vwB|?w`lpU1MMysZYE@tMVHJqc>TJzDuC^-6zVMJP` zebvpxc0W47EzZ#+Z0RxEfKqWn8c^Rq8h`^gf+25(_D;2TWM3>1=|g2Xp`6eFOWm{R zb#>9P44{@S(W&d`z!A($FV{H^(e%h|WLHcZkRDqB+Cm+ypiTPX`6gfy=uwvz8d-(Z zo70{_dT&fRkm%zE3E(5B={G`KF*Xq(1s`qTKuO~l+J&3)J%jdmOj`l+0}lX=;09WG zp%*|@xafig_CrU{g+iCLH-mK4G?*P3OZWz|!WW=zIs)yzz%)W#6lAV@XvUQ~Q0(Rm z+D%Dc^|o9@T4Dx{%}jI++dkT=8No$-0d$KV20&{Hz|8?+X+b1VlB@O**ZtHa44kW{ zq2C-EDL7iJC61&a(7mjL?nhv}Rnj-bq|q1e1?Wcyd z?QY6+js6kWyJOPWL!|(y(*&~Q*hk>&pm08LAg*PFK!4-qfaav9RD=%A4e>5Y;0thQ z(Cm#rK{0aqW(nHsB8qm#(3?C@DbJXwmlwH{v*eLzs>P zgtDG=RCgk$*p6wd#B7Hau(~DyCMK{3KnTHryRft-YQ|^-H@P>3?@n$hN^do>+HV2d z(KThH%>j@z0lP>H3EJF&t0io->)3!<27rCGERrA=2>=&(?*L$H3w`YbPyr4K7$w*d zAI}y5b>u~8aBr$4Geja1amzq=0UIS)43q?j69Obqi@p>9GXy4HKxj_7M*m`QZ;QnZ zIuck3;H(@8zQYjM4GY*Xi+i{V=uHT_9<0|)++j>w8p1b$+tOT*@QD$ST>!7(#R)F} zFvAFR4+f>q(WrIur_U4n6IWeFBIyTwO_+eO;}r-O_D?tH4x>jWfFrSi$_|u1^GcZ8 z{?U=VJ|=A;`giak2O-38aBzGGVg*RK;dp3u*KHsxnoE0XBZ=~MP>!zYhlOT7R1Clg z!e09Tk^(&d6G+f2?uP=9tX4_;BP9b``J*erYU%K@b2{Aj9im2xf|cqo3kd??BIfH zlf*>jGJHO-?Bp3uG7meh#+ssX<*zWo>#A~?bl7Xp(e%3Y8nQyw5TT}SszwqS{S)Y2 zG$*|&8CS_nd(mXaRS9D+fzZ+_q2$lHB6`eBW&N(~Ks6`)#zgApBcnbKsb4uE^%L>= zyrkYVC%kw|5=K?FVTu*iDqPEDpa5GJSWd7arnqX<|1XICmk`>Bb zLGm%mm1pKFpA(>!uirI3zUHPtjZy8EE|bF>IvTLUA=6~9a=@9IW zWx3Jpk?hXy;r8!Mhhn$iN?`2v$FghE0kYd}&o1uX1g!NJb888hovp;s?1jK)&j!G* z0DyfPobTpV?5%`KNn`%6r-5m%pzC6s3j)#C)kdPT1)?%0+1!fb{gUVJ&T6Npxk|QI z&b}x6#?#aIn7b=a+ykmjooM*bdn(0yvk{`{Ch;D^y_;i!rtqF`idUZ^Hh3Ei1d>*- zg4gBB{gn;(WjDU60Q}tjnP@moN#J5y z9w1q!ju&naKLec%Oh19ap)(RVdaMbkeCfPB7FZepe*TWk*I!`(e*T=yaGGp)3jlm@ zIm}aJl+y$7^FPWA*W_hi2>^cKn#^#cOlEB&^G^Z5FMK~UoF;MirvTs=Z_W&-X?T5A z0r;gCvX{=$m{|kBFO6j$P17#9vh+Y^f11|DR~3L?dLgqZ9RMFAhpWAKbG7272jF); zm3cHxAKV$;UypFU|J(uiUDsxY(=<-KssQ}%M>G4=bQrY&zz50UGz~p}egOQQFX0`a zQ~>%i#-n%m-USGTv9OdxA&aV{j2V;b$_t?=KK@g8@rbmH+21>aBJ@Q zuJ3m}o;%X@&aP|nXXk%d_+FRURqFgv{`kVxolkb&(WyGm$;;0E($7m@DBWLptTb9$ zTUt{5W$~-UM~eGCb=;tri^%T{kj)N5h6h^Ub)iCrRH$z<%GKfd&j8Pz7=itcc%bgP z$Qa`%$Q{Fz#`_}loD%ZArsv@$E{3wkEi^Dqg=rrR#B&?diaN8_boSE7G!>e08knXg zcOwm?QdQKfR<(;p1}Q2yV>B>L9dk5hQY1ymXs)W{P8wNPcbKK@a03lYQ$*N718Y;% zQKKC7ws`L88RuCO&LIuNbAC;W42lNMOzTJWd40SRNhAFfNqRv8(=_D750cGhwxgNP zrN$4Zq=A&gFE!2KAWtHf;s>f4xj{YC9wMqdc5im&(#c1D8uueFuKW<`h4pDV*W#z; zQbk#n+hdx|AD>Fzo|?sW*T0=o`QE)*jZ(J5>r;gYw4tfIvC3k~+N6r$Dn~H1f}Ysn z^{Fg*tbJ)7^{Onp4}DLmBe^sNqUb14fmdcjFwAay5G@u2O++MX4R5)G3+N z=^!e>M$KxlZy}+j4i{6q_0@FHT0F?uuum_3kFmzTZT!B};RRiT3wnC*&vf+sB-3}e z=O^%legQA#+da|9+v6fenoM3R^}e`ndrM3@n9H?^CVZP#Y4=^889^~Ijek|GdrX#f zYHPL8jLxZ@YUCX#35}HhXv#APlR)5~UX6-yt*oO)Y138&LKV>cB|b!hX~&~ JM}ZKe{|~*nZj}H4 diff --git a/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java b/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java index 12f2583..e39cb55 100644 --- a/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java +++ b/src/main/java/com/iflytop/digester/DigestionTaskTheadManager.java @@ -4,16 +4,17 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.iflytop.digester.model.MdbDigestionSolution; import com.iflytop.digester.model.MdbDigestionTask; -import com.iflytop.digester.model.MdbOperationLog; import com.iflytop.digester.model.MdbRuntimeLog; import com.iflytop.digester.underframework.dao.model.UfMdbNotification; import com.iflytop.digester.underframework.dao.model.UfMdbOption; import com.iflytop.digester.underframework.dao.record.UfActiveRecord; +import com.iflytop.digester.underframework.util.UfCommon; import com.iflytop.digester.underframework.util.UfJsonHelper; import jakarta.annotation.PostConstruct; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; -import org.springframework.beans.factory.annotation.Value; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.HashMap; @@ -21,6 +22,8 @@ import java.util.List; import java.util.Map; @Component public class DigestionTaskTheadManager { + // logger + public static final Logger LOG = LoggerFactory.getLogger(DigestionTaskTheadManager.class); // Singleton instance private static DigestionTaskTheadManager instance = null; // Task list @@ -41,47 +44,58 @@ public class DigestionTaskTheadManager { // setup public void setup() { - this.setupMqttBroker(); + Thread mqttConnectThread = new Thread(DigestionTaskTheadManager.this::setupMqttBroker); + mqttConnectThread.start(); } // setup mqtt broker - private void setupMqttBroker() { - String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); - String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); - String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); - - this.client = null; - try { - this.client = new MqttClient(uri, clientId, new MemoryPersistence()); - } catch (MqttException e) { - throw new RuntimeException(e); - } - this.client.setCallback(new MqttCallback() { - public void messageArrived(String topic, MqttMessage message) { - DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); + public void setupMqttBroker() { + do { + String uri = UfMdbOption.getString("DigestionTaskMqttBrokerUri", ""); + String clientId = UfMdbOption.getString("DigestionTaskMqttClientId", ""); + String myTopic = UfMdbOption.getString("DigestionTaskMqttMyTopic", ""); + + UfCommon.delay(1000); + LOG.info("[MQTT Server] : connecting {}", uri); + try { + this.client = new MqttClient(uri, clientId, new MemoryPersistence()); + } catch (MqttException e) { + throw new RuntimeException(e); } - public void connectionLost(Throwable cause) { - UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); - System.out.println("connectionLost: " + cause.getMessage()); + + this.client.setCallback(new MqttCallback() { + public void messageArrived(String topic, MqttMessage message) { + DigestionTaskTheadManager.this.handleOnMessageArrived(topic, message); + } + public void connectionLost(Throwable cause) { + UfMdbNotification.error("消解任务 MQTT 服务器断开连接"); + System.out.println("connectionLost: " + cause.getMessage()); + } + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("xxxx"); + } + }); + + try { + MqttConnectOptions options = new MqttConnectOptions(); + options.setAutomaticReconnect(true); + this.client.connect(options); + LOG.info("[MQTT Server] : connected {}", uri); + } catch (MqttException e) { + UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); + this.client = null; + continue; } - public void deliveryComplete(IMqttDeliveryToken token) { - System.out.println("xxxx"); + + try { + client.subscribe(myTopic, 2); + LOG.info("[MQTT Server] : subscribed {}", myTopic); + } catch (MqttException e) { + throw new RuntimeException(e); } - }); - try { - MqttConnectOptions options = new MqttConnectOptions(); - options.setAutomaticReconnect(true); - client.connect(options); - } catch (MqttException e) { - UfMdbNotification.error("消解任务 MQTT 服务器连接失败 : %s : %s", uri, e.getMessage()); - throw new RuntimeException(e); - } - try { - client.subscribe(myTopic, 2); - } catch (MqttException e) { - throw new RuntimeException(e); - } + break ; + } while ( true ); } // Handle on message arrived