龙空技术网

RocketMQ源码分析之NameServer启动流程(一)

生活图志 173

前言:

如今看官们对“centos7上rocketmq的启动”可能比较重视,咱们都想要学习一些“centos7上rocketmq的启动”的相关资讯。那么小编在网络上搜集了一些关于“centos7上rocketmq的启动””的相关资讯,希望看官们能喜欢,同学们快快来学习一下吧!

一、前言

上一篇我们分析了 NamesrvController 中的核心组件,对NameServer有一个大致的了解,这一篇我们就根据NameServer启动流程将各个组件串起来进行分析;

二、启动方法

org.apache.rocketmq.namesrv.NamesrvStartup#start

这里主要是做了二件事:

调用NamesrvController的initialize方法进行初始化;调用NamesrvController的start方法进行启动;

public static NamesrvController start(final NamesrvController controller) throws Exception {    if (null == controller) {        throw new IllegalArgumentException("NamesrvController is null");    }    // 先去进行初始化,主要是初始化几个定时任务    boolean initResult = controller.initialize();    if (!initResult) {        controller.shutdown();        System.exit(-3);    }    // 加入一个jvm关闭钩子    Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {        @Override        public Void call() throws Exception {            controller.shutdown();            return null;        }    }));    // 对NameServer进行一个启动    controller.start();    return controller;}
三、NamesrvController初始化流程调用load方法加载磁盘数据至内存;初始化和创建一个基于netty的远程通信服务器;构建了一个网络通信线程池;向RemotingServer注册一个处理请求的默认组件:DefaultRequestProcessor;启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除;延迟1分钟,每隔10分钟定时打印一下所有的kv配置;如果说没有禁用ssl/tls加密通信模式,则构建一个文件监听服务线程,在调用NamesrvController启动方法时将线程启动;

org.apache.rocketmq.namesrv.NamesrvController#initialize

public boolean initialize() {    // nameserver一旦启动,就会让KVConfigManager把磁盘里的数据加载到内存里来    // 你可以往里面写入kv配置数据,他会写入内存,同步写入到磁盘里去,只不过读写锁做一个控制    this.kvConfigManager.load();    // 初始化和创建一个基于netty的远程通信服务器    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);    // 构建了一个网络通信线程池,是用的worker线程数量    this.remotingExecutor = Executors.newFixedThreadPool(            nettyServerConfig.getServerWorkerThreads(),            new ThreadFactoryImpl("RemotingExecutorThread_")    );    // 注册各种请求处理的组件    this.registerProcessor();    // 启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            NamesrvController.this.routeInfoManager.scanNotActiveBroker();        }    }, 5, 10, TimeUnit.SECONDS);    // 延迟1分钟,每隔10分钟定时打印一下所有的kv配置    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {        @Override        public void run() {            NamesrvController.this.kvConfigManager.printAllPeriodically();        }    }, 1, 10, TimeUnit.MINUTES);    // 如果说没有禁用ssl/tls加密通信模式    if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {        // Register a listener to reload SslContext        try {            // 会去搞一个文件监听服务            // 核心是要去监听ssl/tls加密通信对应的证书、密钥、信任证书路径里面的变化            // 如果说文件内容有变化,此时要回调的监听器都放在这里了            fileWatchService = new FileWatchService(                new String[] {                    TlsSystemConfig.tlsServerCertPath,                    TlsSystemConfig.tlsServerKeyPath,                    TlsSystemConfig.tlsServerTrustCertPath                },                new FileWatchService.Listener() {                    boolean certChanged, keyChanged = false;                    @Override                    public void onChanged(String path) {                        if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {                            log.info("The trust certificate changed, reload the ssl context");                            reloadServerSslContext();                        }                        if (path.equals(TlsSystemConfig.tlsServerCertPath)) {                            certChanged = true;                        }                        if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {                            keyChanged = true;                        }                        if (certChanged && keyChanged) {                            log.info("The certificate and private key changed, reload the ssl context");                            certChanged = keyChanged = false;                            reloadServerSslContext();                        }                    }                    private void reloadServerSslContext() {                        ((NettyRemotingServer) remotingServer).loadSslContext();                    }                });        } catch (Exception e) {            log.warn("FileWatchService created error, can't load the certificate dynamically");        }    }    return true;}
1、加载磁盘数据至内存

将KVConfigManager把磁盘里的数据加载到内存里来,KVConfigManager类中有一个不好的地方,文章结尾会进行说明;

org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load

public void load() {    String content = null;    try {        // kv config path文件里存放的是json格式的一个大字符串,包含了所有的kv配置        content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath());    } catch (IOException e) {        log.warn("Load KV config table exception", e);    }    if (content != null) {        // 他会基于json序列化格式进行反序列化,从json格式转换为hashmap<string, hashmap<string, string>>        KVConfigSerializeWrapper kvConfigSerializeWrapper =            KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class);        if (null != kvConfigSerializeWrapper) {            this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable());            log.info("load KV config table OK");        }    }}
2、初始化和创建一个基于netty的远程通信服务器

NettyRemotingServer继承至NettyRemotingAbstract

// 代表了netty网络通信服务器,NettyRemotingClient(netty网络通信客户端)// server和client的父类都是NettyRemotingAbstract,netty网络通信组件父类public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);    // netty握手handler    private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler";    // netty ssl加密通信handler    private static final String TLS_HANDLER_NAME = "sslHandler";    // 文件数据编码器    private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";    // NettyServer,netty网络服务器    private final ServerBootstrap serverBootstrap;    // 对于nettyserver来说,必须有两个线程池,第一个是监听连接,第二个是处理每个连接读写io请求的    // 必须是有两个event loop group    private final EventLoopGroup eventLoopGroupSelector;    private final EventLoopGroup eventLoopGroupBoss;    // nettyserver核心配置    private final NettyServerConfig nettyServerConfig;    // 公共使用的线程池    private final ExecutorService publicExecutor;    // 网络连接异常事件监听器    private final ChannelEventListener channelEventListener;    // 定时器组件    private final Timer timer = new Timer("ServerHouseKeepingService", true);    // 属于netty里面的事件处理线程池组件    private DefaultEventExecutorGroup defaultEventExecutorGroup;    // netty网络服务器监听的端口号    private int port = 0;    // sharable handlers    // 握手处理组件    private HandshakeHandler handshakeHandler;    // 网络通信编码器    private NettyEncoder encoder;    // 网络连接管理组件    private NettyConnectManageHandler connectionManageHandler;    // Netty消息处理组件    private NettyServerHandler serverHandler;    public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {        this(nettyServerConfig, null);    }    public NettyRemotingServer(final NettyServerConfig nettyServerConfig,        final ChannelEventListener channelEventListener) {        // remoting server一个是接收rpc调用请求,可以通过他发起rpc请求,同步/异步/oneway        // oneway semaphore是否是跟我们的netty server oneway调用请求是有关系的        // async rpc请求的semaphore对象这样子,信号量,猜想,有没有可能说是用来限制通过remoting server同时发起的        // oneway和async rpc的调用数量        super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());        this.serverBootstrap = new ServerBootstrap(); // netty server        this.nettyServerConfig = nettyServerConfig;        this.channelEventListener = channelEventListener;        // netty服务器配置里,callback处理线程数量,如果默认是0,会重置成默认的4个        // 最终会作为公共线程池里面的线程数量        int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();        if (publicThreadNums <= 0) {            publicThreadNums = 4;        }        // 会去构建一个公共线程池        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {            private AtomicInteger threadIndex = new AtomicInteger(0);            @Override            public Thread newThread(Runnable r) {                return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());            }        });        // 如果说要开启epoll的话就走下面的代码逻辑,但是一般来说是不会去开启一个epoll        if (useEpoll()) {            this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));                }            });            this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                private int threadTotal = nettyServerConfig.getServerSelectorThreads();                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));                }            });        }        // 如果说默认不开启epoll,走下面的代码逻辑        else {            // boos event loop group,boss的话一般是负责连接监听的,一般来说他就一个线程就够了            // 就一个线程用一个selector多路复用组件,监听ServerSocketChannel看是否有人发起连接请求就可以了            // 如果说有人发起连接就把物理的网络连接建立好,然后绑定一系列的handler pipeline            this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));                }            });            // selector event loop group,这个里面是会设置对应的io线程数量,默认是3个            this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {                private AtomicInteger threadIndex = new AtomicInteger(0);                private int threadTotal = nettyServerConfig.getServerSelectorThreads();                @Override                public Thread newThread(Runnable r) {                    return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));                }            });        }        // 加载ssl加密通信上下文        loadSslContext();    }}

1.1、父类构造方法

public abstract class NettyRemotingAbstract {    /**     * Remoting logger instance.     */    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);    /**     * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint.     */    protected final Semaphore semaphoreOneway;    /**     * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint.     */    protected final Semaphore semaphoreAsync;    /**     * This map caches all on-going requests.     * 通过这个remoting server发送出去的所有请求都会缓存在我的response table里,等待请求响应     */    protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable =        new ConcurrentHashMap<Integer, ResponseFuture>(256);    /**     * This container holds all processors per request code, aka, for each incoming request, we may look up the     * responding processor in this map to handle the request.     * 请求code->请求处理器:请求处理线程池     */    protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable =        new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64);    /**     * Executor to feed netty events to user defined {@link ChannelEventListener}.     * 网络连接事件处理线程     */    protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor();    /**     * The default request processor to use in case there is no exact match in {@link #processorTable} per request code.     * 默认请求处理器->8个线程     */    protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor;    /**     * SSL context via which to create {@link SslHandler}.     * 他是用于进行安全网络加密通信的,netty内嵌的ssl/tls安全加密通信组件,通过他可以生成ssl handler     */    protected volatile SslContext sslContext;    /**     * custom rpc hooks     * 针对我们的rpc调用可以设置一些回调钩子,比如说收到rpc调用,或者是发起rpc调用,可以去回调我们的钩子     */    protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();    static {        NettyLogger.initNettyLogger();    }    /**     * Constructor, specifying capacity of one-way and asynchronous semaphores.     *     * @param permitsOneway Number of permits for one-way requests.     * @param permitsAsync Number of permits for asynchronous requests.     */    public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) {        this.semaphoreOneway = new Semaphore(permitsOneway, true);        this.semaphoreAsync = new Semaphore(permitsAsync, true);    }}

1.2、是否开启开启epoll

private boolean useEpoll() {    return RemotingUtil.isLinuxPlatform() // 必须是linux操作系统        && nettyServerConfig.isUseEpollNativeSelector() // 默认是false        && Epoll.isAvailable(); // epoll是可用的}

1.3、加载ssl加密通信上下文

public void loadSslContext() {    // 先去获取默认的ssl/tls加密通信的模式    TlsMode tlsMode = TlsSystemConfig.tlsMode;    log.info("Server is running in TLS {} mode", tlsMode.getName());    // 默认情况下,permissive,ssl/tls加密通信是可选的,可以搞加密通信也可以不搞    // 只要你别手动禁用ssl/tls加密通信就可以了    if (tlsMode != TlsMode.DISABLED) {        try {            sslContext = TlsHelper.buildSslContext(false);            log.info("SSLContext created for server");        } catch (CertificateException e) {            log.error("Failed to create SSLContext for server", e);        } catch (IOException e) {            log.error("Failed to create SSLContext for server", e);        }    }}

1.3.1、构建ssl加密通信上下文

public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException {    // 如果说要构建SslContext,必须去制定的默认目录下面加载tls.properties配置文件    File configFile = new File(TlsSystemConfig.tlsConfigFile);    // 从tls.properties里读取出来一大堆的tls配置    extractTlsConfigFromFile(configFile);    // 打印出来最终使用的一大堆的tls配置项    logTheFinalUsedTlsConfig();    SslProvider provider;    if (OpenSsl.isAvailable()) {        provider = SslProvider.OPENSSL;        LOGGER.info("Using OpenSSL provider");    } else {        provider = SslProvider.JDK;        LOGGER.info("Using JDK SSL provider");    }    // 如果说是给netty client去开启ssl加密通信,就走下面的代码    if (forClient) {        if (tlsTestModeEnable) {            return SslContextBuilder                .forClient()                .sslProvider(SslProvider.JDK)                .trustManager(InsecureTrustManagerFactory.INSTANCE)                .build();        } else {            SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK);            if (!tlsClientAuthServer) {                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);            } else {                if (!isNullOrEmpty(tlsClientTrustCertPath)) {                    sslContextBuilder.trustManager(new File(tlsClientTrustCertPath));                }            }            return sslContextBuilder.keyManager(                !isNullOrEmpty(tlsClientCertPath) ? new FileInputStream(tlsClientCertPath) : null,                !isNullOrEmpty(tlsClientKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsClientKeyPath, true) : null,                !isNullOrEmpty(tlsClientKeyPassword) ? tlsClientKeyPassword : null)                .build();        }    }    // 如果是给netty服务端开启一个ssl加密认证通信,就走下面的代码    else {        // 如果说启用了tls测试模式        if (tlsTestModeEnable) {            // 自己签名的证书            SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();            return SslContextBuilder                .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey())                .sslProvider(provider)                .clientAuth(ClientAuth.OPTIONAL)                .build();        } else {            SslContextBuilder sslContextBuilder = SslContextBuilder.forServer(                !isNullOrEmpty(tlsServerCertPath) ? new FileInputStream(tlsServerCertPath) : null,                !isNullOrEmpty(tlsServerKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsServerKeyPath, false) : null,                !isNullOrEmpty(tlsServerKeyPassword) ? tlsServerKeyPassword : null)                .sslProvider(provider);            if (!tlsServerAuthClient) {                sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE);            } else {                if (!isNullOrEmpty(tlsServerTrustCertPath)) {                    sslContextBuilder.trustManager(new File(tlsServerTrustCertPath));                }            }            sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth));            return sslContextBuilder.build();        }    }}
3、构建了一个网络通信线程池
// worker线程数量private int serverWorkerThreads = 8;this.remotingExecutor = Executors.newFixedThreadPool(        nettyServerConfig.getServerWorkerThreads(),        new ThreadFactoryImpl("RemotingExecutorThread_"));
4、注册各种请求处理的组件

这里就是注册了一个处理请求的DefaultRequestProcessor,以及他对应处理请求的线程池,线程池大小为8;后续分析NameServer如何处理请求的时候会进行对DefaultRequestProcessor剖析;

private void registerProcessor() {    // 如果说配置里启用了cluster test模式,默认是false    if (namesrvConfig.isClusterTest()) {        this.remotingServer.registerDefaultProcessor(                new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()),                this.remotingExecutor        );    }    // 正常情况下走这里    else {        // 找到NettyRemotingServer注册一个默认的请求处理组件        // 在处理别人给我发送过来的请求的时候,是用一个默认的请求处理组件就可以了,另外绑定的是remotingThreadPool,默认8个线程        this.remotingServer.registerDefaultProcessor(                new DefaultRequestProcessor(this),                this.remotingExecutor        );    }}
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) {    this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);}
三、总结

本文从NamesrvStartup的start方法作为入口,分析了NamesrvController是如何进行初始化的,后文则会对NamesrvController的start方法进行启动;

其实NameServer的核心类就是NamesrvController,里面包含了初始化方法和启动方法,我们分析完初始化方法其实就已经分析完三分之一了,剩下的就是如何启动这些组件,如果进行请求的发送和响应的解析的;

标签: #centos7上rocketmq的启动