socket客户端
- pinia 封装全局监听
import { defineStore } from 'pinia';
export const useSocketStore = defineStore(
'socket',
{
state: () => ({
ws: undefined,
}),
actions: {
glMessage(nws) {
try {
console.log(' set msg global ', nws);
this.ws = nws
}
catch (e) {
console.log("catch eeeee=", e)
}
},
}
})
- 其他页面使用
import { useSocketStore } from '@/stores/socketGlobal';
const socketStore = useSocketStore();
socketStore.$subscribe((mutation, state) => {
console.log(mutation,state);
let { msgCtx, msgType, ext, msgTitle } = state.ws;
if (msgType == 5) {
Modal.info({
title: '系统消息',
content: msgCtx, okText: '知道了',
});
}
})
- 首页初始化
onBeforeUnmount(() => {
if (global_skt) {
global_skt.close();
}
});
var globalCount = 0;
var global_skt;
const initWebSocket = () => {
//初始化weosocket
const wsuri = `ws://${glIp.value}:${glPort.value}/${glPrefix.value}/netsock/` + userStore.employeeId;
global_skt = new WebSocket(wsuri);
global_skt.onmessage = websocketonmessage;
global_skt.onopen = websocketonopen;
global_skt.onerror = websocketonerror;
global_skt.onclose = websocketclose;
}
const websocketonopen = () => {
globalCount = 0;
heartCheck.reset().start(); //拿到任何消息都说明当前连接是正常的
fstate.value = '#1afa29'
let actions = { room: "ping", info: userStore.employeeId };
websocketsend(JSON.stringify(actions));
}
const websocketonerror = (e) => {
fstate.value = '#ec0c0c';
console.log('websocketonerror 断开: ' + e.code + ' ' + e.reason + ' ' + e.wasClean)
// 超过三次
if (globalCount > 5) {
showNotification('失去和服务器通信,请联系管理员!')
} else {
setTimeout(() => {
globalCount += 1;
initWebSocket();
}, 2000);
}
// initWebSocket();
};
const websocketonmessage = (messages) => {
try {
heartCheck.reset().start(); //拿到任何消息都说明当前连接是正常的
if (messages.data != 'SUCCESS') {
let msg = messages.data
if (msg.includes("{")) {
msg = JSON.parse(msg)
}
socketStore.glMessage(msg)
}
} catch (error) {
console.log(error);
fstate.value = '#ec0c0c';
}
};
const websocketsend = (Data) => {
global_skt.send(Data);
};
const websocketclose = (e) => {
fstate.value = '#ec0c0c';
console.log('已断开与服务器的连接');
if (userStore.employeeId && userStore.employeeId != '') {
if (globalCount > 3) {
// showNotification('失去和服务器通信,请联系管理员!')
} else {
setTimeout(() => {
globalCount += 1;
initWebSocket();
}, 2000);
}
}
};
//心跳检测
var heartCheck = {
timeout: 58000, //1分钟发一次心跳
timeoutObj: null,
serverTimeoutObj: null,
reset: function () {
clearTimeout(this.timeoutObj);
clearTimeout(this.serverTimeoutObj);
return this;
},
start: function () {
var self = this;
this.timeoutObj = setTimeout(function () {
//这里发送一个心跳,后端收到后,返回一个心跳消息,
//onmessage拿到返回的心跳就说明连接正常
global_skt.send("ping");
self.serverTimeoutObj = setTimeout(function () {//如果超过一定时间还没重置,说明后端主动断开了
global_skt.close(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
}, self.timeout)
}, this.timeout)
}
}
function showNotification(msg = '你好世界!!') {
message.error(msg)
};
- 实现WebSocket的“ACK + 超时重传 + 去重”机制,可以有效解决在大量用户在线时的消息丢失问题。以下是每个步骤的实现思路:
1. ACK(确认机制)
ACK机制可以确保消息已经被客户端接收。
- 客户端发送 ACK:当客户端接收到消息后,会立刻发送一个带有消息ID的ACK(确认)响应给服务器,告知消息已被成功接收。
- 服务器等待 ACK:服务器在发送消息后,进入等待状态,等待客户端的ACK。如果在超时时间内收到了ACK,则认为消息已送达,可以将其标记为已送达。
- ACK 标记消息状态:在服务器端为每条消息设置一个“已送达”或“未送达”的状态标识。收到ACK则更新状态为“已送达”。
2. 超时重传
超时重传机制可以解决客户端没有收到消息的情况。
- 设置超时时间:为每条消息设置一个特定的超时时间(例如3秒或5秒),服务器在发送消息后,开始一个超时计时。
- 超时检测和重发:如果超时时间到了,但仍未收到ACK,服务器会再次发送消息给客户端。可以设置重发的最大次数(如3次),超过最大次数仍未收到ACK则认为消息发送失败。
- 退出重传机制:在每次重传后,如果收到了ACK,立刻结束重传过程。
3. 去重机制
去重机制用于防止由于重传或网络抖动造成的消息重复。
- 消息唯一ID:为每条消息生成一个唯一的ID(可以用UUID或时间戳结合其他唯一标识生成),确保每条消息都有独特的标识。
- 客户端去重检测:客户端在收到消息后,将消息ID存入一个去重集合中(例如本地内存或缓存)。当再次接收到同一消息ID的消息时,直接忽略,不进行处理。
- 去重集合过期策略:为避免去重集合过大,可以设置消息ID的过期时间(例如在30秒后删除该ID),保持去重集合中的数据量合理。
实现流程图
- 服务器向客户端发送消息(附带唯一ID)。
- 客户端收到消息后,先检查ID是否已处理过:
- 如果未处理过,处理消息并返回ACK;
- 如果已处理过,直接忽略消息。
- 服务器接收到ACK后,将该消息标记为已送达。
- 如果超时未收到ACK,服务器则重新发送消息,重复此过程直到达到重发上限或收到ACK。
注意事项
- 消息存储策略:可以考虑使用数据库或内存数据库(如Redis)存储消息的状态,以支持大规模的消息管理。
- 网络波动与重传间隔:在不稳定网络环境中,合理的重传间隔可以提高消息的传输成功率。
- ACK发送的可靠性:客户端发送ACK应保证可靠性,如果客户端因网络问题未能发送ACK,可能会导致服务器继续重传。