前言:
目前大家对“apacheartemis”大致比较珍视,你们都需要分析一些“apacheartemis”的相关知识。那么小编在网上收集了一些对于“apacheartemis””的相关资讯,希望看官们能喜欢,大家一起来了解一下吧!序
本文主要研究一下artemis的reconnectAttempts
reconnectAttempts
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java
public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener { //...... public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration, int reconnectAttempts) throws Exception { assertOpen(); initialize(); ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors); addToConnecting(factory); try { try { factory.connect(reconnectAttempts); } catch (ActiveMQException e1) { //we need to make sure is closed just for garbage collection factory.close(); throw e1; } addFactory(factory); return factory; } finally { removeFromConnecting(factory); } } //......}ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)ClientSessionFactoryImpl
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java
public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener { //...... public void connect(final int initialConnectAttempts) throws ActiveMQException { // Get the connection getConnectionWithRetry(initialConnectAttempts, null); if (connection == null) { StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig); if (backupConfig != null) { msg.append(" and backup configuration ").append(backupConfig); } throw new ActiveMQNotConnectedException(msg.toString()); } } private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) { if (!clientProtocolManager.isAlive()) return; if (logger.isTraceEnabled()) { logger.trace("getConnectionWithRetry::" + reconnectAttempts + " with retryInterval = " + retryInterval + " multiplier = " + retryIntervalMultiplier, new Exception("trace")); } long interval = retryInterval; int count = 0; while (clientProtocolManager.isAlive()) { if (logger.isDebugEnabled()) { logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts); } if (getConnection() != null) { if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) { // transferring old connection version into the new connection ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion()); } if (logger.isDebugEnabled()) { logger.debug("Reconnection successful"); } return; } else { // Failed to get connection if (reconnectAttempts != 0) { count++; if (reconnectAttempts != -1 && count == reconnectAttempts) { if (reconnectAttempts != 1) { ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts); } return; } if (ClientSessionFactoryImpl.logger.isTraceEnabled()) { ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier); } if (waitForRetry(interval)) return; // Exponential back-off long newInterval = (long) (interval * retryIntervalMultiplier); if (newInterval > maxRetryInterval) { newInterval = maxRetryInterval; } interval = newInterval; } else { logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory"); return; } } } } public boolean waitForRetry(long interval) { try { if (clientProtocolManager.waitOnLatch(interval)) { return true; } } catch (InterruptedException ignore) { throw new ActiveMQInterruptedException(createTrace); } return false; } //......}ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待ActiveMQClientProtocolManager
activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java
public class ActiveMQClientProtocolManager implements ClientProtocolManager { //...... private final CountDownLatch waitLatch = new CountDownLatch(1); //...... public boolean waitOnLatch(long milliseconds) throws InterruptedException { return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS); } public void stop() { alive = false; synchronized (inCreateSessionGuard) { if (inCreateSessionLatch != null) inCreateSessionLatch.countDown(); } Channel channel1 = getChannel1(); if (channel1 != null) { channel1.returnBlocking(); } waitLatch.countDown(); } //......}ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()小结
ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待
docClientSessionFactoryImpl
标签: #apacheartemis