龙空技术网

Apache Beam 大数据处理一站式分析

会飞的鱼go 135

前言:

现在小伙伴们对“apachebeamforbeam”大体比较讲究,朋友们都想要了解一些“apachebeamforbeam”的相关资讯。那么小编也在网摘上网罗了一些关于“apachebeamforbeam””的相关知识,希望咱们能喜欢,你们一起来学习一下吧!

文章来源:加米谷大数据

一. 介绍

大数据处理其实经常被很多人低估,缺乏正确的处理体系,其实,如果没有高质量的数据处理流程,人工智能将只有人工而没有智能。现在的趋势是数据体量不断上涨,团队却低估了规模所带来的复杂度。大数据领域泰斗级人物Jesse Anderson曾做过研究,一个组织架构比较合理的人工智能团队,数据处理工程师需要占团队总人数的4/5,然而很多团队还没有认识到这点。大数据处理涉及大量复杂因素,而Apache Beam恰恰可以降低数据处理的难度,它是一个概念产品,所有使用者都可以根据它的概念继续拓展。

Apache Beam提供了一套统一的API来处理两种数据处理模式(批和流),让我们只需要将注意力专注于数据处理的算法上,而不用再花时间去维护两种数据处理模式上的差异。

公司用Beam的业务场景,做数据引擎服务,其他中台产品,以此为基础做一些其他服务,比如数据交换,计算开发平台,数据分析等等,中台的概念不是本章的重点,不在此展开,大部分所谓的各种各样的中台,其实就是个业务平台而已。

二. 编程模型

现实应用场景中,各种各样的应用需求很复杂,例如:我们假设 Hive 中有两张数据源表,两个表数据格式一样,我们要做的是:按照日期增量,新版本根据字段修改老版本的数据,再增量一部分新的数据,最后生成一张结果表。案例实现链接:

架构流程

这案例下包含多种不同处理模块,最后连接在一起,得出一个有向无环图,称为一个工作流系统(Workflow System),在这种系统下,不可能就简单用数据转换操作,其中涉及到四种常见的涉及模式。

2.1 Workflow

复制模式:

复制模式通常是将单个数据处理模块中的数据,完整地复制到两个或更多的数据处理模块中,然后再由不同的数据处理模块进行处理。

复制模式

例如:结果集合被不同处理流程调用,输出到不同的数据库。

过滤模式:

过滤掉不符合特定条件的数据。

过滤模式

例如:通过一系列规则筛选结果集。

分离模式:

如果你在处理数据集时并不想丢弃里面的任何数据,而是想把数据分类为不同的类别进行处理时,你就需要用到分离式来处理数据。

分离模式

例如:针对全部用户,用户分级,观察不同组用户的行为,用户增长分析。

合并模式:

合并模式会将多个不同的数据转换集中在一起,成为一个总数据集,然后将这个总数据集放在一个工作流中进行处理。

合并模式

例如:数据融合之后,输出一份结果集。

2.2 Lambda架构

Lambda 架构(Lambda Architecture)是由 Twitter 工程师南森·马茨(Nathan Marz)提出的大数据处理架构。这一架构的提出基于马茨在 BackType 和 Twitter 上的分布式数据处理系统的经验。

Lambda 架构使开发人员能够构建大规模分布式数据处理系统。它具有很好的灵活性和可扩展性,也对硬件故障和人为失误有很好的容错性。Lambda 架构总共由三层系统组成:批处理层(Batch Layer),速度处理层(Speed Layer),以及用于响应查询的服务层(Serving Layer)。

这种架构,稳定高,离线计算和实时计算会冗余代码,如果用比较复杂引擎交替执行任务,维护性很高,用实时计算弥补离线计算的不足。

2.3 Kappa架构

Kappa 架构是由 LinkedIn 的前首席工程师杰伊·克雷普斯(Jay Kreps)提出的一种架构思想。克雷普斯是几个著名开源项目(包括 Apache Kafka 和 Apache Samza 这样的流处理系统)的作者之一,也是现在 Confluent 大数据公司的 CEO。

克雷普斯提出了一个改进 Lambda 架构的观点:

我们能不能改进 Lambda 架构中速度层的系统性能,使得它也可以处理好数据的完整性和准确性问题呢?我们能不能改进 Lambda 架构中的速度层,使它既能够进行实时数据处理,同时也有能力在业务逻辑更新的情况下重新处理以前处理过的历史数据呢?

这种架构其实用kafka性能的特点,海量存储来延展出来的架构,既可以存储历史数据,也可以处理实时数据,但是稳定不高,需要维护好kafka,LinkedIn 开源出来计算引擎,也跟这种架构配套使用的。

2.4 小结

