前言:
今天同学们对“java 超时机制实现单线程”可能比较注重,兄弟们都想要学习一些“java 超时机制实现单线程”的相关资讯。那么小编同时在网摘上网罗了一些有关“java 超时机制实现单线程””的相关文章,希望姐妹们能喜欢,看官们一起来了解一下吧!异步编排
在业务开发的过程中,我们为了降低接口耗时,经常会用到线程池,书写多线程数据获取、同步阻塞获取结果的业务逻辑。
常见的使用方法如下:
Future
@Slf4j@SpringBootTestpublic class OtherTest { public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); public static void main(String[] args) { Future<Integer> submit1 = executor.submit(() -> { // 业务耗时逻辑1 return 1; }); Future<Integer> submit2 = executor.submit(() -> { // 业务耗时逻辑2 return 2; }); Future<Integer> submit3 = executor.submit(() -> { // 业务耗时逻辑3 return 3; }); try { Integer integer1 = submit1.get(); Integer integer2 = submit2.get(); Integer integer3 = submit3.get(); System.out.println(integer1); System.out.println(integer2); System.out.println(integer3); } catch (Exception e) { e.printStackTrace(); } }}复制代码
假设一个接口涉及到3个业务逻辑,如下:
业务逻辑1耗时: 50ms业务逻辑2耗时: 30ms业务逻辑3耗时: 70ms
那么如果是传统的串行调用,接口总耗时:150ms
但如果是上面的利用线程池的方式进行调用,那么该接口耗时取决于耗时最长的那个业务逻辑,即该接口耗时为: 70ms
可以看到,接口耗时是有明显降低的~
CompletableFuture
当然,上面虽然对接口进行异步编排后,接口耗时有着下降,但是如果说我们的耗时业务逻辑有着十几二十个?且业务逻辑之间存在依赖关系?那么我们怎么办?
很显然,上面的Future就不能满足我们的需求了,所以从JDK8开始,JDK提供了CompletableFuture工具类,为我们异步编排提供了很大的便利~
@Slf4j@SpringBootTestpublic class OtherTest { public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100)); public static void main(String[] args) { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { // 业务耗时逻辑1 return 1; }, executor); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { // 业务耗时逻辑2 return 2; }, executor); CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> { // 业务耗时逻辑3 return 3; }, executor); try { // 等待任务全部执行完毕 CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).get(); System.out.println(completableFuture1.get()); System.out.println(completableFuture2.get()); System.out.println(completableFuture3.get()); } catch (Exception e) { e.printStackTrace(); } }}复制代码
由于案例比较简单,无法突出CompletableFuture编排能力相比于Future的优势所在,这个在以后的文章里专门会为大家讲解,这不是本文的重点。
超时中断
在上面的案例中,细心的小伙伴可以发现,无论是CompletableFuture还是Future,我都是进行阻塞等待任务结束。
这,其实是一个非常危险的行为,如果下游rpc接口出现波动,那么接口耗时会明显提升,而我们却进行阻塞获取,线程会被一直阻塞无法及时释放,那么随着不断的请求进来,线程池线程、队列很快都会被打满,新任务都会被拒绝掉,从而影响用户体验,从而影响你的工资,从而影响你的工作。
所以,为了杜绝这种情况出现,我们在获取任务结果的时候需要设置等待时间~
Future和CompletableFuture的get方法都支持传入等待时间~
Future超时中断机制
Future提供了get方法来供我们阻塞获取任务结果,也支持传入超时时间,下面来了解下源码
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // 参数校验 if (unit == null) throw new NullPointerException(); int s = state; // 阻塞等待,如果超过超时时间任务还未完成,那么抛出超时异常 if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s);}复制代码
阻塞等待,timed为true代表存在超时时间
private int awaitDone(boolean timed, long nanos) throws InterruptedException { long startTime = 0L; WaitNode q = null; boolean queued = false; for (;;) { int s = state; // 任务状态 > COMPLETING说明已经执行完毕 if (s > COMPLETING) { // 当前线程不用等待了,将等待节点里的Thread设置为null if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // COMPLETING是任务执行完毕到真正将任务设置为完成态的一个中间状态 // 当任务的处于COMPLETING时,说明任务已经执行完了,但此时cpu时间不够没有继续执行 // 此时需要yield一下,让其他线程执行,从而将任务正确设置为完成状态 Thread.yield(); else if (Thread.interrupted()) { // 如果当前线程被打断了,则把当前线程从等待该任务完成的阻塞线程链表中删除 removeWaiter(q); // 抛出打断异常 throw new InterruptedException(); } else if (q == null) { // 如果是超时等待,且等待时间<=0,则直接返回当前任务状态 if (timed && nanos <= 0L) return s; // 初始化一个等待当前任务执行完的节点,内部包含 q = new WaitNode(); } else if (!queued) // 将WaitNode排队到线程等待链表中 queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { // 阻塞等待,存在超时时间 final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed >= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } if (state < COMPLETING) LockSupport.parkNanos(this, parkNanos); } else // 阻塞等待,没有超时时间 LockSupport.park(this); }}复制代码
上面源码注释已经比较完善了,但我们还是要总结一下
任务COMPLETING状态,是任务执行完毕到真正将任务设置为完成态的一个中间状态(见FutureTask的run方法)get方法无论是否存在超时时间,底层都是通过LockSupport的park、unpark方法来达到阻塞的目的对于每个任务,其内部会维护一个等待当前任务完成的线程链表waitersCompletableFuture超时中断机制
而从JDK 9开始,CompletableFuture 也提供了 orTimeout、completeTimeout 方法,来进行异步超时控制。
CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).orTimeout(1, TimeUnit.SECONDS).get();复制代码
根据上面代码,我们可以理解到,会等待completableFuture1, completableFuture2, completableFuture3三个任务执行1秒钟
如果超过1秒,则会抛出java.util.concurrent.TimeoutException
源码如下:
public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay(new Timeout(this), timeout, unit))); return this;}public CompletableFuture<T> completeOnTimeout(T value, long timeout, TimeUnit unit) { if (unit == null) throw new NullPointerException(); if (result == null) whenComplete(new Canceller(Delayer.delay( new DelayedCompleter<T>(this, value), timeout, unit))); return this;}复制代码
static final class Delayer { static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) { // 延时任务 return delayer.schedule(command, delay, unit); } static final ScheduledThreadPoolExecutor delayer; static { // 单线程 (delayer = new ScheduledThreadPoolExecutor( 1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true); }}static final class Timeout implements Runnable { final CompletableFuture<?> f; Timeout(CompletableFuture<?> f) { this.f = f; } public void run() { // 如果CompletableFuture不为null,且定时任务没有被取消 if (f != null && !f.isDone()) // 设置超时异常 f.completeExceptionally(new TimeoutException()); }}static final class DelayedCompleter<U> implements Runnable { final CompletableFuture<U> f; final U u; DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } public void run() { if (f != null) // 将任务结果设置为我们给定的value f.complete(u); }}复制代码
static final class Canceller implements BiConsumer<Object, Throwable> { final Future<?> f; Canceller(Future<?> f) { this.f = f; } public void accept(Object ignore, Throwable ex) { // 如果没有异常,且超时任务存在且没有被取消,那么则取消超时任务 // 因为此时说明,CompletableFuture的任务在超时时间内完成了,则不需要在监控超时 if (ex == null && f != null && !f.isDone()) f.cancel(false); }}复制代码
通过对上面源码的了解,我们可以知道
CompletableFuture的orTimeout、completeOnTimeout底层其实都是通过ScheduledThreadPoolExecutor来实现的
当我们对一个CompletableFuture设置了超时时间后,底层其实会通过ScheduledThreadPoolExecutor启动一个延时任务,延时时间就是我们设置的超时时间,此时有分为两种情况
任务在超时时间之内完成,那么在任务完成之后,会去通过cancel(false)取消延时任务任务执行时间超过设定的超时时间,则为该任务设置TimeoutException,让主线程感知~
Future cancel原理
另外,我们还能看到,CompletableFuture 的延时任务并没有进行try-catch,此处可以了解下->ScheduledThreadPoolExecutor有坑嗷~
而orTimeout和completeOnTimeout的区别就在于
如果是orTimeout,那么超时后会抛出超时异常如果是completeOnTimeout,不会抛出异常,则是将任务结果设置为我们传入的value扩展知识点
在上面了解CompletableFuture的orTimeout、completeOnTimeout时,我们知道了其底层是通过ScheduledThreadPoolExecutor来实现的,但通过源码发现,ScheduledThreadPoolExecutor只有一个线程去处理
static final ScheduledThreadPoolExecutor delayer;static { (delayer = new ScheduledThreadPoolExecutor( 1, new DaemonThreadFactory())). setRemoveOnCancelPolicy(true);}复制代码
那么,当出现大量设置了超时时间且时间个不一致的CompletableFuture时,由于是单线程处理,可能我们给任务设置的超时时间是1000ms,但实际可能因为队列排队,真正处理超时的超时时间会 > 1000ms
也就是说orTimeout、completeOnTimeout设置的超时时间并不会那么精确
标签: #java 超时机制实现单线程