前言:
此刻看官们对“kafka部署安装”都比较讲究,同学们都想要剖析一些“kafka部署安装”的相关资讯。那么小编在网上搜集了一些有关“kafka部署安装””的相关知识,希望兄弟们能喜欢,各位老铁们一起来学习一下吧!一、环境准备
1、jdk 8+
2、zookeeper
3、kafka
说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可
二、下载地址
三、部署1、启动zookeeper
-- 启动./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties-- 查看是否启动成功ps -ef | grep zoo2、进入解压的kafka目录,修改/config/kafka-server的配置文件
vi config/server.properties
-- 重点配置节点说明listeners=PLAINTEXT://localhost:9092 -- 将localhost修改为主机iplog.dirs=/bigdata/kafka/logs-1 -- 默认不修改也可3、使用kafka-server-start.sh,启动kafka服务
./bin/kafka-server-start.sh config/server.properties四、使用客户端kafka tools连接kafka
客户端下载地址:
关于客户端如何使用可查看:
五、kafka实战简单使用
NuGet:Confluent.Kafka
1、新建.Net Core控制台项目,代码如下:
static void Main(string[] args) { // 发送消息 var producerConfig = new ProducerConfig { BootstrapServers = "192.168.140.131:9092", MessageTimeoutMs = 50000 }; var builder = new ProducerBuilder<string, string>(producerConfig); using (var producer = builder.Build()) { var data = new { key = "1", value = "001" }; var json = JsonConvert.SerializeObject(data); var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult(); Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功"); } // 消费消息 var consumerConfig = new ConsumerConfig { BootstrapServers = "192.168.140.131:9092", AutoOffsetReset=AutoOffsetReset.Earliest, GroupId="1111", // 自定义 EnableAutoCommit=true }; var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig); using (var consumer = consumerBuilder.Build()) { // 1、订阅 consumer.Subscribe("order"); while (true) { try { // 2、消费(自动确认) var result = consumer.Consume(); // 3、业务逻辑 string key = result.Key; string value = result.Value; Console.WriteLine($"创建商品:Key:{key}"); Console.WriteLine($"创建商品:Order:{value}"); consumer.Commit(result); } catch (Exception e) { Console.WriteLine($"异常:Order:{e}"); } } } }2、使用kafka客户端查看消息投递
版权声明:
本站文章均来自互联网搜集,如有侵犯您的权益,请联系我们删除,谢谢。
标签: #kafka部署安装