龙空技术网

.NET Core下Kafka部署安装及简单使用

IT技术资源爱好者 133

前言:

此刻看官们对“kafka部署安装”都比较讲究,同学们都想要剖析一些“kafka部署安装”的相关资讯。那么小编在网上搜集了一些有关“kafka部署安装””的相关知识,希望兄弟们能喜欢,各位老铁们一起来学习一下吧!

一、环境准备

1、jdk 8+

2、zookeeper

3、kafka

说明:在kafka较新版本中已经集成了zookeeper,所以不用单独安装zookeeper,只需要在kafka文件目录中启动zookeeper即可

二、下载地址

三、部署1、启动zookeeper

-- 启动./bin/zookeeper-server-start.sh -daemon config/zookeeper.properties-- 查看是否启动成功ps -ef | grep zoo
2、进入解压的kafka目录,修改/config/kafka-server的配置文件
vi config/server.properties
-- 重点配置节点说明listeners=PLAINTEXT://localhost:9092   -- 将localhost修改为主机iplog.dirs=/bigdata/kafka/logs-1  -- 默认不修改也可
3、使用kafka-server-start.sh,启动kafka服务
./bin/kafka-server-start.sh config/server.properties
四、使用客户端kafka tools连接kafka

客户端下载地址:

关于客户端如何使用可查看:

五、kafka实战简单使用

NuGet:Confluent.Kafka

1、新建.Net Core控制台项目,代码如下:

static void Main(string[] args)        {            // 发送消息            var producerConfig = new ProducerConfig            {                BootstrapServers = "192.168.140.131:9092",                MessageTimeoutMs = 50000            };            var builder = new ProducerBuilder<string, string>(producerConfig);            using (var producer = builder.Build())            {                var data = new { key = "1", value = "001" };                var json = JsonConvert.SerializeObject(data);                var dr = producer.ProduceAsync("order", new Message<string, string> { Key = "order", Value = json }).GetAwaiter().GetResult();                Console.WriteLine($"发送事件{dr.Value}到{dr.TopicPartitionOffset}成功");            }            // 消费消息            var consumerConfig = new ConsumerConfig {                BootstrapServers = "192.168.140.131:9092",                AutoOffsetReset=AutoOffsetReset.Earliest,                GroupId="1111", // 自定义                EnableAutoCommit=true            };            var consumerBuilder = new ConsumerBuilder<string, string>(consumerConfig);            using (var consumer = consumerBuilder.Build())            {                // 1、订阅                consumer.Subscribe("order");                while (true)                {                    try                    {                        // 2、消费(自动确认)                        var result = consumer.Consume();                        // 3、业务逻辑                        string key = result.Key;                        string value = result.Value;                        Console.WriteLine($"创建商品:Key:{key}");                        Console.WriteLine($"创建商品:Order:{value}");                        consumer.Commit(result);                    }                    catch (Exception e)                    {                        Console.WriteLine($"异常:Order:{e}");                    }                }            }        }
2、使用kafka客户端查看消息投递

标签: #kafka部署安装