龙空技术网

你所不知道的Artemis的启机流程(纯干货)

yongfxu 251

前言:

此时咱们对“hornetq发送机制”可能比较注意,我们都需要分析一些“hornetq发送机制”的相关文章。那么小编也在网络上搜集了一些关于“hornetq发送机制””的相关资讯,希望看官们能喜欢,小伙伴们一起来了解一下吧!

1. Artemis系统简述

Artemis是基于java语言实现并且100%开源的消息中间件,该系统采用netty的非堵塞IO架构,拥有出色的性能。性能方面,消息日志保证持久化消息可以像非持久化消息一样拥有出色的表现。灵活的集群可以通过不可靠的全球网络创建地理上分布的集群。同时, Artemis支持自动故障转移功能,在实时系统服务器故障时会话会自动重新连接到备份服务器。

不同于基于远程过程调用(RPC)模式的消息系统,Artemis是一个异步消息系统,请求和响应之间解耦,消息发送者和消息消费者分离,将整个异构应用系统松散的耦合在一起,同时可以提供可靠性、事务和其他许多功能。同时,异步系统可以最大限度的利用硬件资源,减少IO堵塞线程数量,很好的利用网络带宽资源。

总之,Artemis是一个基于java的开源的消息中间件,是集多消息协议、高性能、高可用、集群可扩展的异步消息系统。

系统主要结构如下:

图表 1 Artemis系统图

接入层:Netty作为artemis系统的接入层,主要用于接受各种连接请求。既支持某个协议单独一个端口,也支持所有协议同一个端口,Netty将解析连接请求的前8bit,与所有支持的协议进行比对,选择匹配的协议管理器,并添加该协议的编码器、解码器和协议处理器等,进行下一步处理。协议层:如上所述,artemis支持amqp、mqtt、stomp、hornetq。消息协议层主要实现对这些协议的支持。包括协议管理器、协议流程处理器、编码与解码器、连接管理器等。服务层:Artemis-server消息分发层是Artemis的核心,主要包括鉴权管理模块、协议管理模块、远程连接管理模块、持久化管理模块、配置管理模块等。持久层:Artemis支持两种持久化方案,一种是使用针对消息持久化高度优化并且拥有出色性能的日志系统Journal;另一种是JDBC存储,使用JDBC连接到数据库。JDBC存储还在开发中,当前使用日志功能2. 启机代码流程

Artemis系统是基于CLI命令行来完成动作的执行,其支持的命令包括create、run和stop等。本章主要介绍基于CLI命令的执行流程。

涉及到Artemis命令执行的组件图如下:

图表 2 执行命令的组件图

其中:

Artemis-Boot组件提供了所有执行命令的入口。Artemis-CLI组件封装了执行CLI命令的启动类,并实现了每个命令的具体执行流程。包括create、run、stop等。Artemis-Server组件封装了鉴权管理、协议管理、远程连接管理、持久化管理和配置管理等模块的实现。Artemis-Protocols组件封装了对各个协议栈的实现。Artemis-Web组件提供界面展示所需要的接口

下面主要对run命令的执行流程进行拆解。

Run命令执行流程

图表 3 Run命令执行流程1

启机步骤描述如下:

1. 执行./artemis run命令后,将启动Artemis-Boot模块中的Artemis类,执行main方法,该方法主要获取artemis安装路径和实例路径,并调用execute方法。

2.Artemis类的excetoer主要是负责获取artemis安装路径和实例路径下的库文件名称,并新建一个类加载器,加载所有库文件,最后加载Artemis-CLI模块中的Artemis类,并反射调用该类的execute方法,执行命令的具体流程。

3. Artemis-CLI模块Artemis类的execute方法将调用internalExecute方法执行命令的具体流程:

3.1 实例化CLI:(每个命令的具体使用方法)

实例化命令名称为artemis,构建包含HelpAction、Producer、Consumer、Browse、Mask等命令类的命令行工具构建器(默认命令为HelpAction)。在命令行工具构建器中构建组为queue所包含的命令,包括CreateQueue、DeleteQueue、UpdateQueue、StatQueue等命令(默认命令类为HelpQueue)在命令行工具构建器中构建组为address所包含的命令,包括CreateAddress、DeleteAddress、UpdateAddress、ShowAddres等命令(默认命令类为HelpAddress)在命令行工具构建器中构建组为data所包含的命令,默认命令类为HelpData在命令行工具构建器中构建组为user所包含的命令,默认命令类为HelpUser在命令行工具构建器中构建Run、Stop、Kill等命令。根据上述Builder构建CLI实例。

