龙空技术网

springboot+websocket+vue网站实时在线人数(02) 实现步骤

Dobbyisfree 373

前言:

现在咱们对“在线人数统计java”都比较注重,大家都需要学习一些“在线人数统计java”的相关资讯。那么小编同时在网络上网罗了一些对于“在线人数统计java””的相关文章,希望兄弟们能喜欢,同学们一起来学习一下吧!

需求捕捉用户登录和关闭页签的动作,用于统计实时在线人数系统管理员可以查看到当前实时在线人数列表效果

实时人数监控

实现模型

实现模型

实现步骤用户前端登录后发起socket连接

/** 获取当前用户信息 **/      getUserInfo () {        this.$http({          url: this.$http.adornUrl('/sys/user/info'),          method: 'get',          params: this.$http.adornParams()        }).then(({data}) => {          if (data && data.code === 0) {            this.loading = false            this.userId = data.user.userId            this.userName = data.user.username            this.name = data.user.name            this.$store.commit('user/updateDept',  data.user.dept)            // 注册websocket事件            this.startSocketBeats(this.userId,data.user.onlineUrl)          }        })      },            /** 创建websocket连接 **/      startSocketBeats(token,url){        if (!window['SocketNotice']) {        // 创建socket实例          window['SocketNotice'] = new SocketNotice(url, token, 5)          // window['SocketNotice'] = new SocketNotice("ws://127.0.0.1:8099/saleapi/notice.ws", '1290904685997350914', 5)        }        if (!window['SocketNotice'].connected) {        // 发起socket连接          window['SocketNotice'].connect()        }      }
socketNotice.js (webSocket相关工具类代码)

注意processMessage方法,这里用于区分接收到的消息类型,根据不同类型去处理不同的页面监听器

import UUID from 'uuid/v4'const socketType = {  // 心跳  HEART: {    msgType: 98  }}export default class SocketNotice {  /**   * @param url   * @param userId   * @param pingIntervalInSeconds 心跳间隔,单位秒   * @param maxConnectionBrokenTimes      最大的断开链接失败次数,超过就重连   */  constructor(url, userId, pingIntervalInSeconds = 5, maxConnectionBrokenTimes = 100) {    this.socket = null    this.connected = false// 是否已经联通了服务器    this.heartHeatTimer = -1// 心跳定时器    this.maxConnBrokenTimes = maxConnectionBrokenTimes// 默认的判断断线的丢包次数    this.connBrokenTimes = 0// 默认的判断断线的丢包次数    this.componentsBinded = new Map() // 存放页面组件的容器    this.heartBeatData = JSON.stringify(socketType.HEART)    this.connectUrl = url    this.pingInterval = pingIntervalInSeconds * 1000    this.wsTokenId = userId    this.stopFlag = false// 是否停止socket    this.defaultEventProcessor = {      onclose: this.onClose.bind(this),      onopen: this.onOpen.bind(this),      onmessage: this.onMessage.bind(this),      onerror: this.onError.bind(this)    }  }  connect() {    if ('WebSocket' in window) {      this.socket = new WebSocket(this.connectUrl + '?onlineTokenId=' + this.wsTokenId)      this.setUpEvents()      this.connected = true    } else {      this.connected = false      console.log('当前浏览器 不支持 websocket')    }  }  isConnected() {    return this.connected  }  /***   * 关闭当前socket   */  closeSocket(msg = '未指明', stop = false) {    this.stopFlag = stop    this.socket && this.socket.readyState === 1 && (this.socket.close() || (this.socket = null))    clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)// 清除定时器    if (stop) {      console.log(`由于 < ${msg} > 的原因客户端主动关闭远程连接,请联系管理员`)    }  }  stop() {    this.stopFlag = true  }  /**   * 注册事件消费者   * UUID(receiver) 生成一个理论上不重复的128位16进制表示的数字   * @param receiver   */  registerReceiver(receiver, copmName) {    console.log(receiver, copmName)    //    if (!receiver || !UUID(receiver)) {      throw new Error('Illegal arguments')    }    if (this.componentsBinded.has(copmName)) {      console.error(`key <${copmName}> 组件已经存在! `)    }    this.componentsBinded.set(copmName, receiver)  }  unRegisterReceiver(copmName) {    copmName && this.componentsBinded.delete(copmName)  }  // 发送消息  send(message) {    if (this.stopFlag) return    this.socket.send(message)  }  processMessage(data) {    // console.log(`接收到 WS 消息... ${JSON.stringify(data)}`)    if (data.msgType === 1) {      this.online(data)    }  }  online(data) {    const comp = this.componentsBinded.get('ONLINE')    if (comp && !comp.compDestory && comp.onSocketMessage) {      comp.onSocketMessage(data)    }  }  beatHeart() {    if (!this.stopFlag) this.send(this.heartBeatData)  }  /**   * 设置心跳时间间隔 同时开启心跳   * @param onConnectionBroken    处理 无回应的次数超过限制的 函数   */  startPing() {    if (this.heartHeatTimer > -1) {      clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)    }    if (this.pingInterval === -1 || !this.connected) {      return    }    this.heartHeatTimer = setInterval(this.beatHeart.bind(this), this.pingInterval)    // console.log('开始发送心跳 , 定时器id -->> ' + this.heartHeatTimer)  }  setUpEvents() {    Object.assign(this.socket, this.defaultEventProcessor)  }  // 连接发生错误的回调方法  onError(error) {    console.log('WebSocket连接发生错误')    console.log(error)  };  // 连接成功建立的回调方法  onOpen() {    this.stopFlag = false    console.log('WebSocket连接成功 ')    this.startPing()  }  // 接收到消息的回调方法  onMessage(event) {    // console.log(`接收到socket消息 --> ${event.data} `)    this.processMessage(JSON.parse(event.data))  }  /**   * 连接关闭的回调方法   * 需要在关闭的时候就重连.   */  onClose() {    this.connected = false    if (this.stopFlag) {      return    }    console.log('WebSocket连接关闭')    // 停止心跳    clearInterval(this.heartHeatTimer) || (this.heartHeatTimer = -1)    // 开始重连    this.socket = null    this.connBrokenTimes = this.connBrokenTimes + 1    setTimeout(function () {      console.log('socket断开链接 一秒钟之后,准备重试链接  ,已经尝试重连的次数:  -->> ' + this.connBrokenTimes + ', 最大次数: -> ' + this.maxConnBrokenTimes)      if (this.connBrokenTimes > this.maxConnBrokenTimes) {        console.log(`多次链接失败,可能是服务器出现故障`)        return      }      if (this.connect()) {        this.startPing()      }    }.bind(this), 1000)// 重连间隔时间是 一秒  }}
后端处理用户发起的socket连接监听onOpen事件,根据token获取当前登录用户详情记录发起连接的是哪个用户,放入缓存
# 这部分根据具体业务自由发挥@Slf4j@Component@ServerEndpoint("/online.ws")public class OnlineWSServer {    private SysUserService userService;    private SaleDeptService deptService;    private DeptUserRelService deptUserRelService;    private static final String ONLINE_TOKEN_ID = "onlineTokenId";    /**     * 用来记录当前在线连接数。     */    private static AtomicInteger onlineCount = new AtomicInteger(0);    private static NutMap onlineUserPool = new NutMap();    @OnOpen    public synchronized void onOpen(Session session) {        this.userService = WsApplicationContextAware.getApplicationContext().getBean(SysUserService.class);        this.deptService = WsApplicationContextAware.getApplicationContext().getBean(SaleDeptService.class);        this.deptUserRelService = WsApplicationContextAware.getApplicationContext().getBean(DeptUserRelService.class);        addOnlineCount();        addOnlineUser(session);    }    @OnClose    public synchronized void onClose(Session session) {        log.error(JSONUtil.toJsonStr(session));        String userId = getSessionToken(session);        SysUserEntity user = userService.getById(userId);        SysUserEntity onlineUser = onlineUserPool.getAs(userId, SysUserEntity.class);        if (onlineUser != null) {            AtomicInteger onlineCount = onlineUser.getOnlineCount();            onlineCount.getAndDecrement();            if (onlineCount.get() == 0) {                onlineUserPool.remove(userId);                WebSocketSessionManager.remove(userId, session);            }        }        reduceOnlineCount();    }    @OnMessage    public void onMessage(String message, Session session) {//        log.info("接收到ws=[{}]的消息:{}", session, message);        JSONObject jsonObject = JSONObject.parseObject(message);        int msgType = jsonObject.getIntValue("msgType");        if (msgType == 98) {            NutMap pong = new NutMap();            pong.put("time", new Date());            pong.put("onlineCount", onlineCount.intValue());            pong.put("msgType", msgType);            try {                // 发送实时人数到当前登录用户                Set<Map.Entry<String, Object>> entries = onlineUserPool.entrySet();                List<SysUserEntity> sysUserEntityList = new ArrayList<>();                for (Map.Entry<String, Object> entry : entries) {                    SysUserEntity user = (SysUserEntity) entry.getValue();                    sysUserEntityList.add(user);                }                pong.put("userList", sysUserEntityList);                pong.put("onlineUserPool", onlineUserPool);                pong.put("msgType", 1);                session.getBasicRemote().sendText(JSON.toJSONString(pong));            } catch (Exception e) {                log.error(e.getMessage(), e);            }        }    }    @OnError    public void onError(Session session, Throwable error) {        log.error("ws= < " + session.getId() + " > 内部错误", error);    }    private String getSessionToken(Session session) {        List<String> id = session.getRequestParameterMap().get(ONLINE_TOKEN_ID);        return id.stream().findFirst().orElse(StrUtil.EMPTY);    }    private String getDeptFullName(String itcode){        List<SaleDeptEntity> deptList = deptUserRelService.getDeptListByUsernameNotExpired(itcode);        if (CollectionUtil.isNotEmpty(deptList)){            List<String> deptNames = deptList.stream().map(SaleDeptEntity::getDeptFullName).filter(Objects::nonNull).collect(Collectors.toList());            return Joiner.on(",").join(deptNames);        }else{            return "";        }    }    public void addOnlineUser(Session session) {        String userId = getSessionToken(session);        SysUserEntity user = userService.getById(userId);        if(user==null){            return;        }        String deptFullName = getDeptFullName(user.getUsername());        if (BeanUtil.isNotEmpty(onlineUserPool.get(userId))) {            SysUserEntity userEntity = (SysUserEntity) onlineUserPool.get(userId);            userEntity.setDeptFullNames(deptFullName);            userEntity.getOnlineCount().incrementAndGet();            onlineUserPool.put(userId, userEntity);        } else {            user.setDeptFullNames(deptFullName);            user.getOnlineCount().incrementAndGet();            onlineUserPool.put(userId, user);        }    }    /**     * 原子性操作,在线连接数加一     */    public void addOnlineCount() {        onlineCount.getAndIncrement();    }    /**     * 原子性操作,在线连接数减一     */    public static void reduceOnlineCount() {        onlineCount.getAndDecrement();    }}
演示连接效果

接收到msgType=98的心跳请求,反馈msgType=1的在线用户信息

下面被划掉的地方其实是不存在的,我测试时多发送了一次

目前已经能实时地拿到用户信息,但是这些数据目前是没有被任何页面处理的,我们的需求是当管理员打开实时监控页面时,才去处理并展示这些数据,接下来要做的就是,在打开实时监控页面时,把该页面的实例添加到socket容器中,也就是注册,在页面关闭时将该实例在容器中移除.

前端注册socket监听

<template>  <div>    <el-table      :data="tableData"      border      style="width: 100%">      <el-table-column        prop=""        align="center"        label="当前登录用户"        width="auto">        <el-table-column          align="center"          type="index"          width="auto">        </el-table-column>        <el-table-column          fixed          align="center"          prop="username"          label="ITCODE"          width="auto">        </el-table-column>        <el-table-column          fixed          align="center"          prop="name"          label="姓名"          width="auto">        </el-table-column>        <el-table-column          fixed          align="center"          prop="deptFullNames"          label="部门"          width="auto">        </el-table-column>        <el-table-column          fixed          align="center"          prop="onlineCount"          label="会话个数"          width="auto">        </el-table-column>      </el-table-column>    </el-table>  </div></template><script>  const COMP_NAME = 'ONLINE'  export default {    name: 'index',    created() {      this.registerWebSocket(true)    },    destroyed() {      this.registerWebSocket(false)    },    data() {      return {        tableData: []      }    },    methods: {      registerWebSocket(open) {        if (open) {          if (window['SocketNotice'] && window['SocketNotice'].registerReceiver) {            window['SocketNotice'].registerReceiver(this, COMP_NAME)          }        } else {          if (window['SocketNotice'] && window['SocketNotice'].unRegisterReceiver) {            window['SocketNotice'].unRegisterReceiver(COMP_NAME)          }        }      },      onSocketMessage(data) {        this.tableData = data.userList;      }    }  }</script><style scoped></style>
最终效果总结

总的流程大概就是以下几点

客户端在用户登录后发起websocket连接 后端监听到有连接加入时就对用户信息进行记录,做相应的处理并进行反馈客户端对指定页面和指定的消息类型进行监听和处理

标签: #在线人数统计java #vue websocket心跳 #vue websocket心跳检测