前言:
而今各位老铁们对“java线程池shutdownnow”大体比较注重,大家都想要学习一些“java线程池shutdownnow”的相关内容。那么小编在网上收集了一些对于“java线程池shutdownnow””的相关知识,希望朋友们能喜欢,看官们快快来了解一下吧!线程池的状态
The workerCount is the number of workers that have been permitted to start and not permitted to stop
RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP: Don't accept new tasks, don't process queued tasks,and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED: terminated() has completed
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue(任务队列) and pool(线程池) are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
shutdown 方法将线程池状态置为 SHUTDOWN,线程池并不会立即停止,要等正在执行和队列里等待的任务执行完才会停止。
看到调用 shutdownNow 后,第一个任务0正在睡眠(sleep)的时候,触发了 interrupt 中断,之前等待的任务1-5被从队列中清除并返回,之后的任务被拒绝。该方法是通过 interrupt 方法去终止正在运行的任务的,因此无法响应 interrupt 中断的任务可能不会被终止。所以,该方法是无法保证一定能终止任务的。
所以 shutdownNow 方法将线程池状态置为 STOP,试图让线程池立刻停止,但不一定能保证立即停止,要等所有正在执行的任务(不能被 interrupt 中断的任务)执行完才能停止。
awaitTermination 的功能如下:
阻塞当前线程,等已提交和正在执行的任务都执行完,解除阻塞;当等待超过设置的时间,检查线程池是否停止,如果停止返回 true,否则返回 false,并解除阻塞
awaitTermination 和 shutdown 执行时都会申请锁,awaitTermination 需要在 shutdown 调用后调用,否则会知道超时后才会返回
参考 线程池相关-shutdown、shutdownNow和awaitTermination
线程池shutdown与shutdownNow有什么区别?
看代码主要三个区别:
1 shutdown会把线程池的状态改为SHUTDOWN,而shutdownNow把当前线程池状态改为STOP
2 shutdown只会中断所有空闲的线程,而shutdownNow会中断所有的线程。
3 shutdown返回方法为空,会将当前任务队列中的所有任务执行完毕;而shutdownNow把任务队列中的所有任务都取出来返回。
默认线程池可监控属性
taskCount: 线程池待执行的任务数量,从同步队列获取
completedTaskCount: 已经完成的任务数量 不是精确值
largestPoolSize: 线程池中曾经创建的最大的线程数量
getPoolSize: 线程池的线程数量(正在运行+空闲的)
getActiveCount: 活动的线程数(只有正在运行的)
默认扩展方法
protected void beforeExecute(Thread t, Runnable r) { } // task.run方法之前执行
protected void afterExecute(Runnable r, Throwable t) { } // task执行完之后,不管有没有异常都会执行
protected void terminated() { } //线程池终止时运行
在方法中创建并使用线程池
A pool that is no longer referenced in a program AND has no remaining threads will be shutdown automatically
allowCoreThreadTimeOut
这个参数默认值为:false
设置成true还是false需要根据你具体的业务判断,如果该业务需要执行的次数并不多,采用多线程只是为了缩短执行的时间,那么可以设置成true,毕竟用完后很长时间才会用到,线程干耗着也是耗费资源的。
但是如果是需要较高并发执行的业务,那么可以设置为false,保留着线程,避免每次都得创建线程耗费资源。
创建后的初始线程数为 0
默认情况下,线程池创建后的初始线程数为 0,当有任务到来就会创建一个线程去执行任务,当线程池中的线程数目达到 corePoolSize,就会把到达的任务放到缓存队列当中。
自定义线程名称spring安全关闭线程池
spring线程池参数 setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
异常处理
当线程池中线程频繁出现未捕获的异常,那线程的复用率就大大降低了,需要不断地创建新线程。
1、线程池中线程中异常尽量手动捕获
2、通过设置ThreadFactory的UncaughtExceptionHandler可以对未捕获的异常做保底处理,通过execute提交任务,线程依然会中断,而通过submit提交任务,不会中断线程,线程异常会在get执行结果时抛出。
提交任务执行流程
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 线程池当前线程数小于 corePoolSize 时进入if条件调用 addWorker 创建核心线程来执行任务 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 线程池当前线程数大于或等于 corePoolSize ,就将任务添加到 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { // 获取到当前线程的状态,赋值给 recheck ,是为了重新检查状态 int recheck = ctl.get(); // 如果 isRunning 返回 false ,那就 remove 掉这个任务,然后执行拒绝策略,也就是回滚重新排队 if (! isRunning(recheck) && remove(command)) reject(command); // 线程池处于 running 状态,但是没有线程,那就创建线程执行任务 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果任务放入 workQueue 失败,则尝试通过创建非核心线程来执行任务 // 创建非核心线程失败,则说明线程池已经关闭或者已经饱和,会执行拒绝策略 else if (!addWorker(command, false)) reject(command); }
创建工作线程流程
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); //情况1 rs >= SHUTDOWN && rs != SHUTDOWN //情况2 rs >= SHUTDOWN && firstTask != null (是正在提交的任务) //情况3 rs >= SHUTDOWN && workQueue.isEmpty() if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 跳出两层for循环,执行后面的语句 if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); //情况1 处于Running状态 //情况2 处于Shutdown状态 && 创建的线程用于执行从任务队列获取的任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }线程执行任务流程
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) //先interrupt线程,然后线程执行遇到阻塞方法时会直接抛出InterruptedException wt.interrupt(); try { beforeExecute(wt, task);//回调方法 Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown);//回调方法 } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }从任务同步队列获取任务
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? //for + cas for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { //为什么只减少了工作线程个数?因为如果获取任务为null,当前工作线程就会回收(workers.remove(w)) decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //情况1 (wc > maximumPoolSize || (timed && timedOut) && wc >1 //情况2 (wc > maximumPoolSize || (timed && timedOut) && 任务队列为空 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //大于核心线程数时会在指定时间内获取任务,超过指定时间就会返回空 //正常情况,获取任务会一直阻塞,直到取到任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
参考 从源码分析几道必问线程池的面试题?
线程执行结束时处理逻辑
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } tryTerminate(); int c = ctl.get(); if (runStateLessThan(c, STOP)) { // completedAbruptly=false 表示不是异常退出 if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //小于核心线程数时会再创建一个工作线程 addWorker(null, false); } }
标签: #java线程池shutdownnow