龙空技术网

异步超时中断,知其然,也要知其所以然~

IT知识分享官 97

前言:

今天同学们对“java 超时机制实现单线程”可能比较注重,兄弟们都想要学习一些“java 超时机制实现单线程”的相关资讯。那么小编同时在网摘上网罗了一些有关“java 超时机制实现单线程””的相关文章,希望姐妹们能喜欢,看官们一起来了解一下吧!

异步编排

在业务开发的过程中,我们为了降低接口耗时,经常会用到线程池,书写多线程数据获取、同步阻塞获取结果的业务逻辑。

常见的使用方法如下:

Future

@Slf4j@SpringBootTestpublic class OtherTest {   public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,         new LinkedBlockingQueue<>(100));​   public static void main(String[] args) {​​      Future<Integer> submit1 = executor.submit(() -> {         // 业务耗时逻辑1         return 1;      });​      Future<Integer> submit2 = executor.submit(() -> {         // 业务耗时逻辑2         return 2;      });​      Future<Integer> submit3 = executor.submit(() -> {         // 业务耗时逻辑3         return 3;      });​      try {         Integer integer1 = submit1.get();         Integer integer2 = submit2.get();         Integer integer3 = submit3.get();​         System.out.println(integer1);         System.out.println(integer2);         System.out.println(integer3);      } catch (Exception e) {         e.printStackTrace();      }​   }​}复制代码

假设一个接口涉及到3个业务逻辑,如下:

业务逻辑1耗时: 50ms业务逻辑2耗时: 30ms业务逻辑3耗时: 70ms

那么如果是传统的串行调用,接口总耗时:150ms

但如果是上面的利用线程池的方式进行调用,那么该接口耗时取决于耗时最长的那个业务逻辑,即该接口耗时为: 70ms

可以看到,接口耗时是有明显降低的~

CompletableFuture

当然,上面虽然对接口进行异步编排后,接口耗时有着下降,但是如果说我们的耗时业务逻辑有着十几二十个?且业务逻辑之间存在依赖关系?那么我们怎么办?

很显然,上面的Future就不能满足我们的需求了,所以从JDK8开始,JDK提供了CompletableFuture工具类,为我们异步编排提供了很大的便利~

@Slf4j@SpringBootTestpublic class OtherTest {​   public static final ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS,         new LinkedBlockingQueue<>(100));​   public static void main(String[] args) {​​      CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> {         // 业务耗时逻辑1         return 1;      }, executor);​      CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> {         // 业务耗时逻辑2         return 2;      }, executor);​      CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> {         // 业务耗时逻辑3         return 3;      }, executor);​      try {         // 等待任务全部执行完毕         CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).get();​         System.out.println(completableFuture1.get());         System.out.println(completableFuture2.get());         System.out.println(completableFuture3.get());​      } catch (Exception e) {         e.printStackTrace();      }​   }​}复制代码

由于案例比较简单,无法突出CompletableFuture编排能力相比于Future的优势所在,这个在以后的文章里专门会为大家讲解,这不是本文的重点。

超时中断

在上面的案例中,细心的小伙伴可以发现,无论是CompletableFuture还是Future,我都是进行阻塞等待任务结束。

这,其实是一个非常危险的行为,如果下游rpc接口出现波动,那么接口耗时会明显提升,而我们却进行阻塞获取,线程会被一直阻塞无法及时释放,那么随着不断的请求进来,线程池线程、队列很快都会被打满,新任务都会被拒绝掉,从而影响用户体验,从而影响你的工资,从而影响你的工作。

所以,为了杜绝这种情况出现,我们在获取任务结果的时候需要设置等待时间~

Future和CompletableFuture的get方法都支持传入等待时间~

Future超时中断机制

Future提供了get方法来供我们阻塞获取任务结果,也支持传入超时时间,下面来了解下源码

public V get(long timeout, TimeUnit unit)  throws InterruptedException, ExecutionException, TimeoutException {  // 参数校验  if (unit == null)    throw new NullPointerException();    int s = state;    // 阻塞等待,如果超过超时时间任务还未完成,那么抛出超时异常  if (s <= COMPLETING &&      (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)    throw new TimeoutException();  return report(s);}复制代码

阻塞等待,timed为true代表存在超时时间

private int awaitDone(boolean timed, long nanos)    throws InterruptedException {      long startTime = 0L;    WaitNode q = null;    boolean queued = false;    for (;;) {        int s = state;        // 任务状态 > COMPLETING说明已经执行完毕        if (s > COMPLETING) {            // 当前线程不用等待了,将等待节点里的Thread设置为null            if (q != null)                q.thread = null;            return s;        }        else if (s == COMPLETING)            // COMPLETING是任务执行完毕到真正将任务设置为完成态的一个中间状态            // 当任务的处于COMPLETING时,说明任务已经执行完了,但此时cpu时间不够没有继续执行            // 此时需要yield一下,让其他线程执行,从而将任务正确设置为完成状态            Thread.yield();        else if (Thread.interrupted()) {            // 如果当前线程被打断了,则把当前线程从等待该任务完成的阻塞线程链表中删除            removeWaiter(q);            // 抛出打断异常            throw new InterruptedException();        }        else if (q == null) {            // 如果是超时等待,且等待时间<=0,则直接返回当前任务状态            if (timed && nanos <= 0L)                return s;            // 初始化一个等待当前任务执行完的节点,内部包含            q = new WaitNode();        }        else if (!queued)            // 将WaitNode排队到线程等待链表中            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);        else if (timed) {            // 阻塞等待,存在超时时间            final long parkNanos;            if (startTime == 0L) { // first time                startTime = System.nanoTime();                if (startTime == 0L)                    startTime = 1L;                parkNanos = nanos;            } else {                long elapsed = System.nanoTime() - startTime;                if (elapsed >= nanos) {                    removeWaiter(q);                    return state;                }                parkNanos = nanos - elapsed;            }            if (state < COMPLETING)                LockSupport.parkNanos(this, parkNanos);        }        else            // 阻塞等待,没有超时时间            LockSupport.park(this);    }}复制代码

上面源码注释已经比较完善了,但我们还是要总结一下

任务COMPLETING状态,是任务执行完毕到真正将任务设置为完成态的一个中间状态(见FutureTask的run方法)get方法无论是否存在超时时间,底层都是通过LockSupport的park、unpark方法来达到阻塞的目的对于每个任务,其内部会维护一个等待当前任务完成的线程链表waitersCompletableFuture超时中断机制

而从JDK 9开始,CompletableFuture 也提供了 orTimeout、completeTimeout 方法,来进行异步超时控制。

CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3).orTimeout(1, TimeUnit.SECONDS).get();复制代码

根据上面代码,我们可以理解到,会等待completableFuture1, completableFuture2, completableFuture3三个任务执行1秒钟

如果超过1秒,则会抛出java.util.concurrent.TimeoutException

源码如下:

public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) {    if (unit == null)        throw new NullPointerException();    if (result == null)        whenComplete(new Canceller(Delayer.delay(new Timeout(this),                                                 timeout, unit)));    return this;}​public CompletableFuture<T> completeOnTimeout(T value, long timeout,                                              TimeUnit unit) {  if (unit == null)    throw new NullPointerException();  if (result == null)    whenComplete(new Canceller(Delayer.delay(      new DelayedCompleter<T>(this, value),      timeout, unit)));  return this;}复制代码
static final class Delayer {  static ScheduledFuture<?> delay(Runnable command, long delay,                                  TimeUnit unit) {    // 延时任务    return delayer.schedule(command, delay, unit);  }    static final ScheduledThreadPoolExecutor delayer;  static {    // 单线程    (delayer = new ScheduledThreadPoolExecutor(      1, new DaemonThreadFactory())).      setRemoveOnCancelPolicy(true);  }}​​static final class Timeout implements Runnable {  final CompletableFuture<?> f;  Timeout(CompletableFuture<?> f) { this.f = f; }  public void run() {    // 如果CompletableFuture不为null,且定时任务没有被取消    if (f != null && !f.isDone())      // 设置超时异常      f.completeExceptionally(new TimeoutException());  }}​static final class DelayedCompleter<U> implements Runnable {  final CompletableFuture<U> f;  final U u;  DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; }  public void run() {    if (f != null)      // 将任务结果设置为我们给定的value      f.complete(u);  }}复制代码
static final class Canceller implements BiConsumer<Object, Throwable> {    final Future<?> f;    Canceller(Future<?> f) { this.f = f; }    public void accept(Object ignore, Throwable ex) {        // 如果没有异常,且超时任务存在且没有被取消,那么则取消超时任务        // 因为此时说明,CompletableFuture的任务在超时时间内完成了,则不需要在监控超时        if (ex == null && f != null && !f.isDone())            f.cancel(false);    }}复制代码

通过对上面源码的了解,我们可以知道

CompletableFuture的orTimeout、completeOnTimeout底层其实都是通过ScheduledThreadPoolExecutor来实现的

当我们对一个CompletableFuture设置了超时时间后,底层其实会通过ScheduledThreadPoolExecutor启动一个延时任务,延时时间就是我们设置的超时时间,此时有分为两种情况

任务在超时时间之内完成,那么在任务完成之后,会去通过cancel(false)取消延时任务任务执行时间超过设定的超时时间,则为该任务设置TimeoutException,让主线程感知~

Future cancel原理

另外,我们还能看到,CompletableFuture 的延时任务并没有进行try-catch,此处可以了解下->ScheduledThreadPoolExecutor有坑嗷~

而orTimeout和completeOnTimeout的区别就在于

如果是orTimeout,那么超时后会抛出超时异常如果是completeOnTimeout,不会抛出异常,则是将任务结果设置为我们传入的value扩展知识点

在上面了解CompletableFuture的orTimeout、completeOnTimeout时,我们知道了其底层是通过ScheduledThreadPoolExecutor来实现的,但通过源码发现,ScheduledThreadPoolExecutor只有一个线程去处理

static final ScheduledThreadPoolExecutor delayer;static {    (delayer = new ScheduledThreadPoolExecutor(        1, new DaemonThreadFactory())).        setRemoveOnCancelPolicy(true);}复制代码

那么,当出现大量设置了超时时间且时间个不一致的CompletableFuture时,由于是单线程处理,可能我们给任务设置的超时时间是1000ms,但实际可能因为队列排队,真正处理超时的超时时间会 > 1000ms

也就是说orTimeout、completeOnTimeout设置的超时时间并不会那么精确

标签: #java 超时机制实现单线程