龙空技术网

改善Apache Spark应用程序性能

闻数起舞 389

前言:

此刻朋友们对“apacheerror521”可能比较关切,看官们都需要分析一些“apacheerror521”的相关知识。那么小编也在网络上搜集了一些对于“apacheerror521””的相关内容,希望姐妹们能喜欢,我们一起来学习一下吧!

简单的技巧和窍门,以提高您的Spark应用程序的性能

> Pixabay — Abstract Abstraction Acceleration — link

自从2014年2月成为顶级Apache项目以来,Apache Spark已迅速成为大数据空间中使用最广泛的处理引擎之一。它不仅可以在多种环境中运行(本地,独立Spark集群,Apache Mesos,YARN等),但它还可以提供许多库来帮助您解决Hadoop上的几乎所有问题。 这包括运行SQL查询,流式传输和机器学习等。 所有这些都在优化的执行引擎上运行。

多年来,我们在Clairvoyant使用Apache Spark构建了许多数据管道,包括批处理和流式传输。 您可以在此处找到更多信息。 建立了如此多的管道后,我们发现了一些简单的方法来提高Spark应用程序的性能。 以下是我们发现的一些提示和技巧:

使用DataFrame而不是RDD

代替使用RDD API

val rdd = sc.textFile("/path/to/file.txt")

使用DataFrames API

val df = spark.read.textFile("/path/to/file.txt")

通过使用DataFrame API而不是还原为使用RDD,您可以使Spark能够使用Catalyst Optimizer来改进Spark Job的执行计划。

避免使用正则表达式的

Java Regex是一个以预期结构解析数据的好方法。 不幸的是,正则表达式过程通常是一个缓慢的过程,当您必须处理数百万行时,解析单个行的过程中一点点增加会导致整个作业的处理时间增加。 尽可能避免使用正则表达式,并尝试确保以更结构化的格式加载数据。

优化联接将最大的数据集放在左侧

将两个小于一个的数据集合并在一起时,将较大的数据集放在"左"上:

val joinedDF = largerDF.leftJoin(smallerDF, largerDF("id") === smallerDF("some_id"))

当Spark整理连接数据时,它会将您在左侧指定的数据保持在执行器上的静态位置,并在执行器之间传输您在右侧设计的数据。 如果右边的数据(正在传输的)较大,则数据的序列化和传输将花费更长的时间。

利用广播联接将较小的数据集连接到较大的数据集

在许多情况下,我们将较小的数据集(几十行左右,也许更多)与较大的数据集连接在一起。 在这种情况下,使用广播连接的效果更好:

import org.apache.spark.sql.functions._val

joinedDF = largeDF.join(broadcast(smallDF), largeDF("id") === smallDF("some_id"))

使用缓存

如果发现经常在多个查询上使用相同的DataFrame,建议实施缓存或持久性:

val df = spark.read.textFile("/path/to/file.txt").cache()

注意:避免过度使用它。 由于Spark的缓存策略(先在内存中然后交换到磁盘),缓存最终可能会以稍慢的存储速度结束。 此外,将该存储空间用于缓存目的意味着该存储空间不可用于处理。 最后,缓存可能比读取DataFrame花费更多。¹

处理前表的计算统计

在查询一系列表之前,告诉spark计算这些表的统计信息将很有帮助,以便Catalyst Optimizer可以为如何处理这些表提供更好的计划。

spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS")

在某些情况下,Spark无法从上述广泛的COMPUTE STATISTICS调用中获得所需的一切。 这也有助于告诉Spark检查特定的列,以便Catalyst Optimizer可以更好地检查那些列。 对于任何涉及过滤和联接的列,建议对COMPUTE STATISTICS进行计算。

spark.sql("ANALYZE TABLE dbName.tableName COMPUTE STATISTICS FOR COLUMNS joinColumn, filterColumn")

设置spark.sql.shuffle.partitions配置参数

缺省值为200,对于某些作业而言可能太高。 将此配置设置为所有执行程序中可用的核心数。

spark.conf.set("spark.sql.shuffle.partitions", 10)

参考文献

1-

(本文翻译自Robert Sanders的文章《Improving your Apache Spark Application Performance》,参考:)

标签: #apacheerror521