做技术关注那些不变,才能隔离好那些变化,这些思想下,才能延展出一系列服务,让整个体系蕴含生命,技术在于悟,以靠近前辈的方式,离开前辈。

三. PCollection3.1 Apache Beam 发展史在2003年以前,Google内部其实还没有一个成熟的处理框架来处理大规模数据。在2004年时候,Google 发布的一篇名为“MapReduce: Simplified Data Processing on Large Clusters”的论文,将MapReduce架构思想总结出来。它希望能提供一套简洁的API来表达工程师数据处理的逻辑。另一方面,要在这一套API底层嵌套一套扩展性很强的容错系统,使得工程师能够将心思放在逻辑处理上,而不用过于分心去设计分布式容错系统。在2010年时候,Google公开了FlumeJava架构思想论文。它将所有数据都抽象成名为PCollection的数据结构,无论从内存中读取数据,还是在分布式环境下读取文件。这样的好处其实为了让测试代码即可以在分布式环境下运行,也可以在单机内存下运行。在2013年时候,Google公开Millwheel思想,它的结果整合几个大规模数据处理框架的优点,推出一个统一框架。在2015年的时候,Google公布了Dataflow Model论文,同时也推出了基于 Dataflow Model 思想的平台 Cloud Dataflow,让 Google 以外的工程师们也能够利用这些 SDK 来编写大规模数据处理的逻辑。在2016年的时候,Google基于要在多平台运行程序的契机,联合Talend、Data Artisans、Cloudera 这些大数据公司,基于 Dataflow Model 的思想开发出了一套 SDK,并贡献给了 Apache Software Foundation。而它 Apache Beam 的名字是怎么来的呢?就如文章开篇图片所示,Beam 的含义就是统一了批处理和流处理的一个框架。现阶段Beam支持Java、Python和Golang等等。

通过Apache Beam,最终我们可以用自己喜欢的编程语言,通过一套Beam Model统一地数据处理API,编写数据处理逻辑,放在不同的Runner上运行,可以实现到处运行。

3.2 PCollection特点

PCollection,就是 Parallel Collection,意思是可并行计算的数据集。如果了解Spark的话,就会发现PCollection和RDD相似。在Beam的数据结构体系中,几乎所有数据都能表达成PCollection,例如复杂操作数据导流,就是用它来传递的。

PCollection<String> lines = pipeline.apply(TextIO.read().from("url")                .withHintMatchesManyFiles());

PCollection需要Coders:

因为整个Beam计算流程最终会运行在一个分布式系统。所有的数据都有可能在网络上的节点之间传递。

Coder有两种方式,一.需要注册全局CoderRegistry中,二.每次转换操作后,手动指定Coder。

PipelineOptions options = PipelineOptionsFactory.create();Pipeline p = Pipeline.create(options);//全局CoderRegistry cr = p.getCoderRegistry();cr.registerCoder(Integer.class, BigEndianIntegerCoder.class);//手动指定,可以使用Beam自带的序列化类型,也可以自定义。p.apply(Create.of(list))      .setCoder(KvCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()))

PCollection是无序:

PCollection的无序特性其实也和分布式本质有关,无序分布PCollection,异步的,保证性能。

PCollection没有固定大小:

批处理和流数据的区别,在于一个是有界数据和无界数据,因为如此PCollection没有限制它的容量。在实现上,Beam是有window来分割持续更新的无界数据,一个流数据可以被持续的拆分成不同的小块。

PCollection不可变性:

PCollection不提供任何修改它所承载的数据方式,如果修改PCollection,只能Transform(转换)操作,生成新的PCollection的。Beam 的 PCollection 都是延迟执行,为了性能,最后生成执行计划,到处运行。

扩展:

其实如果对函数式编程有了解的朋友,PCollection有些特点跟函数式编程特点有相通的地方,因为,PCollection底层就是用这种范式抽象出来的,为了提高性能,不会有大量的变化机制,在整个编译运行中泄漏资源。

四. Pipeline

Beam中,所有数据处理逻辑都被抽象成数据流水线(Pipeline)来运行,简单来说,就是从读取数据集,将数据集转换成想要的结果数据集这样一套流程。

例1

PipelineOptions options = PipelineOptionsFactory.create();//设置执行引擎,DirectRunner为本地引擎,资源受限,最大并发数限制。options.setRunner(DirectRunner.class);Pipeline pipeline = Pipeline.create(options);List<KV<String, String>> kvs = new ArrayList<>();for (int i = 0; i < 10; i++) {    String id="id:"+i;    String name="name:"+i;    kvs.add(KV.of(id,name));}//1.设置数据集,2.Filter.by过滤操作,3.通过JdbcIO输出到数据库中。pipeline.apply(Create.of(kvs)).setCoder(KvCoder.of(StringUtf8Coder.of(),StringUtf8Coder.of()))        .apply(Filter.by( (KV<String, String> kv) ->  kv.getKey() == "id:1"))        .apply(JdbcIO.<KV<String, String>>write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("数据连接池")).withStatement("sql")                .withPreparedStatementSetter((element, statement) -> {                    statement.setString(1,element.getKey());                    statement.setString(2,element.getValue());                }));//运行pipeline.run().waitUntilFinish(

