龙空技术网

Apache Hudi数据不知道怎么删除?多种方式快来Get

Lakehouse 175

前言:

此时同学们对“apache卸载命令”大体比较关心,兄弟们都想要分析一些“apache卸载命令”的相关知识。那么小编同时在网上网罗了一些有关“apache卸载命令””的相关内容,希望同学们能喜欢,我们快快来了解一下吧!

1. 删除数据的方式

在要删除的记录中添加 ‘_HOODIE_IS_DELETED’ 且值为true的列

使用分区级别的删除API

使用记录级别删除的API

使用deltastreamer,删除数据

2. 核心配置

hoodie.datasource.write.operation = "delete_partition"spark dataSoruce 如果使用分区级别的删除,需要设置此配置

hoodie.datasource.write.partitions.to.delete = "partitionValue_1,partitionValue_2,partitionValue_3"如果使用此配置,则只需要传递需要删除的分区即可,无需构建dataFrame.如果不使用此配置,必须要构建包含主键和分区的dataFrame.

spark dataSource 如果使用记录级别的删除,需要设置此配置hoodie.datasource.write.operation = delete

3. 案例

3.1 分区级别删除

分区级别删除包含两种方式,一种不依赖DataFrame数据,另外一种是依赖DataFrame数据。

3.1.1 不依赖DataFrame数据

不依赖DataFrame数据的删除方式只需要在常规配置下添加如下配置即可

hoodie.datasource.write.operation = delete_partition

hoodie.datasource.write.partitions.to.delete = 具体的分区值

val df = spark.emptyDataFramedf.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "2"). option("hoodie.upsert.shuffle.parallelism", "2"). option("hoodie.bulkinsert.shuffle.parallelism", "2"). option("hoodie.delete.shuffle.parallelism", "2"). option("hoodie.table.name", "tableName"). option("hoodie.datasource.write.partitionpath.field", "partitionpath"). option("hoodie.datasource.write.operation", "delete_partition"). option("hoodie.datasource.write.partitions.to.delete", "partitionField_1, partitionField_2"). mode(Append). save(tablePath)

3.1.2 依赖DataFrame数据

依赖DataFrame数据的删除方式需要构建一个包含主键和分区的dataFrame,并且使用如下配置

hoodie.datasource.write.operation = delete_partition

//需要构建包含分区字段、主键的 dataFrameval df = spark.sql("select uuid, partitionpath from hudi_table")df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "2"). option("hoodie.upsert.shuffle.parallelism", "2"). option("hoodie.bulkinsert.shuffle.parallelism", "2"). option("hoodie.delete.shuffle.parallelism", "2"). option("hoodie.table.name", "tableName"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", "partitionpath"). option("hoodie.datasource.write.operation", "delete_partition"). mode(Append). save(tablePath)

3.2 记录级别删除

记录级删除也分为两种, 一种是将删除的数据集提前准备进行删除。另外一种是在数据中添加 ‘_HOODIE_IS_DELETED’ 且值为true的列

3.2.1 依赖DataFrame数据

第一种记录级别删除与第二种分区级别删除配置大致相同. 需要构建一个包含主键和分区的dataFrame, 并且使用如下配置

hoodie.datasource.write.operation = delete

val df = spark.sql("select uuid, partitionpath from hudi_table")df.write.format("org.apache.hudi"). option("hoodie.insert.shuffle.parallelism", "2"). option("hoodie.upsert.shuffle.parallelism", "2"). option("hoodie.bulkinsert.shuffle.parallelism", "2"). option("hoodie.delete.shuffle.parallelism", "2"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", "partitionpath"). option("hoodie.table.name", "tableName"). option("hoodie.datasource.write.operation", "delete"). mode(Append). save(tablePath)
3.2.2 赖schema方式

第二种记录级别删除需要在数据中添加 ‘_HOODIE_IS_DELETED’ 且值为true的列

//需要在dataFram中添加此列,如果此值为false或者不存在则当作常规写入记录,如果此值为false则为删除记录StructField(_HOODIE_IS_DELETED, DataTypes.BooleanType, true, Metadata.empty());

dataFrame.write.format("org.apache.hudi"). option("hoodie.table.name", "test123"). option("hoodie.datasource.write.operation", "upsert"). option("hoodie.datasource.write.recordkey.field", "uuid"). option("hoodie.datasource.write.partitionpath.field", "partitionpath"). option("hoodie.datasource.write.storage.type", "COPY_ON_WRITE"). option("hoodie.datasource.write.precombine.field", "ts"). mode(Append). save(basePath)

3.2.3 deltastreamer方式

使用deltastreamer方式删除记录和3.2.2 中依赖schama删除方式其实类似

需要设置deltastreamer 中 schama文件包含 '_hoodie_is_deleted' 并且值为true

schema:

{ "type":"record", "name":"schema", "fields":[{ "name": "uuid", "type": "String"}, { "name": "ts", "type": "string"}, { "name": "partitionPath", "type": "string"}, { "name" : "_hoodie_is_deleted", "type" : "boolean", "default" : false}]}

data:

{"ts": 0.0, "uuid": "69cdb048", "partitionpath": "americas/brazil/sao_paulo", "_hoodie_is_deleted" : true}

标签: #apache卸载命令