龙空技术网

聊聊httpclient的CPool

码匠乱炖 110

前言:

眼前咱们对“apache pool”大约比较关怀,你们都想要剖析一些“apache pool”的相关知识。那么小编也在网摘上网罗了一些有关“apache pool””的相关资讯,希望我们能喜欢,你们一起来了解一下吧!

本文主要研究一下httpclient的CPool

ConnPool

org/apache/http/pool/ConnPool.java

public interface ConnPool<T, E> {    /**     * Attempts to lease a connection for the given route and with the given     * state from the pool.     *     * @param route route of the connection.     * @param state arbitrary object that represents a particular state     *  (usually a security principal or a unique token identifying     *  the user whose credentials have been used while establishing the connection).     *  May be {@code null}.     * @param callback operation completion callback.     *     * @return future for a leased pool entry.     */    Future<E> lease(final T route, final Object state, final FutureCallback<E> callback);    /**     * Releases the pool entry back to the pool.     *     * @param entry pool entry leased from the pool     * @param reusable flag indicating whether or not the released connection     *   is in a consistent state and is safe for further use.     */    void release(E entry, boolean reusable);}

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry

ConnPoolControl

public interface ConnPoolControl<T> {    void setMaxTotal(int max);    int getMaxTotal();    void setDefaultMaxPerRoute(int max);    int getDefaultMaxPerRoute();    void setMaxPerRoute(final T route, int max);    int getMaxPerRoute(final T route);    PoolStats getTotalStats();    PoolStats getStats(final T route);}

ConnPoolControl接口定义了设置和访问maxTotal、defaultMaxPerRoute及PoolStats的方法

AbstractConnPool

org/apache/http/pool/AbstractConnPool.java

@Contract(threading = ThreadingBehavior.SAFE_CONDITIONAL)public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>                                               implements ConnPool<T, E>, ConnPoolControl<T> {    private final Lock lock;    private final Condition condition;    private final ConnFactory<T, C> connFactory;    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;    private final Set<E> leased;    private final LinkedList<E> available;    private final LinkedList<Future<E>> pending;    private final Map<T, Integer> maxPerRoute;    private volatile boolean isShutDown;    private volatile int defaultMaxPerRoute;    private volatile int maxTotal;    private volatile int validateAfterInactivity;    public AbstractConnPool(            final ConnFactory<T, C> connFactory,            final int defaultMaxPerRoute,            final int maxTotal) {        super();        this.connFactory = Args.notNull(connFactory, "Connection factory");        this.defaultMaxPerRoute = Args.positive(defaultMaxPerRoute, "Max per route value");        this.maxTotal = Args.positive(maxTotal, "Max total value");        this.lock = new ReentrantLock();        this.condition = this.lock.newCondition();        this.routeToPool = new HashMap<T, RouteSpecificPool<T, C, E>>();        this.leased = new HashSet<E>();        this.available = new LinkedList<E>();        this.pending = new LinkedList<Future<E>>();        this.maxPerRoute = new HashMap<T, Integer>();    }    /**     * Creates a new entry for the given connection with the given route.     */    protected abstract E createEntry(T route, C conn);    //...... }   

AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType

shutdown

    public void shutdown() throws IOException {        if (this.isShutDown) {            return ;        }        this.isShutDown = true;        this.lock.lock();        try {            for (final E entry: this.available) {                entry.close();            }            for (final E entry: this.leased) {                entry.close();            }            for (final RouteSpecificPool<T, C, E> pool: this.routeToPool.values()) {                pool.shutdown();            }            this.routeToPool.clear();            this.leased.clear();            this.available.clear();        } finally {            this.lock.unlock();        }    }

shutdown方法会遍历available、leased挨个执行close,然后遍历routeToPool挨个执行shutdown

lease方法

    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {        Args.notNull(route, "Route");        Asserts.check(!this.isShutDown, "Connection pool shut down");        return new Future<E>() {            private final AtomicBoolean cancelled = new AtomicBoolean(false);            private final AtomicBoolean done = new AtomicBoolean(false);            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);            @Override            public boolean cancel(final boolean mayInterruptIfRunning) {                if (done.compareAndSet(false, true)) {                    cancelled.set(true);                    lock.lock();                    try {                        condition.signalAll();                    } finally {                        lock.unlock();                    }                    if (callback != null) {                        callback.cancelled();                    }                    return true;                }                return false;            }            @Override            public boolean isCancelled() {                return cancelled.get();            }            @Override            public boolean isDone() {                return done.get();            }            @Override            public E get() throws InterruptedException, ExecutionException {                try {                    return get(0L, TimeUnit.MILLISECONDS);                } catch (final TimeoutException ex) {                    throw new ExecutionException(ex);                }            }            @Override            public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {                for (;;) {                    synchronized (this) {                        try {                            final E entry = entryRef.get();                            if (entry != null) {                                return entry;                            }                            if (done.get()) {                                throw new ExecutionException(operationAborted());                            }                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);                            if (validateAfterInactivity > 0)  {                                if (leasedEntry.getUpdated() + validateAfterInactivity <= System.currentTimeMillis()) {                                    if (!validate(leasedEntry)) {                                        leasedEntry.close();                                        release(leasedEntry, false);                                        continue;                                    }                                }                            }                            if (done.compareAndSet(false, true)) {                                entryRef.set(leasedEntry);                                done.set(true);                                onLease(leasedEntry);                                if (callback != null) {                                    callback.completed(leasedEntry);                                }                                return leasedEntry;                            } else {                                release(leasedEntry, true);                                throw new ExecutionException(operationAborted());                            }                        } catch (final IOException ex) {                            if (done.compareAndSet(false, true)) {                                if (callback != null) {                                    callback.failed(ex);                                }                            }                            throw new ExecutionException(ex);                        }                    }                }            }        };    }

lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法

getPoolEntryBlocking

org/apache/http/pool/AbstractConnPool.java

    private E getPoolEntryBlocking(            final T route, final Object state,            final long timeout, final TimeUnit timeUnit,            final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {        Date deadline = null;        if (timeout > 0) {            deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));        }        this.lock.lock();        try {            final RouteSpecificPool<T, C, E> pool = getPool(route);            E entry;            for (;;) {                Asserts.check(!this.isShutDown, "Connection pool shut down");                if (future.isCancelled()) {                    throw new ExecutionException(operationAborted());                }                for (;;) {                    entry = pool.getFree(state);                    if (entry == null) {                        break;                    }                    if (entry.isExpired(System.currentTimeMillis())) {                        entry.close();                    }                    if (entry.isClosed()) {                        this.available.remove(entry);                        pool.free(entry, false);                    } else {                        break;                    }                }                if (entry != null) {                    this.available.remove(entry);                    this.leased.add(entry);                    onReuse(entry);                    return entry;                }                // New connection is needed                final int maxPerRoute = getMax(route);                // Shrink the pool prior to allocating a new connection                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);                if (excess > 0) {                    for (int i = 0; i < excess; i++) {                        final E lastUsed = pool.getLastUsed();                        if (lastUsed == null) {                            break;                        }                        lastUsed.close();                        this.available.remove(lastUsed);                        pool.remove(lastUsed);                    }                }                if (pool.getAllocatedCount() < maxPerRoute) {                    final int totalUsed = this.leased.size();                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);                    if (freeCapacity > 0) {                        final int totalAvailable = this.available.size();                        if (totalAvailable > freeCapacity - 1) {                            if (!this.available.isEmpty()) {                                final E lastUsed = this.available.removeLast();                                lastUsed.close();                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());                                otherpool.remove(lastUsed);                            }                        }                        final C conn = this.connFactory.create(route);                        entry = pool.add(conn);                        this.leased.add(entry);                        return entry;                    }                }                boolean success = false;                try {                    pool.queue(future);                    this.pending.add(future);                    if (deadline != null) {                        success = this.condition.awaitUntil(deadline);                    } else {                        this.condition.await();                        success = true;                    }                    if (future.isCancelled()) {                        throw new ExecutionException(operationAborted());                    }                } finally {                    // In case of 'success', we were woken up by the                    // connection pool and should now have a connection                    // waiting for us, or else we're shutting down.                    // Just continue in the loop, both cases are checked.                    pool.unqueue(future);                    this.pending.remove(future);                }                // check for spurious wakeup vs. timeout                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {                    break;                }            }            throw new TimeoutException("Timeout waiting for connection");        } finally {            this.lock.unlock();        }    }

getPoolEntryBlocking先根据route从routeToPool取出对应的RouteSpecificPool,然后pool.getFree(state),之后判断是否过期,是否关闭,没问题则从available移除,添加到leased中,然后执行onReuse回调,如果entry为null则通过connFactory.create(route)来创建

release

    @Override    public void release(final E entry, final boolean reusable) {        this.lock.lock();        try {            if (this.leased.remove(entry)) {                final RouteSpecificPool<T, C, E> pool = getPool(entry.getRoute());                pool.free(entry, reusable);                if (reusable && !this.isShutDown) {                    this.available.addFirst(entry);                } else {                    entry.close();                }                onRelease(entry);                Future<E> future = pool.nextPending();                if (future != null) {                    this.pending.remove(future);                } else {                    future = this.pending.poll();                }                if (future != null) {                    this.condition.signalAll();                }            }        } finally {            this.lock.unlock();        }    }

release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

CPool

org/apache/http/impl/conn/CPool.java

@Contract(threading = ThreadingBehavior.SAFE)class CPool extends AbstractConnPool<HttpRoute, ManagedHttpClientConnection, CPoolEntry> {    private static final AtomicLong COUNTER = new AtomicLong();    private final Log log = LogFactory.getLog(CPool.class);    private final long timeToLive;    private final TimeUnit timeUnit;    public CPool(            final ConnFactory<HttpRoute, ManagedHttpClientConnection> connFactory,            final int defaultMaxPerRoute, final int maxTotal,            final long timeToLive, final TimeUnit timeUnit) {        super(connFactory, defaultMaxPerRoute, maxTotal);        this.timeToLive = timeToLive;        this.timeUnit = timeUnit;    }    @Override    protected CPoolEntry createEntry(final HttpRoute route, final ManagedHttpClientConnection conn) {        final String id = Long.toString(COUNTER.getAndIncrement());        return new CPoolEntry(this.log, id, route, conn, this.timeToLive, this.timeUnit);    }    @Override    protected boolean validate(final CPoolEntry entry) {        return !entry.getConnection().isStale();    }    @Override    protected void enumAvailable(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {        super.enumAvailable(callback);    }    @Override    protected void enumLeased(final PoolEntryCallback<HttpRoute, ManagedHttpClientConnection> callback) {        super.enumLeased(callback);    }}

CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry;其createEntry方法创建CPoolEntry,validate则判断connect是不是stale

小结

ConnPool定义了lease及release方法,其中定义了两个泛型,T表示route,E表示poolEntry;AbstractConnPool声明实现ConnPool、ConnPoolControl接口,它定义E必须继承PoolEntry,同时定义了泛型C,表示connectionType;CPool继承了AbstractConnPool,其T为HttpRoute,C为ManagedHttpClientConnection,E为CPoolEntry。

AbstractConnPool的lease方法主要是get及cancel,其中get方法主要是执行getPoolEntryBlocking,对于validateAfterInactivity大于0的则根据判断是否需要validate,若需要且validate失败则执行leasedEntry.close()及release方法;release方法先获取RouteSpecificPool,然后执行pool.free(entry, reusable)

标签: #apache pool