龙空技术网

java线程池 知多少

肖文英4353 54

前言:

而今各位老铁们对“java线程池shutdownnow”大体比较注重,大家都想要学习一些“java线程池shutdownnow”的相关内容。那么小编在网上收集了一些对于“java线程池shutdownnow””的相关知识,希望朋友们能喜欢,看官们快快来了解一下吧!

线程池的状态

The workerCount is the number of workers that have been permitted to start and not permitted to stop

RUNNING: Accept new tasks and process queued tasks

SHUTDOWN: Don't accept new tasks, but process queued tasks

STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks

TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method

TERMINATED: terminated() has completed

RUNNING -> SHUTDOWN

On invocation of shutdown(), perhaps implicitly in finalize()

(RUNNING or SHUTDOWN) -> STOP

On invocation of shutdownNow()

SHUTDOWN -> TIDYING

When both queue(任务队列) and pool(线程池) are empty

STOP -> TIDYING

When pool is empty

TIDYING -> TERMINATED

When the terminated() hook method has completed

shutdown 方法将线程池状态置为 SHUTDOWN,线程池并不会立即停止,要等正在执行和队列里等待的任务执行完才会停止。

看到调用 shutdownNow 后,第一个任务0正在睡眠(sleep)的时候,触发了 interrupt 中断,之前等待的任务1-5被从队列中清除并返回,之后的任务被拒绝。该方法是通过 interrupt 方法去终止正在运行的任务的,因此无法响应 interrupt 中断的任务可能不会被终止。所以,该方法是无法保证一定能终止任务的。

所以 shutdownNow 方法将线程池状态置为 STOP,试图让线程池立刻停止,但不一定能保证立即停止,要等所有正在执行的任务(不能被 interrupt 中断的任务)执行完才能停止。

awaitTermination 的功能如下:

阻塞当前线程,等已提交和正在执行的任务都执行完,解除阻塞;当等待超过设置的时间,检查线程池是否停止,如果停止返回 true,否则返回 false,并解除阻塞

awaitTermination 和 shutdown 执行时都会申请锁,awaitTermination 需要在 shutdown 调用后调用,否则会知道超时后才会返回

参考 线程池相关-shutdown、shutdownNow和awaitTermination

线程池shutdown与shutdownNow有什么区别?

看代码主要三个区别:

1 shutdown会把线程池的状态改为SHUTDOWN,而shutdownNow把当前线程池状态改为STOP

2 shutdown只会中断所有空闲的线程,而shutdownNow会中断所有的线程。

3 shutdown返回方法为空,会将当前任务队列中的所有任务执行完毕;而shutdownNow把任务队列中的所有任务都取出来返回。

默认线程池可监控属性

taskCount: 线程池待执行的任务数量,从同步队列获取

completedTaskCount: 已经完成的任务数量 不是精确值

largestPoolSize: 线程池中曾经创建的最大的线程数量

getPoolSize: 线程池的线程数量(正在运行+空闲的)

getActiveCount: 活动的线程数(只有正在运行的)

默认扩展方法

protected void beforeExecute(Thread t, Runnable r) { } // task.run方法之前执行

protected void afterExecute(Runnable r, Throwable t) { } // task执行完之后,不管有没有异常都会执行

protected void terminated() { } //线程池终止时运行

在方法中创建并使用线程池

A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically

allowCoreThreadTimeOut

这个参数默认值为:false

设置成true还是false需要根据你具体的业务判断,如果该业务需要执行的次数并不多,采用多线程只是为了缩短执行的时间,那么可以设置成true,毕竟用完后很长时间才会用到,线程干耗着也是耗费资源的。

但是如果是需要较高并发执行的业务,那么可以设置为false,保留着线程,避免每次都得创建线程耗费资源。

创建后的初始线程数为 0

默认情况下,线程池创建后的初始线程数为 0,当有任务到来就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize,就会把到达的任务放到缓存队列当中。

自定义线程名称spring安全关闭线程池

spring线程池参数 setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

异常处理

当线程池中线程频繁出现未捕获的异常,那线程的复用率就大大降低了,需要不断地创建新线程。

1、线程池中线程中异常尽量手动捕获

2、通过设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过execute提交任务,线程依然会中断,而通过submit提交任务,不会中断线程,线程异常会在get执行结果时抛出。

