Browse Source

修复部分BUG

master
zhaohe 2 days ago
parent
commit
1df2395081
  1. 78
      src/main/java/a8k/app/iflytophald/channel/A8kCanBusCommunicationChannel.java

78
src/main/java/a8k/app/iflytophald/channel/A8kCanBusCommunicationChannel.java

@ -8,15 +8,11 @@ import a8k.app.type.appevent.A8kCanBusOnConnectEvent;
import a8k.app.type.appevent.A8kHardwareReport;
import a8k.app.type.error.AEHardwareError;
import a8k.app.type.exception.AppException;
import a8k.app.factory.A8kPacketFactory;
import a8k.app.service.background.AppEventBusService;
import a8k.app.utils.ByteArrayUtils;
import a8k.app.utils.ZList;
import jakarta.annotation.Resource;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.enums.ReadyState;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
@ -26,11 +22,12 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@Slf4j
@ -118,6 +115,8 @@ public class A8kCanBusCommunicationChannel extends WebSocketClient {
ProcessContext context = new ProcessContext();
int packetIndex = 0;//发送包的packetIndex
private final Lock reconectLock = new ReentrantLock();
public A8kCanBusCommunicationChannel(String cmdChannelUrl, String dataChannelUrl) {
super(URI.create(dataChannelUrl));
@ -147,14 +146,6 @@ public class A8kCanBusCommunicationChannel extends WebSocketClient {
}
//
// EXTERN
//
synchronized public A8kPacket send(A8kPacket pack, int overtime) throws AppException {
return priSend(pack, overtime);
}
synchronized public A8kPacket autoReSend(A8kPacket pack, long overtime) throws AppException {
return priAutoReSend(pack, overtime);
}
@ -163,27 +154,44 @@ public class A8kCanBusCommunicationChannel extends WebSocketClient {
return context.getLastTxCmd(mid);
}
public void restartCanif() {
callLocalCmd("restart");
public boolean restartCanif() {
var ret = callLocalCmd("restart");
return ret != null;
}
//
// PRIVATE
//
@Scheduled(fixedRate = 10000)
@Scheduled(fixedRate = 3000)
private void autoConnect() {
if (!iflytophaldEnable)
return;
if (!isOpen()) {
if (getReadyState().equals(ReadyState.NOT_YET_CONNECTED)) {
try {
connect();
} catch (IllegalStateException ignored) {
boolean locked = reconectLock.tryLock();
if (locked) {
try {
if (isClosed()) {
log.info("try reconnect");
this.reconnect();
for (int i = 0; i < 10; i++) {
if (isOpen()) {
break;
}
OS.hsleep(100);
}
if (!isOpen()) {
log.error("reconnect fail, try to close and reconnect");
this.close();
} else {
log.info("reconnect success");
}
// this.connectBlocking(100, TimeUnit.MILLISECONDS);
}
} else if (getReadyState().equals(ReadyState.CLOSED)) {
reconnect();
} catch (Exception e) {
log.error("connect fail");
} finally {
reconectLock.unlock();
}
}
}
@ -194,21 +202,31 @@ public class A8kCanBusCommunicationChannel extends WebSocketClient {
restartCanif();
}
for (int j = 0; j < 2; j++) {
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 5; i++) {
try {
return this.priSend(pack, overtime);
} catch (AppException e) {
if (!e.error.code.equals(A8kEcode.LOW_ERROR_OVERTIME)) {
if (!e.error.eq(A8kEcode.LOW_ERROR_OVERTIME)) {
throw e;
}
} catch (Exception ignored) {
}
log.error("send cmd {} {} fail", pack, pack.toByteString());
OS.hsleep(100);
log.error("retry {}", i);
}
if (!isOpen()) {
log.warn("canbus connection is closed, try to reconnect");
autoConnect();
}
if (!isOpen()) {
OS.hsleep(2000);
}
restartCanif();
}
throw AppException.of(new AEHardwareError(A8kEcode.LOW_ERROR_OVERTIME, MId.valueOf(pack.getModuleId()), CmdId.valueOf(pack.getCmdId())));
@ -242,16 +260,8 @@ public class A8kCanBusCommunicationChannel extends WebSocketClient {
// TX packet
String txpacket = pack.toByteString();
log.debug("TX-RAW: {} | {}", txpacket, pack);
if (!isOpen()) {
try {
connectBlocking();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
send(txpacket);
A8kPacket receipt;
receipt = context.getReceipt(overtime);
if (receipt == null) {

Loading…
Cancel
Save