龙空技术网

物联网宠儿mqtt.js那些事儿

闪念基因 289

前言:

今天各位老铁们对“js物联网”大致比较关怀,我们都想要了解一些“js物联网”的相关知识。那么小编也在网摘上搜集了一些有关“js物联网””的相关资讯,希望兄弟们能喜欢,我们快快来了解一下吧!

常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。 前者很常见,MQTT是什么呢?MQTT属于IoT也就是物联网的概念。

常见的mq有Kafka,RocketMQ和RabbitMQ,大家也很常见。MQTT是什么呢?

Kafka,RocketMQ和RabbitMQ属于微服务间的mq,而MQTT则属于IoT也就是物联网的概念。

mqtt.js是MQTT在nodejs端的实现。vue技术栈下的前端也可用。

mqtt.js官方为微信小程序和支付宝小程序也做了支持。微信小程序的MQTT协议名为 wxs ,支付宝小程序则是 alis 。

如果还是一脸懵逼,那么就跟随我通过mqtt.js去认识一下这个物联网领域的宠儿吧。

什么是微消息队列?MQTT关键名词解释P2P消息和Pub/Sub消息封装的mqtt.js通用class客户端发包函数sendPacket客户端连接 mqtt.connect()订阅topic mqtt.Client#subscribe()发送消息 mqtt.Client#publish()接收消息 mqtt.Client#“message”事件什么是微消息队列?

消息队列一般分为两种:

微服务消息队列(微服务间信息传递,典型代表有RabbitMQ,Kafka,RocketMQ)物联网消息队列(物联网端与云端消息传递,代表有MQTT)

目前我实践过的,也就是我们本篇博文深入分析的,是物联网消息队列的mqtt.js。

传统的消息队列(微服务间信息传递)传统的微服务间(多个子系统服务端间)消息队列是一种非常常见的服务端间消息传递的方式。

典型代表有:RabbitMQ,Kafka,RocketMQ。

阿里云官网拥有AMQP(兼容RabbitMQ),Kafka,和RocketMQ这三种微服务消息队列,对于我们今后在实际项目中落地提供了很大的帮助。

更多微服务消息队列可查看: node-mq-tutorial

使用场景多种多样:

高并发:秒杀、抢票(FIFO)共享型:积分兑换(多子系统共用积分模块)通信型:服务端间消息传递(nodejs,java,python,go等等)MQTT消息队列(物联网端与云间消息传递)

MQTT是一个物联网MQTT协议,主要解决的是物联网IoT网络情况复杂的问题。

阿里云有MQTT消息队列服务。通信协议支持MQTT,STOMP,GB-808等。数据传输层支持TCP长连接、SSL加密、Websocket等。

使用场景主要为数据传输:

车联网(远程控制,汽车数据上传)IM通讯(1对1单聊,1对多朋友圈)视频直播(弹幕通知,聊天互动)智能家居(电器数据上传,遥控指令)

目前我手上负责的运行了2年的聊天系统就是使用的这个服务,我们主要按照 设备<->server<->PC 的方式, MQTT协议,Websocket传输协议 进行设备与PC间的数据通信。

MQTT关键名词解释实例(Instance)

每个MQTT实例都对应一个全局唯一的服务接入点。

肉眼可见的区别就是在通过 mqtt.connect(url) 与server(broker)建立连接时,broker的url都是一致的。

假设有saleman1,salesman2···他们本地的前端与服务端间建立连接的url都是统一的,只是在clientId进行区分即可。

客户端Id(Client ID)

MQTT的Client ID是每个客户端的唯一标识,要求全局都是唯一的,使用同一个Client ID连接会被拒绝。

阿里云的ClientID由两部分组成 <GroupID>@@@<DeviceID> 。

通常情况下Group ID是多前端统一的,比如PC端,安卓移动端,ios移动端,DeviceID也是多前端统一的。

那么如何区分多端呢?可以对Client ID中间的@@@做修改。

比如:

let CID_PC = `<GroupID>@@@-PC<DeviceID>`let CID_Android = `<GroupID>@@@-Android<DeviceID>`let CID_IOS = `<GroupID>@@@-IOS<DeviceID>`
组Id(Group ID)

