龙空技术网

一键实现 Oracle 数据整库同步至 Apache Doris

灯惉 305

前言:

现在兄弟们对“oracle查看数据链路”大概比较着重,各位老铁们都需要学习一些“oracle查看数据链路”的相关知识。那么小编同时在网摘上网罗了一些关于“oracle查看数据链路””的相关文章,希望朋友们能喜欢,你们快快来了解一下吧!

在实时数据仓库建设或迁移的过程中,用户必须考虑如何高效便捷将关系数据库数据同步到实时数仓中来,Apache Doris 用户也面临这样的挑战。而对于从 Oracle 到 Doris 的数据同步,通常会用到以下两种常见的同步方式:

OGG/XStream/LogMiner 工具:通过该方式先将数据同步到 Kafka 中,然后通过 Routine Load 消费 Kafka 中的数据进行实时同步。这种方式的同步链路相对较长,特别是在上游数据表较多的情况下,需要手动创建大量的 Routine Load 作业,同步流程不仅繁琐,也给用户增加了较大的使用及维护压力。

FlinkCDC:该方式虽然可以直接将上游数据同步到 Doris 中,并在一定程度上缩短了同步链路,实际在使用过程中还会遇到以下问题:

数据同步时,需要在 Flink 中对每张表手动配置参数及字段映射,尤其是在多表或整库同步场景中,不仅带来大量配置工作量,还增加了 FlinkSQL 脚本的维护成本。数据同步时,需要事先在 Doris 中手动逐个创建表,而面对数量庞大的上游表时,手动创建表不仅耗费时间,而且工作效率很低,间接影响数据同步的效率。由于每张 Source 表都会使用同一个链接,因此在整库同步时会给源端造成很大的链接压力。

为了解决上述问题,在新版本的 Doris-Flink-Connector 中,我们实现了 FlinkCDC 的 Datastream API 集成,无需提前在 Doris 中创建表以及映射关系,仅仅通过简单的参数配置就能一键完成从 Oracle 等关系型数据库到 Apache Doris 的整库数据同步。

此外,Doris-Flink-Connector 也可以一键实现万表 MySQL 整库同步至 Apache Doris 中来,具体使用可参考:一键实现万表 MySQL 整库同步至 Apache Doris

同步流程 & 实战演示

在进行整库同步前,我们先了解一下具体同步流程:

在启动 Flink 任务之前,Doris-Flink-Connector 会自动读取需要同步的 Oracle 表的元数据信息,并自动在 Doris 中创建相应的表。通过 FlinkCDC 提供的 OracleSource 功能,能够从 Oracle 数据库中读取数据,并将其传递到下游进行处理。通过 Flink 的侧输出流功能,根据自定义规则将数据分流到不同的 Doris Sink 中,并同步到 Doris 中来。

通过以上简单操作,即可实现上游 Oracle 数据库的整库数据实时数据接入到 Apache Doris 中。接下来我们通过一个实际案例来详细说明具体的操作步骤:

01 Oracle 环境准备

# 拉取镜像docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g# 启动镜像docker run -it -d \--privileged \-p 1521:1521 \--name oracle11g \-e ORACLE_ALLOW_REMOTE=true \-v /mnt/disk1/oracle:/data/oracle \registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g# 进入容器docker exec -it oracle11g bash

Oracle 归档日志(Binlog)配置:启动归档日志时,需对日志大小和存放地址进行设置,设置完成需进行重启。该步骤完成后才可进行后续增量数据的同步。

# 进入SQL命令行[oracle@ef6d9de18e59 ~]$ sqlplus /nologSQL> conn /as sysdbaConnected.SQL> alter system set db_recovery_file_dest_size = 10G;System altered.SQL> alter system set db_recovery_file_dest = '/home/oracle/oracle-data' scope=spfile;System altered.SQL> shutdown immediate;Database closed.Database dismounted.ORACLE instance shut down.SQL> startup mount;ORACLE instance started.Total System Global Area 1603411968 bytesFixed Size                  2213776 bytesVariable Size             402655344 bytesDatabase Buffers         1174405120 bytesRedo Buffers               24137728 bytesDatabase mounted.SQL> alter database archivelog;Database altered.SQL> alter database open;Database altered.# 检查日志归档是否开启SQL> archive log list;Database log mode              Archive ModeAutomatic archival             EnabledArchive destination            USE_DB_RECOVERY_FILE_DESTOldest online log sequence     1Next log sequence to archive   1Current log sequence           1# 启用补充日志记录SQL> ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;Database altered.#创建用户CREATE USER admin IDENTIFIED BY admin123;GRANT dba TO admin;

数据准备

