龙空技术网

Java线程池的正确创建方式

观止 95

前言:

当前小伙伴们对“java线程绑定cpu内核”大概比较珍视,小伙伴们都想要学习一些“java线程绑定cpu内核”的相关内容。那么小编同时在网摘上汇集了一些对于“java线程绑定cpu内核””的相关内容,希望兄弟们能喜欢,咱们快快来了解一下吧!

在阿里Java开发手册里边,关于线程池创建有一条强制规则,如下图,里边也列出了相应的弊端,但是我觉得最大的弊端还是使用Executors之后,开发人员就会忽略掉线程池内部的实现。

ThreadPoolExecutor构造方法

创建线程池使用ThreadPoolExecutor,有7个参数:

corePoolSize - 线程池核心线程数。当线程池中的线程数量为 corePoolSize 时,即使这些线程处于空闲状态,也不会销毁(除非设置 allowCoreThreadTimeOut)。maximumPoolSize - 线程池的最大线程数,线程池中允许的线程数量的最大值。keepAliveTime - 当线程数大于核心线程数时,多余的空闲线程将在销毁之前等待新任务的最长时间。unit - keepAliveTime 的时间单位。workQueue - 用来储存等待执行任务的队列。threadFactory - 线程工厂,线程池创建线程时使用的工厂。handler - 拒绝策略,也就是达到线程边界和任务队列满时,针对新任务的处理方法。

关于线程数

我们的应用一般分为IO密集型和CPU密集型两类,两种类型需要的线程池数量有个简单的计算规则,如下:

IO密集型应用,则线程池大小设置为 2N+1;CPU密集型应用,则线程池大小设置为 N+1;

这里边的N表示核心线程数。这个值呢,可以通过下边的方法获取到:

// 这里获取计算资源(核心线程数)int poolSize = Runtime.getRuntime().availableProcessors();

当然,上边这个规则也只是适用于大部分应用,也不是绝对的,有些也需要根据实际情况以及实际业务来调整。

此外,在《Java虚拟机并发编程》一书中,对于IO密集型应用,还提出了另外一套公式:

线程池大小 = CPU核心数/(1-阻塞系数)

这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。8核CPU就是8/(1-0.9)=80。但是这个阻塞系数在树上也没给出明确定义,是个经验值,所以有时候还是要按照实际业务来优化调整。

关于拒绝策略

内置有四种拒绝策略:

CallerRunsPolicy,由调用线程处理该任务。AbortPolicy,丢弃任务并抛出RejectedExecutionException异常。 这个也是默认策略。DiscardPolicy,丢弃任务,但是不抛出异常。DiscardOldestPolicy,丢弃队列最前面的任务,然后重新尝试执行任务。

当然,这些策略你也可以扩展或者自定义,像dubbo就扩展了AbortPolicy策略,AbortPolicyWithReport实现了原有策略功能后,添加了记录日志和执行堆栈功能。

关于阻塞队列

线程池使用的都是阻塞队列,JDK8内置有7种:

ArrayBlockingQueue:一个基于数组创建的阻塞队列,并且是有界的,所以必须制定大小且不可变,同时在并发实现上采用了ReentrantLock来解决并发问题。需要注意的是ArrayBlockingQueue只有一把锁,put和take操作会相互阻塞。DelayQueue:延迟队列,使得插入其中的元素在延迟一定时间后,才能获取到,插入其中的元素需要实现java.util.concurrent.Delayed接口。该接口需要实现getDelay()和compareTo()方法。getDealy()返回0或者小于0的值时,delayedQueue通过其take()方法就可以获得此元素。compareTo()方法用于实现内部元素的排序,一般情况,按元素过期时间的优先级进行排序是比较好的选择。LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移除元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。LinkedBlockingDeque是可选容量的,在初始化时可以设置容量防止其过度膨胀,如果不设置,默认容量大小为Integer.MAX_VALUE。LinkedBlockingQueue:一个基于链表实现的阻塞队列,它和ArrayBlockingQueue十分相似,但和ArrayBlockingQueue有个很大的差别就是它拥有两个锁,所以在put和take的操作可以同时进行。LinkedTransferQueue:是一个由链表结构组成的无界阻塞TransferQueue队列。PriorityBlockingQueue:顾名思义,按照优先级排序且无边界的队列。插入其中的元素必须实现java.lang.Comparable接口。其排序规则就按此实现。SynchronousQueue:是无界的,是一种无缓冲的等待队列,但是由于该Queue本身的特性,在某次添加元素后必须等待其他线程取走后才能继续添加;可以认为SynchronousQueue是一个缓存值为1的阻塞队列,但是 isEmpty()方法永远返回是true,remainingCapacity() 方法永远返回是0,remove()和removeAll() 方法永远返回是false,iterator()方法永远返回空,peek()方法永远返回null。而且声明SynchronousQueue需要指定公平模式。如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(默认),SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者。而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

阿里手册上罗列的弊端,就有队列长度问题。所以自定义线程池,队列的选择和定义也十分重要,尤其是队列长度问题。

关于ThreadFactory

ThreadFactory用默认的也没什么问题,但是建议还是自定义,至少给自己创建的线程取个不一样的名字,在排查问题、检查性能时候好有个明确的标识。

总结

Executors框架封装了线程池的操作,用起来简单,但代价还是有的,这个还是要在实际开发过程中去多用、多想,避免掉进坑里。

阿里手册中强制使用ThreadPoolExecutor创建线程池,个人觉得也没有必要完全依照去做。你完全可以用Executors框架去建线程池,前提是你明白线程池的创建,确认用这个框架建线程池之后,业务中不会出现OOM或者其他的问题。

标签: #java线程绑定cpu内核