龙空技术网

线程池中的线程是如何回收的

乐天派爱思考的考拉 455

前言:

今天同学们对“java线程回收”大体比较注意,小伙伴们都想要剖析一些“java线程回收”的相关知识。那么小编也在网摘上收集了一些对于“java线程回收””的相关文章,希望朋友们能喜欢,大家快快来学习一下吧!

在并发编程中,线程池使用得非常多,他可以提升我们程序的性能,优化我们的业务逻辑

线程在什么情况下可以回收最大线程数>核心线程数

ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,        new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());

这段代码设置的核心线程数是4个,非核心线程数是8个,当线程空闲60s时,线程便会被回收

设置允许回收核心线程

ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 4, 60, TimeUnit.SECONDS,        new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());// 允许回收核心线程executor.allowCoreThreadTimeOut(true);

核心线程数=最大线程数,线程池中的线程都是核心线程,因此只有设置允许回收核心线程,线程池中的线程才会回收

线程池中的线程是如何回收的

要想知道这个问题,我们需要看一下源码

execute

execute是线程池执行任务的入口因此我们可以从这个地方开始分析

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    /*     * Proceed in 3 steps:     *     * 1. If fewer than corePoolSize threads are running, try to     * start a new thread with the given command as its first     * task.  The call to addWorker atomically checks runState and     * workerCount, and so prevents false alarms that would add     * threads when it shouldn't, by returning false.     *     * 2. If a task can be successfully queued, then we still need     * to double-check whether we should have added a thread     * (because existing ones died since last checking) or that     * the pool shut down since entry into this method. So we     * recheck state and if necessary roll back the enqueuing if     * stopped, or start a new thread if there are none.     *     * 3. If we cannot queue task, then we try to add a new     * thread.  If it fails, we know we are shut down or saturated     * and so reject the task.     */    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        // 将任务添加进线程池,我们可以接着分析这个方法        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // Check if queue empty only if necessary.        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            return false;        for (;;) {            int wc = workerCountOf(c);            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            if (compareAndIncrementWorkerCount(c))                break retry;            c = ctl.get();  // Re-read ctl            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        w = new Worker(firstTask);        final Thread t = w.thread;        if (t != null) {            final ReentrantLock mainLock = this.mainLock;            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                int rs = runStateOf(ctl.get());                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    workers.add(w);                    int s = workers.size();                    if (s > largestPoolSize)                        largestPoolSize = s;                    workerAdded = true;                }            } finally {                mainLock.unlock();            }            if (workerAdded) {                t.start();                workerStarted = true;            }        }    } finally {        if (! workerStarted)            addWorkerFailed(w);    }    return workerStarted;}

这段代码比较长,但是,我们可以关注一下firstTask,w = new Worker(firstTask);,而Worker就是线程池中的线程,final Thread t = w.thread; t.start();这就是启动线程池中线程的方法,因此,我们可以详细分析一下Worker的run方法

Worker.run

public void run() {    runWorker(this);}final void runWorker(Worker w) {    Thread wt = Thread.currentThread();    Runnable task = w.firstTask;    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        while (task != null || (task = getTask()) != null) {            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                wt.interrupt();            try {                beforeExecute(wt, task);                Throwable thrown = null;                try {                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        processWorkerExit(w, completedAbruptly);    }}

看到这里,我们终于找到了执行任务的方法,当没有任务时,线程便会从阻塞对垒获取任务,我们可以看一下getTask方法

getTask

private Runnable getTask() {    boolean timedOut = false; // Did the last poll() time out?    for (;;) {        int c = ctl.get();        int rs = runStateOf(c);        // Check if queue empty only if necessary.        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {            decrementWorkerCount();            return null;        }        int wc = workerCountOf(c);        // Are workers subject to culling?        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;        if ((wc > maximumPoolSize || (timed && timedOut))            && (wc > 1 || workQueue.isEmpty())) {            if (compareAndDecrementWorkerCount(c))                return null;            continue;        }        try {            Runnable r = timed ?                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                workQueue.take();            if (r != null)                return r;            timedOut = true;        } catch (InterruptedException retry) {            timedOut = false;        }    }}

看到这里,我们又看到了线程池是如何判断线程是否可以超时的,boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;这段代码的意思就是是否允许核心线程回收,或者最大线程数是否大于核心线程数

Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();

当线程允许回收时,便执行可以超时的阻塞方法从阻塞队列中获取任务,否则,便执行take方法一直阻塞,直到有任务位置,我们在回到之前的代码while (task != null || (task = getTask()) != null) {},不允许回收线程池,线程便会一直阻塞在while循环这里,直到线程获取到任务,而允许超时时,当达到超时时间,线程还没有获取到任务时,线程便会跳出while循环,这个时候,我们可以看一下跳出for循环的代码了

processWorkerExit

private void processWorkerExit(Worker w, boolean completedAbruptly) {    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted        decrementWorkerCount();    final ReentrantLock mainLock = this.mainLock;    mainLock.lock();    try {        completedTaskCount += w.completedTasks;        workers.remove(w);    } finally {        mainLock.unlock();    }    tryTerminate();    int c = ctl.get();    if (runStateLessThan(c, STOP)) {        if (!completedAbruptly) {            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;            if (min == 0 && ! workQueue.isEmpty())                min = 1;            if (workerCountOf(c) >= min)                return; // replacement not needed        }        addWorker(null, false);    }}

看到这里,我们终于找到了线程回收的代码,当线程跳出while后,便会被回收到 workers.remove(w);

总结

当线程池中的线程没有任务执行时,便会执行阻塞队列的take方法阻塞线程,当take方法超时时,线程退出while循环,然后被回收掉

标签: #java线程回收 #java线程池线程回收 #线程池如何回收线程 #线程怎么回收 #线程资源回收