龙空技术网

前端实现与RabbitMQ通讯

程序猿凯撒 37

前言:

目前我们对“stompjs”可能比较讲究,咱们都需要了解一些“stompjs”的相关知识。那么小编同时在网摘上网罗了一些关于“stompjs””的相关知识,希望你们能喜欢,你们快快来了解一下吧!

需求背景

项目需要前端直接与Rabbitmq创建连接,订阅故障上报、发布异常处理等。项目中有三个主题的消息,前端在不同主题的消息中分别需要publish、subscribe、publish&subscribe。 前端采用Vue3框架,Rabbitmq官网提供http/web-stomp协议应用于web端与Rabbitmq通讯,下面前端开发围绕着stomp协议展开。 官网文档参考 RabbitMQ Web STOMP Plugin — RabbitMQ

【注】Rabbitmq服务端配置文件需要添加端口15674.

Vue封装STOMPstomp.js

stomp.js是一个stomp协议的代理库,本项目使用stomp.js创建rabbirmq连接实例,源码地址stomp-js/stompjs: Javascript and Typescript Stomp client for Web browsers and node.js apps (github.com)

配置文件

创建rabbitmq.config.js文件定义全局变量,存储连接rabbitmq的服务地址、交换机等。

javascript复制代码/* rabbitmq 连接配置*/export const RMQ_SERVER =  // mq服务地址export const RMQ_EXCHANGE = ''//交换机export const RMQ_KEY = ''//交换机routing keyexport const RMQ_ROBOT = ''//手动发布异常交换机export const RMQ_ACCOUNT = '' // 用户名export const RMQ_PASSWORD = '' // 密码export const ERROR_KEY = ""//异常队列export const SOLVE_KEY = ""//异常解决队列export const RES_KEY = ""//启停响应队列
stompjs实例化

创建stomp-client.js文件,封装并初始化stomp实例

javascript复制代码/* 初始化stomp client实例* * */import {Client} from "@stomp/stompjs";import {RMQ_ACCOUNT, RMQ_PASSWORD, RMQ_SERVER} from '@/config/stompjs/rabbitmq.config'import {ElMessage} from "element-plus";const conf = {    brokerURL: RMQ_SERVER,    connectHeaders: {        login: RMQ_ACCOUNT,        passcode: RMQ_PASSWORD    },    debug: (str) => {         console.log('STOMP: ' + str);    },    reconnectDelay: 5000,//断开重连};//连接服务端的配置let client = new Client(conf);//创建stomp客户端实例export const connect = (...topics) => {    const callbacks = topics.reduce((obj, topic, index) => {        if (index % 2 === 0) {            obj[topic] = topics[index + 1];        }        return obj;    }, {})    client.onConnect = frame => {        topics.filter((topic, index) => index % 2 === 0).forEach(topic => {            // @ts-ignore            client.subscribe(topic, callbacks[topic], (err) => {                alert(err)            })        })    }    client.activate();}export const publishMsg=(destination, body)=> {    if (!client.connected) {        ElMessage.error("网络连接异常");        return -1;    }    client.publish({destination: destination, body: JSON.stringify(body)})    return 0;}/** * @module: src\api\stomp-client.ts * @desc: 断开连接 */export const disconnect=()=>{    client.deactivate();}
订阅消息

使用subscribe()方法时必须包含两个参数:目的地destination,回调方法callback,还有一个可选参数headers;获取数据类型为String。 client.subscribe({destination:"",body:JSON.stringify(要发布的消息)})

【注】有多设备端访问,需要同时订阅的情况下需要开启fanout模式!

发布消息

使用publish()方法,必须包含参数: 目的地destination(String),消息体body。

在topic模式下destination为topic名称,存在交换机的模式下destination的格式如下拼接 "/exchang/exchange name/topic name"

应用

在入口文件index.vue中,创建连接以便于在启动项目时订阅相关消息

xml复制代码<template>  <el-container>    <el-header style="height: 10vh; border-bottom: 1px solid #CDD0D6">      <MainHeader></MainHeader>    </el-header>    <el-main style="height: 90vh; padding: 0">      <router-view></router-view>    </el-main>  </el-container></template><script setup>import {nextTick, onMounted} from 'vue';import {useHomeStore, useLogStore} from "@/store";import {connect, publishMsg} from "@/config/stompjs/stomp-client.js";import {ERROR_KEY, RES_KEY, RMQ_EXCHANGE, RMQ_KEY, RMQ_ROBOT, SOLVE_KEY} from "@/config/stompjs/rabbitmq.config";import api from "@/api/exceptionLog/api";import {storeToRefs} from "pinia";import {useRoute, useRouter} from "vue-router";const homeStore = useHomeStore();const logStore = useLogStore();const { errorList } = storeToRefs(logStore);const tags = homeStore.tagList;const router = useRouter();const route = useRoute();const getErrorMq = ()=>{  connect(      `/exchange/${ERROR_KEY}`, (x:any)=>{        //订阅异常        logStore.pushError(JSON.parse(x.body));      },      `/exchange/${SOLVE_KEY}`,(x:any)=>{        //异常消除        logStore.solveError(JSON.parse(x.body));      },      `/exchange/${RES_KEY}`, (x:any)=>{        //启停状态        let status = JSON.parse(x.body).systemStatus;        let type = JSON.parse(x.body).type;        console.log(status)        logStore.getRunningStatus(type, status);      },      `/exchange/${RMQ_ROBOT}`, (x:any)=>{        //手动消除        logStore.solveError(JSON.parse(x.body));     }      )}//向rabbitmq发布‘获取当前状态’消息const publishGetStatus =()=>{  const body0 = {operate: 1, type: 0};//消息体  const body1 = {operate: 1, type: 1};  const destination = `/exchange/${RMQ_EXCHANGE}/${RMQ_KEY}`;  publishMsg(destination, body0);  publishMsg(destination, body1);}onMounted(()=>{  userListener();  api.getErrorData().then(res => {    errorList.value = res.resData;  })  //订阅mq队列  getErrorMq();})</script>

标签: #stompjs