龙空技术网

kettle-循环抽取,插入kafka

opser 212

前言:

此刻小伙伴们对“centos7xulrunner”大致比较关注,各位老铁们都需要分析一些“centos7xulrunner”的相关资讯。那么小编同时在网络上搜集了一些有关“centos7xulrunner””的相关知识,希望你们能喜欢,看官们快快来学习一下吧!

最近,在docker环境下,使用kettle 8循环查询数据库数据后,插入kafka集群中。特此记录。

docker配置kettle准备安装介质 ,需要自行下载。pdi-ce-8.2.0.0-342.zipjdk-8u161-linux-x64.tar.gz自定义dockerfile

由于kettle需要用图形化界面进行配置,所以需要对centos镜像进行初步配置。dockerfile文件内容为:

FROM centos:7RUN rm -rf /etc/localtime && ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime \&& yum -y install webkitgtk4 redhat-lsb bzip2 wget kde-l10n-Chinese glibc-common && yum groupinstall "fonts" -y \&&localedef -c -f UTF-8 -i zh_CN zh_CN.utf8 && yum clean all \&& wget  \&& tar -jxf xulrunner-1.9.2.28pre.en-US.linux-x86_64.tar.bz2 && rm -f  xulrunner-1.9.2.28pre.en-US.linux-x86_64.tar.bz2 \&& cd xulrunner/ && ./xulrunner -register-globalENV LC_ALL zh_CN.UTF-8ENV JAVA_HOME /opt/jdk1.8.0_161ENV PATH $JAVA_HOME/bin:$PATH
编写docker-comose.yml文件

将安装介质解压后,将安装介质解压后目录映射到docker容器中。

映射jdk到容器: ./resource/jdk1.8.0_161:/opt/jdk1.8.0_161

映射kettle到容器:./resource/data-integration:/opt/kettle

映射kettle配置后生成的文件:- ./resource/kettle_files:/opt/kettle_files

version: '3.7'networks:  net_docker:    external: trueservices:  kettle8.2:    image: kettle8:0.1    container_name: kettle8.2    networks:      - net_docker # 安装介质目录映射    volumes:      - ./resource/jdk1.8.0_161:/opt/jdk1.8.0_161      - ./resource/data-integration:/opt/kettle      - ./resource/kettle_files:/opt/kettle_files    command: [/bin/bash]    tty: true    stdin_open: true
启动kettle首先运行构建dockerfile(注意目录路径)
[root@jk-s-monitor dockerFile]# pwd/opt/business/kettle8.2/dockerFile[root@jk-s-monitor dockerFile]# docker build . -t kettle8:0.1
将安装介质拷贝到指定目录后解压
[root@jk-s-monitor resource]# pwd/opt/business/kettle8.2/resource[root@jk-s-monitor resource]# lsdata-integration  jdk1.8.0_161  jdk-8u161-linux-x64.tar.gz  kettle_files  pdi-ce-8.2.0.0-342.zip
启动docker容器
[root@jk-s-monitor kettle8.2]# lsdocker-compose.yml  dockerFile  resource[root@jk-s-monitor kettle8.2]# docker-compose up
进入容器后启动kettle
[root@jk-s-monitor kettle8.2]# docker exec -it kettle8.2 /bin/bash[root@62e156d9cf1c /]# export DISPLAY=192.168.157.126:0.0[root@62e156d9cf1c /]# /opt/kettle/spoon.sh....

在终端打开xmanager-passive

x-passive

打开kettle

kettle8

循环实现需求描述

需要采集2个数据库中数据。2个数据库的表结构相同。

分析:

有2个数据库配置,使用变量灵活代替配置。

需要执行2次数据采集,使用循环解决重复劳动。

需求实现创建配置表

因为数据还有其他的计算需求,这里,笔者使用数据库表的方式,进行数据库配置信息存储,(简单一点,也可以使用配置文件,如json 、yml等)。

在配置数据库(oracle)中创建配置信息表

create table kettle_db_info(  db_name VARCHAR2(10),  ip      VARCHAR2(16),  username VARCHAR2(44),  passwd  VARCHAR2(530),  port    VARCHAR2(5));INSERT INTO kettle_db_info VALUES('orcl','10.0.0.0','username0','passwd0','1521');INSERT INTO kettle_db_info VALUES('orcl','10.0.0.1','username1','passwd1','1521');COMMIT;
kettl中创建输入转换

在左边菜单中选择 输入》表输入,配置表输入转换如下图:

在左边菜单中选择 作业》复制记录到结果,配置完成后如下图:

配置数据插入kafka 转换

选择表输入、需要新建数据库连接,配置的时候需要用变量代替。

source_db_config

根据业务逻辑编写sql,这里笔者将数据进行json格式包装:

分别选择 输出》JSON output 和 Streaming》Kafka producer,选择后如下图:

配置 输出》JSON output

配置字段

配置 Streaming》Kafka producer创建循环执行的job

新建job如下图:

其中 转换(in_data) 的配置为:

其中 转换(in_kafka)配置为:

选项 执行每一个输入行 必须要选择才能循环执行。

需要将变量名称进行映射,才能使用变量。

执行job 并 验证kafka数据

执行job:

在kafka中验证数据

# docker exec -it 8829606db8f8 /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic kettle_kafka --from-beginning{"tb_name":"XXARTMENTDX","tbs_name":"xxx"}{"tb_name":"XXMPANYDEFIX","tbs_name":"xxx"}

标签: #centos7xulrunner