3.2 用上述构建的CLI实例,将启动时的命令转换成对应的Action对象,并设置artemis安装实例和创建实例路径(Artemis启动命令对应的Action是Run类)

4. Run命令的具体实现流程

4.1 根据management.xml配置文件中的配置创建对应的实例managementContext

4.2 打印artemis启动时的logo

4.3 根据bootstrap.xml配置文件创建对象实例

4.4 创建通用的鉴权实例对象,原生代码的鉴权对象是“ActiveMQJAASSecurityManager”,根据项目需要,新增加WSRTC鉴权“ActiveMQWSSecurityManager”,该版本兼容旧版本的鉴权

4.5 根据鉴权对象和broker.xml的路径对象,创建FileBroker服务对象

4.6 启动managementContext,事实上并不是通过该实例对象启动JMX功能

4.7 启动FileBroker,详细流程如下:

实例化FileConfiguration和JMSconfiguration对象。将broker.xml中的配置分别加载到FileConfiguration(加载根节点为“core”的配置)和JMSconfiguration(加载根节点为“jms”的配置)中,实际只配置了“core”。创建paging、journal、binding、largeMessage目录FileConfiguration通过buildService方法创建ActiveMQServerImpl服务启动ActiveMQServerImpl服务,ActiveMQServerImpl的启动流程见图4

4.8 创建并启动队列TTl管理服务,用于定时删除消息队列。(项目需求,之所以放在Artemis-CLI模块中实例化,是为了避免循环依赖,有待确认)

4.9 将bootstrap.xml配置中<web>元素的数据数据存储在broker根节点的components节点中,并启动所有components组件。(其中Web对应的服务对象是WebServerComponent)

图表 4 Run命令执行流程2

ActiveMQServerImp初始化时,只需要记录broker.xml配置信息的FileConfiguration、MBeanServer和鉴权管理服务SecurityManager。初始化流程如下:

如果配置中的connection-ttl-override或connection-ttl等于check-period,则打印222202告警信息初始化地址管理仓库addressSettingsRepository,该实例根据正则表达式的映射管理对象,并按一定顺序存储对象。初始化鉴权管理仓库securityRepository该实例根据正则表达式的映射管理对象,并按一定顺序存储对象。初始化服务注册器serviceRegistry。

基于上述初始化,ActiveMQServerImp的启动流程如下:

1. 解析配置参数和运行的环境变量

2. 初始化线程池,包括

通用线程池threadPool(线程名:"ActiveMQ-server-"),并将该线程池封装到OrderedExecutorFactory工厂类,该线程池主要用于服务端的大多数异步操作。异步IO线程池ioExecutorPool(线程名:"ActiveMQ-IO-server-"),并将该线程池封装到OrderedExecutorFactory工厂类,该线程池主要用于持久化文件。调度线程池scheduledPool(线程名"ActiveMQ-scheduled-threads"),并将该线程池封装到OrderedExecutorFactory工厂类,该线程池主要用于执行定时任务。

3. 初始化临界检验服务,用于周期性检测对象的存活情况

声明并启动检验服务对象CriticalAnalyzerImpl,其拥有独立的检测线程,定期执行检测任务。添加检测失败的回调criticalAction,检测失败有4种处理机制:HALT(暂停服务一段时间)、SHUTDOWN(立即停止服务)、LOG(只是打印log)。检测开始后, 一次检测该对象中存储的待检测组件(所有待检测组件均继承自CriticalComponent)。检测原理是:CriticalComponent每次有操作时均会更新原子日期(enter和leave),CriticalAnalyzerImpl根据leave-enter的值是否大于配置值来判断CriticalComponent是否阻塞或者死锁,一旦检测失败则执行回调criticalAction。

4. 初始化系统高可用的服务角色对象,默认采用LiveOnlyPolicy,即当前启动的就是活动服务机,并且没有备机,因此并没有故障转移功能。

5. 操作上下文服务清除所有上下文对象(OperationContextImpl)

6. 消息日志目录创建(配置路径,并且配置持久化),如果以创建则忽略。

7. 创建节点管理器(NodeManager)并启动,如果是非持久化,则是InVmNodeManager,默认采用文件格式也即FileNodeManager。

8. 激活并启动高可用角色服务(具体流程见下图与描述)

9. 创建并启动连接服务(ConnectorService),事实上该步骤没有实现什么操作。

高可用服务liveOnlyActivation的启机流程:

1. 首先调用ActiveMQServerImpl的initialisePart1方法,完成各个管理器的初始化和启动流程

