龙空技术网

flume监控文件写入kafka

爱学习的小菜鸟 103

前言:

而今同学们对“flume采集数据到kafka”可能比较关注,看官们都想要知道一些“flume采集数据到kafka”的相关知识。那么小编在网摘上网罗了一些对于“flume采集数据到kafka””的相关知识,希望朋友们能喜欢,看官们一起来了解一下吧!

flume监控文件写入kafka

之前了解过log4j2将日志直接写入kafka中,这样会导致应用程序直接依赖于kafka运行环境。

而一般通常的做法是应用程序将日志写入本地,通过日志采集工具将本地日志同步到远程服务器,flume就是常用的数据采集工具之一。

配置

#事件源名称agent.sources = execSource#通道名称agent.channels = memoryChannel#接收器名称agent.sinks = kafkaSink# For each one of the sources, the type is definedagent.sources.execSource.type = execagent.sources.execSource.command = echo "测试一下"# The channel can be defined as follows.# 事件源的通道,绑定通道agent.sources.execSource.channels = memoryChannel# Each sink's type must be defined# kafka接收器配置agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSinkagent.sinks.kafkaSink.kafka.bootstrap.servers = localhost:9092agent.sinks.kafkaSink.kafka.topic = flume-kafkaagent.sinks.kafkaSink.kafka.serializer.class = kafka.serializer.StringEncoderagent.sinks.kafkaSink.kafka.producer.acks = 1#Specify the channel the sink should use# 接收器通道名称,绑定通道agent.sinks.kafkaSink.channel = memoryChannel# Each channel's type is defined.agent.channels.memoryChannel.type = memory# Other config values specific to each type of channel(sink or source)# can be defined as well# In this case, it specifies the capacity of the memory channel# 通道中停留的最大事件数agent.channels.memoryChannel.capacity = 100
执行命令
flume-ng agent -c /usr/local/Cellar/flume/1.9.0_1/libexec/conf -f conf/flume-kafka-conf.properties --name agent
查看kafka结果
>kafka-run-class kafka.tools.DumpLogSegments --files /usr/local/var/lib/kafka-logs/flume-kafka-0/00000000000000000000.log --print-data-logDumping /usr/local/var/lib/kafka-logs/flume-kafka-0/00000000000000000000.logStarting offset: 0baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1603274630544 size: 82 magic: 2 compresscodec: NONE crc: 2637763778 isvalid: true| offset: 0 CreateTime: 1603274630544 keysize: -1 valuesize: 14 sequence: -1 headerKeys: [] payload: "测试一下"

标签: #flume采集数据到kafka