龙空技术网

Spark SQL 分桶表在字节跳动的优化

闪念基因 670

前言:

此刻兄弟们对“sparksql操作hive表”大致比较关心,小伙伴们都需要学习一些“sparksql操作hive表”的相关文章。那么小编同时在网上网罗了一些关于“sparksql操作hive表””的相关文章,希望看官们能喜欢,咱们快快来学习一下吧!

本文来源:微信公众号:过往记忆大数据,内容分享者来自字节跳动的郭俊

原文地址:

本文来自 SPARK + AI SUMMIT 2020 北美会议,分享者来自字节跳动的郭俊。Bucket 在 Hive 和 Spark SQL 中普遍使用,用于消除 Join 或者 group-by-aggregate 场景下的 Shuffle 操作。本文主要介绍字节跳动在 Bucket 方面的优化。

本文主要从以下四个方面介绍:

Spark SQL 在字节跳动的应用什么是分桶Spark 分桶的限制字节跳动在分桶方面的优化

下面是 Spark SQL 在字节跳动的应用。

2016年主要是小规模的测试阶段2017年用于处理 Ad-hoc 工作负载2018年在生产环境下处理少量的 ETL 管道工作;2019年在生产环境下全面部署;2020年成为 DW 领域的主要计算引擎。什么是分桶

上面例子展示了创建分桶表的方法。主要关键字是 clustered by (xxx) sorted by (xxx) into N buckets

如果我们往分桶表里面插入数据,可以如下使用

INSERT INTO order SELECT order_id, user_id, product, amount FROM order_staging

可见,这个和正常表的使用并没有什么区别。

如果我们进行一个 ShuffleHashJoin 的时候,首先需要将表的数据按照 on 的条件进行分区,然后才是进行 Join 操作。

但是如果参与 Join 的表已经实现分桶了,那么在执行 ShuffleHashJoin 的时候省去 Shuffle 的操作。比如上面的例子如果 我们 对 order 和 user 表按照 user_id 字段进行分桶,那么在 ShuffleHashJoin 的时候就不需要进行 Exchange 操作了。

对于 SortMergeJoin ,需要对 on 里面的条件字段进行 Exchange 操作,然后再进行 Sort 操作,最后才是执行 SortMergeJoin (更多关于 Join 的策略可以参见过往记忆大数据的《 每个 Spark 工程师都应该知道的五种 Join 策略 》文章)。

如果参与 Join 的表已经分桶了,那么不需要就行 Exchange 和 Sort 操作了。

Spark 分桶的限制小文件问题

执行上面的 SQL ,每个 task 最多可能产生 1024 个文件,其中 1024 是分桶的数量。所以如果我们有 M 个 task ,那么最多产生的文件个数为 M * 1024 。 比如上面的 attempt_20200519145628_0014_m_000014_0 目录下产生了 988 个文件。

解决小文件的问题可以加上 DISTRIBUTE BY ,如下:

INSERT INTO order SELECT order_id, user_id, product, amountFROM order_stagingDISTRIBUTE BY user_id

如果 1024 是 M 的倍数,那么最多会产生 1024 个文件,其中 M = spark.sql.shuffle.partitions;

如果 M 是 1024 的倍数,那么最多会产生 M 个文件, 其中 M = spark.sql.shuffle.partitions 。

Spark 分桶和其他 SQL 引擎不兼容

Spark 的分桶和 Hive 的分桶是不兼容的,同时和 Presto 也是不兼容的;但是 Presto 与 Hive 的分桶是兼容的。

Spark 的分桶和 Hive 不兼容主要原因是以下原因导致的:

Hi ve 在生成分桶的时候会额外进行 一个 Reduce 操作,以保证相同分桶的数据都存储在一个文件中。 而 Spark SQL 在写分桶文件时不需要 Shuffle 操作,这样就会导致每个分桶最多产生 M 个文件,这就导致上面说的小文件问题;Spark 分桶和 Hive 分桶采用不同的 hash 算法。Hive 用的是 HiveHash;而 Spark 用的是 Murmur3, 所以数据的分布是不一样的。

