前言:
今天姐妹们对“javacall”大约比较讲究,大家都想要剖析一些“javacall”的相关文章。那么小编也在网上汇集了一些关于“javacall””的相关文章,希望小伙伴们能喜欢,同学们快快来学习一下吧!一种全新的设计模式,数学美感与工程实用价值兼备,且不限编程语言。本文将以Java为样例,从无到有实现出完整的流式API,引入生成器特性,并介绍诸多应用场景。
作者 | 文镭(依来)
来源 | 阿里开发者公众号
前言
这篇文章不是工具推荐,也不是应用案例分享。其主题思想,是介绍一种全新的设计模式。它既拥有抽象的数学美感,仅仅从一个简单接口出发,就能推演出庞大的特性集合,引出许多全新概念。同时也有扎实的工程实用价值,由其实现的工具,性能均可显著超过同类的头部开源产品。
这一设计模式并非因Java而生,而是诞生于一个十分简陋的脚本语言。它对语言特性的要求非常之低,因而其价值对众多现代编程语言都是普适的。
关于Stream
首先大概回顾下Java里传统的流式API。自Java8引入lambda表达式和Stream以来,Java的开发便捷性有了质的飞跃,Stream在复杂业务逻辑的处理上让人效率倍增,是每一位Java开发者都应该掌握的基础技能。但排除掉parallelStream也即并发流之外,它其实并不是一个好的设计。
第一,封装过重,实现过于复杂,源码极其难读。我能理解这或许是为了兼容并发流所做的妥协,但毕竟耦合太深,显得艰深晦涩。每一位初学者被源码吓到之后,想必都会产生流是一种十分高级且实现复杂的特性的印象。实际上并不是这样,流其实可以用非常简单的方式构建。
第二、API过于冗长。冗长体现在stream.collect这一部分,作为对比,Kotlin提供的toList/toSet/associate(toMap)等等丰富操作是可以直接作用在流上的。Java直到16才抠抠索索加进来一个Stream可以直接调用的toList,他们甚至不肯把toSet/toMap一起加上。
第三、API功能简陋。对于链式操作,在最初的Java8里只有map/filter/skip/limit/peek/distinct/sorted这七个,Java9又加上了takeWhile/dropWhile。然而在Kotlin中,除了这几个之外人还有许多额外的实用功能,例如mapIndexed, mapNotNull, filterIndexed, filterNotNull, onEachIndexed, distinctBy, sortedBy, sortedWith, zip, zipWithNext等等,翻倍了不止。这些东西实现起来并不复杂,就是个顺手的事,但对于用户而言有和没有的体验差异可谓巨大。
在这篇文章里,我将提出一种全新的机制用于构建流。这个机制极其简单,任何能看懂lambda表达式(闭包)的同学都能亲手实现,任何支持闭包的编程语言都能利用该机制实现自己的流。也正是由于这个机制足够简单,所以开发者可以以相当低的成本撸出大量的实用API,使用体验甩开Stream两条街,不是问题。
关于生成器
生成器(Generator)是许多现代编程语言里一个广受好评的重要特性,在Python/Kotlin/C#/Javascript等等语言中均有直接支持。它的核心API就是一个yield关键字(或者方法)。有了生成器之后,无论是iterable/iterator,还是一段乱七八糟的闭包,都可以直接映射为一个流。举个例子,假设你想实现一个下划线字符串转驼峰的方法,在Python里你可以利用生成器这么玩
def underscore_to_camelcase(s): def camelcase(): yield str.lower while True: yield str.capitalize return ''.join(f(sub) for sub, f in zip(s.split('_'), camelcase()))
这短短几行代码可以说处处体现出了Python生成器的巧妙。首先,camelcase方法里出现了yield关键字,解释器就会将其看作是一个生成器,这个生成器会首先提供一个lower函数,然后提供无数的capitalize函数。由于生成器的执行始终是lazy的,所以用while true的方式生成无限流是十分常见的手段,不会有性能或者内存上的浪费。其次,Python里的流是可以和list一起进行zip的,有限的list和无限的流zip到一起,list结束了流自然也会结束。这段代码中,末尾那行join()括号里的东西,Python称之为生成器推导(Generator Comprehension),其本质上依然是一个流,一个zip流被map之后的string流,最终通过join方法聚合为一个string。
以上代码里的操作, 在任何支持生成器的语言里都可以轻易完成,但是在Java里你恐怕连想都不敢想。Java有史以来,无论是历久弥新的Java8,还是最新的引入了Project Loom的OpenJDK19,连协程都有了,依然没有直接支持生成器。
本质上,生成器的实现要依赖于continuation的挂起和恢复,所谓continuation可以直观理解为程序执行到指定位置后的断点,协程就是指在这个函数的断点挂起后跳到另一个函数的某个断点继续执行,而不会阻塞线程,生成器亦如是。Python通过栈帧的保存与恢复实现函数重入以及生成器,Kotlin在编译阶段利用CPS(Continuation Passing Style)技术对字节码进行了变换,从而在JVM上模拟了协程。其他的语言要么大体如此,要么有更直接的支持。
那么,有没有一种办法,可以在没有协程的Java里,实现或者至少模拟出一个yield关键字,从而动态且高性能地创建流呢。答案是,有。
正文
Java里的流叫Stream,Kotlin里的流叫Sequence。我实在想不出更好的名字了,想叫Flow又被用了,简单起见姑且叫Seq。
概念定义
首先给出Seq的接口定义
public interface Seq<T> { void consume(Consumer<T> consumer);}
它本质上就是一个consumer of consumer,其真实含义我后边会讲。这个接口看似抽象,实则非常常见,java.lang.Iterable天然自带了这个接口,那就是大家耳熟能详的forEach。利用方法推导,我们可以写出第一个Seq的实例
List<Integer> list = Arrays.asList(1, 2, 3);Seq<Integer> seq = list::forEach;
可以看到,在这个例子里consume和forEach是完全等价的,事实上这个接口我最早就是用forEach命名的,几轮迭代之后才改成含义更准确的consume。
利用单方法接口在Java里会自动识别为FunctionalInteraface这一伟大特性,我们也可以用一个简单的lambda表达式来构造流,比如只有一个元素的流。
static <T> Seq<T> unit(T t) { return c -> c.accept(t);}
这个方法在数学上很重要(实操上其实用的不多),它定义了Seq这个泛型类型的单位元操作,即T -> Seq的映射。
map与flatMapmap
从forEach的直观角度出发,我们很容易写出map,将类型为T的流,转换为类型为E的流,也即根据函数T -> E得到Seq -> Seq的映射。
default <E> Seq<E> map(Function<T, E> function) { return c -> consume(t -> c.accept(function.apply(t)));}flatMap
同理,可以继续写出flatMap,即将每个元素展开为一个流之后再合并。
default <E> Seq<E> flatMap(Function<T, Seq<E>> function) { return c -> consume(t -> function.apply(t).consume(c));}
大家可以自己在IDEA里写写这两个方法,结合智能提示,写起来其实非常方便。如果你觉得理解起来不太直观,就把Seq看作是List,把consume看作是forEach就好。
filter与take/drop
map与flatMap提供了流的映射与组合能力,流还有几个核心能力:元素过滤与中断控制。
filter
过滤元素,实现起来也很简单
default Seq<T> filter(Predicate<T> predicate) { return c -> consume(t -> { if (predicate.test(t)) { c.accept(t); } });}take
流的中断控制有很多场景,take是最常见的场景之一,即获取前n个元素,后面的不要——等价于Stream.limit。
由于Seq并不依赖iterator,所以必须通过异常实现中断。为此需要构建一个全局单例的专用异常,同时取消这个异常对调用栈的捕获,以减少性能开销(由于是全局单例,不取消也没关系)
public final class StopException extends RuntimeException { public static final StopException INSTANCE = new StopException(); @Override public synchronized Throwable fillInStackTrace() { return this; }}
以及相应的方法
static <T> T stop() { throw StopException.INSTANCE;}default void consumeTillStop(C consumer) { try { consume(consumer); } catch (StopException ignore) {}}
然后就可以实现take了:
default Seq<T> take(int n) { return c -> { int[] i = {n}; consumeTillStop(t -> { if (i[0]-- > 0) { c.accept(t); } else { stop(); } }); };}drop
drop是与take对应的概念,丢弃前n个元素——等价于Stream.skip。它并不涉及流的中断控制,反而更像是filter的变种,一种带有状态的filter。观察它和上面take的实现细节,内部随着流的迭代,存在一个计数器在不断刷新状态,但这个计数器并不能为外界感知。这里其实已经能体现出流的干净特性,它哪怕携带了状态,也丝毫不会外露。
default Seq<T> drop(int n) { return c -> { int[] a = {n - 1}; consume(t -> { if (a[0] < 0) { c.accept(t); } else { a[0]--; } }); };}
其他APIonEach
对流的某个元素添加一个操作consumer,但是不执行流——对应Stream.peek。
default Seq<T> onEach(Consumer<T> consumer) { return c -> consume(consumer.andThen(c));}zip
流与一个iterable元素两两聚合,然后转换为一个新的流——在Stream里没有对应,但在Python里有同名实现。
default <E, R> Seq<R> zip(Iterable<E> iterable, BiFunction<T, E, R> function) { return c -> { Iterator<E> iterator = iterable.iterator(); consumeTillStop(t -> { if (iterator.hasNext()) { c.accept(function.apply(t, iterator.next())); } else { stop(); } }); };}终端操作
上面实现的几个方法都是流的链式API,它们将一个流映射为另一个流,但流本身依然是lazy或者说尚未真正执行的。真正执行这个流需要使用所谓终端操作,对流进行消费或者聚合。在Stream里,消费就是forEach,聚合就是Collector。对于Collector,其实也可以有更好的设计,这里就不展开了。不过为了示例,可以先简单快速实现一个join。
default String join(String sep) { StringJoiner joiner = new StringJoiner(sep); consume(t -> joiner.add(t.toString())); return joiner.toString();}
以及toList。
default List<T> toList() { List<T> list = new ArrayList<>(); consume(list::add); return list;}
至此为止,我们仅仅只用几十行代码,就实现出了一个五脏俱全的流式API。在大部分情况下,这些API已经能覆盖百分之八九十的使用场景。你完全可以依样画葫芦,在其他编程语言里照着玩一玩,比如Go(笑)。
生成器的推导
本文虽然从标题开始就在讲生成器,甚至毫不夸张的说生成器才是最核心的特性,但等到把几个核心的流式API写完了,依然没有解释生成器到底是咋回事——其实倒也不是我在卖关子,你只要仔细观察一下,生成器早在最开始讲到Iterable天生就是Seq的时候,就已经出现了。
List<Integer> list = Arrays.asList(1, 2, 3);Seq<Integer> seq = list::forEach;
没看出来?那把这个方法推导改写为普通lambda函数,有
Seq<Integer> seq = c -> list.forEach(c);
再进一步,把这个forEach替换为更传统的for循环,有
Seq<Integer> seq = c -> { for (Integer i : list) { c.accept(i); }};
由于已知这个list就是[1, 2, 3],所以以上代码可以进一步等价写为
Seq<Integer> seq = c -> { c.accept(1); c.accept(2); c.accept(3);};
是不是有点眼熟?不妨看看Python里类似的东西长啥样:
def seq(): yield 1 yield 2 yield 3
二者相对比,形式几乎可以说一模一样——这其实就已经是生成器了,这段代码里的accept就扮演了yield的角色,consume这个接口之所以取这个名字,含义就是指它是一个消费操作,所有的终端操作都是基于这个消费操作实现的。功能上看,它完全等价于Iterable的forEach,之所以又不直接叫forEach,是因为它的元素并不是本身自带的,而是通过闭包内的代码块临时生成的。
这种生成器,并非传统意义上利用continuation挂起的生成器,而是利用闭包来捕获代码块里临时生成的元素,哪怕没有挂起,也能高度模拟传统生成器的用法和特性。其实上文所有链式API的实现,本质上也都是生成器,只不过生成的元素来自于原始的流罢了。
有了生成器,我们就可以把前文提到的下划线转驼峰的操作用Java也依样画葫芦写出来了。
static String underscoreToCamel(String str) { // Java没有首字母大写方法,随便现写一个 UnaryOperator<String> capitalize = s -> s.substring(0, 1).toUpperCase() + s.substring(1).toLowerCase(); // 利用生成器构造一个方法的流 Seq<UnaryOperator<String>> seq = c -> { // yield第一个小写函数 c.accept(String::toLowerCase); // 这里IDEA会告警,提示死循环风险,无视即可 while (true) { // 按需yield首字母大写函数 c.accept(capitalize); } }; List<String> split = Arrays.asList(str.split("_")); // 这里的zip和join都在上文给出了实现 return seq.zip(split, (f, sub) -> f.apply(sub)).join("");}
大家可以把这几段代码拷下来跑一跑,看它是不是真的实现了其目标功能。
生成器的本质
虽然已经推导出了生成器,但似乎还是有点摸不着头脑,这中间到底发生了什么,死循环是咋跳出的,怎么就能生成元素了。为了进一步解释,这里再举一个大家熟悉的例子。
生产者-消费者模式
生产者与消费者的关系不止出现在多线程或者协程语境下,在单线程里也有一些经典场景。比如A和B两名同学合作一个项目,分别开发两个模块:A负责产出数据,B负责使用数据。A不关心B怎么处理数据,可能要先过滤一些,进行聚合后再做计算,也可能是写到某个本地或者远程的存储;B自然也不关心A的数据是怎么来的。这里边唯一的问题在于,数据条数实在是太多了,内存一次性放不下。在这种情况下,传统的做法是让A提供一个带回调函数consumer的接口,B在调用A的时候传入一个具体的consumer。
public void produce(Consumer<String> callback) { // do something that produce strings // then use the callback consumer to eat them}
这种基于回调函数的交互方式实在是过于经典了,原本没啥可多说的。但是在已经有了生成器之后,我们不妨胆子放大一点稍微做一下改造:仔细观察上面这个produce接口,它输入一个consumer,返回void——咦,所以它其实也是一个Seq嘛!
Seq<String> producer = this::produce;
接下来,我们只需要稍微调整下代码,就能对这个原本基于回调函数的接口进行一次升级,将它变成一个生成器。
public Seq<String> produce() { return c -> { // still do something that produce strings // then use the callback consumer to eat them };}
基于这一层抽象,作为生产者的A和作为消费者的B就真正做到完全的、彻底的解耦了。A只需要把数据生产过程放到生成器的闭包里,期间涉及到的所有副作用,例如IO操作等,都被这个闭包完全隔离了。B则直接拿到一个干干净净的流,他不需要关心流的内部细节,当然想关心也关心不了,他只用专注于自己想做的事情即可。
更重要的是,A和B虽然在操作逻辑上完全解耦,互相不可见,但在CPU调度时间上它们却是彼此交错的,B甚至还能直接阻塞、中断A的生产流程——可以说没有协程,胜似协程。
至此,我们终于成功发现了Seq作为生成器的真正本质:consumer of callback。明明是一个回调函数的消费者,摇身一变就成了生产者,实在是有点奇妙。不过仔细一想倒也合理:能够满足消费者需求(callback)的家伙,不管这需求有多么奇怪,可不就是生产者么。
容易发现,基于callback机制的生成器,其调用开销完全就只有生成器闭包内部那堆代码块的执行开销,加上一点点微不足道的闭包创建开销。在诸多涉及到流式计算与控制的业务场景里,这将带来极为显著的内存与性能优势。后面我会给出展现其性能优势的具体场景实例。
另外,观察这段改造代码,会发现produce输出的东西,根本就还是个函数,没有任何数据被真正执行和产出。这就是生成器作为一个匿名接口的天生优势:惰性计算——消费者看似得到了整个流,实际那只是一张爱的号码牌,可以涂写,可以废弃,但只有在拿着货真价实的callback去兑换的那一刻,才会真正的执行流。
生成器的本质,正是人类本质的反面:鸽子克星——没有任何人可以鸽它
IO隔离与流输出
Haskell发明了所谓IO Monad来将IO操作与纯函数的世界隔离。Java利用Stream,勉强做到了类似的封装效果。以java.io.BufferedReader为例,将本地文件读取为一个Stream<String>,可以这么写:
Stream<String> lines = new BufferedReader(new InputStreamReader(new FileInputStream("file"))).lines();
如果你仔细查看一下这个lines方法的实现,会发现它使用了大段代码去创建了一个iterator,而后才将其转变为stream。暂且不提它的实现有多么繁琐,这里首先应该注意的是BufferedReader是一个Closeable,安全的做法是在使用完毕后close,或者利用try-with-resources语法包一层,实现自动close。但是BufferedReader.lines并没有去关闭这个源,它是一个不那么安全的接口——或者说,它的隔离是不完整的。Java对此也打了个补丁,使用java.nio.file.Files.lines,它会添加加一个onClose的回调handler,确保stream耗尽后执行关闭操作。
那么有没有更普适做法呢,毕竟不是所有人都清楚BufferedReader.lines和Files.lines会有这种安全性上的区别,也不是所有的Closeable都能提供类似的安全关闭的流式接口,甚至大概率压根就没有流式接口。
好在现在我们有了Seq,它的闭包特性自带隔离副作用的先天优势。恰巧在涉及大量数据IO的场景里,利用callback交互又是极为经典的设计方式——这里简直就是它大展拳脚的最佳舞台。
用生成器实现IO的隔离非常简单,只需要整个包住try-with-resources代码即可,它同时就包住了IO的整个生命周期。
Seq<String> seq = c -> { try (BufferedReader reader = Files.newBufferedReader(Paths.get("file"))) { String s; while ((s = reader.readLine()) != null) { c.accept(s); } } catch (Exception e) { throw new RuntimeException(e); }};
核心代码其实就3行,构建数据源,挨个读数据,然后yield(即accept)。后续对流的任何操作看似发生在创建流之后,实际执行起来都被包进了这个IO生命周期的内部,读一个消费一个,彼此交替,随用随走。
换句话讲,生成器的callback机制,保证了哪怕Seq可以作为变量四处传递,但涉及到的任何副作用操作,都是包在同一个代码块里惰性执行的。它不需要像Monad那样,还得定义诸如IOMonad,StateMonad等等花样众多的Monad。
与之类似,这里不妨再举个阿里中间件的例子,利用Tunnel将大家熟悉的ODPS表数据下载为一个流:
public static Seq<Record> downloadRecords(TableTunnel.DownloadSession session) { return c -> { long count = session.getRecordCount(); try (TunnelRecordReader reader = session.openRecordReader(0, count)) { for (long i = 0; i < count; i++) { c.accept(reader.read()); } } catch (Exception e) { throw new RuntimeException(e); } };}
有了Record流之后,如果再能实现出一个map函数,就可以非常方便的将Record流map为带业务语义的DTO流——这其实就等价于一个ODPS Reader。
异步流
基于callback机制的生成器,除了可以在IO领域大展拳脚,它天然也是亲和异步操作的。毕竟一听到回调函数这个词,很多人就能条件反射式的想到异步,想到Future。
一个callback函数,它的命运就决定了它是不会在乎自己被放到哪里、被怎么使用的。比方说,丢给某个暴力的异步逻辑:
public static Seq<Integer> asyncSeq() { return c -> { CompletableFuture.runAsync(() -> c.accept(1)); CompletableFuture.runAsync(() -> c.accept(2)); };}
这就是一个简单而粗暴的异步流生成器。对于外部使用者来说,异步流除了不能保证元素顺序,它和同步流没有任何区别,本质上都是一段可运行的代码,边运行边产生数据。一个callback函数,谁给用不是用呢。
并发流
既然给谁用不是用,那么给ForkJoinPool用如何?——Java大名鼎鼎的parallelStream就是基于ForkJoinPool实现的。我们也可以拿来搞一个自己的并发流。具体做法很简单,把上面异步流示例里的CompletableFuture.runAsync换成ForkJoinPool.submit即可,只是要额外注意一件事:parallelStream最终执行后是要阻塞的(比如最常用的forEach),它并非单纯将任务提交给ForkJoinPool,而是在那之后还要做一遍join。
对此我们不妨采用最为暴力而简单的思路,构造一个ForkJoinTask的list,依次将元素提交forkJoinPool后,产生一个task并添加进这个list,等所有元素全部提交完毕后,再对这个list里的所有task统一join。
default Seq<T> parallel() { ForkJoinPool pool = ForkJoinPool.commonPool(); return c -> map(t -> pool.submit(() -> c.accept(t))).cache().consume(ForkJoinTask::join);}
这就是基于生成器的并发流,它的实现仅仅只需要两行代码——正如本文开篇所说,流可以用非常简单的方式构建。哪怕是Stream费了老大劲的并发流,换一种方式,实现起来可以简单到令人发指。
这里值得再次强调的是,这种机制并非Java限定,而是任何支持闭包的编程语言都能玩。事实上,这种流机制的最早验证和实现,就是我在AutoHotKey_v2这个软件自带的简陋的脚本语言上完成的。
剩余60%,完整内容请点击下方链接查看:
一种新的流:为Java加入生成器(Generator)特性
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
标签: #javacall