龙空技术网

「Spark从入门到精通系列」3. Apache Spark结构化API(上)

数据与智能 263

前言:

目前同学们对“apacheapi”大概比较注意,兄弟们都想要剖析一些“apacheapi”的相关文章。那么小编在网络上汇集了一些对于“apacheapi””的相关知识,希望我们能喜欢,兄弟们快快来学习一下吧!

来源 | Learning Spark Lightning-Fast Data Analytics,Second Edition

作者 | Damji,et al.

翻译 | 吴邪 大数据4年从业经验,目前就职于广州一家互联网公司,负责大数据基础平台自研、离线计算&实时计算研究

校对 | gongyouliu

编辑 | auroral-L

第三章 Apache Spark结构化API

1. Spark:什么是RDD?

2. 结构化Spark

2.1 主要的优点和优势

在本章中,我们将探讨Apache Spark添加结构化背后的主要动机,包括这些动机是如何引导高级API(DataFrame和DataSet)的创建,以及它们在Spark2.x中不同组件之间的一致性介绍。我们还将研究支撑这些结构化高级API的Spark SQL引擎。

当Spark SQL首次在早期的Spark1.x中被引入,接着是DataFrame作为Spark1.3中SchemaRDD的继承者,我们第一次看到了Spark完整的结构。Spark SQL引入了高级表达式操作函数,模拟了类似SQL的语法,DataFrame为后续版本中的更多结构奠定了基础,为Spark计算查询中的性能操作铺平了道路。

但在我们讨论较新的结构化API之前,让我们先看一下简单的RDD编程API模型,以简要了解一下Spark中没有结构的感觉。

1. Spark:什么是RDD?

RDD是Spark最基本的抽象,与RDD相关的三个重要特性:

依赖关系:宽依赖和窄依赖数据分区(Partitions):数据集组成单位,带有位置信息计算函数:Partition => Iterator[T]

这三个特性都是RDD编程API模型最基本的组成部分,基于RDD模型构建所有更高级别的功能。首先,需要一个依赖关系列表,该依赖关系指示Spark如何使用其输入构造RDD。必要时,Spark可以根据这些依赖关系重新创建RDD并对其进行复制操作。这一特性使得RDD具有弹性。

其次,分区使得Spark能够对数据进行拆分,以便跨Executor的分区进行并行计算。在某些情况下,例如从HDFS读取,Spark将使用位置信息将工作发送给接近数据的Executor。这样,通过网络传输的数据就会更少,减少网络IO。

最后,RDD具有计算功能,它可以将存储在RDD中的数据生成一个Iterator[T]。

简单而优雅!然而,这个原始的模型存在几个问题。首先,计算函数(或计算)对Spark是不透明的。也就是说,Spark不知道你在计算函数中在做什么。无论是执行connect、filter、select还是aggregate,Spark都只将其视为lambda表达式。另一个问题是Iterator[T]数据类型对于Python RDD来说也不透明;Spark只知道它是Python中的通用对象。

此外,由于无法检查函数中的计算或表达式,因此Spark无法优化该表达式——无法理解其中的意图。最后,Spark不了解T中的特定数据类型。Spark是一个不透明的对象,它不知道你是否访问对象中特定类型的列。因此,Spark所能做的就是将不透明对象序列化为一系列字节,而不使用任何数据压缩技术。

这种不透明性明显阻碍了Spark将计算重排为高效的查询计划的能力。那么解决方案是什么呢?

2. 结构化Spark

Spark2.x引入了一些构建Spark的关键方案。一种是使用数据分析中常见的模式来表达计算。这些模式表示为高级操作,如过滤、选择、计数、聚合、平均和分组,这提供了更多的清晰度和简单性。

通过在DSL中使用一组通用运算符,可以进一步缩小了这种特异性。通过DSL中的一组操作(如Spark支持的lan参数(Java、Python、Spark、R、和SQL)中的操作),这些运算符可以让你告诉Spark你希望对数据进行什么计算,因此,它可以构建一个可执行的有效的查询计划。

最终的顺序和结构方案是允许你以表格的形式排列数据,如SQL表或电子表格,并使用受支持的结构化数据类型(稍后将介绍)。

但是,这种结构到底有什么好处呢?

2.1 主要的优点和优势

结构带来许多好处,包括跨Spark组件提供性能和空间效率。在简要讨论DataFrame和Dataset API的使用时,我们将进一步探讨这些优势,但现在我们将集中讨论其他优势:表达性、简单性、可组合性和统一性。

