前言:
现在你们对“apache24servername”可能比较关注,咱们都想要知道一些“apache24servername”的相关内容。那么小编也在网摘上收集了一些对于“apache24servername””的相关文章,希望我们能喜欢,兄弟们快快来学习一下吧!NameServerController启动流程总览
启动类:org.apache.rocketmq.namesrv.NamesrvStartup#main
java复制代码public static void main(String[] args) { main0(args);}
java复制代码public static NamesrvController main0(String[] args) { try { //创建NamesrvController NamesrvController controller = createNamesrvController(args); //初始化并启动NamesrvController start(controller); String tip = "The Name Server boot success. serializeType=" + RemotingCommand.getSerializeTypeConfigInThisServer(); log.info(tip); System.out.printf("%s%n", tip); return controller; } catch (Throwable e) { e.printStackTrace(); System.exit(-1); } return null;}1.创建NamesrvController
java复制代码public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { //设置MQ版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //解析启动命令 start mqnamesrv.cmd Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine ("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //创建NamesrvConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); //创建NettyServerConfig final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //设置启动端口号9876 nettyServerConfig.setListenPort(9876); //解析启动-c参数 if (commandLine.hasOption('c')) { //-c指定配置文件 String file = commandLine.getOptionValue('c'); if (file != null) { //加载配置文件到流 InputStream in = new BufferedInputStream (new FileInputStream(file)); //加载属性到InputStream properties = new Properties(); properties.load(in); //分别设置属性到namesrvConfig 和 nettyServerConfig MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); //设置配置文件存储地址 namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //-p来指定是否打印配置项,在指定该选项时,直接退出。 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger (LoggerName.NAMESRV_CONSOLE_NAME); //打印namesrvConfig属性 MixAll.printObjectProperties(console, namesrvConfig); //打印nettyServerConfig 属性 MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } //将启动参数填充到namesrvConfig,nettyServerConfig MixAll.properties2Object (ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //创建NameServerController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); controller.getConfiguration().registerConfig(properties); return controller;}2.初始化NamesrvController3.启动NamesrvController
java复制代码 public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //注册JVM钩子函数代码 //在JVM进程关闭之前,先将线程池关闭,及时释放资源 //可以借鉴的地方 Runtime.getRuntime() .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { //释放资源 controller.shutdown(); return null; } })); controller.start(); return controller; } public void shutdown() { //关闭nettyServer this.remotingServer.shutdown(); //关闭线程池 this.remotingExecutor.shutdown(); //关闭定时任务 this.scheduledExecutorService.shutdown(); //功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。 //原理:注册一个listener,然后新开个线程,定期去扫描文件 //通过对文件内容进行hash来判断文件内容是否发生变化 //如果变化了,则回调监听器的onChange方法。 //看源码主要是监听证书 //关闭fileWatchService if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } }
大致流程如下图
上面对源码做了概览,大致知道了NameServerController启动的流程分为3步,创建-初始化-启动。
下面一步一步看吧。
1.创建nameServerController1.1:解析配置文件,创建NameSrvController
解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController
注意NameServer创建的是是NettyServerConfig,Broker创建的是NettyClientConfig
NamesrvStartup#createNamesrvController
java复制代码public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException { //设置MQ版本号 System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //解析启动命令 start mqnamesrv.cmd Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine ("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } //创建NamesrvConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); //创建NettyServerConfig final NettyServerConfig nettyServerConfig = new NettyServerConfig(); //设置启动端口号9876 nettyServerConfig.setListenPort(9876); //解析启动-c参数 if (commandLine.hasOption('c')) { //-c指定配置文件 String file = commandLine.getOptionValue('c'); if (file != null) { //加载配置文件到流 InputStream in = new BufferedInputStream (new FileInputStream(file)); //加载属性到InputStream properties = new Properties(); properties.load(in); //分别设置属性到namesrvConfig 和 nettyServerConfig MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig); //设置配置文件存储地址 namesrvConfig.setConfigStorePath(file); System.out.printf("load config properties file OK, %s%n", file); in.close(); } } //-p来指定是否打印配置项,在指定该选项时,直接退出。 if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger (LoggerName.NAMESRV_CONSOLE_NAME); //打印namesrvConfig属性 MixAll.printObjectProperties(console, namesrvConfig); //打印nettyServerConfig 属性 MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } //将启动参数填充到namesrvConfig,nettyServerConfig MixAll.properties2Object (ServerUtil.commandLine2Properties(commandLine), namesrvConfig); //创建NameServerController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig); controller.getConfiguration().registerConfig(properties); return controller;}2.初始化nameServerController2.1:初始化NamesrvController
根据启动属性创建NamesrvController实例,并初始化该实例。
NameServerController实例为NameServer核心控制器。
NamesrvController#initialize
java复制代码public boolean initialize() { //加载KV配置 this.kvConfigManager.load(); /*** 这里需要看NettyRemotingServer构造方法 会把netty的启动辅助类serverBootstrap创建好,这个是重点 保存了channelEventListener。 新建了netty的boss线程。 创建publicExecutor线程池。 ***/ this.remotingServer = new NettyRemotingServer (this.nettyServerConfig, this.brokerHousekeepingService); //创建线程池 默认是8个 this.remotingExecutor = Executors.newFixedThreadPool (nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_")); /*** 创建DefaultRequestProcessor 作为netty server 请求处理器。 org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest 处理所有已知request code类型的请求 **/ this.registerProcessor(); //开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker //如果在2分钟都没有发送心跳 移除不活跃的Broker this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); //开启定时任务:每隔10min打印一次KV配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); return true;}2.2:启动定时任务:每10秒扫描一次所有broker
Broker30秒向NameServer发送一次心跳。
NamesrvController会开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker。
移除broker是根据broker的lastUpdateStamp+2分钟是否小于当前时间,如果小于就移除。
如果某个broker在2分钟内都没有发送心跳 那么就移除该broker 即连续4次没有发送心跳就移除
RouteInfoManager#scanNotActiveBroker
java复制代码//扫描不活跃的brokerpublic void scanNotActiveBroker() {//2分钟private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2; //HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> next = it.next(); //BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。 long last = next.getValue().getLastUpdateTimestamp(); //BrokerLiveInfo中的 //lastUpdateTimestamp+2分钟小于当前时间说明 已经2分钟没有心跳了 if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { //关闭并移除channel RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); //销毁channel工作 this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } }}移除2分钟没心跳的broker的路由元信息:RouteInfoManager#onChannelDestroy
java复制代码 //路由元信息 //类:RouteInfoManager private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
topicQueueTable: Topic消息队列路由信息,消息发送时根据路由表进行负载均衡
brokerAddrTable: Broker基础信息,包括brokerName、所属集群名称、主备Broker地址
clusterAddrTable: Broker集群信息,存储集群中所有Broker名称
brokerLiveTable: Broker状态信息,NameServer每次收到心跳包是会替换该信息
filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。
一个Topic拥有多个消息队列,一个Broker为每一个主题创建8个读队列和8个写队列。
多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构。
brokerId为0代表Master,大于0为Slave。
BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。
ini复制代码//主要就是移除路由信息表相关信息public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { try { //申请写锁,根据brokerAddress //从brokerLiveTable和filterServerTable移除 this.lock.readLock().lockInterruptibly(); Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable = this.brokerLiveTable.entrySet().iterator(); while (itBrokerLiveTable.hasNext()) { Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next(); if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed" + "clean it's data structure at once"); } if (brokerAddrFound != null && brokerAddrFound.length() > 0) { try { try { this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); String brokerNameFound = null; boolean removeBrokerName = false; //维护<String/* brokerName */, BrokerData> brokerAddrTable Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); //遍历brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { //获取brokerData BrokerData brokerData = itBrokerAddrTable.next().getValue(); //遍历该broker的所有地址 即主从 Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); //循环遍历主从 while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); //根据broker地址移除brokerAddr if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); break; } } //如果移除以后没有其他的BrokerAddr 相当于这个broker已经没有实例了 //那么把brokerData也从BrokerAddrTable 移除 // <String/* brokerName */, BrokerData> brokerAddrTable if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); } } /*** 维护集群信息: key = clusterName value对应的set是 brokerName <String, Set<String>> clusterAddrTable 这里移除的条件是 removeBrokerName=true removeBrokerName 是在移除brokerAddr时 当braokerData中的addrs为空 即该broker的主从都不存在 这个broker已经没有实例了 设置removeBrokerName=true ***/ if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); //遍历clusterAddrTable while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); //获得集群名称 String clusterName = entry.getKey(); //获得集群中brokerName集合 Set<String> brokerNames = entry.getValue(); //从brokerNames中移除brokerNameFound boolean removed = brokerNames.remove(brokerNameFound); if (removed) { if (brokerNames.isEmpty()) { //如果集群中不包含任何broker,则移除该集群 it.remove(); } break; } } } //<String/* topic */, List<QueueData>> topicQueueTable队列 //这里移除的条件是 removeBrokerName=true //removeBrokerName 是在移除brokerAddr时 当brokerData中的addrs为空 //即该broker的主从都不存在,这个broker已经没有实例了 //设置removeBrokerName=true if (removeBrokerName) { Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); //遍历topicQueueTable while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); //主题名称 String topic = entry.getKey(); //队列集合 List<QueueData> queueDataList = entry.getValue(); //遍历该主题队列 Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { //获取queueData QueueData queueData = itQueueData.next(); //如果queueData中的brokerName等于本次移除的brokerName //那么从队列中移除该queue if (queueData.getBrokerName() .equals(brokerNameFound)) { itQueueData.remove(); } } //如果该topic的队列为空,则移除该topic if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } }}3.注册jvm钩子函数,启动NameServerController3.1注册jvm钩子函数,启动NameSrvCtr
在JVM进程关闭之前,先将线程池关闭,及时释放资源
NamesrvStartup#start
java复制代码 public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } //注册JVM钩子函数代码 //在JVM进程关闭之前,先将线程池关闭,及时释放资源 //可以借鉴的地方 Runtime.getRuntime() .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { //释放资源 controller.shutdown(); return null; } })); controller.start(); return controller; } public void shutdown() { //关闭nettyServer this.remotingServer.shutdown(); //关闭线程池 this.remotingExecutor.shutdown(); //关闭定时任务 this.scheduledExecutorService.shutdown(); //功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。 //原理:注册一个listener,然后新开个线程,定期去扫描文件 //通过对文件内容进行hash来判断文件内容是否发生变化 //如果变化了,则回调监听器的onChange方法。 //看源码主要是监听证书 //关闭fileWatchService if (this.fileWatchService != null) { this.fileWatchService.shutdown(); } }
标签: #apache24servername #java 启动命令的server #apache servername ip