1.1 设置持久化Journal的方式:如果配置是AsynNIO异步方式,但是系统不支持(比如没安装libao),则回退到NIO方式。

1.2 初始化管理服务器(ManagementService)

1.3 启动内存监管理器(MemoryManager),定时采集内存(总内存、可用内存、已使用内存等)并以debug的形式打印到日志,如果可用内存百分率低于配置值,则打印Error日志告警。(该功能生产环境并没有配置)

1.4 激活回调服务,实际上截止目前还没有待回调的服务。

1.5 创建存储管理服务器(StoreManager):

如果是非持久化,则对应NullStoreManager,基本没做什么操作。如果是JDBC的持久化方式,则对应JDBCStoreManager(尚未实现)默认采用基于文件的持久化方式,对应JournalStoreManager,并将该存储管理服务器添加到CriticalAnalyzer中,做实时检测。

1.6 创建认证存储服务器(SecurityStoreImpl),并将该对象注册到SecurityRepository中。该认证服务器封装了鉴权管理服务SecurityManager,实现解耦。

1.7 创建队列工厂服务器(QueueFactoryImpl),用于创建或删除队列。

1.8 创建分页管理服务器(PagingManagerImpl),用于消息的分页管理,并将该对象注册到addressSettingsRepository中。

1.9 创建资源管理服务器(ResourceManagerImpl),用于管理事务资源,定时清理超时的事务

1.10 创建消息路由服务器(PostOfficeImpl),主要用于消息的路由分发

1.11 创建集群管理服务器(ClusterManager),负责集群、广播和网桥的管理和维护等。

1.12 创建备份管理服务器(BackupManager),针对每个集群连接创建,采用备端口更新集群信息。

1.13 部署集群管理服务器deploy():

根据<broadcast-groups>的配置,创建BroadcastGroupImpl对象,并调用ManagementService的registerBroadcastGroup方法,完成注册JMX等操作。根据< cluster-connections>的配置,完成配置过程(略复杂,后续补充)。最后启动集群控制器(ClusterControllerr),该控制器的作用后续补充。

1.14 创建远程连接管理器(RemotingServiceImpl),用于管理客户端连接。

1.15 调用ManagementService的registerServer方法创建消息服务控制管理器(ActiveMQServerControlImp),并将该对象注入到JMX。该消息服务控制管理器用于获取服务器的状态,包括队列、address等信息,并提供查询接口供Console使用

1.16 根据<address-settings>的配置部署地址设置,将其加入addressSettingsRepository。注意:如果配置成非持久化(<persistence-enabled>false</persistence-enabled>),但是<address-settings>中配置了分页(<address-full-policy>PAGE</address-full-policy>),则服务器默认还是为对消息进行持久化分页处理。

1.17 存储管理器的启动流程

检查Bingding、Journal和Page文件夹是否创建,没有创建则先创建文件夹启动bingdingsJournal(JournalImp实例),根据ioExecutorFactory设置filesExecutor等,最后设置状态为started。启动messageJournal(JournalImp实例),根据ioExecutorFactory设置filesExecutor等,最后设置状态为started。最后设置存储管理器的状态为Started

1.18 消息路由组件启动流程

将消息路由组件以lisenter的形式添加到管理服务对象中,监听通知消息。处理的监听消息类型包括:BINDING_ADDED、BINDING_REMOVED、CONSUMER_CREATED、CONSUMER_CLOSED将消息路由组件对象注入到队列工厂服务器中,用于分页控制设置消息路由组件的状态为started

1.19 分页管理服务启动流程

首先获取写文件的锁往分页存储工厂对象(PagingStoreFactoryNIO)中添加该分页管理服务通过分页存储工厂对象的reloadStores方法重新加载分页消息。最后设置分页管理服务的状态为started。并释放写文件锁。

1.20 管理服务对象启动流程

是否配置消息计数管理,如果有,启动消息计数管理,统计每个队列的消息量。(生产环境并没有配置)设置管理服务对象的状态为true;往消息服务器messagingServer(ActiveMQServerImp)上注册一个回调,该回调用于往postoffice中添加一个addressinfo消息。用于确保创建address的通告能正常发送,否则如果auto-create-adress = false,有可能导致集群的网桥连接不了.

1.21 资源管理对象启动流程

开启定时检测事务消息线程,用于检测事务是否超时,如果超时,需要回滚(都有哪些事务??)设置资源管理对象的状态为true。

1.22 将配置文件中<security-settings>的配置部署到securityRepository中,设置角色对应的权限,比如是否有创建地址、删除地址、发送消息等权限

