龙空技术网

RocketMQ之NameServer启动流程

搬山道猿 68

前言:

现在你们对“apache24servername”可能比较关注,咱们都想要知道一些“apache24servername”的相关内容。那么小编也在网摘上收集了一些对于“apache24servername””的相关文章,希望我们能喜欢,兄弟们快快来学习一下吧!

NameServerController启动流程总览

启动类:org.apache.rocketmq.namesrv.NamesrvStartup#main

java复制代码public static void main(String[] args) {    main0(args);}
java复制代码public static NamesrvController main0(String[] args) {    try {        //创建NamesrvController        NamesrvController controller = createNamesrvController(args);        //初始化并启动NamesrvController        start(controller);        String tip = "The Name Server boot success. serializeType=" +                           RemotingCommand.getSerializeTypeConfigInThisServer();        log.info(tip);        System.out.printf("%s%n", tip);        return controller;    } catch (Throwable e) {        e.printStackTrace();        System.exit(-1);    }    return null;}
1.创建NamesrvController
java复制代码public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {    //设置MQ版本号    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));    //解析启动命令 start mqnamesrv.cmd    Options options = ServerUtil.buildCommandlineOptions(new Options());    commandLine = ServerUtil.parseCmdLine            ("mqnamesrv",             args,              buildCommandlineOptions(options),              new PosixParser());    if (null == commandLine) {        System.exit(-1);        return null;    }    //创建NamesrvConfig    final NamesrvConfig namesrvConfig = new NamesrvConfig();    //创建NettyServerConfig    final NettyServerConfig nettyServerConfig = new NettyServerConfig();    //设置启动端口号9876    nettyServerConfig.setListenPort(9876);    //解析启动-c参数    if (commandLine.hasOption('c')) {        //-c指定配置文件        String file = commandLine.getOptionValue('c');        if (file != null) {            //加载配置文件到流            InputStream in = new BufferedInputStream                                    (new FileInputStream(file));            //加载属性到InputStream            properties = new Properties();            properties.load(in);            //分别设置属性到namesrvConfig 和 nettyServerConfig              MixAll.properties2Object(properties, namesrvConfig);            MixAll.properties2Object(properties, nettyServerConfig);            //设置配置文件存储地址            namesrvConfig.setConfigStorePath(file);            System.out.printf("load config properties file OK, %s%n", file);            in.close();        }    }    //-p来指定是否打印配置项,在指定该选项时,直接退出。    if (commandLine.hasOption('p')) {        InternalLogger console                 = InternalLoggerFactory.getLogger                        (LoggerName.NAMESRV_CONSOLE_NAME);        //打印namesrvConfig属性        MixAll.printObjectProperties(console, namesrvConfig);        //打印nettyServerConfig 属性        MixAll.printObjectProperties(console, nettyServerConfig);        System.exit(0);    }    //将启动参数填充到namesrvConfig,nettyServerConfig    MixAll.properties2Object                (ServerUtil.commandLine2Properties(commandLine),                                                         namesrvConfig);    //创建NameServerController    final NamesrvController controller             = new NamesrvController(namesrvConfig, nettyServerConfig);    controller.getConfiguration().registerConfig(properties);    return controller;}
2.初始化NamesrvController3.启动NamesrvController
java复制代码 public static NamesrvController start(final NamesrvController controller)      															throws Exception {        if (null == controller) {            throw new IllegalArgumentException("NamesrvController is null");        }        boolean initResult = controller.initialize();        if (!initResult) {            controller.shutdown();            System.exit(-3);        }        //注册JVM钩子函数代码     	//在JVM进程关闭之前,先将线程池关闭,及时释放资源     	//可以借鉴的地方        Runtime.getRuntime()            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {            @Override            public Void call() throws Exception {                //释放资源                controller.shutdown();                return null;            }        }));        controller.start();        return controller;    } public void shutdown() {     	//关闭nettyServer        this.remotingServer.shutdown();       	//关闭线程池        this.remotingExecutor.shutdown();     	//关闭定时任务        this.scheduledExecutorService.shutdown();		//功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。     	//原理:注册一个listener,然后新开个线程,定期去扫描文件     	//通过对文件内容进行hash来判断文件内容是否发生变化     	//如果变化了,则回调监听器的onChange方法。     	//看源码主要是监听证书     	//关闭fileWatchService        if (this.fileWatchService != null) {            this.fileWatchService.shutdown();        }    }

大致流程如下图

上面对源码做了概览,大致知道了NameServerController启动的流程分为3步,创建-初始化-启动。

下面一步一步看吧。

1.创建nameServerController1.1:解析配置文件,创建NameSrvController

解析配置文件,填充NameServerConfig、NettyServerConfig属性值,并创建NamesrvController

注意NameServer创建的是是NettyServerConfig,Broker创建的是NettyClientConfig

NamesrvStartup#createNamesrvController

java复制代码public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {    //设置MQ版本号    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));    //解析启动命令 start mqnamesrv.cmd    Options options = ServerUtil.buildCommandlineOptions(new Options());    commandLine = ServerUtil.parseCmdLine            ("mqnamesrv",             args,              buildCommandlineOptions(options),              new PosixParser());    if (null == commandLine) {        System.exit(-1);        return null;    }    //创建NamesrvConfig    final NamesrvConfig namesrvConfig = new NamesrvConfig();    //创建NettyServerConfig    final NettyServerConfig nettyServerConfig = new NettyServerConfig();    //设置启动端口号9876    nettyServerConfig.setListenPort(9876);    //解析启动-c参数    if (commandLine.hasOption('c')) {        //-c指定配置文件        String file = commandLine.getOptionValue('c');        if (file != null) {            //加载配置文件到流            InputStream in = new BufferedInputStream                                    (new FileInputStream(file));            //加载属性到InputStream            properties = new Properties();            properties.load(in);            //分别设置属性到namesrvConfig 和 nettyServerConfig              MixAll.properties2Object(properties, namesrvConfig);            MixAll.properties2Object(properties, nettyServerConfig);            //设置配置文件存储地址            namesrvConfig.setConfigStorePath(file);            System.out.printf("load config properties file OK, %s%n", file);            in.close();        }    }    //-p来指定是否打印配置项,在指定该选项时,直接退出。    if (commandLine.hasOption('p')) {        InternalLogger console                 = InternalLoggerFactory.getLogger                        (LoggerName.NAMESRV_CONSOLE_NAME);        //打印namesrvConfig属性        MixAll.printObjectProperties(console, namesrvConfig);        //打印nettyServerConfig 属性        MixAll.printObjectProperties(console, nettyServerConfig);        System.exit(0);    }    //将启动参数填充到namesrvConfig,nettyServerConfig    MixAll.properties2Object                (ServerUtil.commandLine2Properties(commandLine),                                                         namesrvConfig);    //创建NameServerController    final NamesrvController controller             = new NamesrvController(namesrvConfig, nettyServerConfig);    controller.getConfiguration().registerConfig(properties);    return controller;}
2.初始化nameServerController2.1:初始化NamesrvController

根据启动属性创建NamesrvController实例,并初始化该实例。

NameServerController实例为NameServer核心控制器。

NamesrvController#initialize

java复制代码public boolean initialize() {    //加载KV配置    this.kvConfigManager.load();    /***        这里需要看NettyRemotingServer构造方法        会把netty的启动辅助类serverBootstrap创建好,这个是重点        保存了channelEventListener。        新建了netty的boss线程。        创建publicExecutor线程池。    ***/    this.remotingServer = new NettyRemotingServer                            (this.nettyServerConfig, this.brokerHousekeepingService);    //创建线程池 默认是8个    this.remotingExecutor =        Executors.newFixedThreadPool        (nettyServerConfig.getServerWorkerThreads(),                              new ThreadFactoryImpl("RemotingExecutorThread_"));    /***    创建DefaultRequestProcessor 作为netty server 请求处理器。    org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest     处理所有已知request code类型的请求    **/    this.registerProcessor();    //开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker    //如果在2分钟都没有发送心跳 移除不活跃的Broker    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            NamesrvController.this.routeInfoManager.scanNotActiveBroker();        }    }, 5, 10, TimeUnit.SECONDS);    //开启定时任务:每隔10min打印一次KV配置    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            NamesrvController.this.kvConfigManager.printAllPeriodically();        }    }, 1, 10, TimeUnit.MINUTES);    return true;}
2.2:启动定时任务:每10秒扫描一次所有broker

Broker30秒向NameServer发送一次心跳。

NamesrvController会开启定时任务:每隔10s扫描一次Broker,移除不活跃的Broker。

移除broker是根据broker的lastUpdateStamp+2分钟是否小于当前时间,如果小于就移除。

如果某个broker在2分钟内都没有发送心跳 那么就移除该broker 即连续4次没有发送心跳就移除

RouteInfoManager#scanNotActiveBroker

java复制代码//扫描不活跃的brokerpublic void scanNotActiveBroker() {//2分钟private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;    //HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;    Iterator<Entry<String, BrokerLiveInfo>> it                             = this.brokerLiveTable.entrySet().iterator();    while (it.hasNext()) {        Entry<String, BrokerLiveInfo> next = it.next();        //BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。        long last = next.getValue().getLastUpdateTimestamp();        //BrokerLiveInfo中的        //lastUpdateTimestamp+2分钟小于当前时间说明 已经2分钟没有心跳了        if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {            //关闭并移除channel            RemotingUtil.closeChannel(next.getValue().getChannel());            it.remove();            //销毁channel工作            this.onChannelDestroy(next.getKey(),                                     next.getValue().getChannel());        }    }}
移除2分钟没心跳的broker的路由元信息:RouteInfoManager#onChannelDestroy
java复制代码    //路由元信息    //类:RouteInfoManager    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

topicQueueTable: Topic消息队列路由信息,消息发送时根据路由表进行负载均衡

brokerAddrTable: Broker基础信息,包括brokerName、所属集群名称、主备Broker地址

clusterAddrTable: Broker集群信息,存储集群中所有Broker名称

brokerLiveTable: Broker状态信息,NameServer每次收到心跳包是会替换该信息

filterServerTable: Broker上的FilterServer列表,用于类模式消息过滤。

一个Topic拥有多个消息队列,一个Broker为每一个主题创建8个读队列和8个写队列。

多个Broker组成一个集群,集群由相同的多台Broker组成Master-Slave架构。

brokerId为0代表Master,大于0为Slave。

BrokerLiveInfo中的lastUpdateTimestamp存储上次收到Broker心跳包的时间。

ini复制代码//主要就是移除路由信息表相关信息public void onChannelDestroy(String remoteAddr, Channel channel) {    String brokerAddrFound = null;    if (channel != null) {        try {            try {                //申请写锁,根据brokerAddress                //从brokerLiveTable和filterServerTable移除                this.lock.readLock().lockInterruptibly();                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =                    this.brokerLiveTable.entrySet().iterator();                while (itBrokerLiveTable.hasNext()) {                    Entry<String, BrokerLiveInfo> entry                                        = itBrokerLiveTable.next();                    if (entry.getValue().getChannel() == channel) {                        brokerAddrFound = entry.getKey();                        break;                    }                }            } finally {                this.lock.readLock().unlock();            }        } catch (Exception e) {            log.error("onChannelDestroy Exception", e);        }    }    if (null == brokerAddrFound) {        brokerAddrFound = remoteAddr;    } else {        log.info("the broker's channel destroyed" +                             "clean it's data structure at once");    }    if (brokerAddrFound != null && brokerAddrFound.length() > 0) {        try {            try {                this.lock.writeLock().lockInterruptibly();                this.brokerLiveTable.remove(brokerAddrFound);                this.filterServerTable.remove(brokerAddrFound);                String brokerNameFound = null;                boolean removeBrokerName = false;                //维护<String/* brokerName */, BrokerData> brokerAddrTable                Iterator<Entry<String, BrokerData>> itBrokerAddrTable =                    this.brokerAddrTable.entrySet().iterator();                //遍历brokerAddrTable              while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {                    //获取brokerData                    BrokerData brokerData = itBrokerAddrTable.next().getValue();                    //遍历该broker的所有地址 即主从                    Iterator<Entry<Long, String>> it                            = brokerData.getBrokerAddrs().entrySet().iterator();                    //循环遍历主从                    while (it.hasNext()) {                        Entry<Long, String> entry = it.next();                        Long brokerId = entry.getKey();                        String brokerAddr = entry.getValue();                         //根据broker地址移除brokerAddr                        if (brokerAddr.equals(brokerAddrFound)) {                            brokerNameFound = brokerData.getBrokerName();                            it.remove();                            break;                        }                    }                    //如果移除以后没有其他的BrokerAddr 相当于这个broker已经没有实例了                    //那么把brokerData也从BrokerAddrTable 移除                   // <String/* brokerName */, BrokerData> brokerAddrTable                    if (brokerData.getBrokerAddrs().isEmpty()) {                        removeBrokerName = true;                        itBrokerAddrTable.remove();                    }                }                /***                   维护集群信息: key = clusterName value对应的set是 brokerName                    <String, Set<String>> clusterAddrTable                   这里移除的条件是 removeBrokerName=true                   removeBrokerName 是在移除brokerAddr时 当braokerData中的addrs为空                   即该broker的主从都不存在 这个broker已经没有实例了                   设置removeBrokerName=true               ***/                if (brokerNameFound != null && removeBrokerName) {                    Iterator<Entry<String, Set<String>>>                             it = this.clusterAddrTable.entrySet().iterator();                    //遍历clusterAddrTable                    while (it.hasNext()) {                        Entry<String, Set<String>> entry = it.next();                          //获得集群名称                        String clusterName = entry.getKey();                         //获得集群中brokerName集合                        Set<String> brokerNames = entry.getValue();                        //从brokerNames中移除brokerNameFound                        boolean removed = brokerNames.remove(brokerNameFound);                        if (removed) {                            if (brokerNames.isEmpty()) {                                //如果集群中不包含任何broker,则移除该集群                                it.remove();                            }                            break;                        }                    }                }                //<String/* topic */, List<QueueData>> topicQueueTable队列                //这里移除的条件是 removeBrokerName=true                //removeBrokerName 是在移除brokerAddr时 当brokerData中的addrs为空                //即该broker的主从都不存在,这个broker已经没有实例了                //设置removeBrokerName=true                if (removeBrokerName) {                    Iterator<Entry<String, List<QueueData>>> itTopicQueueTable =                        this.topicQueueTable.entrySet().iterator();                    //遍历topicQueueTable                    while (itTopicQueueTable.hasNext()) {                        Entry<String, List<QueueData>> entry                                            = itTopicQueueTable.next();                        //主题名称                        String topic = entry.getKey();                        //队列集合                        List<QueueData> queueDataList = entry.getValue();                        //遍历该主题队列                        Iterator<QueueData> itQueueData                                        = queueDataList.iterator();                        while (itQueueData.hasNext()) {                            //获取queueData                            QueueData queueData = itQueueData.next();                            //如果queueData中的brokerName等于本次移除的brokerName                            //那么从队列中移除该queue                            if (queueData.getBrokerName()                                        .equals(brokerNameFound)) {                                itQueueData.remove();                            }                        }                        //如果该topic的队列为空,则移除该topic                        if (queueDataList.isEmpty()) {                            itTopicQueueTable.remove();                        }                    }                }            } finally {                this.lock.writeLock().unlock();            }        } catch (Exception e) {            log.error("onChannelDestroy Exception", e);        }    }}
3.注册jvm钩子函数,启动NameServerController3.1注册jvm钩子函数,启动NameSrvCtr

在JVM进程关闭之前,先将线程池关闭,及时释放资源

NamesrvStartup#start

java复制代码 public static NamesrvController start(final NamesrvController controller)                                                                 throws Exception {        if (null == controller) {            throw new IllegalArgumentException("NamesrvController is null");        }        boolean initResult = controller.initialize();        if (!initResult) {            controller.shutdown();            System.exit(-3);        }        //注册JVM钩子函数代码        //在JVM进程关闭之前,先将线程池关闭,及时释放资源        //可以借鉴的地方        Runtime.getRuntime()            .addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {            @Override            public Void call() throws Exception {                //释放资源                controller.shutdown();                return null;            }        }));        controller.start();        return controller;    } public void shutdown() {        //关闭nettyServer        this.remotingServer.shutdown();        //关闭线程池        this.remotingExecutor.shutdown();        //关闭定时任务        this.scheduledExecutorService.shutdown();        //功能实现当文件内容发生变化时,重新加载文件,可用于读取配置类的文件。        //原理:注册一个listener,然后新开个线程,定期去扫描文件        //通过对文件内容进行hash来判断文件内容是否发生变化        //如果变化了,则回调监听器的onChange方法。        //看源码主要是监听证书        //关闭fileWatchService        if (this.fileWatchService != null) {            this.fileWatchService.shutdown();        }    }

标签: #apache24servername #java 启动命令的server #apache servername ip