龙空技术网

聊聊artemis的reconnectAttempts

码匠乱炖 56

前言:

目前大家对“apacheartemis”大致比较珍视,你们都需要分析一些“apacheartemis”的相关知识。那么小编在网上收集了一些对于“apacheartemis””的相关资讯,希望看官们能喜欢,大家一起来了解一下吧!

本文主要研究一下artemis的reconnectAttempts



reconnectAttempts

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java

public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {​   //......​   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,                                                    int reconnectAttempts) throws Exception {      assertOpen();​      initialize();​      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);​      addToConnecting(factory);      try {         try {            factory.connect(reconnectAttempts);         } catch (ActiveMQException e1) {            //we need to make sure is closed just for garbage collection            factory.close();            throw e1;         }         addFactory(factory);         return factory;      } finally {         removeFromConnecting(factory);      }   }​   //......}
ServerLocatorImpl的createSessionFactory方法创建ClientSessionFactoryImpl,然后执行factory.connect(reconnectAttempts)ClientSessionFactoryImpl

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionFactoryImpl.java

public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {​   //......​   public void connect(final int initialConnectAttempts) throws ActiveMQException {      // Get the connection      getConnectionWithRetry(initialConnectAttempts, null);​      if (connection == null) {         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);         if (backupConfig != null) {            msg.append(" and backup configuration ").append(backupConfig);         }         throw new ActiveMQNotConnectedException(msg.toString());      }​   }​   private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {      if (!clientProtocolManager.isAlive())         return;      if (logger.isTraceEnabled()) {         logger.trace("getConnectionWithRetry::" + reconnectAttempts +                         " with retryInterval = " +                         retryInterval +                         " multiplier = " +                         retryIntervalMultiplier, new Exception("trace"));      }​      long interval = retryInterval;​      int count = 0;​      while (clientProtocolManager.isAlive()) {         if (logger.isDebugEnabled()) {            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);         }​         if (getConnection() != null) {            if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {               // transferring old connection version into the new connection               ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());            }            if (logger.isDebugEnabled()) {               logger.debug("Reconnection successful");            }            return;         } else {            // Failed to get connection​            if (reconnectAttempts != 0) {               count++;​               if (reconnectAttempts != -1 && count == reconnectAttempts) {                  if (reconnectAttempts != 1) {                     ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);                  }​                  return;               }​               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {                  ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);               }​               if (waitForRetry(interval))                  return;​               // Exponential back-off               long newInterval = (long) (interval * retryIntervalMultiplier);​               if (newInterval > maxRetryInterval) {                  newInterval = maxRetryInterval;               }​               interval = newInterval;            } else {               logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");               return;            }         }      }   }​   public boolean waitForRetry(long interval) {      try {         if (clientProtocolManager.waitOnLatch(interval)) {            return true;         }      } catch (InterruptedException ignore) {         throw new ActiveMQInterruptedException(createTrace);      }      return false;   }​   //......}   
ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待ActiveMQClientProtocolManager

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQClientProtocolManager.java

public class ActiveMQClientProtocolManager implements ClientProtocolManager {​   //......​   private final CountDownLatch waitLatch = new CountDownLatch(1);​   //......​   public boolean waitOnLatch(long milliseconds) throws InterruptedException {      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);   }​   public void stop() {      alive = false;​      synchronized (inCreateSessionGuard) {         if (inCreateSessionLatch != null)            inCreateSessionLatch.countDown();      }​      Channel channel1 = getChannel1();      if (channel1 != null) {         channel1.returnBlocking();      }​      waitLatch.countDown();​   }​   //......}
ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()小结

ClientSessionFactoryImpl的connect方法主要是执行getConnectionWithRetry;而getConnectionWithRetry方法以clientProtocolManager.isAlive()条件进行while循环执行getConnection(),如果为null且reconnectAttempts不为0则进行重试,递增count,当reconnectAttempts不为-1且reconnectAttempts等于count时跳出循环,重试的时候通过waitForRetry(interval)进行等待若返回true则提前return,否则更新interval进行下一轮循环;waitForRetry则通过clientProtocolManager.waitOnLatch(interval)进行等待

docClientSessionFactoryImpl

标签: #apacheartemis