socket客户端

  • pinia 封装全局监听
import { defineStore } from 'pinia';
export const useSocketStore = defineStore(
    'socket',
    {
        state: () => ({
            ws: undefined,
        }),
        actions: {
            glMessage(nws) {
                try {
                    console.log(' set msg global ', nws);
                    this.ws = nws
                }
                catch (e) {
                    console.log("catch eeeee=", e)
                }
            },
        }
})

  • 其他页面使用
import { useSocketStore } from '@/stores/socketGlobal';
const socketStore = useSocketStore();

socketStore.$subscribe((mutation, state) => {

    console.log(mutation,state);
    
    let { msgCtx, msgType, ext, msgTitle } = state.ws;

    if (msgType == 5) {
        Modal.info({
            title: '系统消息',
            content: msgCtx, okText: '知道了',
        });
    }
})
  • 首页初始化

onBeforeUnmount(() => {

    if (global_skt) {
        global_skt.close();
    }
});


var globalCount = 0;
var global_skt;

const initWebSocket = () => {
    //初始化weosocket
    const wsuri = `ws://${glIp.value}:${glPort.value}/${glPrefix.value}/netsock/` + userStore.employeeId;
    global_skt = new WebSocket(wsuri);
    global_skt.onmessage = websocketonmessage;
    global_skt.onopen = websocketonopen;
    global_skt.onerror = websocketonerror;
    global_skt.onclose = websocketclose;

}

const websocketonopen = () => {
    globalCount = 0;
    heartCheck.reset().start();      //拿到任何消息都说明当前连接是正常的
    fstate.value = '#1afa29'
    let actions = { room: "ping", info: userStore.employeeId };
    websocketsend(JSON.stringify(actions));
}

const websocketonerror = (e) => {
    fstate.value = '#ec0c0c';
    console.log('websocketonerror 断开: ' + e.code + ' ' + e.reason + ' ' + e.wasClean)
    // 超过三次
    if (globalCount > 5) {
        showNotification('失去和服务器通信,请联系管理员!')
    } else {
        setTimeout(() => {
            globalCount += 1;
            initWebSocket();
        }, 2000);
    }

    // initWebSocket();
};
const websocketonmessage = (messages) => {
    try {

        heartCheck.reset().start();      //拿到任何消息都说明当前连接是正常的
        if (messages.data != 'SUCCESS') {
            let msg = messages.data
            if (msg.includes("{")) {
                msg = JSON.parse(msg)
            }
            socketStore.glMessage(msg)
        }
    } catch (error) {
        console.log(error);
        fstate.value = '#ec0c0c';
    }

};
const websocketsend = (Data) => {
    global_skt.send(Data);
};


const websocketclose = (e) => {

    fstate.value = '#ec0c0c';

    console.log('已断开与服务器的连接');

    if (userStore.employeeId && userStore.employeeId != '') {
        if (globalCount > 3) {
            // showNotification('失去和服务器通信,请联系管理员!')
        } else {
            setTimeout(() => {
                globalCount += 1;
                initWebSocket();
            }, 2000);
        }
    }
};


//心跳检测
var heartCheck = {
    timeout: 58000,        //1分钟发一次心跳
    timeoutObj: null,
    serverTimeoutObj: null,
    reset: function () {
        clearTimeout(this.timeoutObj);
        clearTimeout(this.serverTimeoutObj);
        return this;
    },
    start: function () {
        var self = this;
        this.timeoutObj = setTimeout(function () {
            //这里发送一个心跳,后端收到后,返回一个心跳消息,
            //onmessage拿到返回的心跳就说明连接正常
            global_skt.send("ping");
            self.serverTimeoutObj = setTimeout(function () {//如果超过一定时间还没重置,说明后端主动断开了
                global_skt.close();     //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
            }, self.timeout)
        }, this.timeout)
    }
}

function showNotification(msg = '你好世界!!') {

    message.error(msg)
    
};
  • 实现WebSocket的“ACK + 超时重传 + 去重”机制,可以有效解决在大量用户在线时的消息丢失问题。以下是每个步骤的实现思路:

1. ACK(确认机制)

ACK机制可以确保消息已经被客户端接收。

  • 客户端发送 ACK:当客户端接收到消息后,会立刻发送一个带有消息ID的ACK(确认)响应给服务器,告知消息已被成功接收。
  • 服务器等待 ACK:服务器在发送消息后,进入等待状态,等待客户端的ACK。如果在超时时间内收到了ACK,则认为消息已送达,可以将其标记为已送达。
  • ACK 标记消息状态:在服务器端为每条消息设置一个“已送达”或“未送达”的状态标识。收到ACK则更新状态为“已送达”。

2. 超时重传

超时重传机制可以解决客户端没有收到消息的情况。

  • 设置超时时间:为每条消息设置一个特定的超时时间(例如3秒或5秒),服务器在发送消息后,开始一个超时计时。
  • 超时检测和重发:如果超时时间到了,但仍未收到ACK,服务器会再次发送消息给客户端。可以设置重发的最大次数(如3次),超过最大次数仍未收到ACK则认为消息发送失败。
  • 退出重传机制:在每次重传后,如果收到了ACK,立刻结束重传过程。

3. 去重机制

去重机制用于防止由于重传或网络抖动造成的消息重复。

  • 消息唯一ID:为每条消息生成一个唯一的ID(可以用UUID或时间戳结合其他唯一标识生成),确保每条消息都有独特的标识。
  • 客户端去重检测:客户端在收到消息后,将消息ID存入一个去重集合中(例如本地内存或缓存)。当再次接收到同一消息ID的消息时,直接忽略,不进行处理。
  • 去重集合过期策略:为避免去重集合过大,可以设置消息ID的过期时间(例如在30秒后删除该ID),保持去重集合中的数据量合理。

实现流程图

  1. 服务器向客户端发送消息(附带唯一ID)。
  2. 客户端收到消息后,先检查ID是否已处理过:
    • 如果未处理过,处理消息并返回ACK;
    • 如果已处理过,直接忽略消息。
  3. 服务器接收到ACK后,将该消息标记为已送达。
  4. 如果超时未收到ACK,服务器则重新发送消息,重复此过程直到达到重发上限或收到ACK。

注意事项

  • 消息存储策略:可以考虑使用数据库或内存数据库(如Redis)存储消息的状态,以支持大规模的消息管理。
  • 网络波动与重传间隔:在不稳定网络环境中,合理的重传间隔可以提高消息的传输成功率。
  • ACK发送的可靠性:客户端发送ACK应保证可靠性,如果客户端因网络问题未能发送ACK,可能会导致服务器继续重传。