龙空技术网

有空就来学Hystrix RPC保护的原理,RPC监控之滑动窗口的实现原理

程序员高级码农II 250

前言:

今天兄弟们对“滑动窗口基本原理”大概比较关切,大家都想要学习一些“滑动窗口基本原理”的相关文章。那么小编在网上搜集了一些有关“滑动窗口基本原理””的相关文章,希望各位老铁们能喜欢,各位老铁们快快来了解一下吧!

RPC监控之滑动窗口的实现原理

Hystrix通过滑动窗口的数据结构来统计调用的指标数据,并且大量使用了RxJava响应式编程操作符。滑动窗口的本质就是不断变换的数据流,因此滑动窗口的实现非常适合使用观察者模式以及响应式编程模式去完成。最终,RxJava便成了Hystrix滑动窗口实现的框架选择。Hystrix滑动窗口的核心实现是使用RxJava的window操作符(算子)来完成的。使用RxJava实现滑动窗口还有一大好处就是可以依赖RxJava的线程模型来保证数据写入和聚合的线程安全。

Hystrix滑动窗口的原理和实现逻辑非常复杂,所以在深入学习之前先看一个Hystrix滑动窗口模拟实现示例。

Hystrix健康统计滑动窗口的模拟实现

下面总体介绍一下Hystrix健康统计滑动窗口的执行流程。

首先,HystrixCommand命令器的执行结果(失败、成功)会以事件的形式通过RxJava事件流弹射出去,形成命令完成事件流。

然后,桶计数流以事件流作为来源,将事件流中的事件按照固定时间长度(桶时间间隔)划分成滚动窗口,并对时间桶滚动窗口内的事件按照类型进行累积,完成之后将桶数据弹射出去,形成桶计数流。

最后,桶滑动统计流以桶计数流作为来源,按照步长为1、长度为设定的桶数(配置的滑动窗口桶数)的规则划分滑动窗口,并对滑动窗口内的所有桶数据按照各事件类型进行汇总,汇总成最终的窗口健康数据,并将其弹射出去,形成最终的桶滑动统计流,作为Hystrix熔断器进行状态转换的数据支撑。

以上介绍的Hystrix健康统计滑动窗口的执行流程如图5-13所示。

图5-13 Hystrix健康统计滑动窗口的执行流程

为了帮助大家学习Hystrix滑动窗口的执行流程,这里设计一个简单的Hystrix滑动窗口模拟实现用例,对Hystrix滑动窗口数据流的处理过程进行简化,只留下核心部分,简化的模拟执行流程如下:

首先,模拟HystrixCommand的事件发送机制,每100毫秒发送一个随机值(0或1),随机值为0代表失败,为1代表成功,模拟命令完成事件流。

其次,模拟HystrixCommand的桶计数流,以事件流作为来源,将事件流中的事件按照固定时间长度(300毫秒)划分成时间桶滚动窗口,并对时间桶滚动窗口内值为0的事件进行累积,完成之后将累积数据弹射出去,形成桶计数流。

最后,模拟桶计数流作为来源,按照步长为1、长度为设定的桶数

(3)的规则划分滑动窗口,并对滑动窗口内的所有桶数据进行汇总,汇总成最终的失败统计数据,并将其弹射出去,形成最终的桶滑动统计流。

以上模拟Hystrix健康统计滑动窗口的执行流程如图5-14所示。

图5-14 模拟的Hystrix健康统计滑动窗口简化版执行流程

简化的模拟Hystrix健康统计滑动窗口执行流程的实现代码如下:

package com.crazymaker.demo.rxJava.basic;//省略import@Slf4jpublic class WindowDemo{ /** *演示模拟Hystrix的健康统计metric */ @Test public void hystrixTimewindowDemo() throws InterruptedException { //创建Random类对象 Random random = new Random(); //模拟Hystrix event事件流,每100毫秒发送一个0或1随机值//随机值为0代表失败,随机值为1代表成功 Observable eventStream = Observable .interval(100, TimeUnit.MILLISECONDS) .map(i -> random.nextInt(2)); /** *完成桶内0值计数的聚合函数 */ Func1 reduceBucketToSummary =new Func1<Observable<Integer>, Observable<Long>>() { @Override public Observable<Long> call(Observable<Integer> eventBucket) { Observable<List<Integer>> olist = eventBucket.toList(); Observable<Long> countValue = olist.map(list -> { long count = list.stream().filter(i -> i == 0).count(); log.info("{} '0 count:{}", list.toString(), count); return count; }); return countValue; } }; /** *桶计数流 */ Observable<Long> bucketedCounterStream = eventStream .window(300, TimeUnit.MILLISECONDS) .flatMap(reduceBucketToSummary); //将时间桶进行聚合,统计事件值为0 的个数 /** *滑动窗口聚合函数 */ Func1 reduceWindowToSummary = new Func1<Observable<Long>, Observable<Long>>() { @Override public Observable<Long> call(Observable<Long> eventBucket) { return eventBucket.reduce(new Func2<Long, Long, Long>() { @Override public Long call(Long bucket1, Long bucket2) { /** *对窗口内的桶进行累加 */ return bucket1 + bucket2; } }); } }; /** *桶滑动统计流 */ Observable bucketedRollingCounterStream = bucketedCounterStream .window(3, 1) .flatMap(reduceWindowToSummary);//将滑动窗口进行聚合 bucketedRollingCounterStream.subscribe(sum -> log.info("滑动窗口的和:{}", sum)); Thread.sleep(Integer.MAX_VALUE); }}

