龙空技术网

一篇图文掌握把Hive表切换到Iceberg表

码上加薪 622

前言:

当前小伙伴们对“sql迁移数据至hive仓库”大体比较关心,姐妹们都想要学习一些“sql迁移数据至hive仓库”的相关知识。那么小编在网络上搜集了一些关于“sql迁移数据至hive仓库””的相关文章,希望同学们能喜欢,姐妹们快快来了解一下吧!

一 数据仓库架构升级的背景1.1 基于Hive的离线数据仓库缺点1.1.1 不支持ACID不支持数据的Upsert场景,也不支持Row-level delete,数据的修正成本很高。同时也不能做增量数据读取,无法实现存储层面的流批统一。1.1.2 时效性能以提升数据难以做到准实时可见,写入Hive数据表时,周期最短也是30分钟也如一次。无法支持分钟级延迟的数据分析场景,现在写入Iceberg可以缩短到5分钟以内。1.1.3 元数据过多将Hive分区改为小时/分钟,虽然提高了数据的准实时性,但是metastore的压力也是显而易见的,元数据过多导致生成查询计划变慢,而且还会影响线上其他业务。随着元数据增加,存储Hive元数据的数据库压力也会增加,一段时间后必须对该数据库进行扩容。1.1.4 Table Evolution写入型 Schema,对 Schema 变更支持不好。Partition Spec 变更支持不友好。1.2 基于Iceberg的准实时数仓收益

在这个架构层面我们的收益还是很大的,可以把原来整体“T+1”的离线数仓,做成准实时数仓(10分钟级别),提升数仓整体的数据时效性,然后更好的支持上下游的业务。

这样的准实时数仓的优势是一次开发、口径统一、统一存储,是真正的流批一体化。不足就是时效性没有Flink+Kafka的高,不能做到秒级、毫秒级数据延迟。

结合Spark3.x SQL,能够支持准实时的多维数据分析。

二 从Hive迁移到Iceberg的收益2.1 Hive表数据能否平滑迁移到Iceberg?

Iceberg提供了Spark存储过程,这个工具完全可以做到不用挪动原来Hive表的ORC/Parquet数据文件,可以直接生成Iceberg的metadata,进而就可以得到一个Iceberg表。原来操作Hive表的Spark、Hive、Presto作业,切换到Iceberg表之后完全兼容,之前的SQL代码逻辑该怎么写还怎么写,也就是说原来的SQL脚本仍然可以复用。

在第三部分讲重点介绍如何迁移Hive表到Iceberg表。

可以看出Iceberg是有非常大的野心的,也就是说Iceberg的目标就是替换掉Hive表,经过netflix、apple、linkedin、adobe这些公司的场景磨练,无缝替换hive应该是iceberg当前release的场景里面最核心的场景。

2.2 从Hive表迁移到Iceberg表的核心收益是什么?2.2.1 准实时入仓

之前写入Hive数据表时,周期最短也是30分钟也如一次,现在写入Iceberg可以缩短到5分钟以内。本质上在于Iceberg把metadata存储在HDFS上,脱离了去中心化的hive-metastore依赖。

此外Iceberg提供了data文件和metadata文件的合并功能,数据可以按照5分钟的频次写入数据湖,上一个小时的data和metadata文件就可以合并了。这样就避免了过多小文件的影响,同学们是不是就不用担心数据分析受小文件过多的影响了。

最主要的是Iceberg提供了ACID功能。我们通常对Hive表都是每次写入一个新的Partition,在Query(查询、分析)的时候指定老的Partition范围。这套机制在天级别的离线系统里面,基本上没啥问题。一旦做到5分钟级别的数据实效性,我们怎么知道现在应该指定什么分区范围呢?有了iceberg的ACID隔离机制,就不存在这个问题,最近5分钟的数据通过txn commit了,查询就能看见,否则就看不见,不存在看一半数据的问题。

2.2.2 数据变更

Hive数据表本身就是为分析静态数据而设计的,而实际上数据变更是业务发展的常见需求。我们常见的场景有:

数据仓库里面某个表增加字段。修改Hive表的分区方式,将按照天基本的分区方式调整到30分钟级别,这在Hive表操作上是非常棘手的,需要把全部的历史数据重新导入一遍。Hive缺少ACID语义,在进行INSERT OVERWRITE时非常容易遇到脏数据问题。

而上面这些问题,在Iceberg中都已经解决的非常好了。

2.2.3 准实时CDC数据读写

对Flink来说,一般常用的有两种场景,第一种场景是上游的Binlog能够很快速的写到Iceberg中,然后供不同的分析引擎做分析使用; 第二种场景是使用Flink做一些聚合操作,输出的流是upsert类型的数据流,也需要能够实时的写到数据湖或者是下游系统中去做分析。

2.2.3.1 Flink CDC2.0

Flink CDC Connectors内部封装了Debezium特性,可以使用Flink CDC的方式替代canal+kafka的方式,直接通过sql的方式来实现对mysql数据的同步。

