龙空技术网

物联网时代-ThingsBoard源码分析-MQTT设备连接协议-上

物联网技术指南 434

前言:

目前我们对“nettyudp判断启动成功”大概比较着重,兄弟们都需要知道一些“nettyudp判断启动成功”的相关资讯。那么小编在网上收集了一些有关“nettyudp判断启动成功””的相关知识,希望同学们能喜欢,朋友们快快来了解一下吧!

完整项目:

指南logo

系列文章:

高质量的 IOT 技术教程,代码主要源于国外开源物联网平台ThingsBoard和对阿里云物联网平台的感悟

源码解析系列a.『 准备篇 』《物联网时代-Thingsboard源码分析-调试环境调试》《物联网时代-Thingsboard源码分析-项目结构说明》b.『设备连接协议篇 』MQTT

协议 MQTT:

技术框架 Netty:

《MQTT入门篇》《物联网时代-ThingsBoard源码分析-MQTT设备连接协议-上》

MQTT概述

MQTT是一种轻量级的发布-订阅消息传递协议,它可能最适合各种物联网设备。你可以在此处找到有关MQTT的更多信息。

ThingsBoard服务器节点充当支持QoS级别0(最多一次)和QoS级别1(至少一次)以及一组预定义主题的MQTT主题。

ThingsBoard基于MQTT协议提供给设备的API是非常"灵活"的。

例如,目前提供了四种API:

遥测数据上传API属性APIRPC API声明设备所有权API

用上述四种的API,用户可以动态调整设备监控属性,例如原先采集设备的温度,湿度及材料大小调整为温度,湿度和材料是否合格;并可以获取设备历史遥测数据和最新遥测数据;通过RPC API, 设备和服务器可以实时获取对方相应属性的变化和通过API用户可以对设备进行转让,租赁和回收操作。

关联模块一览

和MQTT设备传输协议关联的模块有Thingsboard MQTT Transport Service、Thingsboard MQTT Transport Common和Thingsboard Server Queue components。前面这些名称大家可以看IDEA maven模块名称。

MQTT Transport Service

org.thingsboard.server.mqtt.ThingsboardMqttTransportApplication,MQTT服务启动类,使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行

@SpringBootConfiguration@EnableAsync@EnableScheduling@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"})public class ThingsboardMqttTransportApplication {    private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";    private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";    public static void main(String[] args) {        SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));    }    private static String[] updateArguments(String[] args) {        if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {            String[] modifiedArgs = new String[args.length + 1];            System.arraycopy(args, 0, modifiedArgs, 0, args.length);            modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;            return modifiedArgs;        }        return args;    }}

第2-3行代码中,@EnableAsync注解使用来开启异步线程,@EnableScheduling注解使用来开启定时任务。

第4行代码@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"}): 扫描这些包下的所有使用@Component 的类,不管自动导入还是导出。

第7-8行代码和updateArguments的作用是:启动时,使用 --spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。

MQTT Transport Common

Netty框架

Thingsboard的Mqtt协议逻辑实现是通过Netty实现的,Netty是一个NIO客户端、服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。

Netty 官网Netty GitHub仓库

引入依赖

MQTT Transport common通过引入Netty 4.x版本的jar包对Mqtt进行协议逻辑实现,Netty4.x和3.x的区别还是挺大的。

<dependency>	<groupId>io.netty</groupId>	<artifactId>netty-all</artifactId></dependency>

参数配置

transport:	 # 本地MQTT传输参数  mqtt:    # 开启/关闭mqtt传输协议.    enabled: "${MQTT_ENABLED:true}"    bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"    bind_port: "${MQTT_BIND_PORT:1883}"    timeout: "${MQTT_TIMEOUT:10000}"    netty:      leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"      boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"      worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"      max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"      so_keep_alive: "${NETTY_SO_KEEPALIVE:false}"    # MQTT SSL配置    ssl:      # 开启/关闭 SSL支持      enabled: "${MQTT_SSL_ENABLED:false}"      # SSL协议: 参阅      protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"      # Path to the key store that holds the SSL certificate      key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}"      # Password used to access the key store      key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}"      # Password used to access the key      key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"      # Type of the key store      key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"

模块目录结构

首先我们看该模块下的目录结构:

└── main    └── java        └── org            └── thingsboard                └── server                    └── transport                        └── mqtt                            ├── MqttSslHandlerProvider.java	Mqtt Ssl逻辑处理提供类                            ├── MqttTopics.java	Mqtt预定义主题                            ├── MqttTransportContext.java Mqtt传输协议上下文                            ├── MqttTransportHandler.java Mqttt传输协议逻辑处理类                            ├── MqttTransportServerInitializer.java Mqtt传输协议初始化类                            ├── MqttTransportService.java	Mqtt传输协议启动类                            ├── adaptors                            │   ├── JsonMqttAdaptor.java Mqtt传输内容Json适配器                            │   └── MqttTransportAdaptor.java Mqtt协议传输适配器                            ├── session                            │   ├── DeviceSessionCtx.java 设备会话上下文                            │   ├── GatewayDeviceSessionCtx.java 网关设备会话上下文                            │   ├── GatewaySessionHandler.java 网关会话处理类                            │   ├── MqttDeviceAwareSessionContext.java Mqtt设备会话上下文                            │   └── MqttTopicMatcher.java Mqtt主题匹配器                            └── util                                └── SslUtil.java Ssl工具类