运行这个示例程序,输出的结果部分节选如下:

[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 0, 0] '0 count:3[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 1] '0 count:1[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 0, 1] '0 count:1[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:5[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 0] '0 count:2[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:4[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 0] '0 count:2[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:5[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 1] '0 count:0[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:4[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [0, 1, 1] '0 count:1[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 0, 0] '0 count:2[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 1] '0 count:0[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 0] '0 count:1[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:3[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - [1, 1, 1] '0 count:0[RxComputationScheduler-1] INFO c.c.d.rxJava.basic.WindowDemo - 滑动窗口的和:1

在这个示例程序的代码中,eventStream流通过interval操作符每100毫秒发送一个随机值(0或1),随机值为0代表失败,为1代表成功,模拟HystrixCommand的事件发送机制。

桶计数流bucketedCounterStream使用window操作符以300毫秒为一个时间桶窗口,将原始的事件流进行拆分,每个时间桶窗口的3事件聚合起来,输出一个新的Observable(子流)。然后,bucketedCounterStream通过flapMap操作将每一个Observable进行扁平化。

桶计数流bucketedCounterStream的处理过程如图5-15所示。

图5-15 模拟的桶计数流bucketedCounterStream的处理过程

bucketedCounterStream的flapMap扁平化操作是通过调用reduceBucketToSummary方法完成的,该方法首先将每一个时间桶窗口内的Observable子流内的元素序列转成一个列表(List),然后进行过滤(留下值为0事件)和统计,返回值为0的元素统计数量(失败数)。

接下来,需要对bucketedCounterStream桶计数进行汇总统计,形成滑动窗口的统计数据,这个工作由bucketedRollingCounterStream桶滑动统计流完成。

桶滑动统计流仍然使用window和flatMap两个操作符,先在输入流中通过window操作符按照步长为1、长度为3的规则划分滑动窗口,每个滑动窗口的3统计数据被聚集起来,输出一个新的Observable。然后通过flatMap扁平化操作符对每一个Observable进行聚合,计算出各元素的累加值。

模拟的桶滑动统计流bucketedRollingCounterStream的处理过程如图5-16所示。

图5-16 桶滑动统计流bucketedRollingCounterStream的处理过程

bucketedRollingCounterStream的flapMap扁平化操作是通过调用reduceWindowToSummary方法完成的,该方法通过RxJava的reduce操作符进行“聚合”操作,将Observable子流中的3事件的累加结果计算出来。

Hystrix滑动窗口的核心实现原理

在Hystrix中,业务逻辑以命令模式封装成了一个个命令(HystrixCommand),每个命令执行完成后都会发送命令完成事件(HystrixCommandCompletion)到HystrixCommandCompletion Stream命令完成事件流。HystrixCommandCompletion是Hystrix中核心的事件,它可以代表某个命令执行成功、超时、异常等各种状态,与Hystrix熔断器的状态转换息息相关。

桶计数流BucketedCounterStream是一个抽象类,提供了基本的桶计数器实现。用户在使用Hystrix的时候一般都要配置两个值:timeInMilliseconds(滑动窗口的长度,时间间隔)和numBuckets(滑动窗口中的桶数),每个桶对应的时间长度就是bucketSizeInMs=timeInMilliseconds/numBuckets,该时间长度可以记为一个时间桶窗口BucketedCounterStream每隔一个时间桶窗口就把这段时间内的所有调用事件聚合到一个累积桶内。下面来看一下它的实现。

protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInM this.numBuckets = numBuckets; this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() { @Override public Observable<Bucket> call(Observable<Event> eventBucket) { return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket); } }; ... this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() { @Override public Observable<Bucket> call() { return inputEventStream .observe() .window(bucketSizeInMs, TimeUnit.MILLISECONDS) .flatMap(reduceBucketToSummary) .startWith(emptyEventCountsToStart); } });}

BucketedCounterStream的构造函数里接收4个参数:第一个参数inputEventStream是一个HystrixCommandCompletionStream命令完成事件流,每个HystrixCommand命令执行完成后,将发送的命令完成事件最终都通过inputEventStream弹射出来;第二个参数numBuckets为设置的滑动窗口中的桶数量;

第三个参数bucketSizeInMs为每个桶对应的时间长度;第四个参数为将原始事件统计到累积桶(Bucket)的回调函数。

BucketedCounterStream的核心是window操作符,它可以将原始的完成事件流按照时间桶的长度bucketSizeInMs进行拆分,并将这个时间段内的事件聚集起来,输出一个Observable,然后通过flapMap操作将每一个Observable进行扁平化。

具体的flapMap扁平化操作是通过调用reduceBucketToSummary方法完成的,该方法通过RxJava的reduce操作符进行“聚合”操作,将Observable中的一串事件归纳成一个累积桶。

桶计数流BucketedCounterStream的处理过程如图5-17所示。

图5-17 桶计数流BucketedCounterStream的处理过程

什么是累积桶呢?它是一个整型数组,数组的每一个元素用于存放相对应类型的事件的总数,如图5-18所示。

图5-18 累积桶示意图

累积桶的数组元素所保存的各类事件总数是通过聚合函数appendRawEventToBucket进行累加得到的。

累加的方式是:将数组元素的位置与事件类型相对应,将相同类型的事件总数累加到对应的数组位置上,从而统计出一个累积桶内的SUCCESS总数、FAILURE总数等。

原始的累积桶是一个空桶,每一个元素的值为0。获取原始桶的方法与具体的统计流子类相关,子类HealthCountsStream健康统计流获取原始空桶的函数如下:

public class HealthCountsStream ...{//获取初始桶,返回一个全零数组,长度为事件类型总数//数组的每一个元素用于存放对应类型的事件数量 @Override long[] getEmptyBucketSummary() { return new long[HystrixEventType.values().length]; }}

桶计数流BucketedCounterStream将时间桶类的同类型事件总数(如FAILURE、SUCCESS总数)聚合到累积桶Bucket中,处理的最终结果是,源源不断的汇总数据组成了最终的桶计数流。

接下来,需要对熔断器的滑动窗口内的所有累积桶进行汇总统计,形成滑动窗口的统计数据,作为熔断器状态转换的依据,这个工作由BucketedRollingCounterStream桶滑动统计流完成。

BucketedRollingCounterStream桶滑动统计流的数据来源正好是BucketedCounterStream桶计数流。

桶滑动统计流仍然使用window和flatMap两个操作符,先在数据流中通过滑动窗口将一定数量的数据聚集成一个集合流,然后对每一个集合流进行聚合,如图5-19所示。

图5-19 桶滑动统计流BucketedRollingCounterStream的处理过程

桶滑动统计流BucketedRollingCounterStream的核心源码如下:

public abstract class BucketedRollingCounterStream...{ private Observable<Output> sourceStream; private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false); protected BucketedRollingCounterStream( HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) { super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket); Func1<Observable<Bucket>, Observable<Output>>reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() { @Override public Observable<Output> call(Observable<Bucket> window) { return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets); } }; this.sourceStream = bucketedStream.window(numBuckets, 1) .flatMap(reduceWindowToSummary) .doOnSubscribe(new Action0() {...}) .share() .onBackpressureDrop(); }...}

