龙空技术网

Apache Doris数据湖联邦分析特性揭秘

DataFunTalk 91

前言:

当前你们对“apacheranger”大概比较着重,同学们都需要了解一些“apacheranger”的相关内容。那么小编也在网摘上汇集了一些关于“apacheranger””的相关知识,希望我们能喜欢,姐妹们一起来了解一下吧!

导读 随着数据湖技术的发展,分析性能成为发挥数据湖效用、挖掘数据价值最大的掣肘。基于一款简单易用和高性能的查询分析引擎在数据湖之上构建分析服务,成为新的技术趋势。在过去一年,通过在数据湖上的诸多性能优化,结合自身的高性能执行引擎和查询优化器以及 Apache Doris,实现了数据湖上极速易用的分析体验。本文就将揭秘 Apache Doris 数据湖联邦分析特性。

全文目录:

Apache Doris 湖仓一体的思考Apache Doris 数据湖特性揭秘案例分享社区规划Q&A

分享嘉宾陈明雨 Doris Apache Doris PMC Member

编辑整理|刘步龙 硕磐智能

内容校对|李瑶

出品社区DataFun

01

Apache Doris 湖仓一体的思考

在介绍湖仓一体之前,先来看一些基本概念。首先是数据库,它是一个最基础的概念,主要负责联机事务处理。随着数据量的增长,出现了数据仓库,它存储的是经过清洗、加工以及建模后的高价值的数据,供业务人员进行数据分析。数据湖的出现,主要是为了去满足企业对原始数据的存储、管理和再加工的需求。这里的需求主要包括两部分,首先要有一个低成本的存储,用于存储结构化、半结构化,甚至非结构化的数据;另外,就是希望有一套包括数据处理、数据管理以及数据治理在内的一体化解决方案。

最后来看一下本文要重点介绍的湖仓一体的概念。数据仓库解决了数据快速分析的需求,数据湖解决了数据的存储和管理的需求,而湖仓一体想解决的就是如何让数据能够在数据湖和数据仓库之间进行无缝的集成和自由的流转,从而帮助用户直接利用数据仓库的能力来解决数据湖中的数据分析问题,同时又能充分利用数据湖的数据管理能力来提升数据的价值。

Doris 在设计湖仓一体的方案中提出了四个出发点:

第一点是湖仓查询加速,Doris 作为一个非常高效的 OLAP 查询引擎,有着非常好的 MPP 向量化的分布式的查询层,可以直接利用 Doris 非常高效的查询引擎,对湖上数据进行加速分析。第二点是统一数据分析网关,可以提供各类异构数据源的查询和写入能力,用户利用 Doris,可以把这些外部的数据源,统一到 Doris 的源数据的映射结构上,用户在通过 Doris 去查询这些外部数据源的时候,可以提供一致的查询体验。第三点是统一数据集成,首先通过数据湖的数据源连接能力,能够让多数据源的数据以增量或全量的方式同步到 Doris,并且利用 Doris 的数据处理能力对这些数据进行加工。加工完的数据一方面可以直接通过 Doris 对外提供查询,另一方面也可以通过 Doris 的数据导出能力,继续为下游提供全量或增量数据服务。通过Doris可以减少对外部工具的依赖,可以直接将上下游数据,以及包括同步、加工、处理在内的整条链路打通。最后一点就是更加开放的数据生态,众多数据仓库有着各自的存储格式,用户如果想要使用一个数据仓库,第一步就需要把外部数据通过某种方式导入到数据仓库中才能进行查询。这样就是一个比较封闭的生态,数据仓库中数据除了数仓自己本身可以查询以外,其它外部工具是无法进行直接访问的。一些企业在使用包括 Doris 在内的一些数仓产品的时候就会有一些顾虑,比如数据是否会被锁定到某一个数据仓库里,是否还有便捷的方式进行导出。通过湖仓一体生态的接入,可以用更加开放的数据格式来管理数据,比如可以用 Parquet/ORC 格式来去存储数据,这样开放开源的数据格式可以被很多外部系统去访问。另外,Iceberg, Hudi等都提供了开放式的元数据管理能力,不管元数据是存储在 Doris 本身,还是存储在 Hive Meta store,或者存储在其它统一元数据中心,都可以通过一些对外公开的API 对这些数据进行管理。通过更加开放的数据生态,可以帮助企业更快地接入一个新的数据管理系统,降低企业数据迁移的成本和风险。

02

