龙空技术网

Kafka实时流数据接入 - 吐血梳理并实践

晓亮星域天地 1474

前言:

当前我们对“kafka获取数据”大概比较关切,兄弟们都想要分析一些“kafka获取数据”的相关资讯。那么小编在网摘上收集了一些关于“kafka获取数据””的相关内容,希望小伙伴们能喜欢,朋友们一起来学习一下吧!

背景Druid的特性

1.亚秒级查询:druid提供了快速的聚合能力以及亚秒级的OLAP查询能力,多租户的设计,是面向用户分析应用的理想方式。

2.实时数据注入:druid支持流数据的注入,并提供了数据的事件驱动,保证在实时和离线环境下事件的实效性和统一性。

3.可扩展的PB级存储:druid集群可以很方便的扩容到PB的数据量,每秒百万级别的数据注入。即便在加大数据规模的情况下,也能保证时其效性。

4.多环境部署:druid既可以运行在商业的硬件上,也可以运行在云上。它可以从多种数据系统中注入数据,包括hadoop,spark,kafka,storm和samza等。

5.丰富的社区:druid拥有丰富的社区,供大家学习。

Druid应用场景

1.适用于清洗好的记录实时录入,但不需要更新操作。

2.支持宽表,不用join的方式(换句话说就是一张单表)。

3.可以总结出基础的统计指标,可以用一个字段表示。

4.对时区和时间维度(year、month、week、day、hour等)要求高的(甚至到分钟级别)。

5.实时性很重要。

6.对数据质量的敏感度不高。

7.用于定位效果分析和策略决策参考。

开始前我们先撸一遍Druid配置 *以下配置为(单机版)

以下教程假设您已经搭建起来Druid,如果还没搭建请自行百度查看相关教程

以下整理绝对干货,并经过实践验证过。 不得不说druid是个好工具

默认druid时区是Z,所有需要修改Duser.timezone的值+0800

broker配置 cat jvm.config

-server-Xms512m-Xmx512m-XX:MaxDirectMemorySize=768m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

coordinator配置 cat jvm.config

-server-Xms256m-Xmx256m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Dderby.stream.error.file=var/druid/derby.log

historical配置 cat jvm.config

-server-Xms512m-Xmx512m-XX:MaxDirectMemorySize=1280m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

middleManager配置 cat jvm.config

-server-Xms64m-Xmx64m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

router配置 cat jvm.config

-server-Xms128m-Xmx128m-XX:+UseG1GC-XX:MaxDirectMemorySize=128m-XX:+ExitOnOutOfMemoryError-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

durid默认采用JSON的配置方式,大概整体结构如下:

{	"type": "kafka",	"dataSchema": {		"dataSource": "weiboshop-realtime-kafka-to-druid",		"parser": {			"type": "string",			"parseSpec": {                "format": "json",				"timestampSpec": {					"column": "ts",					"format": "auto"				},				"dimensionsSpec": {					"dimensions": [						"times",                        "type",                        "weizhi"					]				}			}		},		"metricsSpec": [            {"type":"count", "name":"renshu", "fieldName":"renshu"}        ],		"granularitySpec": {			"type": "uniform",			"segmentGranularity": "HOUR",			"queryGranularity": "MINUTE",			"rollup": true 		}	},	"tuningConfig": {		"type": "kafka",		"reportParseExceptions": false	},	"ioConfig": {        "type" : "realtime",		"topic": "weiboshop",		"replicas": 2,		"taskDuration": "PT10M",		"completionTimeout": "PT20M",		"consumerProperties": {			"bootstrap.servers": "localhost:9092"		}	}}
druid supervisors任务配置文件详解
{  "type": "kafka",  "dataSchema": {      "dimensionsSpec": {... ...},      "transformSpec":{... ...},      "metricsSpec":{... ...}  },  "tuningConfig": {... ...},  "ioConfig": {... ...}}
数据源 dataSchema

数据源配置有5部分,表信息、解析器、数据转换、指标度量和聚合&查询粒度

