龙空技术网

Java线程池

Esgoon 283

前言:

眼前姐妹们对“java线程数据库”大概比较关切,你们都需要分析一些“java线程数据库”的相关文章。那么小编同时在网摘上汇集了一些有关“java线程数据库””的相关知识,希望同学们能喜欢,大家快快来了解一下吧!

Java多线程的实现方式

Java程序中,常见有4种方式实现多线程

①继承Thread类

②实现Runnable接口

③实现Callable接口

④使用Executor框架

在JDK5之前,创建线程有2种方式,一种是继承Thread类,另外一种是实现Runnable接口。这2种方式在执行完任务之后都无法获取执行结果,如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。自Java 5起,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

方式①举例:继承Thread类,实现run()方法,调用start()方法启动线程

public class ThreadSample extends Thread {	@Override	public void run() {		System.out.println(this.getName() + " do some work...");	}	public static void main(String[] args) {		ThreadSample threadSample = new ThreadSample();		threadSample.setName("thread-a");		threadSample.start();	}}start()方法调用后并不是立即执行多线程代码,而是使得该线程变为Ready状态,等待CPU分配执行时间。

方式②举例:实现Runnable接口,实现run()方法,将实例对象传入Thread构造方法

public class ThreadSample implements Runnable {	@Override	public void run() {		System.out.println(Thread.currentThread().getName() + " do some work...");	}	public static void main(String[] args) {		Thread threadSample = new Thread(new ThreadSample(), "thread-b");		threadSample.start();	}}

方式③举例:实现Callable接口和FutureTask对象组合