Apache Doris 数据湖特性揭秘

1. 特性一览

Doris 当前版本已经支持了一些湖仓一体和数据湖加速的功能。

首先是对不同数据源的支持,已经支持了包括 Hive、Iceberg、Hudi 等数据仓库和数据湖的连接。

针对 Hive 支持了包括 ExternalTable、ManagedTable,通过内部代码改造兼容了不同版本的 Hive Metastore,同时也支持针对 Hive Metastore 中存储的元数据进行手动或自动地同步。

Iceberg 方面支持了 V1 和 V2 格式,也支持通过 snapshot 查询实现TimeTravel,同时也支持了 Iceberg 不同的元数据中心,包括 HMSCatalog,RestCatlog、Aws Glue,以及阿里云的 Data Lake Formation 等。

Hudi 也是支持了不同的元数据存储,还支持了 copy-on-write 和 merge-on-read。

ES 方面, Doris 在之前版本中就已经支持了 Doris on ES 这样一个功能,可以基于Doris 分布式的 SQL 查询能力,直接去查询 ES 上的数据。第一是提供很好的性能支持,第二是在 SQL 中扩展了一些 Doris 查询语法,用户可以通过 SQL 直接去查询 ES 中存储的日志数据等。在新的连接框架下,也支持了 ES 的 catalog,在原有的基础功能之上还支持了对 ES 元数据的自动全量 index 映射,用户不用再一张表一张表地去建立映射关系,直接通过一个连接命令就可以将 ES 所有的 index 全都同步过来,然后开始进行查询。

JDBC catalog,可以通过 JDBC 协议去连接几乎所有的可以通过 JDBC 连接的数据源,包括 MySQL、PG、Oracle 等。连接以后可以自动地把这些数据源中的库、表等元信息同步过来,然后就可以开始进行查询。通过 JDBC 连接的好处包括:第一是可以进行快速的对上游数据库的数据同步,比如可以通过 insert into select 语句,直接把上游数据库中的数据导入到 Doris 中来;第二,可以把这些外部数据源当做维表,和 Doris 中存储的主数据进行关联查询。

Doris 还支持了对文件的直接分析,用户可以直接把单个或者一批 parquet 文件、 ORC 文件,或者 text 格式的文件存储在远端存储上,比如对象存储、HDFS 等,可以通过的 table value function 直接去对文件进行分析。同时也支持对文件 schema 的自动推导,也就是用户在分析文件的时候,可以直接把文件当作一张二维表去分析,这样可以充分利用 SQL 能力,对文件不入库地直接分析。

Doris 在数据湖联邦分析下主要分为两种场景:元数据连接和数据访问。接下来将分别具体介绍。

2.元数据连接

元数据连接要解决的问题可以总结为四方面:

首先是统一的元数据结构,主要是为了屏蔽不同数据源中元数据的差异,因为不同数据源中元数据的层级结构、定义、schema、列类型可能都是不一样的,需要一个统一的接口把这些异构数据源中的元数据的差异屏蔽掉。在 Doris 中都是统一的元数据层级和结构。第二是可扩展的源数据连接框架。因为外部的数据源非常多,需要通过一个可扩展的连接框架来帮助用户以更低的成本快速接入新的数据源。所以在整个数据连接框架上进行了一些抽象,把对外的 API 通过接口的方式暴露出去,这样开发者在接入一个新的数据源时,可以直接通过接口快速接入。第三是高效的元数据访问能力,针对联邦分析,去查询外部数据源的时候,访问性能和稳定性都是不可控的。Doris 的元数据连接框架就是为了在这样的场景下去提供一个可靠的、高效的元数据访问性能。同时也要支持包括元数据的实时同步,来帮助用户能够更合理有效地去访问外部的元信息。最后一点是自定义的鉴权服务, Doris 本身有一套自己的鉴权管理能力,包括对库表列行的权限管理,一个外部数据源也会有自己的权限管理系统,在接入外部数据源的时候,如果只能用 Doris 内部的权限管理系统,那么一些企业用户可能需要切换或重构其权限管理系统。因此实现了自定义的鉴权服务,能够更灵活对接外部权限管理系统包,比如 Apache Ranger 这样一个统一的权限管理系统,从而降低业务迁移的成本。

在Doris新版本中引入了新的一个层级,catalog 层级,这样就组成了一个完整的三层的元数据结构,即 catalog 、database、table。