1.23 声明配置文件重载管理服务,并配置回调处理器,每隔一段时间检查配置文件是否有更新,如果有更新,则重新加载。

1.24 注册broker插件,实际上没有配置任何插件

2. 注册激活服务的回调:激活节点服务器的状态,并将回调添加到ActiveMQServerImpl中。

3. 执行ActiveMQServerImpl的initialisePart2方法:

3.1 分页管理服务加载文件,过程同分页管理服务启动流程时一样

3.2 加载journal日志信息

创建Journal加载器(PostOfficeJournalLoader)调用存储管理器(StorageManager)加载bingdingjournal消息PostOfficeJournalLoader初始化address、queues等,并完成address、queues和binding建立联系,将队列注册到JMX等过程

3.3 创建清洁者(ServerInfo),并向调度线程池中添加一个任务,每个一段时间执行ServerInfo的dump方法,打印内存使用情况,前提是配置“server.dump.interval”字段。(生产环境没有配置)

3.4 部署预先定义的address,包括DLQ、expiryqueue等,并绑定订阅的该地址的队列

3.5 部署预先定义的队列queue,实际上broker没有预先配置队列。

3.6 根据config-delete-queues的配置对不在配置文件中的address和queue进行处理。该config-delete-queues的作用是当服务器重新加载配置时如何处理不在配置文件中的address和queue,处理策略有:OFF,在配置重新加载时不会删除;FORCE,在配置重新加载时会删除队列,即使消息仍然存在。默认是OFF

3.7 调用激活回调函数,所有激活服务必须在集群完全启机前执行完(原因待明确)

3.8 检查地址配置中潜在的OOM风险,如果加载完配置的地址和队列,内存超过了“global-max-size”,则打印日志告警。

3.9 部署预先定义的投递器。实际上没有配置。

3.10 启动remotingService。

声明远程连接用的线程池,远程连接有自己的线程池,线程名为“ActiveMQ-remoting-threads-”。根据配置中acceptors的acceptor,依次创建相应消息协议对应的NettyAcceptor对象,但是还未启动,避免混乱。创建并启动连接失败检测和刷新线程,该线程会定时(根据“connection-ttl-check-interval”配置)去检测所有连接,如果该连接空闲时间超过配置时间(“connection-ttl-override”),则服务器主动断开连接设置remotingService的状态为started。

3.11 启动集群管理器clusterService

3.11.1 启动BroadcastGroupImp

调用UDPBroadcastEndpointFactory的openBroadcaster()方法打开广播器,实际上是声明一个DatagramSocket,用于发送广播消息。设置BroadcastGroupImp的状态是started。封装Notification信息,消息类型是BROADCAST_GROUP_STARTED,并通过notificationService(也就是ManagementServiceImpl)发送。具体的发送流程可以查看ManagementServiceImpl的sendNotification方法。激活集群心跳,往调度线程池中加入一个任务,每隔“broadcast-period”广播一个类似心跳的消息。消息内容包括:nodeId、消息唯一Id、集群连接数等。

3.11.2 启动集群连接ClusterConnectionImpl

设置集群连接对象ClusterConnectionImpl的状态为start。创建探测服务器(ServerLocatorImpl),执行集群连接流程。经过前期的各种封装,发送一个消息类型为消息类型是BROADCAST_CONNECTION_STARTED的notification消息。最后将ClusterConnectionImpl作为listener添加到集群拓扑中。

3.11.3 部署配置文件中的网桥信息,实际服务器上并没有该配置

3.11.4 启动StandaloneHAManager,设置状态为started,不过该类没做什么操作,应该是作者预留的类。

3.11.5 设置集群管理器的状态为started。

3.12 启动消息超时检测,向调度线程池中添加一个定时任务(默认是30s),检测流程查看postoffic的startExpiryScanner方法

3.13 向分页管理服务和存储管理服务注入磁盘监控器FileStoreMonitor,并启动FileStoreMonitor,启动流程也是向调度线程池中添加一个定时任务(定时disk-scan-period),如果存储空间大于配置数max-disk-usag,则打印日志告警(“AMQ222210: Storage usage is beyond max-disk-usage. System will start blocking producers.”)

4. 完成上述流程后,执行ActiveMQServerImpl的completeActivation()方法:

4.1 设置ActiveMQServerImpl的状态为started。

4.2 调用remotingService的startAcceptors()方法,启动netty服务端。

4.3 执行所有回调类的activationComplete方法,完成服务启动后的动作。

标签: #hornetq发送机制