public class ThreadSample implements Callable<Integer> {	@Override	public Integer call() {		int result = 0;		for (int i = 0; i <= 10; i++) {			result++;		}		return result;	}	public static void main(String[] args) throws InterruptedException, ExecutionException {		//1、实例化Callable对象		ThreadSample callableSample = new ThreadSample();		//2、创建装载线程的FutureTask对象		FutureTask<Integer> ft = new FutureTask<Integer>(callableSample);		//3、启动线程		Thread thread = new Thread(ft, "thread-callable");		thread.start();		//4、获取返回结果		Integer result = ft.get();		System.out.println("result = " + result);	}}

与使用Runnable相比,Callable功能更强大

可以有返回值,支持泛型的返回值,借助FutureTask类获取返回值;可以捕获程序执行过程中的异常。

方式④举例:线程池实现

public class ThreadSample implements Callable<Integer> {	@Override	public Integer call() {		int result = 0;		for (int i = 0; i <= 10; i++) {			result++;		}		return result;	}	public static void main(String[] args) throws InterruptedException, ExecutionException {		ExecutorService services = Executors.newSingleThreadExecutor();		Future<Integer> future = services.submit(new ThreadSample());		System.out.println("result = " + future.get());		services.shutdown();	}}

以上继承Thread类、实现Runnable接口、实现Callable接口三种方式中,理论上优先选用Runnable接口和Callable接口,如果有需要返回值则选用Callable接口实现方式。此外,无论何时,当看到这种形式的代码:

new Thread(runnable).start()

并且最终希望有一个更加灵活的执行策略时,都可以认真考虑使用Executor代替Thread。

使用线程池的好处

在线程池中执行任务线程,比起每个任务创建一个线程,有很多优势。

减少系统开销。重用存在的线程,而不是创建新的线程,这可以在处理多请求时抵消线程创建、销毁产生的开销。提升请求响应性。在请求到达时,工作者线程已经存在,可以立即执行,因此提高了响应性。增强线程的可管理性。通过调整线程池的大小,可以充分利用CPU资源,同时可以防止过多的线程相互竞争资源,导致应用程序耗尽内存或者失败。线程池可以对线程资源进行统一分配、调优和监控。线程池的工作流程

Java线程池的核心实现类是ThreadPoolExecutor类,任务提交到线程池时,具体处理由ThreadPoolExecutor类的execute()方法执行。当一个新任务提交到线程池时,线程池的处理流程如下:

①判断核心线程池里的线程是否都在执行任务,如果不是,创建一个新的工作线程来执行任务。如果是,则进行下一步流程。

②判断阻塞队列是否已满,如果没满,则将新提交的任务存储在阻塞队列中。如果满,则进行下一步流程。

③判断线程池中的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果线程池中所有的线程都处于工作状态,则交给饱和策略来处理这个任务。

ThreadPoolExecutor通用的构造函数为:

参数说明如下

corePoolSize

线程池基本大小。提交一个任务到线程池时,线程池会创建一个新的线程来执行任务。新任务提交时,当目前线程数小于corePoolSize,即使有空闲的线程可以执行该任务,也会创建新的线程。如果线程池中的线程数已经大于或等于corePoolSize,则不会创建新的线程。当一个ThreadPoolExecutor被初始创建后,所有核心线程并非立即开始,而是等到有任务提交的时刻。但如果调用了prestartAllCoreThreads()方法,所有核心线程会立即启动。

maximuPoolSize

线程池允许创建的最大线程数。当阻塞队列满,且线程数小于maximumPoolSize时,便可以创建新的线程执行任务。如果使用无界阻塞队列,该参数无效。

keepAliveTime

线程池的工作线程空闲后,保持存活的时间。如果任务多且任务执行时间较短,可以调大该值,提高线程利用率。如果一个线程已经闲置的时间超过了存活时间,它将成为一个被回收的候选者,如果当前的池的大小超过了的corePoolSize,线程池会终止它。

unit

与keepAliveTime相关联的时间单位。可选值有DAYS、HOURS、MINUTES、毫秒、微妙、纳秒。

workQueue

用于保存等待执行的任务的阻塞队列。ThreadPoolExecutor允许提供一个BlockingQueue来持有等待执行的任务。任务排队有3种基本方法:无限队列、有限队列、同步移交。队列的选择和很多其他的配置参数都有关系,比如池的大小等。ThreadPoolExecutor推荐以下几种阻塞队列。LinkedBlockingQueue:线程安全的阻塞队列,先进先出(FIFO)。可以指定容量(有限队列),也可以不指定(无限队列),不指定的话默认最大是Integer.MAX_VALUE。如果所有的工作者线程都处于忙碌状态,新提交的任务将会在队列中等候。如果新的任务持续快速到达,超过了它们被执行的速度,队列会无限制地增加。线程池中能创建的最大线程数为corePoolSize指定的值。ArrayBlockingQueue:数组实现的有界阻塞队列,先进先出(FIFO)。线程池中能创建的最大线程数为maximumPoolSize指定的值。有界队列有助于避免资源耗尽,当队列满时,如果还有新的任务到达,将根据饱和策略(也称拒绝策略)进行处理。对于一个有界队列,队列的长度与池的长度必须一起调节。一个大队列和一个小池,可以控制对内存和CPU的使用,也可以减少上下文切换,但相应的吞吐量也会减小。SynchronousQueue:SynchronousQueue并不是一个真正的队列,而是一种管理直接在线程间移交信息的机制。当新任务到达时,如果所有工作线程都处于忙碌状态,且线程池数量小于maximumPoolSize,就会创建一个新的线程。否则根据饱和策略处理。只有池是无限的,或者可以接受任务被拒绝,SynchronousQueue才是一个有实际价值的选择。PriorityBlokingQueue: 一个支持优先级的无界阻塞队列 。使用该队列,线程池中能创建的最大线程数为corePoolSize。

threadFactory

线程池创建线程时使用的线程工厂,可以不指定该参数,使用默认的线程工厂Executors.defaultThreadFactory()。

handler

饱和策略,也称拒绝策略。当有限队列满且线程池满的情况下,新的任务到达后,饱和策略将进行处理。有以下几种:

ThreadPoolExecutor.AbortPolicy()

抛出RejectedExecutionException异常。默认策略。调用者可以捕获抛出的异常,进行相应的处理。

ThreadPoolExecutor.CallerRunsPolicy()

不会丢弃任务,也不会抛出异常,由向线程池提交任务的线程来执行该任务。

ThreadPoolExecutor.DiscardPolicy()

丢弃当前的任务

ThreadPoolExecutor.DiscardOldestPolicy()

丢弃最旧的任务(最先提交而没有得到执行的任务),并执行当前任务。

ThreadPoolExecutor执行流程如下

①当新任务提交时,如果当前线程池中的线程数小于corePoolSize,则创建新的线程处理。

②如果线程池中的线程大于或等于corePoolSize,且BlockingQueue未满,则将新任务加入BlockingQueue。

③如果BlockingQueue已满,且线程池中的线程数小于maximumPoolSize,则创建新的工作线程来执行任务。

④如果当前运行的线程大于或等于maximumPoolSize,将执行饱和策略。即调用RejectedExecutionHandler.rejectExecution()方法。

几种常见的线程池

Executors提供了一些静态工厂方法创建的常见线程池。

newFixedThreadPool

创建一个固定长度的线程池,每当提交一个任务就创建一个线程,直到达到线程池最大长度,这时线程池长度不再变化。如果一个线程异常退出,线程池会补充一个新的线程。

newCachedThreadPool

创建一个可缓存的线程池,不会对池的长度做限制。如果线程池长度超过需求,它可以灵活地回收空闲的线程;当需求增加时,它可以灵活地添加新的线程。

newSingleThreadExecutor

创建一个单线程的executor,只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有一个新的取代它。

newScheduledThreadPool

创建一个定长的线程池,支持定时执行任务。

这几种线程池中,newFixedThreadPool和newSingleThreadExecutor默认使用无线队列LinkedBlockingQueue。newCachedThreadPool使用了同步移交队列SynchronousQueue。newScheduledThreadPool使用了DelayedWorkQueue阻塞队列。

newCachedThreadPool的corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,其他几种线程池corePoolSize与maximumPoolSize一样大。

线程池的状态与生命周期

线程池有5种状态,在ThreadPoolExecutor 源码中有定义。

RUNNING : 线程池最初创建后的初始状态,该状态的线程池既能接受新提交的任务 ,又能处理阻塞队列中任务。SHUTDOWN: 调用shutdown()方法后进入该状态。该状态的线程池不能接收新提交的任务 ,但是能处理阻塞队列中的任务。STOP: 调用shutdownNow()方法后进入该状态。该状态的线程池不接受新提交的任务 ,也不处理在阻塞队列中的任务 ,还会中断正在执行的任务。TIDYING: 当所有的任务都已终止,工作线程数为0的状态。线程池进入该状态后会调用 terminated() 钩子方法进入TERMINATED 状态。TERMINATED: 在terminated()钩子方法执行完后进入该状态。

调用线程池的shutdown()或者shutdownNow()方法可以关闭线程池,遍历线程池中工作线程,逐个调用interrupt方法来中断线程。

Shutdown()方法与shutdownNow()的特点:

Shutdown()方法将线程池的状态设置为SHUTDOWN状态,只会中断空闲的工作线程。

shutdownNow()方法将线程池的状态设置为STOP状态,会中断所有工作线程,不管工作线程是否空闲。

调用两者中任何一种方法,都会使isShutdown()方法的返回值为true;线程池中所有的任务都关闭后,isTerminated()方法的返回值为true。

通常使用shutdown()方法关闭线程池,如果不要求任务一定要执行完,则可以调用shutdownNow()方法。

确定线程池的大小

线程池合理的长度取决于所要执行的任务特征以及程序所部署的系统环境,一般根据这二者因素使用配置文件提供或者通过CPU核数N:

N = Runtime.getRuntime().availableProcessors();

动态计算而不是硬编码在代码中。主要是避免线程池过大或过小这两种极端情况。如果线程池过大,会导致CPU和内存资源竞争,频繁的上下文切换,任务延迟,甚至资源耗尽。如果线程池过小,会造成CPU和内存资源未充分利用,任务处理的吞吐量减小。

对于任务特征来说,需要分清楚是计算密集型任务,还是IO密集型任务或是混合型任务。

计算密集型也称为CPU密集型,意思就是该任务需要大量运算,而没有阻塞,CPU一直全速运行。CPU密集型任务只有在多核CPU上才可能得到加速,即scale up,通过多线程程序享受到增加CPU核数带来的好处。

IO密集型,即该任务需要大量的IO操作,例如网络连接、数据库连接等等,执行任务过程中会有大量的阻塞。在单线程上运行IO密集型任务会导致浪费大量的CPU运算能力浪费在等待。

对于计算密集型任务,原则是配置尽可能少的线程数,通常建议以下计算方式设置线程池大小来获得最优利用率:

N(线程数) = N(CPU核数)+ 1

对于IO密集型任务,考虑的因素会多一些,原则是因为较多的时间处于IO阻塞,不能处理新的任务,所有线程数尽可能大一些,通常建议是:

N(线程数) = 2 x N(CPU核数) + 1

或者更精确的:

N(线程数) = N(CPU核数) x U x (1 + W/C)

其中U表示CPU使用率,W/C表示IO等待时间与计算时间的比率,这个不需要太精确,只需要一个估算值。例如,4核CPU,CPU使用率80%,IO等待时间1秒,计算时间0.1秒,那么线程数为:4 x 0.8 x 11≈35。一些文章中还提到这种计算方式:

N(线程数) = N(CPU核数) x U / (1 - f)

其中U表示CPU使用率,f表示阻塞系数,即IO等待时间与任务执行总时间的比率:W/(W + C)。根据上面的例子计算出线程数为:4 x 0.8 / 0.09≈35。两种计算方式的结果是很相近的。

以上的计算方式和建议尽可以作为理论参考值,实际业务中可能并不完全按照这个计算值来设置。可以根据对线程池各项参数的监控,来确定一个合理的值。ThreadPoolExecutor提供的一些可用于获取监控的参数方法如下:

getTaskCount():线程池需要执行的任务数量,包括已经执行完的、未执行的和正在执行的。getCompletedTaskCount():线程池在运行过程中已完成的任务数量 ,completedTaskCount <= taskCount。getLargestPoolSize():线程池曾经创建过的最大线程数量 ,通过这个数据可以知道线程池是否满过。如等于线程池的最大大小 ,则表示线程池曾经满了。getPoolSize(): 线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以线程池的线程数量只增不减 。getActiveCount():获取活动的线程数。

此外,可以通过继承ThreadPoolExecutor并重写它的 beforeExecute(),afterExecute() 和 terminated()方法,我们可以在任务执行前,执行后和线程池关闭前做一些统计、日志输出等等操作,以帮助我们更好地监控到线程池的运行状态。

标签: #java线程数据库