五. Transform

Beam 中数据处理的最基本单元是 Transform。Beam 提供了最常见的 Transform 接口,比如 ParDo、GroupByKey,其中 ParDo 更为常用。

ParDo 就是 Parallel Do 的意思,表达的是很通用的数据处理操作;GroupByKey 的意思是把一个 Key/Value 的数据集按 Key 归并。

注意:

可以用 ParDo 来实现 GroupByKey,一种简单的方法就是放一个全局的哈希表,然后 ParDo 里把一个一个元素插进这个哈希表里。但这样的实现方法其实无法使用,因为你的数据量可能完全无法放进一个内存哈希表。

使用 ParDo 时,需要继承它提供 DoFn 类,可以把 DoFn 看作 ParDo 的一部分, Transform 是一个概念方法,里面包含一些转换操作。

//DoFn 的模板static class DoFnTest<T> extends DoFn<T,T>{  @Setup  public void setUp(){...}  @StartBundle  public void startBundle(){...}  @ProcessElement  public void processElement( ProcessContext c) {...}    @FinishBundle  public void finishBundle(){...}  @Teardown	  public void teardown(){...}	

处理某个 Transform 的时候,数据是序列化的(PCollection),Pipeline 注册的流水线会将这个 Transform 的输入数据集 PCollection 里面元素分割成不同的 Bundle,再将这些 Bundle 分发给不同的 Worker 来处理。Beam 数据流水线具体会分配多少个 Worker,以及将一个 PCollection 分割成多少个 Bundle 都是随机的,具体跟执行引擎有关,设计到不同引擎的动态资源分配,可以自行查阅资料。

Transform 调用 DoFn 时,@Setup 初始化资源,@Teardown 处理实例调用完以后,清除资源,防止泄露。@StartBundle 方法跟 Bundle 有关,在 Bundle 中每个输入元素上调用 @ProcessElement(类似 map 输入每行数据),如果提供 DoFn 的 @FinishBundle 调用它,(Bundle 中数据流完)调用完成 @FinishBundle 之后,下次调用 @StartBundle 之前,框架不会再次调用 @ProcessElement 或 @FinishBundle。

如果处理 Bundle 的中间出现错误,一个 Bundle 里面的元素因为任意原因导致处理失败了,则这整个 Bundle 里面都必须重新处理。

在多步骤 Transform 中,如果一个 Bundle 元素发生错误了,则这个元素所在的整个 Bundle 以及与这个 Bundle 有关联的所有 Bundle 都必须重新处理。

这个就是Beam数据流水线处理模型。

六. Pipeline I/O

读取数据集用Pipeline I/O来实现。

读取数据集:

一个输入数据集的读取通常是通过 Read Transform 来完成的。Read Transform 从外部源 (External Source) 中读取数据,这个外部源可以是本地机器上的文件,可以是数据库中的数据,也可以是云存储上面的文件对象,甚至可以是数据流上的消息数据。Read Transform 的返回值是一个 PCollection,这个 PCollection 就可以作为输入数据集,应用在各种 Transform 上。Beam 数据流水线对于用户什么时候去调用 Read Transform 是没有限制的,我们可以在数据流水线的最开始调用它,当然也可以在经过了 N 个步骤的 Transforms 后再调用它来读取另外的输入数据集。

//文件PCollection<String> inputs = p.apply(TextIO.read().from(filepath));//在Beam的io包下有很多关于读取数据的流,大约有34种,也可以自定义io。//jdbc io流PCollection<Row> t1 = pipeline.apply(JdbcIO.<Row>read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(cpds)).withCoder(SchemaCoder.of(type)).withQuery("select * from template1").withRowMapper(new JdbcIO.RowMapper<Row>() {...}));

输出数据集:

将结果数据集输出到目的地址的操作是通过 Write Transform 来完成的。Write Transform 会将结果数据集输出到外部源中。与 Read Transform 相对应,只要 Read Transform 能够支持的外部源,Write Transform 都是支持的。在 Beam 数据流水线中,Write Transform 可以在任意的一个步骤上将结果数据集输出。所以,用户能够将多步骤的 Transforms 中产生的任何中间结果输出。

p.apply(TextIO.write().to("url").withSuffix("文件后缀"));

标签: #apachebeamforbeam