龙空技术网

队列满了、任务还在提交,看看Java线程池的任务饱和策略

毕小宝 585

前言:

现时你们对“线程池满了怎么处理”都比较注重,你们都需要学习一些“线程池满了怎么处理”的相关文章。那么小编在网上汇集了一些对于“线程池满了怎么处理””的相关知识,希望我们能喜欢,同学们一起来学习一下吧!

引言

使用 Java 的任务管理框架执行任务过程中,当任务等待队列被填满时、又有新的任务提交后,饱和策略开始发挥作用。

ThreadPollExecutor 提供了四种饱和策略:

上一节已经看过它们的源码了,本节来验证一下它们的差异。

测试类准备

先定义一个 MyCommand 的任务,接收一个字符串信息:

public class MyCommand implements Runnable {	private String name;	public MyCommand(String name){		this.name = name;	}	@Override	public void run() {		System.out.println(Thread.currentThread().getName()+" ," +				"name: "+name+","+new Date());		try {			Thread.sleep(5000);		} catch (InterruptedException execption) {			execption.printStackTrace();		}	}	@Override	public String toString() {		return "MyCommand [name=" + name + "]";	}}

编写统一的测试类,线程池初始化大小为 2,等待队列大小为 2,当提交任务大于 4 时,第 5 个任务会因饱和策略的不同,而得到不同的执行结果。后文将通过设置不同的饱和策略,来测试它们的行为差异。