大概数据源结构如下"dataSchema": {	"dataSource": "druid_table_name",	"parser": {},	"transformSpec": {},	"metricsSpec": {}}
表信息
"dataSource": "druid_table_name",
解析器
"parser": {	"type": "string",	"parseSpec": {		"format": "json",		"timestampSpec": {			"column": "time",			"format": "auto"		},		"dimensionsSpec": {			"dimensions": [				"appName",				"nodeName"			],			"dimensionExclusions": []		}	}
数据转换 数据转换主要使用:时间转换、表达式、过滤等;启动,表达式可满足一些根据不同区间范围的指标统计类需求
    "transformSpec": {            "transforms": [                {                    "type": "expression",                    "name": "time",                    "expression": "timestamp_format(time,'yyyy-MM-dd HH:mm:ss.SSS')"                },                {                    "type": "expression",                    "name": "status",                    "expression": "if(status, 1, 0)"                },                {                    "type": "expression",                    "name": "processRange1",                    "expression": "if(processTime<=100, 1, 0)"                },                {                    "type": "expression",                    "name": "processRange2",                    "expression": "if(processTime>100  && processTime<= 500, 1, 0)"                },                {                    "type": "expression",                    "name": "processRange3",                    "expression": "if(processTime>500, 1, 0)"                }            ]        },
指标度量 指标度量主要使用:Sum、Max、Min、hyperUnique
"metricsSpec": [{		"name": "TotalTransCount",		"fieldName": "count",		"type": "longSum"	},	{		"name": "MaxProcessTime",		"fieldName": "processTime",		"type": "longMax"	},	{		"name": "MinProcessTime",		"fieldName": "processTime",		"type": "longMin"	},	{		"name": "TotalProcessRange1",		"fieldName": "processRange1",		"type": "longSum"	},	{		"name": "TotalProcessRange1",		"fieldName": "processRange2",		"type": "longSum"	},	{		"name": "TotalProcessRange1",		"fieldName": "processRange3",		"type": "longSum"	},	{		"name": "NodeName",		"fieldName": "nodeName",		"type": "hyperUnique"	}]
聚合&查询粒度 all、none、second、minute、fifteen_minute、thirty_minute、hour、day、week、month、quarter、year
"granularitySpec": {	"type": "uniform",	"segmentGranularity": "DAY",	"queryGranularity": "HOUR"}
任务及Segments配置

任务及Segments主要配置生成segment的大小,合并任务进程数

"tuningConfig": {	"type": "kafka",	"maxRowsPerSegment": 500000,	"workerThreads": 2,	"reportParseExceptions": false},
数据接入信息

数据接入信息主要包括:kafka sonsumer相关配置和任务执行间隔

"ioConfig": {	"topic": "kafka_topic_name",	"consumerProperties": {		"bootstrap.servers": "hostname:9092"	},	"useEarliestOffset": false,	"taskCount": 3,	"replicas": 1,	"taskDuration": "PT1H"}
后聚合配置

主要使用在查询时,根据业务场景需求,需要配置在度量指标基础上运算获取二级指标

"aggregations": [{	"type": "count",	"name": "count"}]
Kafka数据摄入Druid

1、 创建电商 topic ./kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic weiboshop

Created topic weiboshop.

制作数据

{"ts":"2020-03-18 20:19:56","type":"huodong","weizhi":"活动banner","renshu":2}{"ts":"2020-03-18 21:29:56","type":"huodong","weizhi":"活动banner","renshu":3}{"ts":"2020-03-18 22:39:56","type":"huodong","weizhi":"活动banner","renshu":4}{"ts":"2020-03-18 22:11:56","type":"huodong","weizhi":"头条list","renshu":14}{"ts":"2020-03-18 22:11:57","type":"toutiao","weizhi":"头条list","renshu":15}{"ts":"2020-03-18 22:11:57","type":"toutiao","weizhi":"头条list","renshu":16}{"ts":"2020-03-18 22:12:00","type":"toutiao","weizhi":"头条list","renshu":17}{"ts":"2020-03-18 22:13:00","type":"toutiao","weizhi":"头条list","renshu":18}{"ts":"2020-03-18 22:13:00","type":"toutiao","weizhi":"头条list","renshu":19}{"ts":"2020-03-18 22:14:00","type":"toutiao","weizhi":"头条list","renshu":20}

以下图为可视化配置

1

2

3

3

4

5

6、看到这里您会发现,生成的JSON,跟我开头展示的JSON基本一致。druid还是真为我们这些新手小白考虑哈

7

9

10

11

结束语

假设您看到最后,整理实践不容易,能否给个赞或者喜欢呢,万分感谢

标签: #kafka获取数据