在之前的架构中,没有 catalog 这一层级的,无法直接去映射一个数据源。比如要连接一个 Hive 集群,在 Doris 中是找不到这样的一个对应关系去映射的,在原先的版本中,查询一个 Hive 表只能是通过一张表一张表地去通过 create external table 的方式,建立元数据的映射。如果Hive 中有 1000 张表的话,需要写 1000 个 create table 语句,是非常困难的。所以在新的架构中引入了 catalog 这样一个层级,可以直接去映射到外部的数据源,映射只后,Doris 会自动地把这些库表的元信息同步过来,用户不需要再手动建立映射关系。这样可以方便地进行数据源的查询。

建立完元素映射关系以后,可以通过 SQL 语句很方便地进行跨数据源的联邦访问。比如在上图的例子中,可以把 Hive 的一张表和Iceberg的一张表进行 join 查询,得到想要的结果。另外,也提供了可扩展的元数据连接框架,上图中给出了一个连接 Hive 的示例,也可以连接亚马逊的 AWS Glue、阿里云的 Data Lake Formation 等统一的元数据服务中心,来方便地接入外部的元数据信息。

对于元数据访问,以 Hive Metastore 为例,最原始的一种实践就是每次的元数据访问都直接访问 Hive Metastore,这样的话开销是非常大的。所以这里做了两件事情,首先是元数据的缓存,从上图左侧可以看到,有不同类型的元数据缓存,包括 schema 的缓存,分区表的 partition 的缓存,以及 partition 之下具体的 HDFS 或 S3 上文件信息的缓存,通过三级缓存可以为 Doris 提供一个非常稳定的元数据访问服务。这样用户在大部分情况下都可以直接命中本地的缓存信息,只有在少部分情况下,比如元数据更新的情况下,才会去从远端拉取最新的元数据更新本地的缓存。

第二点就是元数据的实时同步。也提供了两种方案。第一种方案是针对所有的元数据服务,都提供手动的刷新能力。可以通过 refresh 命令,去刷新catalog 级别、database 级别、 table 级别甚至 partition 级别的元数据。用户可以在外围起一个定时脚本,每 5 分钟或者每 10 分钟进行一次元数据的刷新,实现一个半自动的元数据刷新服务。针对 Hive Metastore,也提供了自动的元数据的同步能力。通过订阅 Hive Metastore 的 Meta change 的 event,来自动地同步 Hive Metastore 中的元数据变更,这样就会达到一种近实时的元数据的自动同步。有些用户可能希望分钟级,甚至秒级去感知到外部数据源的同步,可以通过这种方式来实现。这样对用户的使用以及对数据时效性来讲都是比较友好的。

上图展示了统一鉴权服务的框架。首先,create catalog(数据源连接的时候)可以指定使用哪种鉴权插件, Doris 内部提供了统一的 internal access controller 鉴权插件,通过 Doris 内部的RBAC 机制的权限管理能力,进行库级别、表级别、行列级别的权限管理。用户也可以在 create catalog 的时候指定自定义的鉴定插件。比如在内部已经支持了 Apache Ranger 的 Access Controller,指定以后可以直接通过 Ranger 插件去访问 Ranger 中的权限信息,对整个数据源的访问进行鉴权。这样在 catalog 级别可以定义不同的鉴权服务。用户在连接外部数据源以后,就可以直接利用他自己的鉴权插件去提供包括授权、审计以及数据加密等一系列操作。

3.数据访问

大部分的数仓或者数据湖的数据都是存储在外部统一的共享存储上的,可以简单分为两大类,第一类是 HDFS 以及 HDFS 兼容的一些存储系统,第二类是云上对象存储。数据访问本质上要解决的问题是如何在这些远端存储上进行高效的数据读取。

数据湖有一个初衷,是通过更开放的数据格式来提升数据的开放性,减少用户对数据迁移的顾虑。Parquet 或者 ORC 是非常通用的数据格式。

首先就是要优化 Parquet reader。在以前的实现中,是通过 Apache arrow 内置的 C++ 的 parquet reader 进行数据读取的,但它本身也带来诸多限制。第一点,它会多一层数据的内存格式转换,数据从文件中读取出来以后要先转换成 Arrow 的格式,再转换成 Doris 的格式。并且 Arrow 内置的 parquet reader,其功能不是很全面,比如它无法利用 Page Index,也不支持 Bloom Filter,不支持字典编码。

因此通过 C++ 重写了一个全新的 Parquet reader,具有如下优点:

少了一次内存格式转换,内存从文件中读取出来以后,直接就转换成 Doris 内部的内存格式;充分利用了 Parquet 的一些新特性,比如 Page Index,可以更精确地过滤无用数据;利用 Bloom Filter 进行数据过滤;支持字典编码,一条查询条件可以直接下推到 parquet reader,在 parquet reader 中根据字典编码,把 with 条件转换成字典编码的条件,通过整型的比较来加速字符串比较查询的效率;支持延迟物化,比如有多个查询条件和多个需要读取列的时候,优先读取那些有查询条件的列,先进行数据过滤,在过滤之后,通过过滤后的行号再读取其它的列,这样能够大大减少 IO 的开销。

所以,整个数据访问的一个最基本的初衷,就是尽量减少从远端存储中读取的数据量和读取频率,来加速查询响应。

要减少远端数据访问,自然会想到的一个优化点就是进行本地文件 cache。这块做了两个工作,第一个工作是支持 Local File Cache,可以针对文件进行 block 级别的文件缓存,可以把远端 parquet 文件按照 4K 到 4M 的范围进行文件块的缓存。利用 parquet 列存的存储特性,只在本地缓存需要读取的数据,这样下次再去读取数据的时候就可以直接读取本地缓存的文件。在实际测试中,如果数据完全命中本地文件缓存的话,可以达到几乎和内表一致的查询效率。

第二个工作是在缓存基础上去做了一致性哈希。在做外部数据源的数据查询的时候,不需要利用到 Doris 本身的 BE 节点的存储能力,也就是说在 FE端做查询规划的时候,可以随机地把查询任务发送到任意的 BE 节点,然后去做访问。这样带来的一个问题就是有可能第一次查询落到了 a 节点,进行了本地完全缓存,第二次查询又随机到了 b 节点,而 b 节点没有缓存,那么查询第一次可能很快,第二次又会变慢。所以我们通过一致性哈希来尽量将查询规划到已经存在缓存的节点上,从而保证查询的稳定性。另外,有无状态的计算节点可以进行弹性的节点伸缩,在节点上下线的情况下,通过一致性哈希的算法可以有效地保证在节点变更的情况下,能够尽量少地去对 file cache 进行迁移,从而进一步降低 Cache Miss 的发生率。

针对 BE 上数据的执行和扫描进行了架构的重构。首先,在接入数据源时,不用再去重新写一套数据的查询能力,希望能够最大程度地利用 Doris 本身已有的数据查询优化,来加速外部数据的查询,因此对整个执行节点的执行框架进行了分层的重构。

可以看到在 scan 这一层往下,是针对各个数据源不同的实现,scan 这层往上,所有的查询规划都是完全相同的。这样无论查询内表还是查询外表,都可以充分利用整个 Doris的查询优化的能力,包括查询优化器的能力,谓词下推的能力, runtime filter 能力,以及各个算子的优化能力。在 scan 节点之下,也统一了两个部分,第一个是 scanner scheduler,扫描任务的调度逻辑,以及 scanner thread pool,也就是扫描任务的扫描进程池,通过这两个统一的管理模块,可以去做很多事情。可以针对内表和外表进行资源上的管理,比如内表有一个延迟的要求,那可能分配更多的资源,外表可能是吞吐的要求更高,可能线程数会更少,但利用的带宽会更多。通过统一的全局的调度视角,可以去做一些资源隔离。经过改造,只需要一人/周就可以接入一个新的数据源,整个框架是非常方便的。

上图可以看到在完成了上述优化后,外表查询的性能表现。在 ClickBench 宽表场景,以及 TPCH 标准的测试集上,与 Trino 进行了对比,总体上有 3 到 10 倍的性能提升。体现出了 Doris 对外部数据源及湖上数据查询有着很好的加速能力。

4.负载管理与弹性计算

在查询外部数据源的时候,其实不需要利用到 Doris 本身的存储模块,在丢到存储模块以后,BE节点本质上是一个无状态的节点了,在无状态节点上就可以很好地做到弹性的节点伸缩,因为不管是加节点还是减节点,不再需要关心状态的转移情况,可以很安全地把一个节点删掉或加上来。所以在 BE 节点上目前提供了两种部署模式。第一种是 MIX 模式,就是原有的模式,既支持计算也支持本地存储。第二种是compute node 模式,关闭了本地存储功能,只有计算能力,可以通过无状态的 computer node 来快速地对集群进行弹性伸缩,从而快速承接一些外部数据访问的计算负载。同时也基于一些其它技术,包括 FQDN 功能,来提供 Doris 在 K8S 上的部署支持。