因为 Spark 和 Hive 分桶不兼容,所以当 Spark 的 分桶 表和 Hive 的分桶表进行 SortMergeJoin 的时候是需要进行 Sort 和 Exchange 操作的。

额外的排序操作

因为 Spark SQL 表中的每个分桶里面最少包含一个文件,所以在进行 Join 之前需要进行额外的排序操作。

分桶数不对齐

如果参与 Join 的表分桶数不一致,那么其中一张表需要进行额外的 Exchange 操作。

参与 Join 的 key 和分桶列不一样需要额外操作

当参与 Join 的 key 和分桶的列不一样时,需要额外的 Exchange 操作。

上面的例子尽管参与 Join 的表都是对 user_id 字段进行分桶,并且分桶数一样,但是还是需要额外的 Exchange 操作。

字节跳动在分桶方面的优化Spark 分桶和 Hive 分桶对齐

前面介绍了 Spark 和 Hive 分桶不兼容,对于这方面,字节跳动将 Hive 分桶表和 Spark 分桶表进行了对齐,主要包括:

Spark SQL 写 Hive 分桶表的逻辑和 Hive 一致。 重写了 InsertIntoHiveTable#requiredOrdering 和 InsertIntoHiveTable#requiredDistribution,并且也使用了 HiveHash 算法。

对于读方面,重写了 HiveTableScanExec#outputPartitioning 和 HiveTableScanExec#outputOrdering, 使用了 Hiv eHash 算法,并且使用了 Hive 的分桶元数据。

上面是 Spark 读取 Hive 分桶表改进前和改进后的区别。可以看到,改进后,outputPartitioning 为 HashPartitioning,并且 outputOrdering 为 SortOrder,满足了 requireChildDistribution 为 HashClusteredDistribution的要求以及requireChildOrdering 为 SortOrder,从而在进行 SortMergeJoin 的时候省去了 Exchange 和 Sort 操作。

One to Mange Bucket Join

另一个改进是 One to Merge Bucket Join,比如下面例子 A 表有三个分桶,B 表有六个分桶。

如果我们在 Spark 对上面两张表进行 Join 操作,B 表需要额外的 Sort 操作,因为上面两张表的分桶数不一样。但是在字节公司,由于对性能的要求,需要避免 Sort 操作。

一种方法是将 A 表的分桶 0 和 B 表的分桶 0 、分桶 3 进行关联; 将 A 表的 分桶 1 和 B 表的分桶 1 、分桶 4 进行关联 ; 将 A 表的 分桶 2 和 B 表的分桶 2 、分桶 5 进行关联 。我们只需要将 A 表复制一份,这样 A 表也满足 6 个分桶。将 A 表和 A 表进行 Union 可以产生 到 6 个分桶的新表,但是 Spark 自带的 Union 操作之后 outputPartitioning 和 outputOrdering 将被删除,所以字节自己开发出 bucket union,使得 outputPartitioning 和 outputOrdering 被保留,这样就可以省去 Sort 和 Exchange 操作。

不过上面的方面在 B left join A 、 B left semi join A 、 B anti join A 、 B inner join A 可以正常工作,但是在 B right join A、 B full outer join A、 B cross join A 的时候结果有重复,因为 A 表的数据被扫描了两次。

为了解决这个问题,在 TableScan 后面加上了 hash(10) % buckets = bucket id 的过滤条件,比如 bucket 0 将会把 3、9、15 过滤掉,通过这种办法将会消除重复数据。

字节的另外一个优化是如果 Join 的 Key 不仅仅是分桶的 Key,原生的 Spark 会产生额外的 Exchange 和 Sort 操作。

通过优化后,Exchange 将消除。

本文来源:微信公众号:过往记忆大数据

原文地址:

标签: #sparksql操作hive表