龙空技术网

【SpringBoot系列教程六】SpringBoot集成Kafka

猿学堂 400

前言:

如今朋友们对“pythonfornetbeans”大概比较注意,兄弟们都需要知道一些“pythonfornetbeans”的相关内容。那么小编在网上收集了一些对于“pythonfornetbeans””的相关文章,希望兄弟们能喜欢,咱们一起来了解一下吧!

Kafka是一个高吞吐量、分布式的发布—订阅消息系统,它最初由LinkedIn公司开发,后来成为Apache项目的一部分。Kafka核心模块使用Scala语言开发,支持多语言(如Java、C/C++、Python、Go、Erlang、Node.js等)客户端,它以可水平扩展和具有高吞吐量等特性而被广泛使用。

据Kafka官方网站介绍,Kafka定位就是一个分布式流处理平台。在官方看来,作为一个流式处理平台,必须具备以下3个关键特性。

能够允许发布和订阅流数据。从这个角度来讲,平台更像一个消息队列或者企业级的消息系统。存储流数据时提供相应的容错机制。当流数据到达时能够被及时处理。

Kafka能够很好满足以上3个特性,通过Kafka能够很好地建立实时流式数据通道,由该通道可靠地获取系统或应用程序的数据,也可以通过Kafka方便地构建实时流数据应用来转换或是对流式数据进行响应处理。

6.1 Kafka简介

一个典型的 Kafka 体系架构包括若干 Producer、若干 Broker、若干Consumer,以及一个ZooKeeper集群,如图所示。其中ZooKeeper是Kafka用来负责集群元数据的管理、控制器的选举等操作的。Producer将消息发送到Broker,Broker负责将收到的消息存储到磁盘中(持久化),而Consumer负责从Broker订阅并消费消息。

整个Kafka体系结构中引入了以下3个术语:

Producer:生产者,也就是发送消息的一方。生产者负责创建消息,然后将其投递到Kafka中。Consumer:消费者,也就是接收消息的一方。消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理。Broker:服务代理节点。对于Kafka而言,Broker可以简单地看作一个独立的Kafka服务节点或Kafka服务实例。大多数情况下也可以将Broker看作一台Kafka服务器,前提是这台服务器上只部署了一个Kafka实例。一个或多个Broker组成了一个Kafka集群。一般而言,我们更习惯使用首字母小写的broker来表示服务代理节点。