03

案例分享

最后,通过一个案例来介绍 Doris 是如何应用中在实际业务场景中的。

在某金融风控场景,数据存储在 ES、Hive、GP 中,无法进行有效的数据关联查询,并且 Hive 本身 T+1的数据处理能力无法满足数据时效性的要求。在引入了 Doris 1.2 版本之后,它可以通过 Doris Multi Catalog(数据源连接能力)来统一多个数据源的查询访问。

由此带来了几点好处:

首先,多个数据源之间可以通过 Doris 进行统一的联邦分析。

另外,不用再去把 Hive 中的数据导入到其它系统中进行加速查询,可以直接使用 Doris 的查询服务能力,对 Hive 中的数据进行查询加速,这样数据时效性有了极大的提升。

第三,为 ES 半结构化数据提供了统一的 SQL 的查询语义,这样通过 Doris,统一了它整个风控数据主题数仓的查询接口,来对外提供服务。

04

社区规划

Doris 社区之后的规划主要围绕更丰富的数据源支持、数据集成、资源隔离与调度三个方面进行。

在数据源支持方面,第一点是支持 Hudi 的 Merge-on-Read,使得在 Doris 中能够直接读取Hudi的增量。第二点是在 Doris 中扩展 Iceberg/Hudi 中新数据格式的数据加速能力。第三点是接入更多的数据源,比如 Delta lake,Paimon。其中 Hudi 的 Merge-On-Read 和 Paimon 已经在 2.0 版本支持,欢迎试用。

在数据集成方面,第一点是通过 CDC 能力和增量物化视图进一步完善复杂数据查询场景下的加速支持。第二点是 Git Like 的数据访问能力,通过该能力可以很方便地对数据进行管理。第三点是数据的写回,目前 Doris 是不支持数据写回操作的,后续在支持了该功能之后,Doris 在绝大部分业务场景下去做完整的外部数据源的管理,包括数据的查询和写入。

在资源隔离与调度方面,在 Doris2.0 会引入 pipeline 的执行框架,在执行框架基础之上,可以进一步地细化资源隔离和任务调度的机制,来保证可以在同一个机器内进行 CPU、内存、IO 的隔离,从而帮助用户更好地去管理这样的一个 Doris 集群。

Apache Doris 现在已是全球最活跃的开源社区之一,提供丰富的社区支持。在阿里云 EMR、百度云 Palo、腾讯云 CDW 都有基于 Apache Doris 的商业化支持。无论是开源的Doris用户,还是想去购买基于Doris的商业化支持,都可以满足需求。

最后,欢迎更多开发者加入到 Apache Doris 社区。

05

Q&A

Q1:Doris 能够无缝替换 Presto 吗?

A1:无缝替换现在还做不到。一个目标就是去替换 Presto 的一些场景。Doris 的性能指标其实是比 Presto 要更强。当然从功能丰富度来讲,与 Presto 还是有一定差距。但是在某些场景下是有优势的,比如对 Hive 的查询,或者对 Iceberg 的查询等等,功能已经相对比较完备了。如果感兴趣的话,可以尝试用 Doris 去替换 Presto 的一些场景。同时我们已经开始支持 Presto 语法的兼容工作,感兴趣的同学可以联系社区试用。

Q2:BE 提供的计算能力除了查询还能写到数仓和 HDFS 对象吗?

A2:目前无法直接去写入,在文中也提到了,这是后续要支持的。当前的做法是把经过加工处理后的数据,通过 select out file 或者 export 的方式,把数据导出成 parquet 文件或者 ORC 文件,存储到对象存储上或者 HDFS 上。

Q3:Doris 有没有和 Spark 进行过性能比较,能否在数仓替换 Spark?

A3:没有和 Spark 进行过直接的性能比较,但是从和 Presto 的比较来看,我们比较有信心会比 Spark 快很多。其次 Doris 的初衷是想去做数据集成和加工,所以也希望能够代替一些在比较少数据量场景下的 Spark 的功能,从而减少一些中小企业对各个大数据组件维护的困难。比如 GB、TB 级别的数据,可以直接通过 Doris 来完成,不需要借助更加复杂的 Spark 或者 Flink。

今天的分享就到这里,谢谢大家。

标签: #apacheranger