龙空技术网

常用MQTT客户端库简介

EMQ映云科技 215

前言:

目前小伙伴们对“mqttjava”可能比较关注,我们都需要分析一些“mqttjava”的相关资讯。那么小编在网上收集了一些关于“mqttjava””的相关内容,希望看官们能喜欢,你们一起来了解一下吧!

前言

MQTT是一个轻量的发布订阅模式消息传输协议,专门针对低带宽和不稳定网络环境的物联网应用设计。MQTT基于发布/订阅范式,工作在TCP/IP协议族上,MQTT协议轻量、简单、开放并易于实现,这些特点使它适用范围非常广泛。

MQTT基于客户端-服务器通信模式,MQTT服务端称为MQTT Broker,目前行业内可选的MQTT Broker较多,其优劣与功能差别比较本文不再赘述。本文以开源社区中最流行的MQTT消息服务器-EMQX为例,使用EMQ提供的公共MQTT服务器,通过一个简单客户端连接Broker并发布、处理消息的例子,整理总结不同编程语言、平台下MQTT客户端库的使用方式与样例。

入选客户端库如下:

Eclipse Paho C与Eclipse Paho Embedded CEclipse Paho Java ClientEclipse Paho MQTT Go clientemqtt:EMQ提供的Erlang MQTT客户端库MQTT.js Web端&Node.js平台MQTT客户端Eclipse Paho Python样例应用介绍

MQTT客户端整个生命周期的行为可以概括为:建立连接、订阅主题、接收消息并处理、向指定主题发布消息、取消订阅、断开连接。

标准的客户端库在每个环节都暴露出相应的方法,不同库在相同环节所需方法参数含义大致相同,具体选用哪些参数、启用哪些功能特性需要用户深入了解MQTT协议特性并结合实际应用场景而定。

本文以一个客户端连接并发布、处理消息为例,给出每个环节大致需要使用的参数:

建立连接:

指定MQTT Broker基本信息接入地址与端口指定传输类型是TCP还是MQTT over WebSocket如果启用TLS需要选择协议版本并携带相应的的证书Broker启用了认证鉴权则客户端需要携带相应的MQTT Username Password信息配置客户端参数如keep alive时长、clean session回话保留标志位、MQTT协议版本、遗嘱消息(LWT)等

订阅主题:连接建立成功后可以订阅主题,需要指定主题信息

指定主题过滤器Topic,订阅的时候支持主题通配符+与#的使用指定QoS,根据客户端库和Broker的实现可选Qos 0 1 2,注意部分Broker与云服务提供商不支持部分QoS级别,如AWS IoT、阿里云IoT套件、Azure IoT Hub均不支持QoS 2级别消息订阅主题可能因为网络问题、Broker端ACL规则限制而失败

接收消息并处理:

一般是在连接时指定处理函数,依据客户端库与平台的网络编程模型不同此部分处理方式略有不同

发布消息:向指定主题发布消息

指定目标主题,注意该主题不能包含通配符+或#,若主题中包含通配符可能会导致消息发布失败、客户端断开等情况(视Broker与客户端库实现方式)指定消息QoS级别,同样存在不同Broker与平台支持的QoS级别不同,如Azure IoT Hub发布QoS 2的消息将断开客户端连接指定消息体内容,消息体内容大小不能超出Broker设置最大消息大小指定消息Retain保留消息标志位

取消订阅:

指定目标主题即可

断开连接:

主动断开连接,将发布遗嘱消息(LWT)Eclipse Paho C与Eclipse Paho Embedded C

Eclipse Paho C与Eclipse Paho Embedded C均为Eclipse Paho项目下的客户端库,均为使用ANSIC编写的功能齐全的MQTT客户端,Eclipse Paho Embedded C可以在桌面操作系统上使用,但主要针对mbed,Arduino和FreeRTOS 等嵌入式环境。

该客户端有同步/异步两种API,分别以MQTT Client和MQTT Async开头:

同步API旨在更简单,更有用,某些调用将阻塞直到操作完成为止,使用编程上更加容易;异步API中只有一个调用块 API-waitForCompletion ,通过回调进行结果通知,更适用于非主线程的环境。

两个库的下载、使用详细说明请移步至项目主页查看,本文使用Eclipse Paho C,直接提供样例代码如下:

