龙空技术网

ThreadPoolExecutor 线程池任务队列满了再添加任务会执行拒绝策略

IT技术控 175

前言:

现在各位老铁们对“线程池超过最大线程数后怎么处理请求”大致比较注意,你们都想要分析一些“线程池超过最大线程数后怎么处理请求”的相关内容。那么小编在网络上搜集了一些对于“线程池超过最大线程数后怎么处理请求””的相关知识,希望姐妹们能喜欢,朋友们一起来了解一下吧!

问题

大家平时应该都有自己定义过线程池

下面是线程池ThreadPoolExecutor的构造器

java复制代码public ThreadPoolExecutor(int corePoolSize,                          int maximumPoolSize,                          long keepAliveTime,                          TimeUnit unit,                          BlockingQueue<Runnable> workQueue,                          ThreadFactory threadFactory,                          RejectedExecutionHandler handler) {    //省略}

自定义线程池的时候需要指定一些参数

比较重要的就是corePoolSize,maximumPoolSize和workQueue这几个参数

corePoolSize

核心线程数是指要保持活动状态的最小工作线程数(除非设置了allowCoreThreadTimeOut)

maximumPoolSize

线程池最大工作线程数

workQueue

用于容纳任务并将任务移交给工作线程的队列

那么大家有没有想过这样一个问题

当线程数量等于核心线程数corePoolSize之后

这时调用execute添加一个新的任务是会放到任务队列还是在最大线程数maximumPoolSize的范围内新增一个线程来执行

用代码说话

为了得出问题的答案,我写了一个测试方法

java复制代码public class ThreadPoolExecutorTest {    @Test    public void test() {        int corePoolSize = 1;//核心线程数        int maximumPoolSize = 2;//最大线程数        int workQueueSize = 1;//任务队列大小        long keepAliveTime = 0;        ThreadPoolExecutor executor = new ThreadPoolExecutor(                corePoolSize, maximumPoolSize,                keepAliveTime, TimeUnit.MILLISECONDS,                new ArrayBlockingQueue<>(workQueueSize),                Executors.defaultThreadFactory(),                new RejectedExecutionHandler() {                    @Override                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                        Task task = (Task) r;                        System.out.println("第" + task.index + "个任务被拒绝执行");                        System.out.println("线程池中的线程数量:" + executor.getPoolSize());                        System.out.println("队列中等待的任务数量:" + executor.getQueue().size());                        Task peek = (Task) executor.getQueue().peek();                        System.out.println("队列中等待的任务是第" + peek.index + "个任务");                        System.out.println("------------------------------------\n");                    }                });                //第一个任务        executor.execute(new Task(1, executor));        sleep(100);        //第二个任务        executor.execute(new Task(2, executor));        sleep(100);        //第三个任务        executor.execute(new Task(3, executor));        sleep(100);        //第四个任务        executor.execute(new Task(4, executor));        sleep(100);    }    void sleep(long time) {        try {            Thread.sleep(time);        } catch (InterruptedException e) {            throw new RuntimeException(e);        }    }    //任务    class Task implements Runnable {        int index;//表示这是第几个任务        ThreadPoolExecutor executor;        Task(int index, ThreadPoolExecutor executor) {            this.index = index;            this.executor = executor;        }        @Override        public void run() {            System.out.println("这是第" + index + "个任务");            System.out.println("线程池中的线程数量:" + executor.getPoolSize());            System.out.println("队列中等待的任务数量:" + executor.getQueue().size());            System.out.println("------------------------------------\n");            sleep(100000);//使任务不结束,保证线程占用        }    }}

指定核心线程数为1,最大线程数为2,任务队列大小为1

然后依次执行四个任务,看看最后结果会是什么样

在运行测试代码之前,不妨让我们先来做个假设

因为第一个任务肯定是会执行的,而第四个任务肯定是会被拒绝的(前三个任务会消耗掉最大线程数和任务队列,2 + 1 = 3)

所以不确定的就是第二个任务和第三个任务

如果任务放到队列中就不会被执行(这里我用sleep来保证任务不会结束)

那么我们只需要看任务二和任务三哪个不会被执行就能知道是先放队列还是先加线程

假设一:先放队列

任务二会被放到任务队列,任务三会因为队列满了而新加线程执行

我们可以根据程序预测控制台的输出是

console复制代码这是第1个任务线程池中的线程数量:1队列中等待的任务数量:0------------------------------------这是第3个任务线程池中的线程数量:2队列中等待的任务数量:1------------------------------------第4个任务被拒绝执行线程池中的线程数量:2队列中等待的任务数量:1队列中等待的任务是第2个任务------------------------------------

第一个任务会占用核心线程数,所以线程数量为1,队列为0

第二个任务因为核心线程数已经满了,会被放到队列中,所以不会打印任何信息

第三个任务因为核心线程数和任务队列都满了,会新增一个线程执行,所以线程数为1 + 1 = 2,队列中是任务二,数量为1

第四个任务因为任务一和任务三已经让线程数达到了最大线程数并且任务二让队列满了,所以会被拒绝执行

假设二:先加线程

任务二会新加线程执行,任务三会因为到达最大线程数而被放到任务队列

我们可以根据程序预测控制台的输出是

console复制代码这是第1个任务线程池中的线程数量:1队列中等待的任务数量:0------------------------------------这是第2个任务线程池中的线程数量:2队列中等待的任务数量:0------------------------------------第4个任务被拒绝执行线程池中的线程数量:2队列中等待的任务数量:1队列中等待的任务是第3个任务------------------------------------

第一个任务会占用核心线程数,所以线程数量为1,队列为0

第二个任务因为核心线程数已经满了,会新增一个线程执行,所以线程数为1 + 1 = 2,队列为0

第三个任务因为核心线程数和任务队列都满了,会被放到队列中,所以不会打印任何信息

第四个任务因为任务一和任务二已经让线程数达到了最大线程数并且任务三让队列满了,所以会被拒绝执行

验证

接下来就让我们来运行测试代码

新机子哇伊自摸一刀子

console复制代码这是第1个任务线程池中的线程数量:1队列中等待的任务数量:0------------------------------------这是第3个任务线程池中的线程数量:2队列中等待的任务数量:1------------------------------------第4个任务被拒绝执行线程池中的线程数量:2队列中等待的任务数量:1队列中等待的任务是第2个任务------------------------------------

最终的结果是第二个任务会被放到任务队列

所以也就是说会先放队列再加线程

不知道是不是和大家原来的理解一样呢

源码

最后一步,当然是要窥探一下线程池的源码了

首先需要提前说明

在线程池中有一个概念叫Worker

一个Worker就代表一条线程,Worker中持有Thread对象

java复制代码private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable {    /** Thread this worker is running in.  Null if factory fails. */    final Thread thread;}

话不多说,直接来看execute方法

java复制代码public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    //这里有一大段注释被我删了    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        if (addWorker(command, true))            return;        c = ctl.get();    }    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

我们先来看workerCountOf这个方法,根据命名就能很好的理解,表示获得当前线程池中有多少线程

接着看if (workerCountOf(c) < corePoolSize)这个条件语句

这个if判断当线程数量如果小于核心线程数,那么就调用addWorker方法

addWorker方法也很好理解,就是添加一个线程,它的定义是这样的

java复制代码boolean addWorker(Runnable firstTask, boolean core) {}

返回值是一个布尔值,返回true表示添加成功,false表示添加失败(比如在这个过程中调用了shutdown或是另一个线程已经抢先添加了一个线程等情况)

第一个参数firstTask是当Worker运行之后需要执行的任务

第二个参数core表示是否占用核心线程数

如果core为true那么会用当前线程数和核心线程数进行比较,如果核心线程数已经到上限就会直接返回false

如果core为false那么会用当前线程数和最大线程数进行比较,如果最大线程数已经到上限就会直接返回false

这里需要注意,核心线程数和最大线程数是数量,线程本身不存在是核心线程或非核心线程的概念

我们回来继续看这段代码(省略不重要的代码)

java复制代码if (workerCountOf(c) < corePoolSize) {    if (addWorker(command, true))        return;}

也就说当线程数量小于核心线程数时,添加一个占用核心线程数的任务

如果成功就返回,如果失败就说明可能核心线程数满了或是已经shutdown了

因为我们的测试代码不存在shutdown的情况

所以只可能是核心线程数到达上限了

代码会走到if (isRunning(c) && workQueue.offer(command))

java复制代码public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    //这里有一大段注释被我删了    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        //核心线程数满了        if (addWorker(command, true))            return;        c = ctl.get();    }    //会执行这个 if    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    else if (!addWorker(command, false))        reject(command);}

isRunning判断当前线程池的状态,我们的测试代码中肯定是返回true

这样我们的任务就通过workQueue.offer(command)被放到了任务队列workQueue中

如果这个时候workQueue.offer(command)返回false说明队列也满了

那么就会到else if分支调用addWorker(command, false)

java复制代码public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    //这里有一大段注释被我删了    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        //核心线程数满了        if (addWorker(command, true))            return;        c = ctl.get();    }    //队列也满了    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    //会执行这个 else if    else if (!addWorker(command, false))        reject(command);}

相当于添加一个占用非核心线程数的任务

如果这个任务也添加失败了,就说明已经到了最大线程数,直接执行拒绝策略

我们可以尝试还原测试代码的执行逻辑

java复制代码public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    //这里有一大段注释被我删了    int c = ctl.get();    if (workerCountOf(c) < corePoolSize) {        //任务一在这里添加成功并执行        if (addWorker(command, true))            return;        c = ctl.get();    }    //任务二在这里被放到了队列中没有执行    if (isRunning(c) && workQueue.offer(command)) {        int recheck = ctl.get();        if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            addWorker(null, false);    }    //任务三在这里添加成功并执行    else if (!addWorker(command, false))        //任务四在这里被拒绝        reject(command);}
结束

其实在我看源码之前,我一直以为会先加线程再放队列

因为在我看来,我们添加的任务一定是想要尽快执行

如果我们设置的是无界队列,那不是基本用不到最大线程

没想到实际的逻辑竟然有点反直觉

标签: #线程池超过最大线程数后怎么处理请求 #线程池如果满了再来一条线程怎么处理 #线程池满了怎么办