在Kafka中还有两个特别重要的概念—主题(Topic)与分区(Partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费。

主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区(Topic-Partition)。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志(Log)文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量(offset)。offset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。

每一条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设定得合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题只对应一个文件,那么这个文件所在的机器 I/O 将会成为这个主题的性能瓶颈,而分区解决了这个问题。在创建主题的时候可以通过指定的参数来设置分区的个数,当然也可以在主题创建完成之后去修改分区的数量,通过增加分区的数量可以实现水平扩展。

Kafka 为分区引入了多副本(Replica)机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。

如图所示,Kafka集群中有4个broker,某个主题中有3个分区,且副本因子(即副本个数)也为3,如此每个分区便有1个leader副本和2个follower副本。生产者和消费者只与leader副本进行交互,而follower副本只负责消息的同步,很多时候follower副本中的消息相对leader副本而言会有一定的滞后。

Kafka 消费端也具备一定的容灾能力。Consumer 使用拉(Pull)模式从服务端拉取消息,并且保存消费的具体位置,当消费者宕机后恢复上线时可以根据之前保存的消费位置重新拉取需要的消息进行消费,这样就不会造成消息丢失。

6.2 Kafka安装与测试6.2.1 Kafka安装

Kafka运行环境还需要涉及ZooKeeper,Kafka和ZooKeeper都是运行在JVM之上的服务,所以还需要安装JDK。由于Kafka内置了Zookeeper服务,因此不需要额外安装ZooKeeper。

首先下载Kafka,下载地址:

下载结束后解压至任意目录,并在目录中新建data和kafka-logs两个目录,结构如下:

进入config目录中。

寻找server.properties配置文件,在文件中加入如下配置,并保存

log.dirs=kafka目录\\kafka-logs

寻找zookeeper.properties,并在文件中加入如下配置:

dataDir=kafka目录\\data

audit.enable=true

上述配置结束后,即可启动Kafka,在启动Kafka前需要先启动Zookeeper,在Kafka安装目录的根目录下运行如下命令。

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

当出现如下界面时,即表示Zookeeper启动成功。

重写打开命令窗口,运行如下命令启动kafka:

bin\windows\kafka-server-start.bat config\server.properties

当出现如下界面是,即表示kafka启动成功。

6.2.2 Kafka脚本

在Kafka安装目录中存在一个bin目录,该目录存放这使用测试kafka的一些脚本文件,因为此处使用的是windows操作系统,windows操作系统的命令,则放置在bin目录下windows目录中。

下面,我们使用脚本文件来操作kafka

6.2.2.1 创建主题

在Kafka中每条消息都需要设置一个主题,这里的主题可以理解为消息的分类,例如订单类的消息,注册类的消息等等。下面的命令即在Kafka中创建了一个名称为byte的主题。

bin\windows\kafka-topics.bat --bootstrap-server localhost:9092 --create --topic byte --partitions 2 --replication-factor 1

上述命令创建了一个名为byte的主题,并且该主题有2个分区,和1个副本

6.2.2.2 查看主题

创建完主题后,可以使用以下命令来查看创建的主题

bin\windows\kafka-topics.bat --list --bootstrap-server localhost:9092
6.2.2.3 删除主题

如果要删除主题可以使用以下命令

bin\windows\kafka-topics.bat --delete --topic test --bootstrap-server localhost:9092
6.2.2.4订阅主题

当创建完主题后消费者需要订阅主题才可以接收到该主题的消息,以下命令即订阅了该主题的消息。

bin\windows\kafka-console-consumer.bat  --bootstrap-server localhost:9092 --topic byte
6.2.2.5发送消息

如果需要发送消息则需要启动一个生成者,如下命令即是启动了一个生产者,并开始在byte主题下发送消息

bin\windows\kafka-console-producer.bat  --broker-list localhost:9092 --topic byte
6.3 Kafka生产者

在上一小节中成功安装了Kafka并启动测试了相关脚本,但是这些脚本一般只用来做一些测试工作,在实际使用中不会只是简单的使用脚本来处理复杂的业务逻辑,相应的工作需要通过编程来实现。

6.3.1 生产者开发

一个正常的生产逻辑需要具备以下几个步骤:

配置生产者客户端参数及创建相应的生产者实例。构建待发送的消息。发送消息关闭生产者实例

下面就对生产者的代码进行简单的演示:

首先导入依赖:

<dependency>    <groupId>org.apache.kafka</groupId>    <artifactId>kafka_2.13</artifactId>    <version>3.2.3</version></dependency>

接下来,通过程序向Kafka发送消息

package cn.bytecollege.kafka.producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class KafkaProducerDemo {    public static final String brokerServer = "localhost:9092";    public static final String topic = "byte";    /**     * 设置初始化参数     * @return     */    public static Properties initConfig(){        Properties properties = new Properties();        properties.put("bootstrap.servers",brokerServer);        properties.put("key.serializer", StringSerializer.class.getName());        properties.put("value.serializer",StringSerializer.class.getName());        properties.put("client.id","clientId-0");        return properties;    }    public static void main(String[] args) {        Properties properties = initConfig();        //创建生产者对象        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);        //<1>构建所需要的消息        ProducerRecord<String,String> record = new ProducerRecord<>(topic,"Hello Kafka");        //发送数据        producer.send(record).get();        //关闭生产者        producer.close();    }}

在上面的代码中定义了initConfig()方法用于配置生产者的参数,正如上例中代码所展示的,在Kafka生产者客户端KafkaProducer中有3个参数是必填的。

bootstrap.servers:该参数用来指定生产者客户端连接Kafka集群所需的broker地址清单,具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为“”。注意这里并非需要所有的broker地址,因为生产者会从给定的broker里查找到其他broker的信息。不过建议至少要设置两个以上的broker 地址信息,当其中任意一个宕机时,生产者仍然可以连接到 Kafka集群上。key.serializer 和 value.serializer:broker 端接收的消息必须以字节数组(byte[])的形式存在。上例中生产者使用的KafkaProducer<String,String>和ProducerRecord<String,String>中的泛型<String,String>对应的就是消息中key和value的类型,生产者客户端使用这种方式可以让代码具有良好的可读性,不过在发往broker之前需要将消息中对应的key和value做相应的序列化操作来转换成字节数组。key.serializer和value.serializer这两个参数分别用来指定key和value序列化操作的序列化器,这两个参数无默认值。注意这里必须填写序列化器的全限定名,如代码中的StringSerializer.class.getName()或者org.apache.kafka.common.serialization.StringSerializer,单单指定StringSerializer是错误的。client.id:这个参数用来设定KafkaProducer对应的客户端id,默认值为“”。如果客户端不设置,则KafkaProducer会自动生成一个非空字符串,内容形式如“producer-1”“producer-2”,即字符串"producer-"与数字的拼接。

KafkaProducer中的参数众多,远非示例initConfig()方法中的那样只有4个,具体可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,无需所有的参数名称,只能有个大致的印象。在实际使用过程中,诸如“key.serializer”“max.request.size”“interceptor.classes”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的 org.apache.kafka.clients.producer.ProducerConfig类来做一定程度上的预防措施,每个参数在 ProducerConfig 类中都有对应的名称。引入ProducerConfig后的修改结果如下:

public static Properties initConfig(){    Properties properties = new Properties();    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokerServer);    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());    properties.put(ProducerConfig.CLIENT_ID_CONFIG,"clientId-0");    return properties;}

在代码注释1处,创建了构建的消息对象 ProducerRecord,它并不是单纯意义上的消息,它包含了多个属性,原本需要发送的与业务相关的消息体只是其中的一个value 属性,比如“Hello,Kafka!”只是ProducerRecord对象中的一个属性。截取ProducerRecord源码中实例变量:

public class ProducerRecord<K, V> {    //消息主题    private final String topic;    //分区号    private final Integer partition;    //消息头部    private final Headers headers;    private final K key;    private final V value;    //消息时间戳    private final Long timestamp;}

其中topic和partition字段分别代表消息要发往的主题和分区号。headers字段是消息的头部,Kafka 0.11.x版本才引入这个属性,它大多用来设定一些与应用相关的信息,如无需要也可以不用设置。key是用来指定消息的键,它不仅是消息的附加信息,还可以用来计算分区号进而可以让消息发往特定的分区。前面提及消息以主题为单位进行归类,而这个key可以让消息再进行二次归类,同一个key的消息会被划分到同一个分区中。value是指消息体,一般不为空,如果为空则表示特定的消息—墓碑消息(xxxxx),timestamp是指消息的时间戳,它有CreateTime和LogAppendTime两种类型,前者表示消息创建的时间,后者表示消息追加到日志文件的时间。

创建生产者实例和构建消息之后,就可以开始发送消息了。发送消息主要有三种模式:发后即忘(fire-and-forget)、同步(sync)及异步(async)。

上例中的这种发送方式就是发后即忘,它只管往Kafka中发送消息而并不关心消息是否正确到达。在大多数情况下,这种发送方式没有什么问题,不过在某些时候(比如发生不可重试异常时)会造成消息的丢失。这种发送方式的性能最高,可靠性也最差。

KafkaProducer 的 send()方法并非是 void 类型,而是 Future<RecordMetadata>类型,send()方法有2个重载方法,具体定义如下:

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {    return send(record, null);}public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {    // intercept the record, which can be potentially modified; this method does not throw exceptions    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);    return doSend(interceptedRecord, callback);}

要实现同步的发送方式,可以利用返回的Future对象实现,发送消息后调用get()方法获取结果。

实际上send()方法本身就是异步的,send()方法返回的Future对象可以使调用方稍后获得发送的结果。示例中在执行send()方法之后直接链式调用了get()方法来阻塞等待Kafka的响应,直到消息发送成功,或者发生异常。如果发生异常,那么就需要捕获异常并交由外层逻辑处理。

KafkaProducer中一般会发生两种类型的异常:可重试的异常和不可重试的异常。常见的可重试异常有:NetworkException、LeaderNotAvailableException、UnknownTopicOrPartitionException、NotEnoughReplicasException、NotCoordinatorException 等。比如NetworkException 表示网络异常,这个有可能是由于网络瞬时故障而导致的异常,可以通过重试解决;又比如LeaderNotAvailableException表示分区的leader副本不可用,这个异常通常发生在leader副本下线而新的 leader 副本选举完成之前,重试之后可以重新恢复。不可重试的异常,还有RecordTooLargeException异常,表示了所发送的消息太大,KafkaProducer对此不会进行任何重试,直接抛出异常。

对于可重试的异常,如果配置了 retries 参数,那么只要在规定的重试次数内自行恢复了,就不会抛出异常。retries参数的默认值为0,配置方式参考如下:

properties.put(ProducerConfig.RETRIES_CONFIG,10);

示例中配置了10次重试。如果重试了10次之后还没有恢复,那么仍会抛出异常,进而发送的外层逻辑就要处理这些异常了。

同步发送的方式可靠性高,要么消息被发送成功,要么发生异常。如果发生异常,则可以捕获并进行相应的处理,而不会像“发后即忘”的方式直接造成消息的丢失。不过同步发送的方式的性能会差很多,需要阻塞等待一条消息发送完之后才能发送下一条。

6.3.2 生产者重要参数

在KafkaProducer中,除了上一小节提及的3个默认的客户端参数,大部分的参数都有合理的默认值,一般不需要修改它们。不过了解这些参数可以让我们更合理地使用生产者客户端,其中还有一些重要的参数涉及程序的可用性和性能,如果能够熟练掌握它们,也可以让我们在编写相关的程序时能够更好地进行性能调优与故障排查。下面挑选一些重要的参数进行讲解。

6.3.2.1 acks

这个参数用来指定分区中必须要有多少个副本收到这条消息,之后生产者才会认为这条消息是成功写入的。acks 是生产者客户端中一个非常重要的参数,它涉及消息的可靠性和吞吐量之间的权衡。acks参数有3种类型的值(都是字符串类型)。

acks=1。默认值即为1。生产者发送消息之后,只要分区的leader副本成功写入消息,那么它就会收到来自服务端的成功响应。如果消息无法写入leader副本,比如在leader 副本崩溃、重新选举新的 leader 副本的过程中,那么生产者就会收到一个错误的响应,为了避免消息丢失,生产者可以选择重发消息。如果消息写入leader副本并返回成功响应给生产者,且在被其他follower副本拉取之前leader副本崩溃,那么此时消息还是会丢失,因为新选举的leader副本中并没有这条对应的消息。acks设置为1,是消息可靠性和吞吐量之间的折中方案。acks=0。生产者发送消息之后不需要等待任何服务端的响应。如果在消息从发送到写入Kafka的过程中出现某些异常,导致Kafka并没有收到这条消息,那么生产者也无从得知,消息也就丢失了。在其他配置环境相同的情况下,acks设置为 0 可以达到最大的吞吐量。acks=-1或acks=all。生产者在消息发送之后,需要等待ISR中的所有副本都成功写入消息之后才能够收到来自服务端的成功响应。在其他配置环境相同的情况下,acks 设置为-1(all)可以达到最强的可靠性。但这并不意味着消息就一定可靠,因为ISR中可能只有leader副本,这样就退化成了acks=1的情况。

注意acks参数配置的值是一个字符串类型,而不是整数类型。

6.3.2.1 max.request.size

这个参数用来限制生产者客户端能发送的消息的最大值,默认值为 1048576B,即1MB。一般情况下,这个默认值就可以满足大多数的应用场景了。并且不建议盲目地增大这个参数的配置值,尤其是在对Kafka整体脉络没有足够把控的时候。因为这个参数还涉及一些其他参数的联动,比如broker端的message.max.bytes参数,如果配置错误可能会引起一些不必要的异常。比如将broker端的message.max.bytes参数配置为10,而max.request.size参数配置为20,那么当我们发送一条大小为15B的消息时,生产者客户端就会抛出异常。

6.3.2.3 retries和retry.backoff.ms

retries参数用来配置生产者重试的次数,默认值为0,即在发生异常的时候不进行任何重试动作。消息在从生产者发出到成功写入服务器之前可能发生一些临时性的异常,比如网络抖动、leader副本的选举等,这种异常往往是可以自行恢复的,生产者可以通过配置retries大于0的值,以此通过内部重试来恢复而不是一味地将异常抛给生产者的应用程序。如果重试达到设定的次数,那么生产者就会放弃重试并返回异常。不过并不是所有的异常都是可以通过重试来解决的,比如消息太大,超过max.request.size参数配置的值时,这种方式就不可行了。

重试还和另一个参数retry.backoff.ms有关,这个参数的默认值为100,它用来设定两次重试之间的时间间隔,避免无效的频繁重试。在配置 retries 和retry.backoff.ms之前,最好先估算一下可能的异常恢复时间,这样可以设定总的重试时间大于这个异常恢复时间,以此来避免生产者过早地放弃重试。

Kafka 可以保证同一个分区中的消息是有序的。如果生产者按照一定的顺序发送消息,那么这些消息也会顺序地写入分区,进而消费者也可以按照同样的顺序消费它们。

6.3.2.4 compression.type

这个参数用来指定消息的压缩方式,默认值为“none”,即默认情况下,消息不会被压缩。该参数还可以配置为“gzip”“snappy”和“lz4”。对消息进行压缩可以极大地减少网络传输量、降低网络I/O,从而提高整体的性能。消息压缩是一种使用时间换空间的优化方式,如果对时延有一定的要求,则不推荐对消息进行压缩。

6.3.2.5 connections.max.idle.ms

这个参数用来指定在多久之后关闭限制的连接,默认值是540000(ms),即9分钟。

6.3.2.6 linger.ms

这个参数用来指定生产者发送 ProducerBatch 之前等待更多消息(ProducerRecord)加入ProducerBatch 的时间,默认值为 0。生产者客户端会在 ProducerBatch 被填满或等待时间超过linger.ms 值时发送出去。增大这个参数的值会增加消息的延迟,但是同时能提升一定的吞吐量。

6.3.2.7 receive.buffer.bytes

这个参数用来设置Socket接收消息缓冲区(SO_RECBUF)的大小,默认值为32768(B),即32KB。如果设置为-1,则使用操作系统的默认值。如果Producer与Kafka处于不同的机房,则可以适地调大这个参数值。

6.3.2.8 send.buffer.bytes

这个参数用来设置Socket发送消息缓冲区(SO_SNDBUF)的大小,默认值为131072(B),即128KB。与receive.buffer.bytes参数一样,如果设置为-1,则使用操作系统的默认值。

6.3.2.9 request.timeout.ms

这个参数用来配置Producer等待请求响应的最长时间,默认值为30000(ms)。请求超时之后可以选择进行重试。注意这个参数需要比broker端参数replica.lag.time.max.ms的值要大,这样可以减少因客户端重试而引起的消息重复的概率。

6.4 Kafka消费者

消费者(Consumer)负责订阅Kafka中的主题(Topic),并且从订阅的主题上拉取消息。与其他一些消息中间件不同的是:在Kafka的消费理念中还有一层消费组(Consumer Group)的概念,每个消费者都有一个对应的消费组。当消息发布到主题后,只会被投递给订阅它的每个消费组中的一个消费者。

对于消息中间件而言,一般有两种消息投递模式:

点对点(P2P,Point-to-Point)模式,点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息。发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者从主题中订阅消息。主题使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递。发布/订阅(Pub/Sub)模式:发布/订阅模式在消息的一对多广播时采用。

如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每一个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。

如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

消费组是一个逻辑上的概念,它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。

消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同的机器上。

6.4.1 生产者开发

一个正常的消费逻辑需要具备以下几个步骤

配置消费者客户端参数及创建相应的消费者实例。订阅主题。拉取消息并消费。提交消费位移。关闭消费者实例。

package cn.bytecollege.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.clients.consumer.ConsumerRecords;import org.apache.kafka.clients.consumer.KafkaConsumer;import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;import java.util.Arrays;import java.util.Properties;public class KafkaConsumerDemo {    public static final String brokeList = "localhost:9092";    public static final String topic = "byte";    public static final String groupId = "groupId1";    public static Properties initConfig() {        Properties properties = new Properties();        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokeList);        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer.client.id.1");        return properties;    }    public static void main(String[] args) {        Properties properties = initConfig();        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);        consumer.subscribe(Arrays.asList(topic));        while (true){            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));            records.forEach((r) -> {                System.out.println("topic=" + r.topic() + ",partition=" + r.partition()+",offset="+r.offset());                System.out.println("key="+r.key()+",value="+r.value());            });        }    }}

在创建真正的消费者实例之前需要做相应的参数配置,例如生产者代码中设置消费者所属的消费组的名称、连接地址等。参照生产者代码initConfig()方法,在Kafka消费者客户端KafkaConsumer中有4个参数是必填的。

bootstrap.servers:该参数的释义和生产者客户端 KafkaProducer 中的相同,用来 指 定 连 接 Kafka 集 群 所 需 的 broker 地 址 清 单,具 体 内 容 形 式 为host1:port1,host2:post,可以设置一个或多个地址,中间用逗号隔开,此参数的默认值为“”。注意这里并非需要设置集群中全部的broker地址,消费者会从现有的配置中查找到全部的Kafka集群成员。这里设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上group.id:消费者隶属的消费组的名称,默认值为“”。如果设置为空,则会报出异常:Exception in thread "main"org.apache.kafka.common.errors.InvalidGroupIdException:The configured groupId is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称key.deserializer 和 value.deserializer:与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key和value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org.apache.kafka.common.serialization.StringDeserializer,单单指定StringDeserializer是错误的。

initConfig()方法里还设置了一个参数client.id,这个参数用来设定KafkaConsumer对应的客户端id,默认值也为“”。如果客户端不设置,则KafkaConsumer会自动生成一个非空字符串,内容形式如“consumer-1”“consumer-2”,即字符串“consumer-”与数字的拼接。

KafkaConsumer中的参数众多,远非示例initConfig()方法中的那样只有5个,开发人员可以根据业务应用的实际需求来修改这些参数的默认值,以达到灵活调配的目的。一般情况下,普通开发人员无法全部记住所有的参数名称,只能有个大致的印象,在实际使用过程中,诸如“key.deserializer”“auto.offset.reset”之类的字符串经常由于人为因素而书写错误。为此,我们可以直接使用客户端中的org.apache.kafka.clients.consumer.ConsumerConfig 类来做一定程度上的预防,每个参数在ConsumerConfig类中都有对应的名称。

6.4.2 订阅与分区

在创建好消费者之后,我们就需要为该消费者订阅相关的主题了。一个消费者可以订阅一个或多个主题,在消费者代码中我们使用subscribe()方法订阅了一个主题,对于这个方法而言,既可以以集合的形式订阅多个主题,也可以以正则表达式的形式订阅特定模式的主题。subscribe的几个重载方法如下:

public void subscribe(Collection<String> topics) {}public void subscribe(Pattern pattern) {}

对于消费者使用集合的方式(subscribe(Collection))来订阅主题而言,比较容易理解,订阅了什么主题就消费什么主题中的消息。如果前后两次订阅了不同的主题,那么消费者以最后一次的为准。

consumer.subscribe(Arrays.asList(topic1));consumer.subscribe(Arrays.asList(topic2));

上面的示例中,最终消费者订阅的是topic2,而不是topic1,也不是topic1和topic2的并集。

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅,在之后的过程中,如果有人又创建了新的主题,并且主题的名字与正则表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。在Kafka 和其他系统之间进行数据复制时,这种正则表达式的方式就显得很常见。正则表达式的方式订阅的示例如下:

consumer.subscribe(Pattern.compile("topic-.*"));

消费者不仅可以通过KafkaConsumer.subscribe()方法订阅主题,还可以直接订阅某些主题的特定分区,在KafkaConsumer中还提供了一个assign()方法来实现这些功能,此方法的具体定义如下:

public void assign(Collection<TopicPartition> partitions) {}

这个方法只接受一个参数partitions,用来指定需要订阅的分区集合。这里补充说明一下TopicPartition类,在Kafka的客户端中,它用来表示分区。

TopicPartition类只有2个属性:topic和partition,分别代表分区所属的主题和自身的分区编号,这个类可以和我们通常所说的主题—分区的概念映射起来。

我们将代码中的subscribe()方法修改为assign()方法,这里只订阅topic-demo主题中分区编号为0的分区,相关代码如下:

consumer.assign(new TopicPartition("byte",0));

如果我们事先并不知道主题中有多少个分区怎么办?KafkaConsumer 中的partitionsFor()方法可以用来查询指定主题的元数据信息。

List<PartitionInfo> partitionsFor(String var1);

其中PartitionInfo类型即为主题的分区元数据信息。

public class PartitionInfo {    private final String topic;    private final int partition;    private final Node leader;    private final Node[] replicas;    private final Node[] inSyncReplicas;    private final Node[] offlineReplicas;}

PartitionInfo类中的属性topic表示主题名称,partition代表分区编号,leader代表分区的leader副本所在的位置,replicas代表分区的AR集合,inSyncReplicas代表分区的ISR集合,offlineReplicas代表分区的OSR集合。

既然有订阅,那么就有取消订阅,可以使用 KafkaConsumer 中的 unsubscribe()方法来取消主题的订阅。这个方法既可以取消通过 subscribe(Collection)方式实现的订阅,也可以取消通过subscribe(Pattern)方式实现的订阅,还可以取消通过 assign(Collection)方式实现的订阅。

consumer.unsunscribe();

如果将subscribe(Collection)或assign(Collection)中的集合参数设置为空集合,那么作用等同于unsubscribe()方法,如果没有订阅任何主题或分区,那么再继续执行消费程序的时候会报出IllegalStateException异常。

集合订阅的方式subscribe(Collection)、正则表达式订阅的方式subscribe(Pattern)和指定分区的订阅方式 assign(Collection)分表代表了三种不同的订阅状态:AUTO_TOPICS、AUTO_PATTERN和USER_ASSIGNED(如果没有订阅,那么订阅状态为NONE)。然而这三种状态是互斥的,在一个消费者中只能使用其中的一种,否则会报出IllegalStateException异常。

过 subscribe()方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组内的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。而通过assign()方法订阅分区时,是不具备消费者自动均衡的功能的。

6.4.3 消费消息

Kafka中的消费是基于拉模式的。消息的消费一般有两种模式:推模式和拉模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

从消费者代码中可以看出,Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll()方法,而poll()方法返回的是所订阅的主题(分区)上的一组消息。

对于poll()方法而言,如果某些分区中没有可供消费的消息,那么此分区对应的消息拉取的结果就为空;如果订阅的所有分区中都没有可供消费的消息,那么poll()方法返回为空的消息集合。

注意到poll()方法里还有一个超时时间参数timeout,用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。注意这里 timeout 的类型是 Duration,它是JDK8中新增的一个与时间有关的类型。

poll(long)方法中timeout的时间单位固定为毫秒,而poll(Duration)方法可以根据Duration中的 ofMillis()、ofSeconds()、ofMinutes()、ofHours()等多种不同的方法指定不同的时间单位,灵活性更强。并且 poll(long)方法也已经被标注为@Deprecated,虽然目前还可以使用,如果条件允许的话,还是推荐使用poll(Duration)的方式。

timeout的设置取决于应用程序对响应速度的要求,比如需要在多长时间内将控制权移交给执行轮询的应用线程。可以直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是从Kafka中拉取并消费消息,则可以将这个参数设置为最大值Long.MAX_VALUE。

消费者消费到的每条消息的类型为ConsumerRecord(注意与ConsumerRecords的区别),这个和生产者发送的消息类型ProducerRecord相对应,不过ConsumerRecord中的内容更加丰富

public class ConsumerRecord<K, V> {    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;    public static final int NULL_SIZE = -1;    /*** @deprecated checksums are no longer exposed by this class, this constant will be removed in Apache Kafka 4.0*             (deprecated since 3.0).*/    @Deprecated    public static final int NULL_CHECKSUM = -1;    private final String topic;    private final int partition;    private final long offset;    private final long timestamp;    private final TimestampType timestampType;    private final int serializedKeySize;    private final int serializedValueSize;    private final Headers headers;    private final K key;    private final V value;    private final Optional<Integer> leaderEpoch;}

topic 和 partition 这两个字段分别代表消息所属主题的名称和所在分区的编号。offset 表示消息在所属分区的偏移量。timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。timestampType 有两种类型:CreateTime 和LogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳。headers表示消息的头部内容。key 和 value 分别表示消息的键和消息的值,一般业务应用要读取的就是value,serializedKeySize和serializedValueSize分别表示key和value经过序列化之后的大小,如果key为空,则serializedKeySize值为-1。同样,如果value为空,则serializedValueSize的值也会为-1。

poll()方法的返回值类型是 ConsumerRecords,它用来表示一次拉取操作所获得的消息集,内部包含了若干ConsumerRecord,它提供了一个iterator()方法来循环遍历消息集内部的消息。

在消费者代码中,我们使用这种方法来获取消息集中的每一个ConsumerRecord。除此之外,我们还可以按照分区维度来进行消费,这一点很有用,在手动提交位移时尤为明显,ConsumerRecords类提供了一个records(TopicPartition)方法来获取消息集中指定分区的消息。

6.5 SpringBoot集成Kafka新建SpringBoot项目后添加依赖:

<dependency>    <groupId>org.springframework.kafka</groupId>    <artifactId>spring-kafka</artifactId></dependency>
在application.yml中配置Kafka
spring:  application:    name: kafka-demo  kafka:    producer:      key-serializer: org.apache.kafka.common.serialization.StringSerializer      value-serializer: org.apache.kafka.common.serialization.StringSerializer    consumer:      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer    	group-id: groupId1    bootstrap-servers: localhost:9092  	
新建Controller用于接收请求,并向Kafka发送消息
package cn.bytecollege.kafka.controller;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.kafka.core.KafkaTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class KafkaController {    @Autowired    private KafkaTemplate<String,String> kafkaTemplate;    @GetMapping("/send")    public String send(){        kafkaTemplate.send("byte","SpringBoot Kafka");        return "ok";    }}
新建Handler用于消费消息
package cn.bytecollege.kafka.listener;import lombok.extern.java.Log;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.springframework.kafka.annotation.KafkaListener;import org.springframework.stereotype.Service;@Service    @Log    public class MessageListener {        @KafkaListener(topics = {"byte"})        public void msgHandler(ConsumerRecord<String,String> consumerRecord){            log.info("接收到消息:"+consumerRecord.value()+",偏移量"+consumerRecord.offset());        }    }

标签: #pythonfornetbeans