龙空技术网

FlinkSQL之Kafka流关联维表并输出到MySQL

SapphireCoder 165

前言:

现在兄弟们对“mysql列转行函数转多个字段”大概比较重视,各位老铁们都想要学习一些“mysql列转行函数转多个字段”的相关知识。那么小编同时在网摘上收集了一些关于“mysql列转行函数转多个字段””的相关内容,希望朋友们能喜欢,各位老铁们快快来了解一下吧!

导读:本文主要讨论如何编写 FlinkSQL 任务实现从 KafkaSource 获取数据并关联维表 DimTable 后写入 MySQLSink。之后再讨论笔者在将输出从 MySQL 改为写入阿里云 AnalyticDB MySQL版(ADB)后所遇到的问题以及解决的思路,希望对各位有所帮助。

基础环境java 8Flink 1.12.2Flink SQL 编写

1、对接收到的 Kafka 数据进行列转行处理

通过 filebeat 采集 Nginx 日志后输出 Kafka 中,Flink KafkaSource 从 Kafka 中取出 json 格式的数据进行处理。由于目标数据是以 json 字符串形式存在 message 字段中,因此我们需要在 Flink 中进行数据处理,将 message 这一列的值进行解析转行。

{    "@timestamp":"2021-06-05T11:55:44.608Z",    "@metadata":{    },    "message":"{"内嵌JSON字符串"}}

解决思路:UDTF + Table Function Join

关于编写列转行 UDTF 在 Flink 官方文档[1] 中已详细说明,这里就不再赘述。主要就是继承 TableFunction<Row> , 编写 eval() 创建 Row 并通过 collect(row) 输出行。如果有特殊需求可重写 getTypeInference() 自定义类型推断。

KafkaSource 建表语句

CREATE TABLE KafkaTable (    message STRING,    procTime AS PROCTIME()  // 后面进行 join 时要用到) WITH (    'connector' = 'kafka',    'topic' = 'filebeat_json_login_log_test',    'properties.bootstrap.servers' = '*********',    'properties.group.id' = 'dwdRootGameUidReg',    'scan.startup.mode' = 'group-offsets',    'format' = 'json');

列转行 UDTF(Demo)

public class BaseColumnToRow extends TableFunction<Row> {  	 // 转行逻辑     public void eval(String data, String columnWithType) {          // 解析 JSON 字符串       		Map<String, Object> paramMap = (Map) JSON.parse(data);       		//.. 省略部分代码       		Row row = new Row(size);       		collect(row);     }		    // 自定义类型推断	@Override    public TypeInference getTypeInference(DataTypeFactory typeFactory) {    }}

Table Function Join

将表与表函数的结果连接。左表(外部表)的每一行都与对应的table函数调用产生的所有行连接,这里我们将 Kafka 的行数据与经过 UDTF 产生的行结果进行连接,大致语法如下:

// 可参考官方文档 [2]KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,'media_site_id', 省略其他字段......))

2、维表创建

维表 join 有以下几种方式,每一种 Join 都有其特点以及主要解决的场景,官方文档[3]也对每种 join 做出了详细的解析:

Regular Joins 最通用的连接类型,其中任何新记录或对连接任意一侧的更改都是可见的,并影响整个连接结果。例如,如果左边有一个新记录,它将与右边所有以前和将来的记录合并,这里表会无限增长存在资源问题,所以一般用做有界数据流的 JoinInterval Joins 利用窗口的给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉,这要求两个输入流都必须有时间下界Temporal Joins 分为 Processing Time Temporal Join 和 Event Time Temporal Join 。其将左侧输入称为 Probe 右侧输入称为 Build,Build Table 一般是维度表的 changelog

这里我根据实际情况决定采用了 Processing Time Temporal Join 方式。

同时开启了 Lookup Cache 官方文档[4] 用于提升性能,其大致原理是每一个TaskMananger都维护一个 cache,Flink 将首先查找缓存,只在缓存丢失时向外部数据库发送请求,并使用返回的行更新缓存。通过配置lookup.cache.max-rows 、 lookup.cache.ttl 这两项启动, 在设置值时需要考虑吞吐量和正确性之间的平衡。

// 创建维表语句CREATE  TABLE DimTable (    game_id BIGINT,    root_game_id BIGINT,    main_game_id BIGINT,    platform VARCHAR,    PRIMARY KEY (game_id, platform) NOT ENFORCED) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://***',    'table-name' = 'v2_dim_game_id',    'driver' = 'com.mysql.cj.jdbc.Driver',    'username' = '*****',    'password' = '*****,',    'lookup.cache.max-rows'='5000',    'lookup.cache.ttl' = '60s',    'lookup.max-retries'='3');

3、结果表的创建,以及KafkaSource关联维表并输出到结果表

结果表创建

CREATE TABLE sinktable (   // 省略字段) WITH (    'connector' = 'jdbc',    'url' = 'jdbc:mysql://*******',    'table-name' = 'v2_dwd_root_game_uid_reg_log',    'driver' = 'com.mysql.cj.jdbc.Driver',    'username' = '******',    'password' = '******',    'sink.buffer-flush.interval'='5s',    'sink.buffer-flush.max-rows' = '10');

关联与输出

这里有个注意点,笔者踩的一个坑就是在起别名时要注意是把 KafkaTable 以及它所连接的 Table Function 输出结果看做是一个整体 k 表,之后再与维表进行 join 和输出。

// 错误写法,导致 can not find k.procTimeINSERT INTO sinktable select    // 省略处理字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,'media_site_id, ... 省略其他字段')) as k LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.procTime as d ON k.game_id = d.game_id and k.platform = d.platform where k.platform <> '' and k.platform IS NOT NULL and k.game_id IS NOT NULL and IsPositiveNumber(k.uid) and IsMaxForwardTime(k.login_time);// 正确写法INSERT INTO sinktable select    // 省略处理字段 from (select * from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,'media_site_id, ... 省略其他字段'))) as k LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.procTime as d ON k.game_id = d.game_id and k.platform = d.platform where k.platform <> '' and k.platform IS NOT NULL and k.game_id IS NOT NULL and IsPositiveNumber(k.uid) and IsMaxForwardTime(k.login_time);

自此 Kafka 流关联维表并输出到 MySQL 的任务算是编写结束。

拓展

错误的产生

接下来笔者将输出改为 ADB, 执行上面的任务时出现下面的异常报错。

cannot update pk column UID to expr

经过分析大概原因是因为 join 了维表之后在输出时使用了 upsert 语义(官方在出于幂等性考虑上也倾向于 upsert 语义), 在 flink-connect-jdbc 的 MySQLDialect 中对于 upsert 语义的实现是采用 INSERT INTO ON DUPLICATE KEY UPDATE 语法来实现, update 时拼入了全部字段(包含了主键), 由于 ADB 在使用该语法时是不支持对 pk 字段进行更新,所以导致出现了该错误。

解决的方案

这里笔者基于 flink-connect-jdbc 重新编写了一个 adb 的 connect,在 AdbDynamicTableFactory.getJdbcOptions 方法里指定了使用 AdbDialect 方言。

AdbDialect 方言继承 MySQLDialect,并重写 getUpsertStatement,这里笔者根据业务场景决定将 upsert 流修改为 Replace 语法(暂未性能测试),或可改为在构建语句时排除掉主键字段,各位可自行尝试。

关于连接器的编写可参考 FlinkSQL自定义 redis connector

最后

参考文章

[1]

[2]

[3]

[4]

[5]

[6]

[7]

感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。

标签: #mysql列转行函数转多个字段