Flink在1.11版本开始引入Flink CDC功能,并且同时支持Table和SQL两种形式,Flink SQL CDC基于Debezium实现的,能够对CDC数据进行实时解析同步。当启动MySQL CDC源时,它将获取一个全局读取锁(FLUSH TABLES WITH READ LOCK),该锁将阻止其他数据库的写入,然后读取当前binlog位置以及数据库和表的schema,之后将释放全局读取锁。然后它扫描数据库表并从先前记录的位置读取binlog,Flink将定期执行checkpoints以记录binlog位置。如果发生故障,作业将重新启动并从checkpoint完成的binlog位置恢复,因此它保证了仅一次的语义。

Dynamic Table是Flink内部定义的表,它和流式可以相互转化的。可以简单的理解为:每张MySQL表对应一个Binlog日志,Binlog日志随着MySQL表的变化而变化,Dynamic Table相当于Binlog日志流在某一时刻的物化结果。在Flink中,数据从一个算子流向另外一个算子的时候,都是以Changelog Stream的格式发送到下游,在下游我们可以将其翻译成一张表或者一条流进行操作。

2.2.3.2 准实时操作CDC数据

Iceberg 是统一的数据湖存储,支持多样化的计算模型,也支持各种引擎(包括 Spark、Presto、hive)来进行分析;产生的 file 都是纯列存的,对于后面的分析是非常快的;Iceberg 作为数据湖基于 snapshot 的设计,支持增量读取;Iceberg 架构足够简洁,没有在线服务节点,纯 table format 的,这给了上游平台方足够的能力来定制自己的逻辑和服务化。

将数据连同 CDC flag 直接 append 到 Iceberg 当中,在 merge 的时候,把这些增量的数据按照一定的组织格式、一定高效的计算方式与全量的上一次数据进行一次 merge。这样的好处是支持近实时的导入和实时数据读取;这套计算方案的 Flink SQL 原生支持 CDC 的摄入,不需要额外的业务字段设计。

2.2.4 准实时数据仓库分析系统

我们知道Iceberg支持读写分离,又支持并发读、增量读、合并小文件,而且还能做到秒级/分钟级的数据延迟。我们基于Iceberg这些优势,采用Flink+Iceberg的方式构建了流批一体化的实时数据仓库。

在数据仓库处理层,可以用 presto 进行一些简单的查询,因为 Iceberg 支持 Streaming read,所以在系统的中间层也可以直接接入 Flink,直接在中间层用 Flink 做一些批处理或者流式计算的任务,把中间结果做进一步计算后输出到下游。

三 Spark迁移工具

Spark存储过程仅可以在Iceberg SQL extensions中使用Spark 3.x进行操作。

3.1 用法

所有的存储过程都在system命名空间中,都需要通过CALL来调用。有两种执行存储过程的方式:

按参数名称调用存储过程。按参数位置调用存储过程。

注意:绝对不能混合运用这两种方式调用存储过程

3.1.1 按参数名称调用存储过程

按名称传递参数时,参数可以按任何顺序排列,并且可以省略任何可选参数。

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)
3.1.2 按位置参数调用存储过程

按位置传递参数时,如果它们是可选的,只能省略结束参数。

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)
3.2 快照管理

这是一种闪回操作,将表回退到历史的某一快照ID,同样支持两种调用方式。

rollback_to_snapshot:回退到指定的快照ID。rollback_to_timestamp:回退到指定的时间。

注意:这个过程会使所有引用受影响表的Spark缓存计划失效

3.2.1 rollback_to_snapshot3.2.1.1 用法

参数名称

字段类型

说明

table_name

string

准备回退的表名称

snapshot_id

long

准备回退到的snapshot_id

3.2.1.2 输出

输出名称

字段类型

说明

previous_snapshot_id

long

回退前的snapshot_id

current_snapshot_id

long

新的snapshot_id

3.2.1.3 样例

首先查一下hive_prod.iceberg_db.ods_test_table.snapshots,看看有哪些snapshot_id。

select committed_at, snapshot_id from hive_prod.iceberg_db.ods_test_table.snapshots;

通过查询,我们发现现在有9条数据。

现在把hive_prod.iceberg_db.ods_test_table回退到snapshot_id=8860098474832620457

CALL hive_prod.system.rollback_to_snapshot('iceberg_db.ods_test_table', 8860098474832620457);

验证一下数据,证明回退snapshot_id成功。

SELECT * FROM ods_test_table;

3.3 迁移Hive表到Iceberg

使用Spark存储过程migrate可以把Hive表迁移到Iceberg中去,表结构、分区和位置信息将从原表中复制,目前支持的数据格式有Avro、Parquet和ORC。

3.3.1 用法

参数名称

类型

说明

table_name

String

要迁移的表名称

properties

map

iceberg表描述信息

3.3.2 输出

输出名称

类型

说明

migrated_files_count

long

迁移到Iceberg表的文件数

3.3.3 例子

CALL hive_prod.system.migrate('hive_db.ods_test_table');
四 总结

欢迎大家评论、转发。

标签: #sql迁移数据至hive仓库