[oracle@ef6d9de18e59 ~]$ sqlplus admin/admin123 SQL> CREATE TABLE PERSONS(      ID NUMBER(10),      NAME VARCHAR2(128) NOT NULL,      PRIMARY KEY(ID)   );Table created.SQL> INSERT INTO "PERSONS" VALUES (1, 'zhangsan');SQL> INSERT INTO "PERSONS" VALUES (2, 'lisi');SQL> INSERT INTO "PERSONS" VALUES (3, 'wangwu');SQL> CREATE TABLE PERSONS_1(      ID NUMBER(10),      NAME VARCHAR2(128) NOT NULL,      PRIMARY KEY(ID)   );Table created.SQL> INSERT INTO "PERSONS_1" VALUES (1, 'zhangsan');SQL> INSERT INTO "PERSONS_1" VALUES (2, 'lisi');SQL> INSERT INTO "PERSONS_1" VALUES (3, 'wangwu');
02 Flink 环境配置将 FlinkCDC-Oracle 的依赖和 Doris-Flink-Connector 包放到 Flink 的 lib 目录下,同时启动 Flink 集群。
# 下载相关依赖wget   -O flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar# 启动Flink集群bin/start-cluster.sh
03 一键提交整库同步作业

本次同步以 PERSON 开头的所有的表。

<FLINK_HOME>/bin/flink run \     -Dexecution.checkpointing.interval=10s \     -Dparallelism.default=1 \     -c org.apache.doris.flink.tools.cdc.CdcTools \     ./lib/flink-doris-connector-1.16-1.5.0-SNAPSHOT.jar \     oracle-sync-database \     --database test_db \     --oracle-conf hostname=127.0.0.1 \     --oracle-conf port=1521 \     --oracle-conf username=admin \     --oracle-conf password=admin123 \     --oracle-conf database-name=HELOWIN \     --oracle-conf schema-name=ADMIN \     --including-tables "PERSONS.*" \     --sink-conf fenodes=127.0.0.1:8030 \     --sink-conf username=root \     --sink-conf password=\     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \     --sink-conf sink.label-prefix=label \     --table-conf replication_num=1

详细参数可参考:

提交成功后,可以在 FlinkWeb 上看到该同步任务的状态。

进入 Doris 可以查看自动创建的表以及同步成功的全量数据。

mysql> use test_db;                                                                                                                                        Reading table information for completion of table and column names                                                                                         You can turn off this feature to get a quicker startup with -A                                                                                                                                                                                                                                                        Database changed                                                                                                                                           mysql> show tables;                                                                                                                                        +-------------------+                                                                                                                                      | Tables_in_test_db |                                                                                                                                      +-------------------+                                                                                                                                      | PERSONS           |                                                                                                                                      | PERSONS_1         |                                                                                                                                      +-------------------+                                                                                                                                      2 rows in set (0.00 sec)                                                                                                                                                                                                                                                                                              mysql> select * from PERSONS;                                                                                                                              +------+----------+                                                                                                                                        | ID   | NAME     |                                                                                                                                        +------+----------+                                                                                                                                        |    2 | lisi     |                                                                                                                                        |    3 | wangwu   |                                                                                                                                        |    1 | zhangsan |                                                                                                                                        +------+----------+                                                                                                                                        3 rows in set (0.01 sec)                                                                                                                                                                                                                                                                                              mysql> select * from PERSONS_1;                                                                                                                            +------+----------+                                                                                                                                        | ID   | NAME     |                                                                                                                                        +------+----------+                                                                                                                                        |    2 | lisi     |                                                                                                                                        |    3 | wangwu   |                                                                                                                                        |    1 | zhangsan |                                                                                                                                        +------+----------+                                                                                                                                        3 rows in set (0.01 sec)

在 Oracle 中模拟实时增删改数据

INSERT INTO PERSONS VALUES(4,'doris');UPDATE PERSONS SET name = 'zhangsan-update' WHERE ID =1;DELETE PERSONS WHERE ID =2; 

在 Doris 中进行验证,可以确认增量数据已经成功同步。

mysql> select * from PERSONS;                                                                                                                              +------+-----------------+                                                                                                                                 | ID   | NAME            |                                                                                                                                 +------+-----------------+                                                                                                                                 |    1 | zhangsan-update |                                                                                                                                 |    4 | doris           |                                                                                                                                 |    3 | wangwu          |                                                                                                                                 +------+-----------------+                                                                                                                                 3 rows in set (0.01 sec)  

通过以上操作,成功实现将 Oracle 中数据整库同步到 Doris 中,同时也实现了上游全量与增量数据的自动接入。

标签: #oracle查看数据链路