

码匠乱炖 42






public final class ServerLocatorImpl implements ServerLocatorInternal, DiscoveryListener {​   //......​   public ClientSessionFactory createSessionFactory(final TransportConfiguration transportConfiguration,                                                    int reconnectAttempts) throws Exception {      assertOpen();​      initialize();​      ClientSessionFactoryInternal factory = new ClientSessionFactoryImpl(this, transportConfiguration, callTimeout, callFailoverTimeout, clientFailureCheckPeriod, connectionTTL, retryInterval, retryIntervalMultiplier, maxRetryInterval, reconnectAttempts, threadPool, scheduledThreadPool, incomingInterceptors, outgoingInterceptors);​      addToConnecting(factory);      try {         try {            factory.connect(reconnectAttempts);         } catch (ActiveMQException e1) {            //we need to make sure is closed just for garbage collection            factory.close();            throw e1;         }         addFactory(factory);         return factory;      } finally {         removeFromConnecting(factory);      }   }​   //......}


public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, ClientConnectionLifeCycleListener {​   //......​   public void connect(final int initialConnectAttempts) throws ActiveMQException {      // Get the connection      getConnectionWithRetry(initialConnectAttempts, null);​      if (connection == null) {         StringBuilder msg = new StringBuilder("Unable to connect to server using configuration ").append(currentConnectorConfig);         if (backupConfig != null) {            msg.append(" and backup configuration ").append(backupConfig);         }         throw new ActiveMQNotConnectedException(msg.toString());      }​   }​   private void getConnectionWithRetry(final int reconnectAttempts, RemotingConnection oldConnection) {      if (!clientProtocolManager.isAlive())         return;      if (logger.isTraceEnabled()) {         logger.trace("getConnectionWithRetry::" + reconnectAttempts +                         " with retryInterval = " +                         retryInterval +                         " multiplier = " +                         retryIntervalMultiplier, new Exception("trace"));      }​      long interval = retryInterval;​      int count = 0;​      while (clientProtocolManager.isAlive()) {         if (logger.isDebugEnabled()) {            logger.debug("Trying reconnection attempt " + count + "/" + reconnectAttempts);         }​         if (getConnection() != null) {            if (oldConnection != null && oldConnection instanceof CoreRemotingConnection) {               // transferring old connection version into the new connection               ((CoreRemotingConnection)connection).setChannelVersion(((CoreRemotingConnection)oldConnection).getChannelVersion());            }            if (logger.isDebugEnabled()) {               logger.debug("Reconnection successful");            }            return;         } else {            // Failed to get connection​            if (reconnectAttempts != 0) {               count++;​               if (reconnectAttempts != -1 && count == reconnectAttempts) {                  if (reconnectAttempts != 1) {                     ActiveMQClientLogger.LOGGER.failedToConnectToServer(reconnectAttempts);                  }​                  return;               }​               if (ClientSessionFactoryImpl.logger.isTraceEnabled()) {                  ClientSessionFactoryImpl.logger.trace("Waiting " + interval + " milliseconds before next retry. RetryInterval=" + retryInterval + " and multiplier=" + retryIntervalMultiplier);               }​               if (waitForRetry(interval))                  return;​               // Exponential back-off               long newInterval = (long) (interval * retryIntervalMultiplier);​               if (newInterval > maxRetryInterval) {                  newInterval = maxRetryInterval;               }​               interval = newInterval;            } else {               logger.debug("Could not connect to any server. Didn't have reconnection configured on the ClientSessionFactory");               return;            }         }      }   }​   public boolean waitForRetry(long interval) {      try {         if (clientProtocolManager.waitOnLatch(interval)) {            return true;         }      } catch (InterruptedException ignore) {         throw new ActiveMQInterruptedException(createTrace);      }      return false;   }​   //......}   


public class ActiveMQClientProtocolManager implements ClientProtocolManager {​   //......​   private final CountDownLatch waitLatch = new CountDownLatch(1);​   //......​   public boolean waitOnLatch(long milliseconds) throws InterruptedException {      return waitLatch.await(milliseconds, TimeUnit.MILLISECONDS);   }​   public void stop() {      alive = false;​      synchronized (inCreateSessionGuard) {         if (inCreateSessionLatch != null)            inCreateSessionLatch.countDown();      }​      Channel channel1 = getChannel1();      if (channel1 != null) {         channel1.returnBlocking();      }​      waitLatch.countDown();​   }​   //......}
ActiveMQClientProtocolManager有个名为waitLatch的CountDownLatch,waitOnLatch方法通过waitLatch.await(milliseconds, TimeUnit.MILLISECONDS)进行等待,而stop方法则执行waitLatch.countDown()小结



标签: #apacheartemis