龙空技术网

Android进阶必备:RxJava全面学习(3)——变换、组合操作符详解

IT工匠 246

前言:

现在咱们对“rxjava合并请求”大约比较关心,同学们都想要知道一些“rxjava合并请求”的相关资讯。那么小编在网络上网罗了一些关于“rxjava合并请求””的相关内容,希望咱们能喜欢,小伙伴们快快来了解一下吧!

关于RxJava我会持续更新,从原理到使用一步步由浅至深带大家对RxJava有一个深入的学习,欢迎大家持续关注。如果觉得这里代码排版不是很舒服的读者可以关注我的微信公众号“IT工匠”,我会同步更新,另外微信公众号上还有很多互联网必备资源(涉及算法、数据结构、java、深度学习、计算机网络、python、Android等互联网技术资料),欢迎大家关注、交流。

RxJava之所以强大的一大原因是其提供了功能完备的各类操作符,有了这些操作符,我们可以完成很多功能。所谓的操作符其实核心功能只有一个,那就是创建被观察者以及发送事件,如果看过我写的《RxJava全面学习(1)——原理介绍&入门使用》一文,一定会对Observable.create()方法有很深的印象,当时我们是使用Observable.create()方法去创建被观察者以及发送事件的,其实Observable.create()方法就是RxJava为我们提供的一个最基本的操作符,RxJava为我们提供的操作符大致有以下这些:

本文主主要介绍RxJava中的变换操作符和组合/合并操作符,本文目录如下:

变换操作符

变换操作符的作用是将原来的事件进行一定的变化之后再进行发送,下面我们结合实例来分别讲解RxJava中的变换操作符。

Map()

作用:map()操作符的作用很简单,就是将被观察者发送的每一个事件都经过指定函数进行处理,从而转化为另一个事件。

具体使用:

public void mapRun() { //在这里创建了一个Function对象,用来将Integer类型的事件转化为String类型的事件 Observable.just(1, 2, 3, 4).map(new Function<Integer, String>() { //被观察者发送的每一个事件都会经过这里的apply()方法进行转化 @Override public String apply(Integer integer) throws Exception { return "这是变换后的事件,类型为String:integer"; } //观察者处理的事件类型为经过map()操作符转化后的事件类型,这里是String,而不是Integer }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { print("开始采用subscribe连接"); } @Override public void onNext(String value) { print("对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

必要的部分代码中都有注释,运行效果如下:

FlatMap()

作用:将被观察者发送的每一个事件进行拆分,然后将所有的这些拆分结果合并为一连串的事件进行发送,注意,合并已拆分的事件这个过程是没有顺序的,即不能保证观察者最后接收到的事件顺序和被观察者发送的事件顺序一致。

具体使用:

public void flatMapRun() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).flatMap(new Function<Integer, ObservableSource<?>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { ArrayList<String> events = new ArrayList<>(); events.add("事件" + integer + "拆分的子事件1"); events.add("事件" + integer + "拆分的子事件2"); events.add("事件" + integer + "拆分的子事件3"); return Observable.fromIterable(events); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print("开始采用subscribe连接"); } @Override public void onNext(Object value) { print("对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

运行效果:

ConcatMap()

作用:ConcatMap()操作符的作用和FlatMap()操作符的作用差不多,唯一的不同是ConcatMap()操作符可以保证观察者接收到的事件顺序和被观察者发送的事件顺序一致。

用法:

public void concatMapRun() { Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); } }).concatMap(new Function<Integer, ObservableSource<?>>() { @Override public ObservableSource<String> apply(Integer integer) throws Exception { ArrayList<String> events = new ArrayList<>(); events.add("事件" + integer + "拆分的子事件1"); events.add("事件" + integer + "拆分的子事件2"); events.add("事件" + integer + "拆分的子事件3"); return Observable.fromIterable(events); } }).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print("开始采用subscribe连接"); } @Override public void onNext(Object value) { print("对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

运行效果:

Buffer()

作用:定期收集被观察者发送的事件并放进缓冲区,然后一次性发送缓冲区中所有的事件,而不是一次发射一个事件。这种模式有点类似于生产者消费者,被观察者不断发送事件,Buffer定期从被观察者发送的事件中获取一定数量的事件并发送给观察者。如果被观察者发送了一个Error事件,buffer会立即传递这个事件,而不是发射缓存的数据,即使已经缓存了数据。buffer有很多的变体,有多个重载方法,这里选择一个作为示例:

 Observable.just("事件1", "事件2", "事件3", "事件4", "事件5") .buffer(3, 2) .subscribe(new Observer<List<String>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(List<String> stringList) { print(" 缓存区里的事件数量 = " + stringList.size()); for (String value : stringList) { print(" 事件 = " + value); } } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

运行结果:

组合/合并操作符

组合多个被观察者

concat()/concatArray()

作用:组合多个被观察者,并串行顺序发送组合的所有被观察者的事件

特点:concat()最多组合4个被观察者,concatArray()最多可组合的被观察者数量不受限制

具体使用:

//所连接的被观察者不能超过4个Observable.concat(Observable.just(1, 2, 3), Observable.just("一", "二", "三")).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print("开始采用subscribe连接"); } @Override public void onNext(Object value) { print("对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); }});

运行结果:

public void concatArrayRun() { //所连接的被观察者可以超过4个 Observable.concatArray(Observable.just(1, 2, 3), Observable.just("一", "二", "三"), Observable.just(4, 5, 6), Observable.just("四", "五", "六"), Observable.just(7, 8, 9)).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print("开始采用subscribe连接"); } @Override public void onNext(Object value) { print("对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

执行结果:

merge()/mergeArray()

作用:合并被观察者,并行、顺序地发送被观察者的事件

特点:merge()可合并的被观察者不能超过4个,mergeArray()可以合并的被观察者可以超过4个。

具体使用:

public void mergeRun(){ //所连接的被观察者不能超过4个 Observable.merge(Observable.intervalRange(1,4,1,1,TimeUnit.SECONDS), Observable.intervalRange(5,4,1,1,TimeUnit.SECONDS)).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print(df.format(new Date()) + "-开始采用subscribe连接"); } @Override public void onNext(Object value) { print(df.format(new Date()) + "-对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print(df.format(new Date()) + "-对Error事件作出响应"); } @Override public void onComplete() { print(df.format(new Date()) + "-对Complete事件作出响应"); } });}

执行结果:

注意,相比于concat()串行、顺序的特点,merge()的特点是并行、顺序。

mergeArray()的功能和merge()类似,只是其可以连接多余4个被观察者,这里不再演示其效果。

concatDelayError()/mergeDelayError()

当我们使用concat()操作符和merge()操作符连接多个被观察者时,如果被连接的众多被观察者中有一个发送了Error事件,就会立即终止其他被观察者发送事件,如果我们想要在其他观察者顺利发送完毕事件之后再发送Error事件,则可以使用concatDelayError()/mergeDelayError()操作符。

比如,我们使用merge()操作符:

public void mergeDelayRun() { Observable.merge( Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); emitter.onComplete(); } }),  Observable.just(4, 5, 6) ).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print(df.format(new Date()) + "-开始采用subscribe连接"); } @Override public void onNext(Object value) { print(df.format(new Date()) + "-对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print(df.format(new Date()) + "-对Error事件作出响应"); } @Override public void onComplete() { print(df.format(new Date()) + "-对Complete事件作出响应"); } });}

运行结果是:

可以发现,由于第一个被观察者发送了Error事件,所以第二个被观察者的事件中断了发送。

为了避免上面的问题,我们使用mergeDelayError():

public void mergeDelayRun() { Observable.mergeDelayError(Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onError(new NullPointerException()); emitter.onComplete(); } }), Observable.just(4, 5, 6)).subscribe(new Observer<Object>() { @Override public void onSubscribe(Disposable d) { print(df.format(new Date()) + "-开始采用subscribe连接"); } @Override public void onNext(Object value) { print(df.format(new Date()) + "-对Next事件: " + value + " 作出响应"); } @Override public void onError(Throwable e) { print(df.format(new Date()) + "-对Error事件作出响应"); } @Override public void onComplete() { print(df.format(new Date()) + "-对Complete事件作出响应"); } });}

运行效果如下:

可以发现,使用了mergeDelayError()操作符之后,Error事件将会在所有被观察者的事件被正常发送完毕后再发送。

concatDelayError()的用法和作用于mergeDelayError()类似,这里就不展示具体实例了。

合并多个事件

该类型下的操作符的作用主要是对接收到的多个观察者的事件进行合并处理。

Zip()

作用:合并多个被观察者发送的事件,生成一个新的事件序列(组合起来的事件序列),并最终发送。

具体使用:

public void zipRun() { //创建2个被观察者,每隔1秒种发送一次数据 Observable.zip( Observable.intervalRange(1, 5, 2, 1, TimeUnit.SECONDS), Observable.intervalRange(6, 6, 1, 2, TimeUnit.SECONDS), new BiFunction<Long, Long, String>() { //在applay()方法中执行合并的逻辑并返回 @Override public String apply(Long aLong, Long aLong2) throws Exception { return "这是合并" + aLong + "和" + aLong2 + "的结果"; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(String value) { print("最终接收到的事件 : " + value); } @Override public void onError(Throwable e) { print("onError"); } @Override public void onComplete() { print("onComplete"); } });}

运行结果如下:

注意:

最终接收到的合并结果是被合并的各个被观察者的发送事件的位结合,即第i个被观察者发送的第x个事件一定是和第n个被观察者发送的第x个事件进行合并被观察者1发送m个事件,被观察者2发送n个事件,m>n,则最终我们接收到的合并事件的个数为n个,即被观察者1中的最后m-n个事件是不会被合并的(因为没有事件可以和他合并),但是这些事件还是会照常发送zip()中的被观察者可以不同时发送事件,所有的被观察者都发送完毕第i个事件后,才会执行apply()方法对着i个事件进行合并,即apply()方法是在所有被观察者发送完毕第i个事件后才执行zip()操作符支持最多合并9个被观察者的事件,如果多余9个,需要使用zipArray()或者zipIterable()

combineLatest()

作用:将最新到达的被观察者的事件和其他被观察者的事件进行组合

具体使用:

public void combainLastestRun() { print(df.format(new Date()) + "-开始"); Observable.combineLatest( //每隔1s发送一次事件,发送事件的顺序是:1,2,3,4,5 Observable.intervalRange(1, 5, 0, 1, TimeUnit.SECONDS), //每隔1s发送一次事件,发送事件的顺序是:6,7,8,9,10 Observable.intervalRange(6, 5, 0, 2, TimeUnit.SECONDS), //每隔2s发送一次事件,发送事件的顺序是:11,12,13,14,15 Observable.intervalRange(11, 5, 0, 1, TimeUnit.SECONDS), new Function3<Long, Long, Long, String>() { @Override public String apply(Long aLong, Long aLong2, Long aLong3) throws Exception { return "这是合并" + aLong + "-" + aLong2 + "-" + aLong3 + "的结果"; } }).subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { print(df.format(new Date()) + "-onSubscribe"); } @Override public void onNext(String value) { print(df.format(new Date()) + "-接收到事件 : " + value); } @Override public void onError(Throwable e) { print(df.format(new Date()) + "-onError" + e.getMessage()); } @Override public void onComplete() { print(df.format(new Date()) + "-onComplete"); } });}

执行结果如下:

整个流程大致是这样的(由于时间关系我只画出了前几次合并事件的过程,后面的过程类似):

从上面的例子我们可以得出以下两点结论:

当某一个被观察者有新的事件发送时,将会合并所有观察者的最新发送的事件当有多个被观察者同时发送事件时,将会按照combineLatest()中的被观察者的位置顺序进行合并,比如这里2和12是在同一个时间点发送,但是有发送事件2的被观察者的的参数位置比发送事件12的被观察者的位置靠前,所以相当2发送的事件“更早“和Zip()的区别在Zip()是按照事件的个数进行合并的,而combainLastestRun()是按照事件发送的时间进行和合并的

combineLatestDelayError()

作用:和concatDelayError()/mergeDelayError()类似,用来延迟错误事件的发送。

reduce()

作用:将被观察者需要发送的事件聚合成一个事件并发送

具体使用:

public void reduceRun() { Observable.just(1, 2, 3, 4).reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Exception { print("接收到:" + integer); } });}

执行结果:

注意:当有多个事件时,需要两个两个进行合并,最终将所有事件聚合为一个事件进行发送。

collect()

作用:将发送的事件聚合到一个集合中

具体使用:

public void collectRun() { Observable.just(1, 2, 3, 4).collect(new Callable<ArrayList<Integer>>() { @Override public ArrayList<Integer> call() throws Exception { return new ArrayList<>(); } }, new BiConsumer<ArrayList<Integer>, Integer>() { @Override public void accept(ArrayList<Integer> integers, Integer integer) throws Exception { integers.add(integer); } }).subscribe(new Consumer<ArrayList<Integer>>() { @Override public void accept(ArrayList<Integer> integers) throws Exception { print("接收到的数据长度为:" + integers.size()); } });}

运行结果如下:

发送事件前追加事件

startWith()/startWithArray()

作用:在被观察者发送事件之前发送一个新的事件

具体使用:

public void startWithRun() { Observable.just(1, 2, 3, 4).startWith(0).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(Integer value) { print("接收到了事件" + value); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

运行结果:

再比如:

public void startWithRun2() { Observable.just(1, 2, 3, 4).startWith(Observable.just(5,6)).subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { print("onSubscribe"); } @Override public void onNext(Integer value) { print("接收到了事件" + value); } @Override public void onError(Throwable e) { print("对Error事件作出响应"); } @Override public void onComplete() { print("对Complete事件作出响应"); } });}

运行结果:

统计发送事件的数量

count()

作用:统计被观察者发送事件的数量

具体使用:

public void countRun(){ Observable.just("事件1","事件2","事件3","事件4").count().subscribe(new Consumer<Long>() { @Override public void accept(Long aLong) throws Exception { print("接收到:"+aLong); } });}

运行结果:

标签: #rxjava合并请求