前言:
此刻各位老铁们对“多线程juc包”大约比较关怀,同学们都想要分析一些“多线程juc包”的相关文章。那么小编也在网络上收集了一些有关“多线程juc包””的相关资讯,希望你们能喜欢,同学们一起来了解一下吧!线程池ThreadPool是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。使用线程池会给我们带来如下好处:--《Java 并发编程的艺术》
【降低资源消耗】通过重复利用已创建的线程降低线程创建和销毁造成的消耗。【提高响应速度】当任务到达时,任务可以不需要等到线程创建就能立即执行。【提高线程的可管理性】线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。除了线程池,还有其他比较典型的几种使用策略包括:
(1)内存池Memory Pooling:预先申请内存,提升申请内存速度,减少内存碎片。
(2)连接池Connection Pooling:预先申请数据库连接,提升申请连接的速度,降低系统的开销。
(3)实例池Object Pooling:循环使用对象,减少资源在初始化和释放时的昂贵损耗
线程池的使用
创建线程池的两种方式:
【方式一】通过ThreadPoolExecutor构造函数来创建(推荐)
corePoolSize 线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于corePoolSize, 即使有其他空闲线程能够执行新来的任务, 也会继续创建线程;如果当前线程数为corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;如果执行了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。·
大小设置问题:
· 如果我们设置的线程池数量太小的话,如果同一时间有大量任务/请求需要处理,可能会导致大量的请求/任务在任务队列中排队等待执行,甚至会出现任务队列满了之后任务/请求无法处理的情况,或者大量任务堆积在任务队列导致 OOM。这样很明显是有问题的,CPU 根本没有得到充分利用。
· 如果我们设置线程数量太大,大量线程可能会同时在争取 CPU 资源,这样会导致大量的上下文切换,从而增加线程的执行时间,影响了整体执行效率。
一个简单并且适用面比较广的公式:
· CPU 密集型任务(N+1): 这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1。比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
· I/O 密集型任务(2N): 这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
workQueue 用来保存等待被执行的任务的阻塞队列。在JDK中提供了如下阻塞队列: ArrayBlockingQueue: 基于数组结构的有界阻塞队列,按FIFO排序任务;LinkedBlockingQueue: 基于链表结构的阻塞队列,按FIFO排序任务,吞吐量通常要高于ArrayBlockingQueue;SynchronousQueue: 一个不存储元素的阻塞队列,每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue;PriorityBlockingQueue: 具有优先级的无界阻塞队列;
LinkedBlockingQueue比ArrayBlockingQueue在插入删除节点性能方面更优,但是二者在put(), take()任务的时均需要加锁,SynchronousQueue使用无锁算法,根据节点的状态判断执行,而不需要用到锁,其核心是Transfer.transfer()。
maximumPoolSize 线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于maximumPoolSize;当阻塞队列是无界队列, 则maximumPoolSize则不起作用, 因为无法提交至核心线程池的线程会一直持续地放入workQueue.keepAliveTime 线程空闲时的存活时间,即当线程没有任务执行时,该线程继续存活的时间;默认情况下,该参数只在线程数大于corePoolSize时才有用, 超过这个时间的空闲线程将被终止;unit keepAliveTime的单位threadFactory 创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名。默认为DefaultThreadFactoryhandler 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:AbortPolicy: 直接抛出异常,默认策略;CallerRunsPolicy: 用调用者所在的线程来执行任务;DiscardOldestPolicy: 丢弃阻塞队列中靠最前的任务,并执行当前任务;DiscardPolicy: 直接丢弃任务;
当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
【方式二】通过 Executor 框架的工具类 Executors 来创建
利用Executors工具类可以创建多种类型的ThreadPoolExecutor,对应 Executors 工具类中的方法如图所示:
FixedThreadPool:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。SingleThreadExecutor: 该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。CachedThreadPool: 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。ScheduledThreadPool:该返回一个用来在给定的延迟后运行任务或者定期执行任务的线程池。
【Executors 返回线程池对象的弊端如下】(经典面试:为何不允许使用 Executors 去创建线程池)
FixedThreadPool 和 SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。ScheduledThreadPool 和 SingleThreadScheduledExecutor : 使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
说白了就是:使用有界队列,控制线程创建数量。
除了避免 OOM 的原因之外,不推荐使用 Executors提供的两种快捷的线程池的原因还有:
实际使用中需要根据自己机器的性能、业务场景来手动配置线程池的参数比如核心线程数、使用的任务队列、饱和策略等等。线程池原理分析
我们通常使用 ThreadPoolExecutor的execute()方法来提交一个任务到线程池中去。因此,为了搞懂线程池的原理,我们需要首先分析一下 execute方法。其源码如下:
// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount) private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static int workerCountOf(int c) { return c & CAPACITY; } //任务队列 private final BlockingQueue<Runnable> workQueue; public void execute(Runnable command) { // 如果任务为null,则抛出异常。 if (command == null) throw new NullPointerException(); // ctl 中保存的线程池当前的一些状态信息 int c = ctl.get(); // 下面会涉及到 3 步 操作 // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。 // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。 if (!isRunning(recheck) && remove(command)) reject(command); // 如果当前工作线程数量为0,新创建一个线程并执行。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。 // 传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。 else if (!addWorker(command, false)) reject(command); }
简单分析一下整个流程:
如果当前工作线程总数小于corePoolSize,则直接创建核心线程执行任务(任务实例会传入直接用于构造工作线程实例)。如果当前工作线程总数大于等于corePoolSize,判断线程池是否处于运行中状态,同时尝试用非阻塞方法向任务队列放入任务,这里会二次检查线程池运行状态,如果当前工作线程数量为0,则创建一个非核心线程并且传入的任务对象为null。如果向任务队列投放任务失败(任务队列已经满了),则会尝试创建非核心线程传入任务实例执行。
为什么需要double check线程池的状态?
在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。
在 execute 方法中,多次调用 addWorker 方法。addWorker 这个方法主要用来创建新的工作线程,如果返回 true 说明创建和启动工作线程成功,否则的话返回的就是 false。
// 全局锁,并发操作必备 private final ReentrantLock mainLock = new ReentrantLock(); // 跟踪线程池的最大大小,只有在持有全局锁mainLock的前提下才能访问此集合 private int largestPoolSize; // 工作线程集合,存放线程池中所有的(活跃的)工作线程,只有在持有全局锁mainLock的前提下才能访问此集合 private final HashSet<Worker> workers = new HashSet<>(); //获取线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } //判断线程池的状态是否为 Running private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * 添加新的工作线程到线程池 * @param firstTask 要执行 * @param core参数为true的话表示使用线程池的基本大小,为false使用线程池最大大小 * @return 添加成功就返回true否则返回false */ private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //这两句用来获取线程池的状态 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { //获取线程池中工作的线程的数量 int wc = workerCountOf(c); // core参数为false的话表明队列也满了,线程池大小变为 maximumPoolSize if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //原子操作将workcount的数量加1 if (compareAndIncrementWorkerCount(c)) break retry; // 如果线程的状态改变了就再次执行上述操作 c = ctl.get(); if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 标记工作线程是否启动成功 boolean workerStarted = false; // 标记工作线程是否创建成功 boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //获取线程池状态 int rs = runStateOf(ctl.get()); //rs < SHUTDOWN 如果线程池状态依然为RUNNING,并且线程的状态是存活的话,就会将工作线程添加到工作线程集合中 //(rs=SHUTDOWN && firstTask == null)如果线程池状态小于STOP,也就是RUNNING或者SHUTDOWN状态下,同时传入的任务实例firstTask为null,则需要添加到工作线程集合和启动新的Worker // firstTask == null证明只新建线程而不执行任务 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); //更新当前工作线程的最大容量 int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; // 工作线程是否启动成功 workerAdded = true; } } finally { // 释放锁 mainLock.unlock(); } //// 如果成功添加工作线程,则调用Worker内部的线程实例t的Thread#start()方法启动真实的线程实例 if (workerAdded) { t.start(); /// 标记线程启动成功 workerStarted = true; } } } finally { // 线程启动失败,需要从工作线程中移除对应的Worker if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
上面的分析逻辑中需要注意一点,Worker实例创建的同时,在其构造函数中会通过ThreadFactory创建一个Java线程Thread实例,后面会加锁后二次检查是否需要把Worker实例添加到工作线程集合workers中和是否需要启动Worker中持有的Thread实例,只有启动了Thread实例实例,Worker才真正开始运作,否则只是一个无用的临时对象。Worker本身也实现了Runnable接口,它可以看成是一个Runnable的适配器。
线程池中的每一个具体的工作线程被包装为内部类Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS),实现了Runnable接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; // 保存ThreadFactory创建的线程实例,如果ThreadFactory创建线程失败则为null final Thread thread; // 保存传入的Runnable任务实例 Runnable firstTask; // 记录每个线程完成的任务总数 volatile long completedTasks; // 唯一的构造函数,传入任务实例firstTask,注意可以为null Worker(Runnable firstTask) { // 禁止线程中断,直到runWorker()方法执行 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 通过ThreadFactory创建线程实例,注意一下Worker实例自身作为Runnable用于创建新的线程实例 this.thread = getThreadFactory().newThread(this); } // 委托到外部的runWorker()方法,注意runWorker()方法是线程池的方法,而不是Worker的方法 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state. // The value 1 represents the locked state. // 是否持有独占锁,state值为1的时候表示持有锁,state值为0的时候表示已经释放锁 protected boolean isHeldExclusively() { return getState() != 0; } // 独占模式下尝试获取资源,这里没有判断传入的变量,直接CAS判断0更新为1是否成功,成功则设置独占线程为当前线程 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 独占模式下尝试是否资源,这里没有判断传入的变量,直接把state设置为0 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 加锁 public void lock() { acquire(1); } // 尝试加锁 public boolean tryLock() { return tryAcquire(1); } // 解锁 public void unlock() { release(1); } // 是否锁定 public boolean isLocked() { return isHeldExclusively(); } // 启动后进行线程中断,注意这里会判断线程实例的中断标志位是否为false,只有中断标志位为false才会中断 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }}
Worker的构造函数里面的逻辑十分重要,通过ThreadFactory创建的Thread实例同时传入Worker实例,因为Worker本身实现了Runnable,所以可以作为任务提交到线程中执行。只要Worker持有的线程实例w调用Thread#start()方法就能在合适时机执行Worker#run()。简化一下逻辑如下:
// addWorker()方法中构造Worker worker = createWorker();// 通过线程池构造时候传入ThreadFactory threadFactory = getThreadFactory();// Worker构造函数中Thread thread = threadFactory.newThread(worker);// addWorker()方法中启动thread.start();
Worker继承自AQS,这里使用了AQS的独占模式,这里有个技巧是构造Worker的时候,把AQS的资源(状态)通过setState(-1)设置为-1,这是因为Worker实例刚创建时AQS中state的默认值为0,此时线程尚未启动,不能在这个时候进行线程中断,见Worker#interruptIfStarted()方法。Worker中两个覆盖AQS的方法tryAcquire()和tryRelease()都没有判断外部传入的变量,前者直接CAS(0,1),后者直接setState(0)。接着看核心方法ThreadPoolExecutor#runWorker():
final void runWorker(Worker w) { // 获取当前线程,实际上和Worker持有的线程实例是相同的 Thread wt = Thread.currentThread(); // 获取Worker中持有的初始化时传入的任务对象,这里注意存放在临时变量task中 Runnable task = w.firstTask; // 设置Worker中持有的初始化时传入的任务对象为null w.firstTask = null; // 由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为0,允许线程中断 w.unlock(); // allow interrupts // 记录线程是否因为用户异常终结,默认是true boolean completedAbruptly = true; try { // 初始化任务对象不为null,或者从任务队列获取任务不为空(从任务队列获取到的任务会更新到临时变量task中) // getTask()由于使用了阻塞队列,这个while循环如果命中后半段会处于阻塞或者超时阻塞状态,getTask()返回为null会导致线程跳出死循环使线程终结 while (task != null || (task = getTask()) != null) { // Worker加锁,本质是AQS获取资源并且尝试CAS更新state由0更变为1 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 如果线程池正在停止(也就是由RUNNING或者SHUTDOWN状态向STOP状态变更),那么要确保当前工作线程是中断状态 // 否则,要保证当前线程不是中断状态 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 钩子方法,任务执行前 beforeExecute(wt, task); try { task.run(); // 钩子方法,任务执行后 - 正常情况 afterExecute(task, null); } catch (Throwable ex) { // 钩子方法,任务执行后 - 异常情况 afterExecute(task, ex); throw ex; } } finally { // 清空task临时变量,这个很重要,否则while会死循环执行同一个task task = null; // 累加Worker完成的任务数 w.completedTasks++; // Worker解锁,本质是AQS释放资源,设置state为0 w.unlock(); } } // 走到这里说明某一次getTask()返回为null,线程正常退出 completedAbruptly = false; } finally { // 处理线程退出,completedAbruptly为true说明由于用户异常导致线程非正常退出 processWorkerExit(w, completedAbruptly); }}
这里重点拆解分析一下判断当前工作线程中断状态的代码:
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt();// 先简化一下判断逻辑,如下// 判断线程池状态是否至少为STOP,rs >= STOP(1)boolean atLeastStop = runStateAtLeast(ctl.get(), STOP);// 判断线程池状态是否至少为STOP,同时判断当前线程的中断状态并且清空当前线程的中断状态boolean interruptedAndAtLeastStop = Thread.interrupted() && runStateAtLeast(ctl.get(), STOP);if (atLeastStop || interruptedAndAtLeastStop && !wt.isInterrupted()){ wt.interrupt();}
Thread.interrupted()方法获取线程的中断状态同时会清空该中断状态,这里之所以会调用这个方法是因为在执行上面这个if逻辑同时外部有可能调用shutdownNow()方法,shutdownNow()方法中也存在中断所有Worker线程的逻辑,但是由于shutdownNow()方法中会遍历所有Worker做线程中断,有可能无法及时在任务提交到Worker执行之前进行中断,所以这个中断逻辑会在Worker内部执行,就是if代码块的逻辑。这里还要注意的是:STOP状态下会拒绝所有新提交的任务,不会再执行任务队列中的任务,同时会中断所有Worker线程。也就是,即使任务Runnable已经runWorker()中前半段逻辑取出,只要还没走到调用其Runnable#run(),都有可能被中断。假设刚好发生了进入if代码块的逻辑同时外部调用了shutdownNow()方法,那么if逻辑内会判断线程中断状态并且重置,那么shutdownNow()方法中调用的interruptWorkers()就不会因为中断状态判断出现问题导致二次中断线程(会导致异常)。
小结一下上面runWorker()方法的核心流程:
Worker先执行一次解锁操作,用于解除不可中断状态。通过while循环调用getTask()方法从任务队列中获取任务(当然,首轮循环也有可能是外部传入的firstTask任务实例)。如果线程池更变为STOP状态,则需要确保工作线程是中断状态并且进行中断处理,否则要保证工作线程必须不是中断状态。执行任务实例Runnale#run()方法,任务实例执行之前和之后(包括正常执行完毕和异常执行情况)分别会调用钩子方法beforeExecute()和afterExecute()。while循环跳出意味着runWorker()方法结束和工作线程生命周期结束(Worker#run()生命周期完结),会调用processWorkerExit()处理工作线程退出的后续工作。
下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法我们可以看出线程池是怎么让超过corePoolSize的那部分worker销毁的。
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}
注意这里一段代码是keepAliveTime起作用的关键:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
如果线程允许空闲等待而不被销毁timed == false,workQueue.take任务: 如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
如果线程不允许无休止空闲timed == true, workQueue.poll任务: 如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null。
线程池业务选型
快速响应类型: 对于业务的并行操作,服务追求的是响应时间。比如客人信息和机票信息等聚合展示。从客户体验角度而言,响应越快越好,所以不应该设置队列去缓冲并发任务,调高corePoolSize和maxPoolSize去尽可能创造多的线程快速执行任务。
快速处理批量任务: 离线的大量任务,需要快速的执行,比如数据迁移/清洗工作。但与响应速度优先的场景区别在于,这类场景任务量巨大,并不需要瞬时完成,而是关注如何使用有限的资源,在单位时间内处理更多的任务,也就是吞吐量优先的问题。所以应该设置队列去缓冲并发任务,调整合适的corePoolSize,如果设置线程数过多可能引发线程上下文切换频繁的问题,反而降低处理任务的速度和吞掉量。
动态化线程池
线程池执行情况和任务类型相关性较大,IO密集型和CPU密集型的任务运行起来的情况差异非常大。当调用量发生变化时,导致最大核心数设置偏小,大量抛出RejectedExecutorException,触发服务降级条件。或者因为队列设置过长,最大线程数失效,导致执行时间过长,最终导致下游服务的大量调用超时失败等问题。此时,线程池也需要根据实际场景时时调整:
动态调参: 线程池构造参数有8个,但是最核心的是3个:corePoolSize、maximumPoolSize、workQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。并发性的场景主要分为两种:并行执行子任务,提高响应速度和并行执行大批次任务,提升吞吐量,这种情况会使用有界队列去缓冲大批量的任务。所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择等,参数修改后及时生效。任务监控: 支持应用粒度、线程池粒度、任务粒度的Transaction监控,可以看到线程池的任务执行情况、最大任务执行时间、平均任务执行时间、95/99线等。负载告警: 线程池任务队列积压到一定值的时候会通过Trip告知应用开发负责人。操作监控: 创建/修改和删除线程池都会通知到应用开发负责人。操作日志: 可以查看线程池的修改记录,具体的修改时间和修改前后的参数等。权限校验: 只有应用负责人才能够修改应用的线程池配置。参数动态化: JDK原生线程池ThreadPoolExecutor提供了几个setter方法
setCorePoolSize(int): voidsetKeepAliveTime(long, TimeUnit): voidsetMaximumPoolSize(int): voidsetRejectedExecutionHandler(RejectedExecutionHandler): voidsetThreadFactory(ThreadFactory): void
JDK允许通过ThreadPoolExecutor的实例来动态设置线程池的核心策略,当线程池运行期间调用setCorePoolSize方法时,线程池会直接覆盖原来的corePoolSize值,并且基于当前值和原始值的比较结果采取不同的处理策略。
【1】当前值 < 原始值: 说明有多余的worker线程,此时会向当前idle的worker线程发起中断请求,实现回收,多余的worker在下次idel的时候也会被回收。
【2】当前值 > 原始值: 判断阻塞队列中是否有待执行的任务,有则创建新的worker线程来执行任务,没有则暂不创建worker。
线程池内部会处理好当前状态做到平滑修改,基于上述的public方法,我们只需要维护ThreadPoolExecutor的实例,并且在需要修改的时候拿到实例修改其参数即可。
标签: #多线程juc包