桶滑动统计流BucketedRollingCounterStream中的window操作符和BucketedCounterStream中的window操作符在版本上有所不同,它的第二个参数skip=1的意思是按照步长为1的间隔在输入数据流中持续滑动,不断聚集出numBuckets数量的输入对象,输出一个个Observable,这才是滑动窗口的真正含义。

而BucketedCounterStream流所用的window操作符,窗口与窗口之间没有重叠,严格来说,这才叫作滚动窗口操作符。

BucketedRollingCounterStream流通过window操作符滑动生成一个个Observable后,再通过flapMap操作将每一个Observable进行扁平化,具体的flapMap扁平化操作通过调用自定义窗口归约方法reduceWindowToSummary来完成。注意,该窗口归约方法没有用reduce操作符,而是用了scan+skip(numBuckets)的组合。scan和reduce一样都是聚合操作符,但是scan会将所有的中间结果弹出,而reduce操作符仅仅弹出最终结果。在scan弹出所有的中间结果和最终统计结果之后,后面的skip(numBuckets)操作将所有的中间结果跳过,剩下最终结果。这样做的好处是,如果桶里的元素个数不满足numBuckets,就把这个不完整的窗口过滤掉。

最后,总结一下Hystrix各大流之间存在的继承关系,具体如下:

(1)最顶层的BucketedCounterStream桶计数流是一个抽象类,它提供了基本的桶计数器实现,按计算出来的bucketSizeInMs时间间隔将各种类型的事件数量聚合成桶。

(2)BucketedRollingCounterStream抽象类在桶计数流的基础上实现滑动窗口内numBuckets个Bucket(累积桶)的相同类型事件数的汇总,并聚合成指标数据。

(3)最底下一层的类则是各种具体的实现,比如HealthCountsStream最终会聚合成健康检查数据(HystrixCommandMetrics.HealthCounts),比如统计命令执行成功和失败的次数,供熔断器HystrixCircuitBreaker使用。

本文给大家讲解的内容是SpringCloudRPC远程调用核心原理: Hystrix RPC保护的原理,RPC监控之滑动窗口的实现原理下篇文章给大家讲解的是SpringCloudRPC远程调用核心原理:微服务网关与用户身份识别,创建Zuul网关服务;觉得文章不错的朋友可以转发此文关注小编;感谢大家的支持!

标签: #滑动窗口基本原理