龙空技术网

线程池之tomcat的ThreadPoolExecutor

最柔的光 132

前言:

现在你们对“apachethreadpool”大体比较重视,小伙伴们都想要知道一些“apachethreadpool”的相关文章。那么小编同时在网络上搜集了一些关于“apachethreadpool””的相关文章,希望我们能喜欢,姐妹们一起来了解一下吧!

tomcat里面有很多源码很优秀,这里笔者介绍一种不同于jdk的线程池.

org.apache.tomcat.util.threads.ThreadPoolExecutor

该类没有引用第三方jar包,里面的StringManager类(用于错误日志的输出)可以直接使用“+”来替换.

所以可以直接copy到自己的项目.

注意:高版本的tomcat源码中是ThreadPoolExecutor继承java.util.concurrent.AbstractExecutorService,低版本的是继承java.util.concurrent.ThreadPoolExecutor.

笔者的版本是10.1.x.

/**从线程池中选择一个线程来执行指定的任务,如果线程池已满且无法继续添加线程,则等待。如果在等待过程中线程池被关闭,则该任务将被拒绝。@param command 要执行的任务@throws RejectedExecutionException 如果无法将任务提交给线程池进行执行,则抛出此异常@throws NullPointerException 如果任务为 null,则抛出此异常*/public void execute(Runnable command) {  // submittedCount:已提交但尚未完成的任务数(线程中执行的任务+队列等待的任务) +1,afterExecutef任务正常执行完后中会减1    submittedCount.incrementAndGet();    try {      //从线程池中选择一个线程来执行指定的任务,如果线程池已满且无法继续添加线程,则等待。如果在等待过程中线程池被关闭,则该任务将被拒绝。        executeInternal(command);    } catch (RejectedExecutionException rx) {        if (getQueue() instanceof TaskQueue) {            /**            如果 Executor 接近最大池大小,并发          调用 execute() 可能会导致(由于 Tomcat 使用                TaskQueue) 在某些任务中被拒绝而不是排队。                  如果发生这种情况,请将它们添加到队列中。*/            final TaskQueue queue = (TaskQueue) getQueue();            if (!queue.force(command)) {              /**queue.force这个特殊处理可以保证在Tomcat的异步处理中,              提交的任务能够被尽可能多地执行,而不会因为TaskQueue的限制而被拒绝。              同时,这种处理方式也能帮助开发人员更好地控制线程池的大小和队列的长度,              以达到最优的性能和吞吐量。*/                submittedCount.decrementAndGet();                throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));            }        } else {            submittedCount.decrementAndGet();            throw rx;        }    }}/**从线程池中选择一个线程来执行指定的任务,如果线程池已满且无法继续添加线程,则等待。如果在等待过程中线程池被关闭,则该任务将被拒绝。@param command 要执行的任务@throws RejectedExecutionException 如果无法将任务提交给线程池进行执行,则抛出此异常@throws NullPointerException 如果任务为 null,则抛出此异常*/    private void executeInternal(Runnable command) {      /*首先检查任务是否为 null,如果是,则抛出 NullPointerException 异常。      注意,这里对于空任务的处理并不是由 RejectedExecutionHandler 来进行的,      而是直接在此方法中抛出异常。上面的execute方法catch不到的      */        if (command == null) {            throw new NullPointerException();        }        /*         * Proceed in 3 steps:         *         * 1.如果正在运行的线程数少于corePoolSize,         尝试启动一个新的线程,并将给定的任务作为其第一个任务。         addWorker方法原子性地检查runState和workerCount,         因此可以防止错误警报,如果没有必要添加线程,则返回false。         *         * 2. (机器翻译看不懂,笔者手翻下,欢迎指正,轻喷)如果这个任务被成功的放到队列,那么我们仍然需要再次检查是否我们需要新增一个线程(因为我们第一次检查后,存在一种可能有线程消亡         ,或者在我们进入这个方法以后线程池被关闭了),看后面的代码,我这里实在翻译不下去了.         *         * 3. 如果无法排队任务,则尝试添加新线程。如果失败,则知道我们线程已关闭或任务已饱和,因此拒绝任务。         */      /*      使用原子操作来增加正在运行的线程数,并检查当前线程池是否已满。      如果已满,则根据配置的拒绝策略来处理该任务。			*/        int c = ctl.get();        if (workerCountOf(c) < corePoolSize) {          /*1.如果正在运行的线程数少于corePoolSize,         尝试启动一个新的线程,并将给定的任务作为其第一个任务。         addWorker方法原子性地检查runState和workerCount,         因此可以防止错误警报,如果没有必要添加线程,则返回false         */            if (addWorker(command, true)) {                return;            }            c = ctl.get();        }      // 与jdk的线程池最大的不同就是tomcat的TaskQueue的offer.当前的线程数量小于最大线程数量的时候返回false,需要创建新线程        if (isRunning(c) && workQueue.offer(command)) {          //这个任务被成功的放到队列            int recheck = ctl.get();            if (! isRunning(recheck) && remove(command)) {              // 再次检查如果线程池关闭了并且能移除改任务,那就使用拒绝策略拒绝该任务                reject(command);            } else if (workerCountOf(recheck) == 0) {              // workerCountOf(recheck)获取工作线程数是否0,执行到这后,              //因为上面没有拒绝这个任务,所以要新增一个线程执行完这个任务                addWorker(null, false);            }        }        else if (!addWorker(command, false)) {          // 如果无法排队任务,则尝试添加新线程。如果失败,则知道我们线程已关闭或任务已饱和,因此拒绝任务。            reject(command);        }    }

最大不同点:

TaskQueue.java// 高版本getPoolSizeNoLock这个方法改进了,在操作频繁的地方,无锁方式可以大大减少不必要的性能开销@Overridepublic boolean offer(Runnable o) {  //we can't do any checks    if (parent==null) {        return super.offer(o);    }    //we are maxed out on threads, simply queue the object    if (parent.getPoolSizeNoLock() == parent.getMaximumPoolSize()) {      // 工作线程数量等于最大现场数量,入队列        return super.offer(o);    }    //we have idle threads, just add it to the queue    //我们有空闲线程,只需将其添加到队列中    if (parent.getSubmittedCount() <= parent.getPoolSizeNoLock()) {      //已提交但尚未完成的任务数量(线程中执行的任务+队列等待的任务)少于工作线程数量,只需将其添加到队列中        return super.offer(o);    }    //if we have less threads than maximum force creation of a new thread    //如果我们的线程少于创建新线程的最大力量    if (parent.getPoolSizeNoLock() < parent.getMaximumPoolSize()) {      // 工作线程数量少于最大数量的时候返回false        return false;    }    //if we reached here, we need to add it to the queue    return super.offer(o);}
使用示例:import org.apache.tomcat.util.threads.TaskQueue;import org.apache.tomcat.util.threads.ThreadPoolExecutor;TaskQueue queue = new TaskQueue(5);ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 9, 50, TimeUnit.SECONDS, queue);queue.setParent(executor);

tomcat提供的ThreadPoolExecutor相比jdk的线程池

当你的任务(正在执行的+队列等待)>你的工作线程 && 你的工作线程<最大线程数量

就会直接创建新线程.

总结:

由于org.apache.tomcat.util.threads.TaskQueue的特殊性

jdk的线程池是核心线程满了后,后续任务添加队列,队列满了后如果当前现场没有到达最大核心线程数量,就会创建新线程.

tomcat的线程池是任务数量(正在执行的数量+队列的数量)>你的工作线程 && 你的工作线程<最大线程数量 会创建新线程.

如果jdk的队列也使用org.apache.tomcat.util.threads.TaskQueue,但是queue.setParent(executor);这个不支持jdk线程池,所以jdk的队列用org.apache.tomcat.util.threads.TaskQueue也达不到tomcat线程池的效果.

同时tomcat提供的ThreadPoolExecutor会queue.force(command)个特殊处理可以保证在Tomcat的异步处理中,提交的任务能够被尽可能多地执行,而不会因为TaskQueue的限制而被拒绝。

如果你的线程优先响应,不妨试试tomcat线程池加org.apache.tomcat.util.threads.TaskQueue队列

标签: #apachethreadpool