You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
|
|
import type { Ref } from 'vue' import { ref, watch } from 'vue'
import { FtMessageBox } from './messageBox'
// WebSocket客户端类
export class WebSocketClient { public socket: WebSocket private url: string private isConnecting = false public readonly isConnected: Ref<boolean> = ref(false) public readonly connectionError: Ref<string | null> = ref(null)
private responseHandlers = new Map<string, (response: Socket.WebSocketResponse) => void>()
private eventListeners = new Map<string, ((response: Socket.WebSocketResponse) => void)[]>() private connectCount = 0 private intervalVal: any = 0
constructor(url: string) { this.url = url this.socket = this.createWebSocket() }
// 创建WebSocket实例
private createWebSocket() { const socket = new WebSocket(this.url) socket.onopen = () => { this.isConnecting = false this.isConnected.value = true this.connectionError.value = null this.connectCount = 0 clearInterval(this.intervalVal) } socket.onclose = (event) => { this.isConnected.value = false this.connectionError.value = `连接关闭: ${event.code} ${event.reason}` // 非正常关闭时尝试重连
if (event.code !== 1000) { this.scheduleReconnect() } } socket.onerror = (error) => { this.isConnected.value = false this.connectionError.value = `连接错误: ${(error as any).message || '未知错误'}` this.scheduleReconnect() } socket.onmessage = (event) => { try { const response: Socket.WebSocketResponse = JSON.parse(event.data) // 处理特定请求的响应
const handler = this.responseHandlers.get(response.messageId) if (handler) { handler(response) this.responseHandlers.delete(response.messageId) return } // 处理事件订阅
const listeners = this.eventListeners.get(response.fromClass) || [] listeners.forEach(listener => listener(response)) // 全局事件监听
const globalListeners = this.eventListeners.get('*') || [] globalListeners.forEach(listener => listener(response)) } catch (parseError) { console.error('解析WebSocket消息失败', parseError) } } return socket }
public async waitAndSend<T = Record<string, any>>( request: Socket.WebSocketRequest, ): Promise<Socket.WebSocketResponse & { rely: T }> { // 等待连接建立
if (!this.isConnected.value) { await new Promise<void>((resolve) => { if (this.isConnected.value) { resolve() } else { const watcher = watch(this.isConnected, (connected) => { if (connected) { watcher() // 停止监听
resolve() } }) } }) return this.sendRequest(request) } else { // 连接建立后发送请求
return this.sendRequest(request) } }
// 发送请求
public sendRequest<T = Record<string, any>>( request: Socket.WebSocketRequest, ): Promise<Socket.WebSocketResponse & { rely: T }> { return new Promise((resolve, reject) => { if (this.isConnected.value) { this.socket.send(JSON.stringify(request)) this.responseHandlers.set(request.messageId, (response) => { if (response.ackcode === 0) { resolve(response as Socket.WebSocketResponse & { rely: T }) } else { // ElMessage.error(response.message)
if (response.message) { FtMessageBox.error(response.message) } resolve(response as Socket.WebSocketResponse & { rely: T }) } }) // 设置超时
setTimeout(() => { if (this.responseHandlers.has(request.messageId)) { this.responseHandlers.delete(request.messageId) reject(new Error(`请求超时: ${request.messageId}`)) } }, 10000) // 10秒超时
} else { if (!this.isConnecting) { this.reconnect() } } }) }
// 订阅事件
public subscribe(messageType: string | '*', callback: (response: Socket.WebSocketResponse) => void) { if (!this.eventListeners.has(messageType)) { this.eventListeners.set(messageType, []) } this.eventListeners.get(messageType)?.push(callback) return () => { const listeners = this.eventListeners.get(messageType) if (listeners) { const index = listeners.indexOf(callback) if (index !== -1) { listeners.splice(index, 1) } } } }
// 取消订阅方法
public unsubscribe(messageType: string | '*', callback: (response: Socket.WebSocketResponse) => void) { const listeners = this.eventListeners.get(messageType) if (listeners) { // 过滤掉需要取消的回调函数
const filteredListeners = listeners.filter(cb => cb !== callback) if (filteredListeners.length === 0) { this.eventListeners.delete(messageType) } else { // 否则更新监听器数组
this.eventListeners.set(messageType, filteredListeners) } return true // 取消成功
} return false // 未找到监听器
}
// 重连逻辑
private reconnect() { // if (this.connectCount > this.maxConnectCount) {
// clearInterval(this.intervalVal)
// console.log('-----达到最大重连次数,停止重连------')
// return
// }
this.isConnecting = true if (this.socket.readyState !== WebSocket.CLOSED && this.socket.readyState !== WebSocket.CLOSING) { this.socket.close() } this.socket = this.createWebSocket() }
// 安排重连
private scheduleReconnect() { if (this.isConnecting) { return } this.isConnecting = true this.intervalVal = setInterval(() => { this.isConnecting = false this.connectCount++ console.log('重连------------', this.connectCount) this.reconnect() }, 3000) // 3秒后重连
}
// 手动关闭连接
public close() { this.responseHandlers.clear() this.eventListeners.clear() this.socket.close() } }
// 存储已创建的WebSocket实例
const urlSocketMap = new Map<string, WebSocketClient>() // 创建WebSocket客户端的工厂函数
export const createWebSocket = (url?: string): WebSocketClient => { url = url || import.meta.env.FT_WS_URL if (urlSocketMap.has(url)) { return urlSocketMap.get(url)! } else { const client = new WebSocketClient(url) urlSocketMap.set(url, client) return client } }
|