龙空技术网

kafka入门

一点鑫得 146

前言:

目前兄弟们对“apache解压版启动错误1”大约比较看重,各位老铁们都想要分析一些“apache解压版启动错误1”的相关知识。那么小编也在网上收集了一些关于“apache解压版启动错误1””的相关知识,希望大家能喜欢,大家一起来了解一下吧!

一、Kafka概述

Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。 Kafka非常快,执行2百万写/秒。 Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存。 这使得将数据从页面缓存传输到网络套接字非常有效。

二、Kafka示例环境搭建

本示例搭建的Kafka环境为示例代码演示环境,生产环境一般为集群配置。

官网下载kafka并解压:Apache Kafka

启动zookeeper:Kafka依赖zookeeper服务,cd到Kafka解压目录,执行如下命令启动zookeeper。

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka:配置config/server.properties,增加listeners配置如下,其中IP地址为本机IP,保存配置。

listeners=PLAINTEXT://192.168.2.14:9092

执行bin/kafka-server-start.sh config/server.properties启动Kafka。

安装Kafka可视化工具:Offset Explorer (kafkatool.com)。

如果正常工作环境为Windows系统,可以下载安装Kafka可视化工具,方便查看Kafka Topic消息及消费组信息等。

安装完毕后,打开Offset Explorer,参照如下配置新建连接,可以查看Brokers、Topic以及Consumers信息,初次打开的话内容应该为空,没有新建的Topic、也没有Consumer。

三、多个消费组独立消费同一个消息队列示例

Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。 换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。接下来通过java编写代码来演示该能力。

创建java maven工程,pom.xml添加Kafka依赖。

<dependencies>      <dependency>          <groupId>org.apache.kafka</groupId>          <artifactId>kafka-clients</artifactId>          <version>2.8.1</version>      </dependency>  </dependencies>

创建ProducerDemo类,代码及注释如下:

package com.demo.kafka;import java.util.Properties;import java.util.Scanner;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;public class ProducerDemo {    // Kafka消息主题    public final static String TOPIC = "DEMO";    // Kafka Broker Server    public final static String BROKER_LIST = "192.168.2.14:9092";    public static void main(String[] args) {        Properties props = new Properties();        // 设置key序列化器        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // 设置value序列化器        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");        // 设置重试次数        props.put(ProducerConfig.RETRIES_CONFIG, 10);        // 设置集群服务地址        props.put("bootstrap.servers", BROKER_LIST);        // 根据上述属性配置创建生产者        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);        while (true) {            // 控制台接收用户输入消息,回车发送            Scanner input = new Scanner(System.in);            if (input.hasNextLine()) {                String message = input.nextLine();                ProducerRecord<String, String> record = new ProducerRecord<String, String>(TOPIC, message);                try {                    producer.send(record);                } catch (Exception e) {                    e.printStackTrace();                    producer.close();                }            }        }    }}

创建ConsumerDemo类,代码及注释如下:

package com.demo.kafka;import java.time.Duration;import java.util.Arrays;import java.util.Collections;import java.util.Properties;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;public class ConsumerDemo extends Thread {    // Kafka消息主题    public final static String TOPIC = "DEMO";    // Kafka Broker Server    public final static String BROKER_LIST = "192.168.2.14:9092";    public static void main(String[] args) {        Properties props = new Properties();        // 设置key反序列化器        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // 设置value反序列化器        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");        // 设置集群服务地址        props.put("bootstrap.servers", BROKER_LIST);        // 创建消费组A        props.put("group.id", "group.A");        KafkaConsumer<String, String> consumerA = new KafkaConsumer<>(props);        // 创建消费组B        props.put("group.id", "group.B");        KafkaConsumer<String, String> consumerB = new KafkaConsumer<>(props);        // 创建消费组C        props.put("group.id", "group.C");        KafkaConsumer<String, String> consumerC = new KafkaConsumer<>(props);        // 消费组A、B、C订阅同一个消息队列        consumerA.subscribe(Collections.singletonList(TOPIC));        consumerB.subscribe(Collections.singletonList(TOPIC));        consumerC.subscribe(Collections.singletonList(TOPIC));        while (true) {            // 消费组A消费TOPIC中的消息,拉取间隔为1s            ConsumerRecords<String, String> records = consumerA.poll(Duration.ofMillis(1000));            for (ConsumerRecord<String, String> record : records) {                System.out.println("consumerA消费消息:");                System.out.printf("offset = %d, value = %s", record.offset(), record.value());                System.out.println();            }            // 消费组B消费TOPIC中的消息,拉取间隔为1s            records = consumerB.poll(Duration.ofMillis(1000));            for (ConsumerRecord<String, String> record : records) {                System.out.println("consumerB消费消息:");                System.out.printf("offset = %d, value = %s", record.offset(), record.value());                System.out.println();            }            // 消费组C消费TOPIC中的消息,拉取间隔为1s            records = consumerC.poll(Duration.ofMillis(1000));            for (ConsumerRecord<String, String> record : records) {                System.out.println("consumerC消费消息:");                System.out.printf("offset = %d, value = %s", record.offset(), record.value());                System.out.println();            }        }    }}

运行ConsumerDemo.main(),该类共创建了三个消费组consumerA、consumerB、consumerC,均订阅了同一个消息主题DEMO,并持续在控制台打印消费的消息。

- 运行ProducerDemo.main(),该类创建了一个生产者并持续接收控制台用户的输入作为消息内容,并向消息主题DEMO发送消息。

现在,试着在控制台输入一条消息并回车。

然后查看ConsumerDemo页签查看各消费组消费的消息信息,可以发现三个消费组都消费了同一个主题信息。

- 通过Offset Explorer可视化工具也可以查看到对应TOPIC下的消息,以及创建的消费组。


标签: #apache解压版启动错误1