龙空技术网

Flink 1.9—SQL 创建 Kafka 数据源

Lake说科技 497

前言:

今天小伙伴们对“如何查看sql中的数据源”大概比较关切,你们都需要分析一些“如何查看sql中的数据源”的相关内容。那么小编在网上搜集了一些对于“如何查看sql中的数据源””的相关资讯,希望各位老铁们能喜欢,各位老铁们一起来了解一下吧!

前言

目前 Flink 1.9 SQL 支持用户直接使用 SQL 语句创建 Kafka 数据源,这极大的方便了用户开发 Flink 实时任务,你可以像 Hive 一样,使用 Create Table 语句来创建 Kafka Source,同时在也可以使用 Select 语句,从这个表中读取数据,进行窗口、ETL等操作。本文主要讲解 Flink 1.9 SQL 创建 Kafka 的 SQL 语法使用,当然,使用这个功能的前提,是你选择使用 Blink Planner。


Flink SQL Kafka Source DDL 语句

首先,一般你的 Kafka 数据源里面的消息格式为 Json ,这样在 Flink SQL 创建 Kafka 数据源的时候,指定消息格式为 Json,表中的定义的确保字段的名称和 Json 中的字段保持一致,下面是 Flink SQL 代码实例:

create table kafka_topic_src(id varchar,name varchar,age varchar,) with ('connector.type' = 'kafka', 'connector.version' = '0.10','connector.topic' = 'your_topic','connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'your_consumer_id','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_bootstrap_servers','connector.property-version' = '1','connector.startup-mode' = 'latest-offset','format.type' = 'json','format.property-version' = '1','format.derive-schema' = 'true','update-mode' = 'append');

上面的 Flink SQL 语句中,定义了三个字段,id、name、age。所以你的 Json 数据格式要包含这三个字段,如果没有包含某个字段,Flink 默认会使用 null 进行填充。

当然,你也可以使用 Json 中部分字段进行使用,比如你只需要 Json 中的 id、name,你也可以这样定义:

create table kafka_topic_src(id varchar,name varchar) with ('connector.type' = 'kafka', 'connector.version' = '0.10','connector.topic' = 'your_topic','connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'your_consumer_id','connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_bootstrap_servers','connector.property-version' = '1','connector.startup-mode' = 'latest-offset','format.type' = 'json','format.property-version' = '1','format.derive-schema' = 'true','update-mode' = 'append');

注意,如果你的 kafka 消息不是 Json的话,Flink 任务会一直报错,目前 Kafka 的 upadte-mode 只支持 append 模式。


Flink SQL Kafka Source DDL 属性值connector.topic ,kafka Topicconnector.startup-mode ,Flink kafka 消费者启动模式format.type ,kafka 消息内容格式Flink SQL Kafka Source DDL 注意点

Flink SQL 设置 kafka 消费者 group id

'connector.properties.0.key' = 'group.id','connector.properties.0.value' = 'track.log.teamtype.join' 

这两个参数一起来进行设置,在 with 后面的语句中。

设置 kafka bootstrap.servers

'connector.properties.1.key' = 'bootstrap.servers','connector.properties.1.value' = 'your_kafka_boots_servers'

这两个参数要一起设置,具体的 bootstrap.servers 就是你所使用 Topic 所在集群的链接信息。

结语

我是Lake,专注大数据技术原理、人工智能、数据库技术、程序员经验分享,如果我的问答对你有帮助的话,希望你能点赞关注我,感谢。

我会持续大数据、数据库方面的内容,如果你有任何问题,也欢迎关注私信我,我会认真解答每一个问题,期待您的关注

标签: #如何查看sql中的数据源