前言:
如今看官们对“centos7上rocketmq的启动”可能比较重视,咱们都想要学习一些“centos7上rocketmq的启动”的相关资讯。那么小编在网络上搜集了一些关于“centos7上rocketmq的启动””的相关资讯,希望看官们能喜欢,同学们快快来学习一下吧!一、前言
上一篇我们分析了 NamesrvController 中的核心组件,对NameServer有一个大致的了解,这一篇我们就根据NameServer启动流程将各个组件串起来进行分析;
二、启动方法
org.apache.rocketmq.namesrv.NamesrvStartup#start
这里主要是做了二件事:
调用NamesrvController的initialize方法进行初始化;调用NamesrvController的start方法进行启动;
public static NamesrvController start(final NamesrvController controller) throws Exception { if (null == controller) { throw new IllegalArgumentException("NamesrvController is null"); } // 先去进行初始化,主要是初始化几个定时任务 boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 加入一个jvm关闭钩子 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { controller.shutdown(); return null; } })); // 对NameServer进行一个启动 controller.start(); return controller;}三、NamesrvController初始化流程调用load方法加载磁盘数据至内存;初始化和创建一个基于netty的远程通信服务器;构建了一个网络通信线程池;向RemotingServer注册一个处理请求的默认组件:DefaultRequestProcessor;启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除;延迟1分钟,每隔10分钟定时打印一下所有的kv配置;如果说没有禁用ssl/tls加密通信模式,则构建一个文件监听服务线程,在调用NamesrvController启动方法时将线程启动;
org.apache.rocketmq.namesrv.NamesrvController#initialize
public boolean initialize() { // nameserver一旦启动,就会让KVConfigManager把磁盘里的数据加载到内存里来 // 你可以往里面写入kv配置数据,他会写入内存,同步写入到磁盘里去,只不过读写锁做一个控制 this.kvConfigManager.load(); // 初始化和创建一个基于netty的远程通信服务器 this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService); // 构建了一个网络通信线程池,是用的worker线程数量 this.remotingExecutor = Executors.newFixedThreadPool( nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_") ); // 注册各种请求处理的组件 this.registerProcessor(); // 启动一个定时调度任务,延迟5s,每隔10s,扫描不活跃broker并将其移除 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 延迟1分钟,每隔10分钟定时打印一下所有的kv配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES); // 如果说没有禁用ssl/tls加密通信模式 if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) { // Register a listener to reload SslContext try { // 会去搞一个文件监听服务 // 核心是要去监听ssl/tls加密通信对应的证书、密钥、信任证书路径里面的变化 // 如果说文件内容有变化,此时要回调的监听器都放在这里了 fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } } return true;}1、加载磁盘数据至内存
将KVConfigManager把磁盘里的数据加载到内存里来,KVConfigManager类中有一个不好的地方,文章结尾会进行说明;
org.apache.rocketmq.namesrv.kvconfig.KVConfigManager#load
public void load() { String content = null; try { // kv config path文件里存放的是json格式的一个大字符串,包含了所有的kv配置 content = MixAll.file2String(this.namesrvController.getNamesrvConfig().getKvConfigPath()); } catch (IOException e) { log.warn("Load KV config table exception", e); } if (content != null) { // 他会基于json序列化格式进行反序列化,从json格式转换为hashmap<string, hashmap<string, string>> KVConfigSerializeWrapper kvConfigSerializeWrapper = KVConfigSerializeWrapper.fromJson(content, KVConfigSerializeWrapper.class); if (null != kvConfigSerializeWrapper) { this.configTable.putAll(kvConfigSerializeWrapper.getConfigTable()); log.info("load KV config table OK"); } }}2、初始化和创建一个基于netty的远程通信服务器
NettyRemotingServer继承至NettyRemotingAbstract
// 代表了netty网络通信服务器,NettyRemotingClient(netty网络通信客户端)// server和client的父类都是NettyRemotingAbstract,netty网络通信组件父类public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); // netty握手handler private static final String HANDSHAKE_HANDLER_NAME = "handshakeHandler"; // netty ssl加密通信handler private static final String TLS_HANDLER_NAME = "sslHandler"; // 文件数据编码器 private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; // NettyServer,netty网络服务器 private final ServerBootstrap serverBootstrap; // 对于nettyserver来说,必须有两个线程池,第一个是监听连接,第二个是处理每个连接读写io请求的 // 必须是有两个event loop group private final EventLoopGroup eventLoopGroupSelector; private final EventLoopGroup eventLoopGroupBoss; // nettyserver核心配置 private final NettyServerConfig nettyServerConfig; // 公共使用的线程池 private final ExecutorService publicExecutor; // 网络连接异常事件监听器 private final ChannelEventListener channelEventListener; // 定时器组件 private final Timer timer = new Timer("ServerHouseKeepingService", true); // 属于netty里面的事件处理线程池组件 private DefaultEventExecutorGroup defaultEventExecutorGroup; // netty网络服务器监听的端口号 private int port = 0; // sharable handlers // 握手处理组件 private HandshakeHandler handshakeHandler; // 网络通信编码器 private NettyEncoder encoder; // 网络连接管理组件 private NettyConnectManageHandler connectionManageHandler; // Netty消息处理组件 private NettyServerHandler serverHandler; public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } public NettyRemotingServer(final NettyServerConfig nettyServerConfig, final ChannelEventListener channelEventListener) { // remoting server一个是接收rpc调用请求,可以通过他发起rpc请求,同步/异步/oneway // oneway semaphore是否是跟我们的netty server oneway调用请求是有关系的 // async rpc请求的semaphore对象这样子,信号量,猜想,有没有可能说是用来限制通过remoting server同时发起的 // oneway和async rpc的调用数量 super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue()); this.serverBootstrap = new ServerBootstrap(); // netty server this.nettyServerConfig = nettyServerConfig; this.channelEventListener = channelEventListener; // netty服务器配置里,callback处理线程数量,如果默认是0,会重置成默认的4个 // 最终会作为公共线程池里面的线程数量 int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads(); if (publicThreadNums <= 0) { publicThreadNums = 4; } // 会去构建一个公共线程池 this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet()); } }); // 如果说要开启epoll的话就走下面的代码逻辑,但是一般来说是不会去开启一个epoll if (useEpoll()) { this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet())); } }); this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } // 如果说默认不开启epoll,走下面的代码逻辑 else { // boos event loop group,boss的话一般是负责连接监听的,一般来说他就一个线程就够了 // 就一个线程用一个selector多路复用组件,监听ServerSocketChannel看是否有人发起连接请求就可以了 // 如果说有人发起连接就把物理的网络连接建立好,然后绑定一系列的handler pipeline this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet())); } }); // selector event loop group,这个里面是会设置对应的io线程数量,默认是3个 this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = nettyServerConfig.getServerSelectorThreads(); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } // 加载ssl加密通信上下文 loadSslContext(); }}
1.1、父类构造方法
public abstract class NettyRemotingAbstract { /** * Remoting logger instance. */ private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); /** * Semaphore to limit maximum number of on-going one-way requests, which protects system memory footprint. */ protected final Semaphore semaphoreOneway; /** * Semaphore to limit maximum number of on-going asynchronous requests, which protects system memory footprint. */ protected final Semaphore semaphoreAsync; /** * This map caches all on-going requests. * 通过这个remoting server发送出去的所有请求都会缓存在我的response table里,等待请求响应 */ protected final ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable = new ConcurrentHashMap<Integer, ResponseFuture>(256); /** * This container holds all processors per request code, aka, for each incoming request, we may look up the * responding processor in this map to handle the request. * 请求code->请求处理器:请求处理线程池 */ protected final HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable = new HashMap<Integer, Pair<NettyRequestProcessor, ExecutorService>>(64); /** * Executor to feed netty events to user defined {@link ChannelEventListener}. * 网络连接事件处理线程 */ protected final NettyEventExecutor nettyEventExecutor = new NettyEventExecutor(); /** * The default request processor to use in case there is no exact match in {@link #processorTable} per request code. * 默认请求处理器->8个线程 */ protected Pair<NettyRequestProcessor, ExecutorService> defaultRequestProcessor; /** * SSL context via which to create {@link SslHandler}. * 他是用于进行安全网络加密通信的,netty内嵌的ssl/tls安全加密通信组件,通过他可以生成ssl handler */ protected volatile SslContext sslContext; /** * custom rpc hooks * 针对我们的rpc调用可以设置一些回调钩子,比如说收到rpc调用,或者是发起rpc调用,可以去回调我们的钩子 */ protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>(); static { NettyLogger.initNettyLogger(); } /** * Constructor, specifying capacity of one-way and asynchronous semaphores. * * @param permitsOneway Number of permits for one-way requests. * @param permitsAsync Number of permits for asynchronous requests. */ public NettyRemotingAbstract(final int permitsOneway, final int permitsAsync) { this.semaphoreOneway = new Semaphore(permitsOneway, true); this.semaphoreAsync = new Semaphore(permitsAsync, true); }}
1.2、是否开启开启epoll
private boolean useEpoll() { return RemotingUtil.isLinuxPlatform() // 必须是linux操作系统 && nettyServerConfig.isUseEpollNativeSelector() // 默认是false && Epoll.isAvailable(); // epoll是可用的}
1.3、加载ssl加密通信上下文
public void loadSslContext() { // 先去获取默认的ssl/tls加密通信的模式 TlsMode tlsMode = TlsSystemConfig.tlsMode; log.info("Server is running in TLS {} mode", tlsMode.getName()); // 默认情况下,permissive,ssl/tls加密通信是可选的,可以搞加密通信也可以不搞 // 只要你别手动禁用ssl/tls加密通信就可以了 if (tlsMode != TlsMode.DISABLED) { try { sslContext = TlsHelper.buildSslContext(false); log.info("SSLContext created for server"); } catch (CertificateException e) { log.error("Failed to create SSLContext for server", e); } catch (IOException e) { log.error("Failed to create SSLContext for server", e); } }}
1.3.1、构建ssl加密通信上下文
public static SslContext buildSslContext(boolean forClient) throws IOException, CertificateException { // 如果说要构建SslContext,必须去制定的默认目录下面加载tls.properties配置文件 File configFile = new File(TlsSystemConfig.tlsConfigFile); // 从tls.properties里读取出来一大堆的tls配置 extractTlsConfigFromFile(configFile); // 打印出来最终使用的一大堆的tls配置项 logTheFinalUsedTlsConfig(); SslProvider provider; if (OpenSsl.isAvailable()) { provider = SslProvider.OPENSSL; LOGGER.info("Using OpenSSL provider"); } else { provider = SslProvider.JDK; LOGGER.info("Using JDK SSL provider"); } // 如果说是给netty client去开启ssl加密通信,就走下面的代码 if (forClient) { if (tlsTestModeEnable) { return SslContextBuilder .forClient() .sslProvider(SslProvider.JDK) .trustManager(InsecureTrustManagerFactory.INSTANCE) .build(); } else { SslContextBuilder sslContextBuilder = SslContextBuilder.forClient().sslProvider(SslProvider.JDK); if (!tlsClientAuthServer) { sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); } else { if (!isNullOrEmpty(tlsClientTrustCertPath)) { sslContextBuilder.trustManager(new File(tlsClientTrustCertPath)); } } return sslContextBuilder.keyManager( !isNullOrEmpty(tlsClientCertPath) ? new FileInputStream(tlsClientCertPath) : null, !isNullOrEmpty(tlsClientKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsClientKeyPath, true) : null, !isNullOrEmpty(tlsClientKeyPassword) ? tlsClientKeyPassword : null) .build(); } } // 如果是给netty服务端开启一个ssl加密认证通信,就走下面的代码 else { // 如果说启用了tls测试模式 if (tlsTestModeEnable) { // 自己签名的证书 SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(); return SslContextBuilder .forServer(selfSignedCertificate.certificate(), selfSignedCertificate.privateKey()) .sslProvider(provider) .clientAuth(ClientAuth.OPTIONAL) .build(); } else { SslContextBuilder sslContextBuilder = SslContextBuilder.forServer( !isNullOrEmpty(tlsServerCertPath) ? new FileInputStream(tlsServerCertPath) : null, !isNullOrEmpty(tlsServerKeyPath) ? decryptionStrategy.decryptPrivateKey(tlsServerKeyPath, false) : null, !isNullOrEmpty(tlsServerKeyPassword) ? tlsServerKeyPassword : null) .sslProvider(provider); if (!tlsServerAuthClient) { sslContextBuilder.trustManager(InsecureTrustManagerFactory.INSTANCE); } else { if (!isNullOrEmpty(tlsServerTrustCertPath)) { sslContextBuilder.trustManager(new File(tlsServerTrustCertPath)); } } sslContextBuilder.clientAuth(parseClientAuthMode(tlsServerNeedClientAuth)); return sslContextBuilder.build(); } }}3、构建了一个网络通信线程池
// worker线程数量private int serverWorkerThreads = 8;this.remotingExecutor = Executors.newFixedThreadPool( nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));4、注册各种请求处理的组件
这里就是注册了一个处理请求的DefaultRequestProcessor,以及他对应处理请求的线程池,线程池大小为8;后续分析NameServer如何处理请求的时候会进行对DefaultRequestProcessor剖析;
private void registerProcessor() { // 如果说配置里启用了cluster test模式,默认是false if (namesrvConfig.isClusterTest()) { this.remotingServer.registerDefaultProcessor( new ClusterTestRequestProcessor(this, namesrvConfig.getProductEnvName()), this.remotingExecutor ); } // 正常情况下走这里 else { // 找到NettyRemotingServer注册一个默认的请求处理组件 // 在处理别人给我发送过来的请求的时候,是用一个默认的请求处理组件就可以了,另外绑定的是remotingThreadPool,默认8个线程 this.remotingServer.registerDefaultProcessor( new DefaultRequestProcessor(this), this.remotingExecutor ); }}
public void registerDefaultProcessor(NettyRequestProcessor processor, ExecutorService executor) { this.defaultRequestProcessor = new Pair<NettyRequestProcessor, ExecutorService>(processor, executor);}三、总结
本文从NamesrvStartup的start方法作为入口,分析了NamesrvController是如何进行初始化的,后文则会对NamesrvController的start方法进行启动;
其实NameServer的核心类就是NamesrvController,里面包含了初始化方法和启动方法,我们分析完初始化方法其实就已经分析完三分之一了,剩下的就是如何启动这些组件,如果进行请求的发送和响应的解析的;
标签: #centos7上rocketmq的启动