前言:
现在兄弟们对“mysql列转行函数转多个字段”大概比较重视,各位老铁们都想要学习一些“mysql列转行函数转多个字段”的相关知识。那么小编同时在网摘上收集了一些关于“mysql列转行函数转多个字段””的相关内容,希望朋友们能喜欢,各位老铁们快快来了解一下吧!导读:本文主要讨论如何编写 FlinkSQL 任务实现从 KafkaSource 获取数据并关联维表 DimTable 后写入 MySQLSink。之后再讨论笔者在将输出从 MySQL 改为写入阿里云 AnalyticDB MySQL版(ADB)后所遇到的问题以及解决的思路,希望对各位有所帮助。
基础环境java 8Flink 1.12.2Flink SQL 编写
1、对接收到的 Kafka 数据进行列转行处理
通过 filebeat 采集 Nginx 日志后输出 Kafka 中,Flink KafkaSource 从 Kafka 中取出 json 格式的数据进行处理。由于目标数据是以 json 字符串形式存在 message 字段中,因此我们需要在 Flink 中进行数据处理,将 message 这一列的值进行解析转行。
{ "@timestamp":"2021-06-05T11:55:44.608Z", "@metadata":{ }, "message":"{"内嵌JSON字符串"}}
解决思路:UDTF + Table Function Join
关于编写列转行 UDTF 在 Flink 官方文档[1] 中已详细说明,这里就不再赘述。主要就是继承 TableFunction<Row> , 编写 eval() 创建 Row 并通过 collect(row) 输出行。如果有特殊需求可重写 getTypeInference() 自定义类型推断。
KafkaSource 建表语句
CREATE TABLE KafkaTable ( message STRING, procTime AS PROCTIME() // 后面进行 join 时要用到) WITH ( 'connector' = 'kafka', 'topic' = 'filebeat_json_login_log_test', 'properties.bootstrap.servers' = '*********', 'properties.group.id' = 'dwdRootGameUidReg', 'scan.startup.mode' = 'group-offsets', 'format' = 'json');
列转行 UDTF(Demo)
public class BaseColumnToRow extends TableFunction<Row> { // 转行逻辑 public void eval(String data, String columnWithType) { // 解析 JSON 字符串 Map<String, Object> paramMap = (Map) JSON.parse(data); //.. 省略部分代码 Row row = new Row(size); collect(row); } // 自定义类型推断 @Override public TypeInference getTypeInference(DataTypeFactory typeFactory) { }}
Table Function Join
将表与表函数的结果连接。左表(外部表)的每一行都与对应的table函数调用产生的所有行连接,这里我们将 Kafka 的行数据与经过 UDTF 产生的行结果进行连接,大致语法如下:
// 可参考官方文档 [2]KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,'media_site_id', 省略其他字段......))
2、维表创建
维表 join 有以下几种方式,每一种 Join 都有其特点以及主要解决的场景,官方文档[3]也对每种 join 做出了详细的解析:
Regular Joins 最通用的连接类型,其中任何新记录或对连接任意一侧的更改都是可见的,并影响整个连接结果。例如,如果左边有一个新记录,它将与右边所有以前和将来的记录合并,这里表会无限增长存在资源问题,所以一般用做有界数据流的 JoinInterval Joins 利用窗口的给两个输入表设定一个 Join 的时间界限,超出时间范围的数据则对 JOIN 不可见并可以被清理掉,这要求两个输入流都必须有时间下界Temporal Joins 分为 Processing Time Temporal Join 和 Event Time Temporal Join 。其将左侧输入称为 Probe 右侧输入称为 Build,Build Table 一般是维度表的 changelog
这里我根据实际情况决定采用了 Processing Time Temporal Join 方式。
同时开启了 Lookup Cache 官方文档[4] 用于提升性能,其大致原理是每一个TaskMananger都维护一个 cache,Flink 将首先查找缓存,只在缓存丢失时向外部数据库发送请求,并使用返回的行更新缓存。通过配置lookup.cache.max-rows 、 lookup.cache.ttl 这两项启动, 在设置值时需要考虑吞吐量和正确性之间的平衡。
// 创建维表语句CREATE TABLE DimTable ( game_id BIGINT, root_game_id BIGINT, main_game_id BIGINT, platform VARCHAR, PRIMARY KEY (game_id, platform) NOT ENFORCED) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://***', 'table-name' = 'v2_dim_game_id', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '*****', 'password' = '*****,', 'lookup.cache.max-rows'='5000', 'lookup.cache.ttl' = '60s', 'lookup.max-retries'='3');
3、结果表的创建,以及KafkaSource关联维表并输出到结果表
结果表创建
CREATE TABLE sinktable ( // 省略字段) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://*******', 'table-name' = 'v2_dwd_root_game_uid_reg_log', 'driver' = 'com.mysql.cj.jdbc.Driver', 'username' = '******', 'password' = '******', 'sink.buffer-flush.interval'='5s', 'sink.buffer-flush.max-rows' = '10');
关联与输出
这里有个注意点,笔者踩的一个坑就是在起别名时要注意是把 KafkaTable 以及它所连接的 Table Function 输出结果看做是一个整体 k 表,之后再与维表进行 join 和输出。
// 错误写法,导致 can not find k.procTimeINSERT INTO sinktable select // 省略处理字段 from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,'media_site_id, ... 省略其他字段')) as k LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.procTime as d ON k.game_id = d.game_id and k.platform = d.platform where k.platform <> '' and k.platform IS NOT NULL and k.game_id IS NOT NULL and IsPositiveNumber(k.uid) and IsMaxForwardTime(k.login_time);// 正确写法INSERT INTO sinktable select // 省略处理字段 from (select * from KafkaTable,LATERAL TABLE(RequestBodyColumnToRow(message,'media_site_id, ... 省略其他字段'))) as k LEFT JOIN DimTable FOR SYSTEM_TIME AS OF k.procTime as d ON k.game_id = d.game_id and k.platform = d.platform where k.platform <> '' and k.platform IS NOT NULL and k.game_id IS NOT NULL and IsPositiveNumber(k.uid) and IsMaxForwardTime(k.login_time);
自此 Kafka 流关联维表并输出到 MySQL 的任务算是编写结束。
拓展
错误的产生
接下来笔者将输出改为 ADB, 执行上面的任务时出现下面的异常报错。
cannot update pk column UID to expr
经过分析大概原因是因为 join 了维表之后在输出时使用了 upsert 语义(官方在出于幂等性考虑上也倾向于 upsert 语义), 在 flink-connect-jdbc 的 MySQLDialect 中对于 upsert 语义的实现是采用 INSERT INTO ON DUPLICATE KEY UPDATE 语法来实现, update 时拼入了全部字段(包含了主键), 由于 ADB 在使用该语法时是不支持对 pk 字段进行更新,所以导致出现了该错误。
解决的方案
这里笔者基于 flink-connect-jdbc 重新编写了一个 adb 的 connect,在 AdbDynamicTableFactory.getJdbcOptions 方法里指定了使用 AdbDialect 方言。
AdbDialect 方言继承 MySQLDialect,并重写 getUpsertStatement,这里笔者根据业务场景决定将 upsert 流修改为 Replace 语法(暂未性能测试),或可改为在构建语句时排除掉主键字段,各位可自行尝试。
关于连接器的编写可参考 FlinkSQL自定义 redis connector
最后
参考文章
[1]
[2]
[3]
[4]
[5]
[6]
[7]
感谢您的阅读,如果喜欢本文欢迎关注和转发,转载需注明出处,本头条号将坚持持续分享IT技术知识。对于文章内容有其他想法或意见建议等,欢迎提出共同讨论共同进步。
标签: #mysql列转行函数转多个字段