提交任务执行流程

 public void execute(Runnable command) {     if (command == null)         throw new NullPointerException();        int c = ctl.get();     // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务     if (workerCountOf(c) < corePoolSize) {         if (addWorker(command, true))             return;         c = ctl.get();     }     // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中     if (isRunning(c) && workQueue.offer(command)) {      // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态         int recheck = ctl.get();         // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队         if (! isRunning(recheck) && remove(command))             reject(command);         // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务         else if (workerCountOf(recheck) == 0)             addWorker(null, false);     }     // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务     // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略     else if (!addWorker(command, false))         reject(command); }

创建工作线程流程

  private boolean addWorker(Runnable firstTask, boolean core) {        retry:        for (; ; ) {            int c = ctl.get();            int rs = runStateOf(c);            //情况1 rs >= SHUTDOWN && rs != SHUTDOWN            //情况2 rs >= SHUTDOWN && firstTask != null (是正在提交的任务)            //情况3 rs >= SHUTDOWN && workQueue.isEmpty()            if (rs >= SHUTDOWN &&                 !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false;            for (; ; ) {                int wc = workerCountOf(c);                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false;                // 跳出两层for循环,执行后面的语句                if (compareAndIncrementWorkerCount(c)) break retry;                c = ctl.get();  // Re-read ctl                if (runStateOf(c) != rs) continue retry;                // else CAS failed due to workerCount change; retry inner loop            }        }        boolean workerStarted = false;        boolean workerAdded = false;        Worker w = null;        try {            w = new Worker(firstTask);            final Thread t = w.thread;            if (t != null) {                final ReentrantLock mainLock = this.mainLock;                mainLock.lock();                try {                    // Recheck while holding lock.                    // Back out on ThreadFactory failure or if                    // shut down before lock acquired.                    int rs = runStateOf(ctl.get());                    //情况1 处于Running状态                    //情况2 处于Shutdown状态 && 创建的线程用于执行从任务队列获取的任务                    if (rs < SHUTDOWN ||                         (rs == SHUTDOWN && firstTask == null)) {                        if (t.isAlive()) // precheck that t is startable                            throw new IllegalThreadStateException();                        workers.add(w);                        int s = workers.size();                        if (s > largestPoolSize) largestPoolSize = s;                        workerAdded = true;                    }                } finally {                    mainLock.unlock();                }                if (workerAdded) {                    t.start();                    workerStarted = true;                }            }        } finally {            if (!workerStarted) addWorkerFailed(w);        }        return workerStarted;    }
线程执行任务流程
    final void runWorker(Worker w) {        Thread wt = Thread.currentThread();        Runnable task = w.firstTask;        w.firstTask = null;        w.unlock(); // allow interrupts        boolean completedAbruptly = true;        try {            while (task != null || (task = getTask()) != null) {                w.lock();                // If pool is stopping, ensure thread is interrupted;                // if not, ensure thread is not interrupted.  This                // requires a recheck in second case to deal with                // shutdownNow race while clearing interrupt                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))                        && !wt.isInterrupted())                    //先interrupt线程,然后线程执行遇到阻塞方法时会直接抛出InterruptedException                    wt.interrupt();                try {                    beforeExecute(wt, task);//回调方法                    Throwable thrown = null;                    try {                        task.run();                    } catch (RuntimeException x) {                        thrown = x;                        throw x;                    } catch (Error x) {                        thrown = x;                        throw x;                    } catch (Throwable x) {                        thrown = x;                        throw new Error(x);                    } finally {                        afterExecute(task, thrown);//回调方法                    }                } finally {                    task = null;                    w.completedTasks++;                    w.unlock();                }            }            completedAbruptly = false;        } finally {            processWorkerExit(w, completedAbruptly);        }    }
从任务同步队列获取任务
    private Runnable getTask() {        boolean timedOut = false; // Did the last poll() time out?        //for + cas        for (; ; ) {            int c = ctl.get();            int rs = runStateOf(c);            // Check if queue empty only if necessary.            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                //为什么只减少了工作线程个数?因为如果获取任务为null,当前工作线程就会回收(workers.remove(w))                decrementWorkerCount();                return null;            }            int wc = workerCountOf(c);            // Are workers subject to culling?            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;            //情况1 (wc > maximumPoolSize || (timed && timedOut) && wc >1             //情况2 (wc > maximumPoolSize || (timed && timedOut) && 任务队列为空            if ((wc > maximumPoolSize || (timed && timedOut)) &&                 (wc > 1 || workQueue.isEmpty())) {                if (compareAndDecrementWorkerCount(c)) return null;                continue;            }            try {                //大于核心线程数时会在指定时间内获取任务,超过指定时间就会返回空                //正常情况,获取任务会一直阻塞,直到取到任务                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();                if (r != null) return r;                timedOut = true;            } catch (InterruptedException retry) {                timedOut = false;            }        }    }

参考 从源码分析几道必问线程池的面试题?

线程执行结束时处理逻辑

    private void processWorkerExit(Worker w, boolean completedAbruptly) {        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted            decrementWorkerCount();        final ReentrantLock mainLock = this.mainLock;        mainLock.lock();        try {            completedTaskCount += w.completedTasks;            workers.remove(w);        } finally {            mainLock.unlock();        }        tryTerminate();        int c = ctl.get();        if (runStateLessThan(c, STOP)) {            // completedAbruptly=false 表示不是异常退出            if (!completedAbruptly) {                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;                if (min == 0 && !workQueue.isEmpty()) min = 1;                if (workerCountOf(c) >= min) return; // replacement not needed            }            //小于核心线程数时会再创建一个工作线程            addWorker(null, false);        }    }

标签: #java线程池shutdownnow