前言:
当前我们对“kafka获取数据”大概比较关切,兄弟们都想要分析一些“kafka获取数据”的相关资讯。那么小编在网摘上收集了一些关于“kafka获取数据””的相关内容,希望小伙伴们能喜欢,朋友们一起来学习一下吧!背景Druid的特性
1.亚秒级查询:druid提供了快速的聚合能力以及亚秒级的OLAP查询能力,多租户的设计,是面向用户分析应用的理想方式。
2.实时数据注入:druid支持流数据的注入,并提供了数据的事件驱动,保证在实时和离线环境下事件的实效性和统一性。
3.可扩展的PB级存储:druid集群可以很方便的扩容到PB的数据量,每秒百万级别的数据注入。即便在加大数据规模的情况下,也能保证时其效性。
4.多环境部署:druid既可以运行在商业的硬件上,也可以运行在云上。它可以从多种数据系统中注入数据,包括hadoop,spark,kafka,storm和samza等。
5.丰富的社区:druid拥有丰富的社区,供大家学习。
Druid应用场景
1.适用于清洗好的记录实时录入,但不需要更新操作。
2.支持宽表,不用join的方式(换句话说就是一张单表)。
3.可以总结出基础的统计指标,可以用一个字段表示。
4.对时区和时间维度(year、month、week、day、hour等)要求高的(甚至到分钟级别)。
5.实时性很重要。
6.对数据质量的敏感度不高。
7.用于定位效果分析和策略决策参考。
开始前我们先撸一遍Druid配置 *以下配置为(单机版)
以下教程假设您已经搭建起来Druid,如果还没搭建请自行百度查看相关教程
以下整理绝对干货,并经过实践验证过。 不得不说druid是个好工具
默认druid时区是Z,所有需要修改Duser.timezone的值+0800
broker配置 cat jvm.config
-server-Xms512m-Xmx512m-XX:MaxDirectMemorySize=768m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
coordinator配置 cat jvm.config
-server-Xms256m-Xmx256m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager-Dderby.stream.error.file=var/druid/derby.log
historical配置 cat jvm.config
-server-Xms512m-Xmx512m-XX:MaxDirectMemorySize=1280m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
middleManager配置 cat jvm.config
-server-Xms64m-Xmx64m-XX:+ExitOnOutOfMemoryError-XX:+UseG1GC-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
router配置 cat jvm.config
-server-Xms128m-Xmx128m-XX:+UseG1GC-XX:MaxDirectMemorySize=128m-XX:+ExitOnOutOfMemoryError-Duser.timezone=UTC+0800-Dfile.encoding=UTF-8-Djava.io.tmpdir=var/tmp-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
durid默认采用JSON的配置方式,大概整体结构如下:
{ "type": "kafka", "dataSchema": { "dataSource": "weiboshop-realtime-kafka-to-druid", "parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "ts", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "times", "type", "weizhi" ] } } }, "metricsSpec": [ {"type":"count", "name":"renshu", "fieldName":"renshu"} ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "MINUTE", "rollup": true } }, "tuningConfig": { "type": "kafka", "reportParseExceptions": false }, "ioConfig": { "type" : "realtime", "topic": "weiboshop", "replicas": 2, "taskDuration": "PT10M", "completionTimeout": "PT20M", "consumerProperties": { "bootstrap.servers": "localhost:9092" } }}druid supervisors任务配置文件详解
{ "type": "kafka", "dataSchema": { "dimensionsSpec": {... ...}, "transformSpec":{... ...}, "metricsSpec":{... ...} }, "tuningConfig": {... ...}, "ioConfig": {... ...}}数据源 dataSchema
数据源配置有5部分,表信息、解析器、数据转换、指标度量和聚合&查询粒度
大概数据源结构如下"dataSchema": { "dataSource": "druid_table_name", "parser": {}, "transformSpec": {}, "metricsSpec": {}}表信息
"dataSource": "druid_table_name",解析器
"parser": { "type": "string", "parseSpec": { "format": "json", "timestampSpec": { "column": "time", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "appName", "nodeName" ], "dimensionExclusions": [] } }数据转换 数据转换主要使用:时间转换、表达式、过滤等;启动,表达式可满足一些根据不同区间范围的指标统计类需求
"transformSpec": { "transforms": [ { "type": "expression", "name": "time", "expression": "timestamp_format(time,'yyyy-MM-dd HH:mm:ss.SSS')" }, { "type": "expression", "name": "status", "expression": "if(status, 1, 0)" }, { "type": "expression", "name": "processRange1", "expression": "if(processTime<=100, 1, 0)" }, { "type": "expression", "name": "processRange2", "expression": "if(processTime>100 && processTime<= 500, 1, 0)" }, { "type": "expression", "name": "processRange3", "expression": "if(processTime>500, 1, 0)" } ] },指标度量 指标度量主要使用:Sum、Max、Min、hyperUnique
"metricsSpec": [{ "name": "TotalTransCount", "fieldName": "count", "type": "longSum" }, { "name": "MaxProcessTime", "fieldName": "processTime", "type": "longMax" }, { "name": "MinProcessTime", "fieldName": "processTime", "type": "longMin" }, { "name": "TotalProcessRange1", "fieldName": "processRange1", "type": "longSum" }, { "name": "TotalProcessRange1", "fieldName": "processRange2", "type": "longSum" }, { "name": "TotalProcessRange1", "fieldName": "processRange3", "type": "longSum" }, { "name": "NodeName", "fieldName": "nodeName", "type": "hyperUnique" }]聚合&查询粒度 all、none、second、minute、fifteen_minute、thirty_minute、hour、day、week、month、quarter、year
"granularitySpec": { "type": "uniform", "segmentGranularity": "DAY", "queryGranularity": "HOUR"}任务及Segments配置
任务及Segments主要配置生成segment的大小,合并任务进程数
"tuningConfig": { "type": "kafka", "maxRowsPerSegment": 500000, "workerThreads": 2, "reportParseExceptions": false},数据接入信息
数据接入信息主要包括:kafka sonsumer相关配置和任务执行间隔
"ioConfig": { "topic": "kafka_topic_name", "consumerProperties": { "bootstrap.servers": "hostname:9092" }, "useEarliestOffset": false, "taskCount": 3, "replicas": 1, "taskDuration": "PT1H"}后聚合配置
主要使用在查询时,根据业务场景需求,需要配置在度量指标基础上运算获取二级指标
"aggregations": [{ "type": "count", "name": "count"}]Kafka数据摄入Druid
1、 创建电商 topic ./kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic weiboshop
Created topic weiboshop.
制作数据
{"ts":"2020-03-18 20:19:56","type":"huodong","weizhi":"活动banner","renshu":2}{"ts":"2020-03-18 21:29:56","type":"huodong","weizhi":"活动banner","renshu":3}{"ts":"2020-03-18 22:39:56","type":"huodong","weizhi":"活动banner","renshu":4}{"ts":"2020-03-18 22:11:56","type":"huodong","weizhi":"头条list","renshu":14}{"ts":"2020-03-18 22:11:57","type":"toutiao","weizhi":"头条list","renshu":15}{"ts":"2020-03-18 22:11:57","type":"toutiao","weizhi":"头条list","renshu":16}{"ts":"2020-03-18 22:12:00","type":"toutiao","weizhi":"头条list","renshu":17}{"ts":"2020-03-18 22:13:00","type":"toutiao","weizhi":"头条list","renshu":18}{"ts":"2020-03-18 22:13:00","type":"toutiao","weizhi":"头条list","renshu":19}{"ts":"2020-03-18 22:14:00","type":"toutiao","weizhi":"头条list","renshu":20}
以下图为可视化配置
1
2
3
3
4
5
6、看到这里您会发现,生成的JSON,跟我开头展示的JSON基本一致。druid还是真为我们这些新手小白考虑哈
7
9
10
11
结束语
假设您看到最后,整理实践不容易,能否给个赞或者喜欢呢,万分感谢
标签: #kafka获取数据