用于指定一组逻辑功能完全一致的节点公用的组名,代表的是一类相同功能的设备。

Device ID

每个设备独一无二的标识。这个需要保证全局唯一,可以是每个传感器设备的序列号,可以是登录PC的userId。

父主题(Parent Topic)

MQTT协议基于Pub/Sub模型,任何消息都属于一个Topic。

Topic可以存在多级,第一级为父级Topic。

需要控制台单独创建。

子主题(Subtopic)

MQTT可以有二级Topic,也可以有三级Topic。

无需创建,代码中直接写即可。

P2P消息和Pub/Sub消息

Pub/Sub消息就是订阅和发布的模式,类似事件监听和广播。

如果对发布订阅不理解,可以去看Webhook到底是个啥?

MQTT除了支持Pub/Sub的模式,还支持P2P的模式。

什么是P2P消息?P2P,全称为(Point to Point)。一对一的消息收发模式,只有一个消息发送者和一个消息接收者。P2P模式下,消息发送者明确知道消息的预期接收者,并且这个消息只能被这个特定的 客户端消费 。发送者发送消息时,通过Topic指定接收者,接收者无需订阅即可获得该消息。P2P 模式不仅降低注册订阅的成本,而且因为对链路有优化,所以降低推送延迟。P2P模式和Pub/Sub模式的区别

发送消息时

Pub/Sub模式下,发送者需要按照与接受者约定好的Topic发送消息P2P模式下,发送者无需按照Tpic发送,可以直接按照规范进行发送

接收消息时

Pub/Sub模式下,接收者需要提前订阅topic才能接消息P2P模式下无需订阅即可接收消息nodejs发送P2P消息