public class RejectPolicyTest {	private final ThreadPoolExecutor exec ;	public RejectPolicyTest(){		exec = new ThreadPoolExecutor(2,2,0L,TimeUnit.MICROSECONDS,				new LinkedBlockingQueue<Runnable>(2));		//TODO 设置不同的饱和策略		//exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());	}		public static void main(String[] args) {		MyCommand c1 = new MyCommand("c1");		MyCommand c2 = new MyCommand("c2");		MyCommand c3 = new MyCommand("c3");		MyCommand c4 = new MyCommand("c4");		MyCommand c5 = new MyCommand("c5");		RejectPolicyTest c = new RejectPolicyTest();		c.submit(c1);		c.submit(c2);		c.submit(c3);		c.submit(c4);		c.submit(c5);	}	public void submit(Runnable command){		System.out.println(Thread.currentThread().getName()+" submit tast..."+command);		try{			exec.submit(command);		}catch(Exception e){			System.out.println("Exception when submit task:"+e.getMessage());		}	}}
策略一:通知模式抛弃

AbortPolicy 是默认的饱和策略,该策略会抛出未检查异常 RejectedExecutionException,调用者可以捕获这个异常,然后根据自己的需求编写代码。比如,捕获异常并尝试重新提交任务,该策略还算友好,至少抛弃之前会通知任务提交者。

由于是默认策略,直接运行第一部分准备的测试类,结果如下:

main submit tast...MyCommand [name=c1]main submit tast...MyCommand [name=c2]main submit tast...MyCommand [name=c3]main submit tast...MyCommand [name=c4]main submit tast...MyCommand [name=c5]Exception when submit task:Task java.util.concurrent.FutureTask@33909752 rejected from java.util.concurrent.ThreadPoolExecutor@55f96302[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]pool-1-thread-2 ,name: c2,Sun Dec 22 19:10:56 CST 2019pool-1-thread-1 ,name: c1,Sun Dec 22 19:10:56 CST 2019pool-1-thread-2 ,name: c3,Sun Dec 22 19:11:01 CST 2019pool-1-thread-1 ,name: c4,Sun Dec 22 19:11:01 CST 2019

测试结果:主线程提交了 4 个任务后,队列满了,此时提交第 5 个任务时,线程池抛出了 RejectedExecutionException异常,主线程能够捕获并处理该异常。

策略二:静音模式抛弃

DiscardPolicy,默默收下任务,但是啥都不做,连异常都不抛,调用者根本不知道任务的状况。显然,这不利于任务的控制,所以笔者不建议用这种策略。

修改测试类,调整策略:

exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());

测试结果:提交了 5 个线程,只有 4 个任务执行了,最后提交的那个任务被无情抛弃了,而调用者浑然不觉

main submit tast...MyCommand [name=c1]main submit tast...MyCommand [name=c2]main submit tast...MyCommand [name=c3]main submit tast...MyCommand [name=c4]main submit tast...MyCommand [name=c5]pool-1-thread-2 ,name: c2,Sun Dec 22 19:12:38 CST 2019pool-1-thread-1 ,name: c1,Sun Dec 22 19:12:38 CST 2019pool-1-thread-2 ,name: c3,Sun Dec 22 19:12:43 CST 2019pool-1-thread-1 ,name: c4,Sun Dec 22 19:12:43 CST 2019
策略三:抛弃等待最久的任务

DiscardOldestPolicy 策略抛弃掉等待队列中等待最久的任务,将其移除队列,然后执行当前任务,对等待最久的任务不利。

修改测试类的策略:

exec.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

测试结果:提交了 5 个线程,但是只有 4 个任务执行了,等待最久的任务 c3 被抛弃,调用者也无从知晓。

main submit tast...MyCommand [name=c1]main submit tast...MyCommand [name=c2]main submit tast...MyCommand [name=c3]main submit tast...MyCommand [name=c4]main submit tast...MyCommand [name=c5]pool-1-thread-2 ,name: c2,Sun Dec 22 19:13:10 CST 2019pool-1-thread-1 ,name: c1,Sun Dec 22 19:13:10 CST 2019pool-1-thread-2 ,name: c4,Sun Dec 22 19:13:15 CST 2019pool-1-thread-1 ,name: c5,Sun Dec 22 19:13:15 CST 2019
策略四:调用者执行

CallerRunsPolicy 策略提供了一种调节机制,它不抛弃任务,也不抛出异常,而是将任务的运行请求回退到任务调用者,由提交任务的线程去执行自己刚提交的任务。

exec.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

测试结果:提交了 5 个线程,但是只有 4 个由工作线程执行,第 5 个任务由调用者执行。

main submit tast...MyCommand [name=c1]main submit tast...MyCommand [name=c2]main submit tast...MyCommand [name=c3]main submit tast...MyCommand [name=c4]main submit tast...MyCommand [name=c5]pool-1-thread-2 ,name: c2,Sun Dec 22 19:18:28 CST 2019pool-1-thread-1 ,name: c1,Sun Dec 22 19:18:28 CST 2019main ,name: c5,Sun Dec 22 19:18:28 CST 2019pool-1-thread-1 ,name: c3,Sun Dec 22 19:18:33 CST 2019pool-1-thread-2 ,name: c4,Sun Dec 22 19:18:33 CST 2019

结论:调用者运行的饱和策略实现了一种弹性调节机制,当工作队列被填满时,下一个待执行的任务会在提交任务的主线程中执行。

主线程执行任务该期间,线程资源被占用,将不能再提交任务。因此降低了任务的提交速率,为线程池争取了更多的时间来完成正在排队的任务。

策略五:调用者限制提交

前四种都是线程池自己的饱和策略,除此之外,还可以在任务提交方控制任务的提交速率,即限制任务的提交,避免产生任务饱和的情况。比如,借助信号量 Semaphore 来限制任务的到达率,这个同步工具类,可以控制同时访问某个特定资源的操作数量。

可以利用 Semaphoreacquire 获取一个虚拟许可,如果没有可用的许可,则阻塞该方法的调用线程直到有可用许可为止。如果线程池使用无界队列缓冲任务时,且未对任务数量做控制,容易导致内存耗尽。这时,可以和 Semaphore 搭配使用,设置信号量的上界,来控制任务的提交速率。

使用上一章的 MyCommand 任务,结合 Semaphore,实现一个调用者控制任务提交的示例:

/** *  * @title       :BoundedExecutor * @description :使用Semaphore控制任务的提交速率 * @since       :2019-12-22 */public class BoundedExecutor {	private final Executor exec;	private final Semaphore semaphore;		public BoundedExecutor(Executor exec,int bound){		this.exec = exec;		this.semaphore = new Semaphore(bound);	}		public void submitTask(final Runnable command) throws InterruptedException{		try{			semaphore.acquire();			exec.execute(new Runnable(){				@Override				public void run() {					try{						command.run();					}finally{						System.out.println("执行完成 ,release...");						semaphore.release();					}				}			});		}catch(RejectedExecutionException e){			System.out.println("队列已满,拒绝执行");			semaphore.release();		}	}		public static void main(String[] args) {		//虽然线程池大小为4,但是Semaphore限制每次只能有两个任务被执行		Executor exec = Executors.newCachedThreadPool();		BoundedExecutor b = new BoundedExecutor(exec,2);				MyCommand c1 = new MyCommand("c1");		MyCommand c2 = new MyCommand("c2");		MyCommand c3 = new MyCommand("c3");		MyCommand c4 = new MyCommand("c4");		MyCommand c5 = new MyCommand("c5");		try {			b.submitTask(c1);			b.submitTask(c2);			b.submitTask(c3);			b.submitTask(c4);			b.submitTask(c5);		} catch (InterruptedException execption) {			execption.printStackTrace();		}	}}

任务执行结果:

pool-1-thread-2 ,name: c2,Mon Dec 15 16:20:17 CST 2019pool-1-thread-1 ,name: c1,Mon Dec 15 16:20:17 CST 2019执行完成 ,release...执行完成 ,release...pool-1-thread-1 ,name: c4,Mon Dec 15 16:20:22 CST 2019pool-1-thread-3 ,name: c3,Mon Dec 15 16:20:22 CST 2019执行完成 ,release...执行完成 ,release...pool-1-thread-1 ,name: c5,Mon Dec 15 16:20:27 CST 2019执行完成 ,release...

执行结果分析:使用 Semaphore 限制每次只能提交两个任务,任务完成后释放信号量许可,可以有效地控制任务的提交速率。

启示录

有界线程池的四种饱和策略,只有 AbortPolicyCallerRunPolicy 对任务提交者是友好的,其他几种都会导致任务漏执行,对任务提交方是不利的。

折中的方案是,由调用者控制任务提交速率,自己根据线程池的配置大小,利用信号量控制任务的提交,这样就不会产生任务提交过量的情况了。

标签: #线程池满了怎么处理 #线程池如果满了再来一条线程怎么处理 #java主线程等待线程池执行完成 #线程池满了怎么办