Browse Source

feat:ws链接成功后主动推送一次系统状态

master
白凤吉 4 days ago
parent
commit
fbb38ffdcf
  1. 41
      src/main/java/com/iflytop/handacid/app/websocket/server/MyWebSocketHandler.java
  2. 24
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketConfig.java
  3. 17
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketConnectedEvent.java
  4. 27
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketEventListener.java
  5. 25
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketSender.java
  6. 51
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketServer.java
  7. 14
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketServerConfig.java
  8. 63
      src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketSessionManager.java
  9. 2
      src/main/java/com/iflytop/handacid/app/websocket/server/WebsocketMsg.java

41
src/main/java/com/iflytop/handacid/app/websocket/server/MyWebSocketHandler.java

@ -0,0 +1,41 @@
package com.iflytop.handacid.app.websocket.server;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.TextWebSocketHandler;
@Slf4j
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
private final WebSocketSessionManager sessionManager;
private final ApplicationEventPublisher eventPublisher;
public MyWebSocketHandler(WebSocketSessionManager sessionManager, ApplicationEventPublisher eventPublisher) {
this.sessionManager = sessionManager;
this.eventPublisher = eventPublisher;
}
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
sessionManager.add(session);
eventPublisher.publishEvent(new WebSocketConnectedEvent(this, session));
}
@Override
public void afterConnectionClosed(WebSocketSession session, org.springframework.web.socket.CloseStatus status) throws Exception {
// 从管理器移除
sessionManager.remove(session);
super.afterConnectionClosed(session, status);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
// 如果需要也可以在这里处理消息
log.info("收到消息,sessionId={},内容={}", session.getId(), message.getPayload());
}
}

24
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketConfig.java

@ -0,0 +1,24 @@
package com.iflytop.handacid.app.websocket.server;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
private final MyWebSocketHandler myWebSocketHandler;
public WebSocketConfig(MyWebSocketHandler myWebSocketHandler) {
this.myWebSocketHandler = myWebSocketHandler;
}
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(myWebSocketHandler, "/ws")
.setAllowedOrigins("*") // 根据需要限制
.addInterceptors(new org.springframework.web.socket.server.support.HttpSessionHandshakeInterceptor());
}
}

17
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketConnectedEvent.java

@ -0,0 +1,17 @@
package com.iflytop.handacid.app.websocket.server;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import org.springframework.web.socket.WebSocketSession;
@Getter
public class WebSocketConnectedEvent extends ApplicationEvent {
private final WebSocketSession session;
public WebSocketConnectedEvent(Object source, WebSocketSession session) {
super(source);
this.session = session;
}
}

27
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketEventListener.java

@ -0,0 +1,27 @@
package com.iflytop.handacid.app.websocket.server;
import com.iflytop.handacid.app.core.state.DeviceState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
@Slf4j
@Component
@RequiredArgsConstructor
public class WebSocketEventListener {
private final WebSocketSender webSocketSender;
private final DeviceState deviceState;
@EventListener
public void handleWebSocketConnected(WebSocketConnectedEvent event) throws IOException {
WebSocketSession session = event.getSession();
log.info("监听到新的 WS 连接,sessionId={}", session.getId());
session.sendMessage(new TextMessage("链接成功"));
webSocketSender.push(WebSocketMessageType.STATUS, deviceState.toJSON());
}
}

25
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketSender.java