const p2pTopic =topic+"/p2p/GID_xxxx@@@DEVICEID_001";mqtt.client.publish(p2pTopic);
封装的mqtt.js通用class客户端连接 initClient(config)订阅topic subscribeTopic(topic, config)发送消息 publishMessage(message)接收消息 handleMessage(callback)
import mqtt from 'mqtt';import config from '@/config';export default class MQTT {  constructor(options) {    this.name = options.name;    this.connecting = false;  }  /**   * 客户端连接   */  initClient(config) {    const { url, groupId, key, password, topic: { publish: publishTopic }} = config;    return new Promise((resolve) => {      this.client = mqtt.connect(        {          url,          clientId: `${groupId}@@@${deviceId}`,          username: key,          password,        }      );      this.client.on('connect', () => {        this.connecting = true;        resolve(this);      });    });  }  /**   * 订阅topic   */  subscribeTopic(topic, config) {    if (this.connecting) {      this.client.subscribe(topic, config);    }    return this;  }  /**   * 发送消息   */  publishMessage(message) {    this.client.publish(publishTopic, message, { qos: 1 });  }  /**   * 接收消息   */  handleMessage(callback) {    if (!this.client._events.message) {      this.client.on('message', callback);    }  }}
客户端发包函数sendPacketmqtt-packet生成一个可传输buffer
var mqtt = require('mqtt-packet')var object = {  cmd: 'publish',  retain: false,  qos: 0,  dup: false,  length: 10,  topic: 'test',  payload: 'test' // Can also be a Buffer}var opts = { protocolVersion: 4 } // default is 4. Usually, opts is a connect packetconsole.log(mqtt.generate(object))// Prints://// <Buffer 30 0a 00 04 74 65 73 74 74 65 73 74>//// Which is the same as://// new Buffer([//   48, 10, // Header (publish)//   0, 4, // Topic length//   116, 101, 115, 116, // Topic (test)//   116, 101, 115, 116 // Payload (test)// ])
sendPacket函数

发出packetsend事件并且通过mqtt.writeToStream将packet写入client的stream中。

var mqttPacket = require('mqtt-packet')function sendPacket (client, packet) {  client.emit('packetsend', packet)  mqttPacket.writeToStream(packet, client.stream, client.options)}
_sendPack方法
MqttClient.prototype._sendPacket = function (packet) {     sendPacket(this, packet);}
客户端连接 mqtt.connect()

mqtt client建立与mqtt server(broker)的连接,通常是通过给定一个'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'为协议的url进行连接。

mqtt.connect([url], options)

官方说明:

通过给定的url和配置连接到一个broker,并且返回一个Client。url可以遵循以下协议:'mqtt', 'mqtts', 'tcp', 'tls', 'ws', 'wss', 'wxs' , 'alis'。( mqtt.js支持微信小程序和支付宝小程序,协议分别为wxs和alis。 )url也可以是通过URL.parse()返回的对象。可以传入一个单对象,既包含url又包含选项。

再来看一下我手上项目的连接配置,连接结果。

敏感信息已通过foo,bar,baz或者xxxx的组合进行数据脱敏处理。

连接配置

{    key: 'xxxxxxxx',    secret: 'xxxxxxxx',    url: 'wss://foo-bar.mqtt.baz.com/mqtt',    groupId: 'FOO_BAR_BAZ_GID',    topic: {      publish: 'PUBLISH_TOPIC',      subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'],      unsubscribe: 'PUBLISH_TOPIC/noticeMobile/',    },}
key 账号secret 密码url 用于建立client与server(broker)mqtt连接的链接groupId 组idtopic 发送消息的topic,订阅的topic,取消订阅的topic连接结果

包括总览,响应头和请求头。

General

Request URL: wss://foo-bar.mqtt.baz.comRequest Method: GETStatus Code: 101 Switching Protocols
Response Header
HTTP/1.1 101 Switching Protocolsupgrade: websocketconnection: upgradesec-websocket-accept: xxxxxxxsec-websocket-protocol: mqtt
Request Header
GET wss://foo-bar.mqtt.baz.com/ HTTP/1.1Host: foo-bar.mqtt.baz.comConnection: UpgradePragma: no-cacheCache-Control: no-cacheUser-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.149 Safari/537.36Upgrade: websocketOrigin: : 13Accept-Encoding: gzip, deflate, brAccept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-TW;q=0.6Sec-WebSocket-Key: xxxxxxxxxSec-WebSocket-Extensions: permessage-deflate; client_max_window_bitsSec-WebSocket-Protocol: mqtt
源码分析

下面来看这段mqtt连接的代码。

this.client = mqtt.connect(  {    url,    clientId: `${groupId}@@@${deviceId}`,    username: key,    password,  });
function parseAuthOptions (opts) {  var matches  if (opts.auth) {    matches = opts.auth.match(/^(.+):(.+)$/)    if (matches) {      opts.username = matches[1]      opts.password = matches[2]    } else {      opts.username = opts.auth    }  }}/** * connect - connect to an MQTT broker. * * @param {String} [brokerUrl] - url of the broker, optional * @param {Object} opts - see MqttClient#constructor */function connect (brokerUrl, opts) {  if ((typeof brokerUrl === 'object') && !opts) {    //  可以传入一个单对象,既包含url又包含选项    opts = brokerUrl    brokerUrl = null  }  opts = opts || {}  // 设置username和password  parseAuthOptions(opts)  if (opts.query && typeof opts.query.clientId === 'string') {    // 设置Client Id    opts.clientId = opts.query.clientId  }  function wrapper (client) {   ...    return protocols[opts.protocol](client, opts)  }  // 最终返回一个mqtt client实例  return new MqttClient(wrapper, opts)}
订阅topic mqtt.Client#subscribe()实际代码
const topic =  {      subscribe: ['PUBLISH_TOPIC/noticePC/', 'PUBLISH_TOPIC/p2p'],      unsubscribe: 'PUBLISH_TOPIC/noticeMobile/',};const config = { qos:1 };this.client.subscribe(topic.subscribe, config)
源码分析
MqttClient.prototype.subscribe = function () {  var packet  var args = new Array(arguments.length)  for (var i = 0; i < arguments.length; i++) {    args[i] = arguments[i]  }  var subs = []   // obj为订阅的topic列表  var obj = args.shift()  // qos等配置  var opts = args.pop()  var defaultOpts = {    qos: 0  }  opts = xtend(defaultOpts, opts)  // 数组类型的订阅的topic列表    if (Array.isArray(obj)) {    obj.forEach(function (topic) {      if (!that._resubscribeTopics.hasOwnProperty(topic) ||        that._resubscribeTopics[topic].qos < opts.qos ||          resubscribe) {        var currentOpts = {          topic: topic,          qos: opts.qos        }        // subs是最终的订阅的topic列表        subs.push(currentOpts)      }    })  }  // 这个packet很重要  packet = {    // 发出订阅命令    cmd: 'subscribe',    subscriptions: subs,    qos: 1,    retain: false,    dup: false,    messageId: this._nextId()  }  // 发出订阅包  this._sendPacket(packet)  return this}
发送消息 mqtt.Client#publish()实际代码
const topic = {      publish: 'PUBLISH_TOPIC',};const messge = {   foo: '',   bar: '',   baz: '',   ...}const msgStr = JSON.stringify(message);this.client.publish(topic.publish, msgStr);

注意publish的消息需要使用JSON.stringify进行序列化,然后再发到指定的topic。

源码分析

MqttClient.prototype.publish = function (topic, message, opts, callback) {  var packet  var options = this.options  var defaultOpts = {qos: 0, retain: false, dup: false}  opts = xtend(defaultOpts, opts)  // 将消息传入packet的payload  packet = {    cmd: 'publish',    topic: topic,    payload: message,    qos: opts.qos,    retain: opts.retain,    messageId: this._nextId(),    dup: opts.dup  }  // 处理不同qos  switch (opts.qos) {    case 1:    case 2:       // 发出publish packet       this._sendPacketI(packet);        ...    default:       this._sendPacket(packet);        ...  }  return this}
接收消息 mqtt.Client “message”事件实际代码
this.client.on('message', callback);

数据以callback的方式接收。

function (topic, message, packet) {}

topic代表接收到的topic,buffer则是具体的数据。

message是接收到的数据,谨记通过JSON.parse()对buffer做解析。

handleMessage(callback) {    this.client.on('message', callback);}this.client.handleMessage((topic, buffer) => {  let receiveMsg = null;  try {   receiveMsg = JSON.parse(buffer.toString());  } catch (e) {   receiveMsg = null;  }  if (!receiveMsg) {    return;  }  ...do something with receiveMsg...});
源码分析

MqttClient继承了EventEmitter。

从而进行可以使用on监听“message”事件。

inherits(MqttClient, EventEmitter)

那么到底是在哪里间发出message事件的呢?>emit the message event

基于websocket-stream建立websocket连接使用pipe连接基于readable-stream.Writable创建的可写流nextTick调用_handlePacket在handlePacket中调用handlePublish发出message事件1.基于websocket-stream建立websocket连接

this.stream = this.streamBuilder(this)function streamBuilder (client, opts) {  return createWebSocket(client, opts)}var websocket = require('websocket-stream')function createWebSocket (client, opts) {  var websocketSubProtocol =    (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)      ? 'mqttv3.1'      : 'mqtt'  setDefaultOpts(opts)  var url = buildUrl(opts, client)  return websocket(url, [websocketSubProtocol], opts.wsOptions)}
2. 使用pipe连接基于readable-stream.Writable创建的可写流
var Writable = require('readable-stream').Writablevar writable = new Writable();this.stream.pipe(writable);
3.nextTick调用_handlePacket
writable._write = function (buf, enc, done) {    completeParse = done    parser.parse(buf)    work()}function work () {    var packet = packets.shift()    if (packet) {      that._handlePacket(packet, nextTickWork)    }}function nextTickWork () {    if (packets.length) {      process.nextTick(work)    } else {      var done = completeParse      completeParse = null      done()    }}
4. 在handlePacket中调用handlePublish发出message事件
MqttClient.prototype._handlePacket = function (packet, done) {  switch (packet.cmd) {    case 'publish':      this._handlePublish(packet, done)      break   ...}// emit the message eventMqttClient.prototype._handlePublish = function (packet, done) {  switch (qos) {    case 1: {      // emit the message event        if (!code) { that.emit('message', topic, message, packet) }    }}

标签: #js物联网 #rabbitmq连接拒绝 #jsmq