#include "stdio.h"#include "stdlib.h"#include "string.h"#include "MQTTClient.h"#define ADDRESS "tcp://broker.emqx.io:1883"#define CLIENTID "emqx_test"#define TOPIC "testtopic/1"#define PAYLOAD "Hello World!"#define QOS 1#define TIMEOUT 10000Lint main(int argc, char* argv[]){MQTTClient client;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;MQTTClient_message pubmsg = MQTTClient_message_initializer;MQTTClient_deliveryToken token;int rc;MQTTClient_create(&client, ADDRESS, CLIENTID,MQTTCLIENT_PERSISTENCE_NONE, NULL);// Connection parametersconn_opts.keepAliveInterval = 20;conn_opts.cleansession = 1;if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS){printf("Failed to connect, return code %d\n", rc);exit(-1);}// Publish messagepubmsg.payload = PAYLOAD;pubmsg.payloadlen = strlen(PAYLOAD);pubmsg.qos = QOS;pubmsg.retained = 0;MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);printf("Waiting for up to %d seconds for publication of %s\n""on topic %s for client with ClientID: %s\n",(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);printf("Message with delivery token %d delivered\n", token);// DisconnectMQTTClient_disconnect(client, 10000);MQTTClient_destroy(&client);return rc;}
Eclipse Paho Java Client

Eclipse Paho Java Client是用Java编写的MQTT客户端库,可用于JVM或其他Java兼容平台(例如Android)。

Eclipse Paho Java Client提供了MqttAsyncClient和Mqtt Client异步和同步API。

通过Maven安装:

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.2</version></dependency>

连接样例代码如下:

App.javapackage io.emqx;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class App {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://broker.emqx.io:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// Connection optionsMqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// Retain connectionconnOpts.setCleanSession(true);// Set callbackclient.setCallback(new PushCallback());// Setup connectionSystem.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// Publishclient.subscribe(subTopic);// Required parameters for publishing messageMqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}}

回调消息处理类OnMessageCallback.java

package io.emqx;import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttMessage;public class OnMessageCallback implements MqttCallback {public void connectionLost(Throwable cause) {// Reconnect after lost connection.System.out.println("Connection lost, and re-connect here.");}public void messageArrived(String topic, MqttMessage message) throws Exception {// Message handler after receiving messageSystem.out.println("Topic:" + topic);System.out.println("QoS:" + message.getQos());System.out.println("Payload:" + new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}}
Eclipse Paho MQTT Go client

Eclipse Paho MQTT Go Client为Eclipse Paho项目下的Go语言版客户端库,该库能够连接到MQTT Broker以发布消息,订阅主题并接收已发布的消息,支持完全异步的操作模式。

客户端依赖于Google的proxy和websockets软件包,通过以下命令完成安装:

go get github.com/eclipse/paho.mqtt.golang

连接样例代码如下:

package mainimport ("fmt""log""os""time""github.com/eclipse/paho.mqtt.golang")var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {fmt.Printf("TOPIC: %s\n", msg.Topic())fmt.Printf("MSG: %s\n", msg.Payload())}func main() {mqtt.DEBUG = log.New(os.Stdout, "", 0)mqtt.ERROR = log.New(os.Stdout, "", 0)opts := mqtt.NewClientOptions().AddBroker("tcp://broker.emqx.io:1883").SetClientID("emqx_test_client")opts.SetKeepAlive(60 * time.Second)// Message callback handleropts.SetDefaultPublishHandler(f)opts.SetPingTimeout(1 * time.Second)c := mqtt.NewClient(opts)if token := c.Connect(); token.Wait() && token.Error() != nil {panic(token.Error())}// Subscriptionif token := c.Subscribe("testtopic/#", 0, nil); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// Publish messagetoken := c.Publish("testtopic/1", 0, false, "Hello World")token.Wait()time.Sleep(6 * time.Second)// Cancel subscriptionif token := c.Unsubscribe("testtopic/#"); token.Wait() && token.Error() != nil {fmt.Println(token.Error())os.Exit(1)}// Disconnectc.Disconnect(250)time.Sleep(1 * time.Second)}
emqtt:EMQ提供的Erlang MQTT客户端库

emqtt是开源MQTT Broker EMQX官方EMQ提供的客户端库,适用于Erlang语言。

Erlang生态有多个MQTT Broker实现,如通过插件支持MQTT的RabbitMQ,VerenMQ、EMQX等。但是MQTT客户端库几乎没有选择的余地,MQTT社区收录的Erlang客户端库中emqtt是最佳选择。

emqtt完全由Erlang实现,完成支持MQTT v3.1.1和MQTT v5.0协议版本,支持SSL单双向认证与WebSocket连接。另一款MQTT基准测试工具emqtt_bench就基于该客户端库构建。

emqtt使用方式如下:

ClientId = <<"test">>.{ok, ConnPid} = emqtt:start_link([{clientid, ClientId}]).{ok, _Props} = emqtt:connect(ConnPid).Topic = <<"guide/#">>.QoS = 1.{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {Topic, QoS}).{ok, _PktId} = emqtt:publish(ConnPid, <<"guide/1">>, <<"Hello World!">>, QoS).%% If the qos of publish packet is 0, `publish` function would not return packetid.ok = emqtt:publish(ConnPid, <<"guide/2">>, <<"Hello World!">>, 0).%% Recursively get messages from mail box.Y = fun (Proc) -> ((fun (F) -> F(F) end)((fun(ProcGen) -> Proc(fun() -> (ProcGen(ProcGen))() end) end))) end.Rec = fun(Receive) -> fun()-> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Receive(); _Other -> Receive() after 5 -> ok end end end.(Y(Rec))().%% If you don't like y combinator, you can also try named function to recursively get messages in erlang shell.Receive = fun Rec() -> receive {publish, Msg} -> io:format("Msg: ~p~n", [Msg]), Rec(); _Other -> Rec() after 5 -> ok end end.Receive().{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, <<"guide/#">>).ok = emqtt:disconnect(ConnPid).
MQTT.js Web端&Node.js平台MQTT客户端

MQTT.js是Java Script编写的,实现了MQTT协议客户端功能的模块,可以在Node.js或浏览器环境中使用。在Node.js中使用时,即可以-g全局安装以命令行的形式使用,又可以将其集成到项目中调用。

由于JavaScript单线程特性,MQTT.js是全异步MQTT客户端,MQTT.js支持MQTT与MQTT over WebSocket,在不同运行环境支持程度如下:

浏览器环境:MQTT over WebSocket(包括微信小程序、支付宝小程序等定制浏览器环境)Node.js环境:MQTT、MQTT over WebSocket

不同环境里除了少部分连接参数不同,其他API均是相同的。

使用npm安装:

npm i mqtt

使用CDN安装(浏览器):

<script src=";></script><script>// Initialize a global mqtt variableconsole.log(mqtt)</script>

样例代码:

// const mqtt = require('mqtt')import mqtt from 'mqtt'// Connection optionconst options = {clean: true, // Retain connectionconnectTimeout: 4000, // Timeout// AuthticationclientId: 'emqx_test',username: 'emqx_test',password: 'emqx_test',}// Connection string// ws: unsecured WebSocket// wss: secured WebSocket connection// mqtt: unsecured TCP connection// mqtts: secured TCP connectionconst connectUrl = 'wss://broker.emqx.io:8084/mqtt'const client = mqtt.connect(connectUrl, options)client.on('reconnect', (error) => {console.log('reconnect:', error)})client.on('reconnect', (error) => {console.log('reconnect:', error)})client.on('message', (topic, message) => {console.log('message:', topic, message.toString())})
Eclipse Paho Python

Eclipse Paho Python为Eclipse Paho项目下的Python语言版客户端库,该库能够连接到MQTT Broker以发布消息,订阅主题并接收已发布的消息。

使用PyPi包管理工具安装:

pip install paho-mqtt

代码样例:

import paho.mqtt.client as mqtt# Successful Connection Callbackdef on_connect(client, userdata, flags, rc):print('Connected with result code '+str(rc))client.subscribe('testtopic/#')# Message delivery callbackdef on_message(client, userdata, msg):print(msg.topic+" "+str(msg.payload))client = mqtt.Client()# Set callback handlerclient.on_connect = on_connectclient.on_message = on_message# Set up connectionclient.connect('broker.emqx.io', 1883, 60)# Publish messageclient.publish('emqtt',payload='Hello World',qos=0)client.loop_forever()
总结

关于MQTT协议、MQTT客户端库使用流程、常用MQTT客户端的简介就到这里,欢迎读者通过EMQX进行MQTT学习、项目开发使用。

标签: #mqttjava