@ -1,6 +1,7 @@
package com.iflytop.handacid.app.websocket.server; package com.iflytop.handacid.app.websocket.server;
import cn.hutool.json.JSONUtil; import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -11,14 +12,18 @@ import java.time.format.DateTimeFormatter;
@Slf4j @Slf4j
@Component @Component
@RequiredArgsConstructor
public class WebSocketSender { public class WebSocketSender {
private final WebSocketSessionManager sessionManager;
public void push(String type, Object data) { public void push(String type, Object data) {
WebsocketResult websocketResult = new WebsocketResult();
websocketResult.setType(type);
websocketResult.setData(data);
websocketResult.setTimestamp(Instant.now().toEpochMilli());
WebSocketServer.sendMessageToClients(JSONUtil.toJsonStr(websocketResult));
WebsocketMsg msg = new WebsocketMsg();
msg.setType(type);
msg.setData(data);
msg.setTimestamp(Instant.now().toEpochMilli());
String json = JSONUtil.toJsonStr(msg);
sessionManager.broadcast(json);
} }
@ -26,16 +31,6 @@ public class WebSocketSender {
push(WebSocketMessageType.CMD_DEBUG, data); push(WebSocketMessageType.CMD_DEBUG, data);
} }
public void pushLog(String code, String type, String content) {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
String formattedDate = LocalDateTime.now().format(formatter);
WebsocketResult websocketResult = new WebsocketResult();
websocketResult.setType(WebSocketMessageType.LOG);
websocketResult.setData(formattedDate + "模块" + StringUtils.substring(code, code.length() - 1) + type + content);
websocketResult.setTimestamp(Instant.now().toEpochMilli());
WebSocketServer.sendMessageToClients(JSONUtil.toJsonStr(websocketResult));
}
public void pushCMDResponse(Object data) { public void pushCMDResponse(Object data) {
push(WebSocketMessageType.CMD_RESPONSE, data); push(WebSocketMessageType.CMD_RESPONSE, data);
} }

51
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketServer.java

@ -1,51 +0,0 @@
package com.iflytop.handacid.app.websocket.server;
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@Slf4j
@ServerEndpoint("/ws")
@Component
public class WebSocketServer {
private static final Set<Session> sessions = Collections.synchronizedSet(new HashSet<>());
public static void sendMessageToClients(String message) {
synchronized (sessions) {
for (Session session : sessions) {
try {
session.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("WS发送给客户端失败 sessionId={}", session.getId(), e);
}
}
}
}
@OnOpen
public void onOpen(Session session) {
sessions.add(session);
log.info("新连接加入,sessionId={}", session.getId());
}
@OnMessage
public void onMessage(String message, Session session) {
log.info("收到消息 sessionId={},内容:{}", session.getId(), message);
}
@OnClose
public void onClose(Session session) {
sessions.remove(session);
log.info("连接已关闭,sessionId={}", session.getId());
}
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误,sessionId={}", session.getId(), error);
}
}

14
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketServerConfig.java

@ -1,14 +0,0 @@
package com.iflytop.handacid.app.websocket.server;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketServerConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

63
src/main/java/com/iflytop/handacid/app/websocket/server/WebSocketSessionManager.java

@ -0,0 +1,63 @@
package com.iflytop.handacid.app.websocket.server;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
/**
* 统一管理所有 WebSocketSession
*/
@Component
public class WebSocketSessionManager {
private final Set<WebSocketSession> sessions = Collections.synchronizedSet(new HashSet<>());
/**
* 连接建立时调用
*/
public void add(WebSocketSession session) {
sessions.add(session);
}
/**
* 连接关闭时调用
*/
public void remove(WebSocketSession session) {
sessions.remove(session);
}
/**
* 广播所有在线客户端
*/
public void broadcast(String payload) {
synchronized (sessions) {
for (WebSocketSession session : sessions) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(payload));
} catch (IOException e) {
// log 或者移除无效 session
}
}
}
}
}
/**
* 定向给某个 session 发消息
*/
public void sendTo(WebSocketSession session, String payload) {
if (session.isOpen()) {
try {
session.sendMessage(new TextMessage(payload));
} catch (IOException e) {
// log
}
}
}
}

2
src/main/java/com/iflytop/handacid/app/websocket/server/WebsocketResult.java → src/main/java/com/iflytop/handacid/app/websocket/server/WebsocketMsg.java

@ -4,7 +4,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data; import lombok.Data;
@Data @Data
public class WebsocketResult {
public class WebsocketMsg {
/** /**
* 推送类型(指令 cmd,报警 warn ,状态 status) * 推送类型(指令 cmd,报警 warn ,状态 status)
*/ */
Loading…
Cancel
Save