让我们先用一个简单的代码片段来演示可表达性和可组合性。在下面的示例中,我们要汇总每个名称的所有年龄,按名称分组,然后计算年龄平均值——这是数据分析和发现中的一种常见模式。如果我们使用低级RDD API,代码如下:

# In Python# Create an RDD of tuples (name, age)dataRDD = sc.parallelize([("Brooke", 20), ("Denny", 31), ("Jules", 30),("TD", 35), ("Brooke", 25)])# Use map and reduceByKey transformations with their lambda# expressions to aggregate and then compute averageagesRDD = (dataRDD.map(lambda x: (x[0], (x[1], 1))).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])).map(lambda x: (x[0], x[1][0]/x[1][1])))

没有人会质疑这段代码告诉Spark如何聚合键和用一串lambda函数计算平均值,该代码是神秘的且难以阅读的。换句话说,代码正在指示Spark如何计算查询,但对Spark完全不透明,因为它不能传达意图。此外,Scala中的等效RDD代码看起来与这里显示的Python代码完全不同。

相比之下,如果我们用高级DSL运算符和DataFrame API来表达相同的查询,从而指示Spark该怎么办?请看一看下面这段代码:

# In Pythonfrom pyspark.sql import SparkSessionfrom pyspark.sql.functions import avg# Create a DataFrame using SparkSessionspark = (SparkSession.builder.appName("AuthorsAges").getOrCreate())# Create a DataFramedata_df = spark.createDataFrame([("Brooke", 20), ("Denny", 31), ("Jules", 30), ("TD", 35), ("Brooke", 25)], ["name", "age"])# Group the same names together, aggregate their ages, and compute an averageavg_df = data_df.groupBy("name").agg(avg("age"))# Show the results of the final executionavg_df.show()+------+--------+| name|avg(age)|+------+--------+|Brooke| 22.5|| Jules| 30.0|| TD| 35.0|| Denny| 31.0|+------+--------+

这个版本的代码比早期的版本更有表达力,也更简单,因为我们使用高级DSL运算符和API来告诉Spark该做什么。实际上,我们已经使用了这些运算符来构成我们的查询。而且由于Spark可以检查或解析这个查询并理解我们的意图,所以它可以优化或调整操作以高效执行。Spark确切地知道了我们想做什么:按他们的名字分组,年龄汇总,然后计算所有同名的人的平均年龄。我们使用高级运算符作为一个简单的查询来构建一个完整的计算——它的表达能力如何呢?

有些人会认为,仅通过使用映射到通用或重复数据分析模式的高级表达DSL运算符来引入顺序和结构,我们就限制了开发人员指示编译器或控制了应该如何计算其查询的范围,实际上你不会受限于这些结构化模式;你可以随时切换回非结构化的低级RDD API,尽管我们几乎没有必要这样做。

除了更容易阅读之外,Spark的高级API的结构还引入了其组件和语言之间的统一性。例如,此处显示的Scala代码与以前的Python代码具有相同的作用,并且API看起来几乎相同:

// In Scalaimport org.apache.spark.sql.functions.avgimport org.apache.spark.sql.SparkSession// Create a DataFrame using SparkSessionval spark = SparkSession.builder.appName("AuthorsAges").getOrCreate()// Create a DataFrame of names and agesval dataDF = spark.createDataFrame(Seq(("Brooke", 20), ("Brooke", 25),("Denny", 31), ("Jules", 30), ("TD", 35))).toDF("name", "age")// Group the same names together, aggregate their ages, and compute an averageval avgDF = dataDF.groupBy("name").agg(avg("age"))// Show the results of the final execution avgDF.show()+------+--------+| name|avg(age)|+------+--------+|Brooke| 22.5|| Jules| 30.0|| TD| 35.0|| Denny| 31.0|+------+--------+

如果了解SQL操作,其中一些DSL运算符会执行你将熟悉的类似关系的操作,如选择、筛选、分组和聚合。

我们开发人员所珍视的所有这些简单性和表达性都是可能的,因为构建了高级结构化API的Spark SQL引擎。正是因为这个支撑了所有的Spark组件的引擎,我们才能获得统一的API。无论是在结构化流(Structured Streaming)还是MLLib中对DataFrame做查询,你始终都会将DataFrame作为结构化数据进行转换和操作。我们将在这一章后面详细介绍Spark SQL引擎,但现在我们探讨常见操作所用的API和DSL,以及如何将它们用于数据分析。

标签: #apacheapi