龙空技术网

使用 Bucket Index 加速Apache Hudi 写入

Lakehouse 146

前言:

目前咱们对“apache配置防sql”大致比较关切,咱们都想要分析一些“apache配置防sql”的相关知识。那么小编在网摘上网罗了一些对于“apache配置防sql””的相关内容,希望小伙伴们能喜欢,看官们一起来学习一下吧!

Apache Hudi 在写入路径上使用索引[1]来检测更新与插入,并将更新确定性地路由到同一文件组。Hudi 支持开箱即用的不同索引选项,如 bloom、simple、hbase、bucket、global_bloom、global_simple 等。我们将讨论 Apache Hudi 中的 bucket 索引以及它与其他索引的不同之处。

写入流程

这是一批数据摄取到 Hudi 的关键写入步骤。

关键阶段之一是上述所有阶段中的索引阶段。大多数情况下,索引查找将决定写入延迟,因为每个其他阶段都是合理限制或确定的。但是索引延迟取决于很多因素,例如表中的总数据、正在摄取的数据、分区与非分区、常规索引与全局索引、更新传播、记录关键时间特征等。所以,我们经常看到工程师/开发人员花时间尝试减少索引查找时间。

桶索引(Bucket Index)

与 Hudi 支持的所有其他索引相比,桶索引非常特殊。每个其他索引都有某种索引方式,索引查找涉及查找索引元数据和推断记录位置。然而在桶索引的情况下,它的记录键的哈希值或基于 Hudi 确定记录所在位置的某些列。其实我们可以只命名这个 StaticHashIndex 而不是 BucketIndex。无论如何计算哈希是 O(1) 并没有任何 IO操作,因此节省了写入期间索引所需的时间。

桶索引的唯一缺点是每个分区的桶数必须预先为给定的表定义。例如当启动一个新表时可以为每个分区定义 16 个桶,Hudi 将为表中的每个分区分配 16 个文件组。因此传入的记录通过 16 散列到 mod,然后路由到相应的文件组。每个文件组的写句柄将推断出插入/更新,并将基于此合并记录。

性能对比

进行了一个非常小规模的实验测试 Bloom索引和桶索引的区别。数据集特征:

• 总大小为 7GB,约 1300 万条记录,平均分布 10 个分区。

• 更新插入:抽取总批次的 50% 并尝试更新插入。

• upsert(第二次提交)比较 bloom 索引和桶索引的总写入延迟。

如下是两者的 Spark UI。使用桶索引可以明显地看到索引查找不涉及任何阶段,而对于 Bloom 索引可以看到其中有几个阶段/作业用于索引标记。Bloom索引 Spark UI

桶索引 Spark UI

Bloom索引代码

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_bloom_index"

val basePath = $TARGET_LOCATION

val inputPath = $INPUT_LOCATION // with parquet dataset as input.

val df = spark.read.format("parquet").load(inputPath)

df.cache

df.write.format("hudi").

option(PRECOMBINE_FIELD_OPT_KEY, "ts").

option(PARTITIONPATH_FIELD_OPT_KEY, "partition").

option(RECORDKEY_FIELD_OPT_KEY, "key").

option("hoodie.table.name", tableName).

option("hoodie.metadata.enable","false").

option("hoodie.datasource.write.operation","insert").

mode(Overwrite).

save(basePath)

// upsert 50% of same batch.

df.sample(0.5).write.format("hudi").

option(PRECOMBINE_FIELD_OPT_KEY, "ts").

option(PARTITIONPATH_FIELD_OPT_KEY, "partition").

option(RECORDKEY_FIELD_OPT_KEY, "key").

option("hoodie.table.name", tableName).

option("hoodie.metadata.enable","false").

mode(Append).

save(basePath)

注意:在 EMR 中 默认使用 Bloom 索引。本地测试默认索引是SIMPLE索引。

桶索引代码

import org.apache.hudi.QuickstartUtils._

import scala.collection.JavaConversions._

import org.apache.spark.sql.SaveMode._

import org.apache.hudi.DataSourceReadOptions._

import org.apache.hudi.DataSourceWriteOptions._

import org.apache.hudi.config.HoodieWriteConfig._

import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_bucket_index"

val basePath = $TARGET_LOCATION

val inputPath = $INPUT_LOCATION // with parquet dataset as input.

val df = spark.read.format("parquet").load(inputPath)

df.cache

df.write.format("hudi").

option(PRECOMBINE_FIELD_OPT_KEY, "ts").

option(PARTITIONPATH_FIELD_OPT_KEY, "partition").

option(RECORDKEY_FIELD_OPT_KEY, "key").

option("hoodie.table.name", tableName).

option("hoodie.metadata.enable","false").

option("hoodie.index.type","BUCKET").

option("hoodie.index.bucket.engine","SIMPLE").

option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").

option("hoodie.bucket.index.num.buckets","12").

option("hoodie.datasource.write.operation","insert").

mode(Overwrite).

save(basePath)

Upsert 50% of records:

df.sample(0.5).write.format("hudi").

option(PRECOMBINE_FIELD_OPT_KEY, "ts").

option(PARTITIONPATH_FIELD_OPT_KEY, "partition").

option(RECORDKEY_FIELD_OPT_KEY, "key").

option("hoodie.table.name", tableName).

option("hoodie.metadata.enable","false").

option("hoodie.index.type","BUCKET").

option("hoodie.index.bucket.engine","SIMPLE").

option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").

option("hoodie.bucket.index.num.buckets","12").

mode(Append).

save(basePath)

如果更喜欢使用桶索引,这些是要设置的配置

option("hoodie.index.type","BUCKET").

option("hoodie.index.bucket.engine","SIMPLE").

option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").

option("hoodie.bucket.index.num.buckets","12")

注意:如果更喜欢对表使用桶索引,则必须重新开始。不能变更索引,如从 Bloom 切换到桶索引。

结论

如果用例非常适合存储桶索引,则可以大大加快写入延迟,可以选择任何列进行散列,如果没有,主键将用于散列。

引用链接

[1] 索引: []()

标签: #apache配置防sql #apacheindexof美化