Mqtt传输协议逻辑实现

MqttTransportService

@Service("MqttTransportService")@ConditionalOnExpression("'${transport.type:null}'=='null' || ('${transport.type}'=='local' && '${transport.mqtt.enabled}'=='true')")@Slf4jpublic class MqttTransportService {    @Value("${transport.mqtt.bind_address}")    private String host;    @Value("${transport.mqtt.bind_port}")    private Integer port;    @Value("${transport.mqtt.netty.leak_detector_level}")    private String leakDetectorLevel;    @Value("${transport.mqtt.netty.boss_group_thread_count}")    private Integer bossGroupThreadCount;    @Value("${transport.mqtt.netty.worker_group_thread_count}")    private Integer workerGroupThreadCount;    @Value("${transport.mqtt.netty.so_keep_alive}")    private boolean keepAlive;    @Autowired    private MqttTransportContext context;    private Channel serverChannel;    private EventLoopGroup bossGroup;    private EventLoopGroup workerGroup;    @PostConstruct    public void init() throws Exception {        log.info("Setting resource leak detector level to {}", leakDetectorLevel);        ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.valueOf(leakDetectorLevel.toUpperCase()));        log.info("Starting MQTT transport...");        bossGroup = new NioEventLoopGroup(bossGroupThreadCount);        workerGroup = new NioEventLoopGroup(workerGroupThreadCount);        ServerBootstrap b = new ServerBootstrap();        b.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .childHandler(new MqttTransportServerInitializer(context))                .childOption(ChannelOption.SO_KEEPALIVE, keepAlive);        serverChannel = b.bind(host, port).sync().channel();        log.info("Mqtt transport started!");    }    @PreDestroy    public void shutdown() throws InterruptedException {        log.info("Stopping MQTT transport!");        try {            serverChannel.close().sync();        } finally {            workerGroup.shutdownGracefully();            bossGroup.shutdownGracefully();        }        log.info("MQTT transport stopped!");    }}

第6行到18行, 通过@value来注入对应的值,直接在字段上添加@value获取application.yml文件中的值。

MQTT服务端参数:

其中第三个内存检测级别是源于: netty中大量使用了池化技术来减缓IO buffer的创建销毁开销。对于这些内存池管理的对象,从netty 4之后使用了引用计数来对它们进行管理。但是JVM GC和netty的内存回收机制是不同的,netty就提供了一个内存泄漏检查机制。

DISABLED: 不进行内存泄露的检测;SIMPLE: 抽样检测,且只对部分方法调用进行记录,消耗较小,有泄漏时可能会延迟报告,默认级别;ADVANCED: 抽样检测,记录对象最近几次的调用记录,在泄漏时可能会延迟报告;PARANOID: 每次创建一个对象时都进行泄露检测,且会记录对象最近的详细调用记录。是比较激进的内存泄露检测级别,消耗最大,建议只在测试时使用。

第四个和第五个参数,boss线程组 用于服务端接受客户端的连接,worker线程组 用于进行SocketChannel的数据读写。

第35行到第41行,通过创建ServerBootstrap对象,设置使用的EventLoopGroup;设置要被实例化的为NioServerSocketChannel类;设置连入服务端的Client的SocketChannel的处理器,绑定端口,并同步等待成功,即启动服务器。

第46行到56行,监听服务端关闭,并阻塞等待,并优雅关闭两个EventLoopGroup对象。

MqttTransportServerInitializer

public class MqttTransportServerInitializer extends ChannelInitializer<SocketChannel> {    private final MqttTransportContext context;    public MqttTransportServerInitializer(MqttTransportContext context) {        this.context = context;    }    @Override    public void initChannel(SocketChannel ch) {        ChannelPipeline pipeline = ch.pipeline();        if (context.getSslHandlerProvider() != null) {            SslHandler sslHandler = context.getSslHandlerProvider().getSslHandler();            pipeline.addLast(sslHandler);            context.setSslHandler(sslHandler);        }        pipeline.addLast("decoder", new MqttDecoder(context.getMaxPayloadSize()));        pipeline.addLast("encoder", MqttEncoder.INSTANCE);        MqttTransportHandler handler = new MqttTransportHandler(context);        pipeline.addLast(handler);        ch.closeFuture().addListener(handler);    }}

第11-16行设置ChannelPipeLine,判断SSL处理器处理类是否为空,如果不为空,将SSL处理器加入到ChannelPipeLine。

第17-23行,添加负载内容的解编码器,Mqtt协议逻辑处理器和异步操作完成时回调。

MqttTransportHandler

总结让大家了解MQTT是什么?MQTT是事实上的物联网准标准协议,且经过多年的发展,在IoT行业已然长成参天大树。基于NIO通讯的Netty框架是实现MQTT broker的不二良选。带大家初步了解怎么用Netty框架搭建一个MQTT服务,Netty的网络参数又是怎么样!

标签: #nettyudp判断启动成功