龙空技术网

Flink connector Oracle CDC 实时同步数据到MySQL

Java互联网技术栈 266

前言:

此时各位老铁们对“oracle联网时间同步”大约比较关怀,咱们都想要剖析一些“oracle联网时间同步”的相关文章。那么小编在网络上收集了一些关于“oracle联网时间同步””的相关内容,希望小伙伴们能喜欢,大家一起来了解一下吧!

准备工作

在这一步需要配置Oracle。主要包含。

开启Archive log开启数据库和数据表的supplemental log创建CDC用户并赋予权限

注意:不要使用Oracle的SYS和SYSTEM用户做为CDC用户。因为这两个用户能够捕获到大量Oracle数据库内部的变更信息,对于业务数据来说是不必要的。Debezium会过滤掉这两个用户捕获到的变更内容。

下面开始配置步骤。在安装Oracle的机器上执行:

su - oraclesqlplus / as sysdba

进入Sqlplus。然后开启Archive log。

alter system set db_recovery_file_dest_size = 10G;alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;startup mount;alter database archivelog;alter database open;# 检查Archive log是否成功开启archive log list;

注意: '/opt/oracle/oradata/recovery_area' 这个路径如果不存在的话需要自己手动去创建(Oracle用户下创建)

/opt/oracle/oradata/recovery_areaalter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
show parameter spfile;# 如果输出value为空,说明没有创建spfile,执行下面SQL创建create spfile from pfile;# 关闭并重启shutdown immediate;startup;# 检查spfile是否成功创建show parameter spfile;

接下来进行相关重要的配置:

-- 检查日志归档是否开启archive log list;-- 为捕获的数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态,下面说明了如何在数据库级别进行配置。ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;- 创建表空间CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;-- 创建用户family绑定表空间LOGMINER_TBSCREATE USER C##family IDENTIFIED BY zyhcdc DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;-- 授予family用户dba的权限 grant connect,resource,dba to C##family;- 并授予权限GRANT CREATE SESSION TO C##family;GRANT SELECT ON V_$DATABASE to C##family;GRANT FLASHBACK ANY TABLE TO C##family;GRANT SELECT ANY TABLE TO C##family;GRANT SELECT_CATALOG_ROLE TO C##family;GRANT EXECUTE_CATALOG_ROLE TO C##family;GRANT SELECT ANY TRANSACTION TO C##family;GRANT EXECUTE ON SYS.DBMS_LOGMNR TO C##family;GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;GRANT CREATE TABLE TO C##family;GRANT LOCK ANY TABLE TO C##family;GRANT ALTER ANY TABLE TO C##family;GRANT CREATE SEQUENCE TO C##family;GRANT EXECUTE ON DBMS_LOGMNR TO C##family;GRANT EXECUTE ON DBMS_LOGMNR_D TO C##family;GRANT SELECT ON V_$LOG TO C##family;GRANT SELECT ON V_$LOG_HISTORY TO C##family;GRANT SELECT ON V_$LOGMNR_LOGS TO C##family;GRANT SELECT ON V_$LOGMNR_CONTENTS TO C##family;GRANT SELECT ON V_$LOGMNR_PARAMETERS TO C##family;GRANT SELECT ON V_$LOGFILE TO C##family;GRANT SELECT ON V_$ARCHIVED_LOG TO C##family;GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO C##family;

本地用Navicate连接Oracle:

-- 创建 STUDENT_INFO 表create table student_info (  sid         number(10) constraint pk_sid primary key,  sname       varchar2(10),  sex         varchar2(2));-- 修改STUDENT_INFO表让其支持增量日志,先在Oracle里创建STUDENT_INFO表再执行下面的语句ALTER TABLE C##FAMILY.STUDENT_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

至此,Oracle配置完成;

Flink CDC 配置:

这里用Flink SQL CLI 做演示:

使用以下命令切换到 Flink 目录

cd flink-1.14.0

使用以下命令启动 Flink SQL CLI:

./bin/sql-client.sh

我们应该看到 CLI 客户端的欢迎屏幕。

在 Flink SQL CLI 中使用 Flink DDL 创建表

创建从相应数据库表中捕获更改数据的表。

Oracle CDC 表可以定义如下:

-- register an Oracle table 'student_info' in Flink SQLFlink SQL> CREATE TABLE student_info (     SID INT NOT NULL,     SNAME STRING,     SEX STRING,PRIMARY KEY(SID) NOT ENFORCED     ) WITH (     'connector' = 'oracle-cdc',     'hostname' = 'localhost',     'port' = '1521',     'username' = 'C##family',     'password' = 'zyhcdc',     'database-name' = 'ORCLCDB',     'schema-name' = 'C##FAMILY',     'table-name' = 'STUDENT_INFO');  -- read snapshot and binlogs from products tableFlink SQL> SELECT * FROM student_info;

在定义一个数据流出到MySQL的表:

-- register an Oracle table 'mysql_user' in Flink SQLFlink SQL> CREATE TABLE mysql_user (     SID INT ,     SNAME STRING,     SEX STRING,PRIMARY KEY(SID) NOT ENFORCED    ) WITH (     'connector' = 'jdbc',     'url' = 'jdbc:mysql://localhost:3306/test_cdc',     'username' = 'root',     'password' = 'root123',     'table-name' = 'user'    );Flink SQL> insert into mysql_user select SID,SNAME,SEX from student_info;[INFO] Submitting SQL update statement to the cluster...[INFO] SQL update statement has been successfully submitted to the cluster:Job ID: 1966fdd63bb36c14908fe8e31408db58

完成以上配置;Flink connector Oracle CDC实时数据同步到MySQL的操作就完成了;后续可以自行测试一下。

如果有主键的话会自动进行merge。

标签: #oracle联网时间同步 #oracle增量同步千万数据 #oracle10g表空间同步 #oracle connector java