龙空技术网

kafka消费者详解

马龙飞鸽 447

前言:

现时大家对“kafka中的一个消费者可以属于几个消费者组”都比较关注,小伙伴们都想要知道一些“kafka中的一个消费者可以属于几个消费者组”的相关资讯。那么小编同时在网络上搜集了一些对于“kafka中的一个消费者可以属于几个消费者组””的相关知识,希望姐妹们能喜欢,各位老铁们一起来了解一下吧!

kafka消费方式

消息的消费一般有两种模式:推模式和拉模式。

推模式(push)是服务端主动将消息推送给消费者,很难适应消费的处理能力,有可能造成消费者处理不过来,导致拒绝服务或者消息拥塞。拉模式(pull)是消费者主动向服务端发起请求来拉取消息,消费者根据处理能力进行消费。但如果没有消息的话可能造成空循环,可以设置轮询时间解决。

消费者和消费者群组

在Kafka 的消费概念中有个消费组(Consumer Group)的概念, 每个消费者都有一个对应的消费组。当消息发布到主题后, 只会被投递给订阅它的每个消费组中的一个消费者。

消费者群组是一个逻辑上的概念, 它将旗下的消费者归为一类,每一个消费者只隶属于一个消费组。每一个消费组都会有一个固定的名称,消费者在进行消费时需要指定其所属消费组的名称,这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。消费者并非逻辑上的概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上, 也可以部署在不同的机器上。

简单的说,对于同一topic,不同的消费者group,可以消费相同的分区,同一group的消费者只能消费不同的分区。这样可以通过增加消费者线程数,来提高并发处理能力。

消费者线程数等于分区数,性能最优消费者线程数大于分区数,会有消费者线程空闲。消费者线程数小于分区数,一个线程要处理多个分区数据。

消费者线程

kafka消费者是非线程安全的,也是说在同一个群组中,无法让一个线程运行多个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。最好把消费者的逻辑封装在自己的对象中,然后使用java的ExecutorService启动多个线程,使每个消费者运行在自己的线程上

分区分配

一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。

一个partition只能被同一个消费者组里的一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于等于同时运行的consumer的数量。建议partition的数量大于等于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,如果partition数量越大,就要为kafka分配更大的heap space。

Kafka 有两种分区分配策略:

RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数最大差别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。Range,默认为Range

当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。

再均衡

再均衡是指分区的所属权从一个消费者转移到另一消费者的过程。

满足以下条件,会产生再均衡:

消费者组中新添加消费者读取到原本是其他消费者读取的消息。消费者关闭或崩溃之后离开群组,原本由他读取的partition将由群组里其他消费者读取。当向一个Topic添加新的partition,会发生partition在消费者中的重新分配。

优点:

给消费者组带来了高可用性和伸缩性, 可以既方便又安全地删除消费组内的消费者或往消费组内添加消费者。

缺点:

再均衡期间消费者无法读取消息,整个消费群组有一小段时间不可用partition被重新分配给一个消费者时,消费者当前的读取状态会丢失,有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。消费者消费完某个分区中的一部分消息时还没有来得及提交消费位移就发生了再均衡操作, 之后这个分区又被分配给了消费组内的另一个消费者,原来被消费完的那部分消息又被重新消费一遍, 也就是发生了重复消费.

因此,需要进行安全的再均衡和避免不必要的再均衡。

offset提交

对于Kafka 中的分区而言,它的每条消息都有唯一的offset ,用来表示消息在分区中对应的位置。对于消费者而言, 它也有一个offset 的概念,消费者使用offset 来表示消费到分区中某个消息所在的位置。

在每次调用poll ()方法时,它返回的是还没有被消费过的消息集, 要做到这一点,就需要记录上一次消费时的消费位移。并且这个消费位移必须做持久化保存,而不是单单保存在内存中,否则消费者重启之后就无法知晓之前的消费位移。消费offset存储在Kafka 内部的主题__consumer_offsets 中。这里把将消费位移存储起来(持久化)的动作称为commit, 消费者在消费完消息之后需要执行消费位移的提交。

consumer group 名称+topic + partition 唯一确定一个offset

1、提交偏移量可能带来的问题:

如果提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。如果提交的偏移量大于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。

2、提交偏移量的方式:

自动提交 Automatic Commit

enable.auto.commit设置成true(默认为true),那么每过5s,消费者自动把从poll()方法接收到的最大的偏移量提交。提交的时间间隔由auto.commit.interval.ms控制,默认是5s。自动提交的优点是方便,但是可能会重复处理消息

提交当前偏移量 Commit Current Offset

将enable.auto.commit设置成false,让应用程序决定何时提交偏移量。commitSync()提交由poll()方法返回的最新偏移量,所以在处理完所有消息后要确保调用commitSync,否则会有消息丢失的风险。commitSync在提交成功或碰到无法恢复的错误之前,会一直重试。如果发生了再均衡,从最近一批消息到发生再均衡之间的所有消息都会被重复处理。

异步提交

异步提交的commitAsync,只管发送提交请求,无需等待broker响应。commitAsync提交之后不进行重试,假设要提交偏移量2000,这时候发生短暂的通信问题,服务器接收不到提交请求,因此也就不会作出响应。与此同时,我们处理了另外一批消息,并成功提交了偏移量3000,。如果commitAsync重新尝试提交2000,那么它有可能在3000之后提交成功,这个时候如果发生再均衡,就会出现重复消息。

同步和异步组合提交

一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交(finally 块中用同步提交),就要确保提交成功。

提交特定的偏移量

消费者API允许调用commitSync()和commitAsync()方法时传入希望提交的partition和offset的map,即提交特定的偏移量。

指定offset消费

在Kafka 中每当消费者查找不到所记录的消费位移时, 就会根据消费者客户端参数auto.offset.reset 的配置来决定从何处开始进行消费

auto.offset.reset=latest 表示从分区末尾开始消费消息。默认值为latestauto.offset.reset=earliest 表示消费者会从起始处,也就是0 开始消费auto.offset.reset=none 配置为此值就意味着出现查到不到消费位移的时候,既不从最新的消息位置处开始消费,也不从最早的消息位置处开始消费,此时会报出NoOffsetForParti tionException 异常

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而KafkaConsumer 中的seek()方法正好提供了这个功能,让我们得以追前消费或回溯消费。seek()方法的具体定义如下:

public void seek(TopicPartition partition , long offset)

seek()方法中的参数partition 表示分区,而offset 参数用来指定从分区的哪个位置开始消费。seek()方法只能重置消费者分配到的分区的消费位置,而分区的分配是在poll()方法的调用过程中实现的。也就是说,在执行seek()方法之前需要先执行一次poll()方法, 等到分配到分区之后才可以重置消费位置

消费者线程实现

kafka消费者非线程安全,因此,一个消费者线程,只能消费同一个topic的 一个或者多个分区数据。

消费者实现有2种方式:非并消费和并发消费

非并发消费

Properties properties = new Properties();properties.put("bootstrap.servers", "127.0.0.1:9092");properties.put("group.id", "group-1");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");String topic="sound";KafkaConsumer<String ,String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList(topic));int count=0;try {   while (true){   //拉取数据   ConsumerRecords<String, String> records = consumer.poll(1000);   for (ConsumerRecord<String,String> record:records){   System.out.println(record.topic()+" "+record.key() +" "+ record.value() );   } //方案1,同步提交 consumer.commitSync(); //方案2, 异步提交 //consumer.commitASync(); }}catch (Exception ex){ //和异步提交配合 //consumer.commitSync();}
并发消费
public class ConcurrentConsumer { Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, imeUnit.MICROSECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());Properties properties = new Properties();properties.put("bootstrap.servers", "127.0.0.1:9092");properties.put("group.id", "group-1");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("auto.offset.reset", "earliest");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String ,String> consumer = new KafkaConsumer<String, String>(properties);consumer.subscribe(Arrays.asList(topic));try { while (true){ ConsumerRecords<String, String> records = consumer.poll(1000); if(!records.isEmpty()){ executorService.submit(new RecordHandler(records)); } //提交offset synchronized (offsets){//提交offset if(!offsets.isEmpty()){ consumer.commitSync(offsets); offsets.clear(); }} }}catch (Exception ex){ if(consumer!=null){ consumer.close(); }}//业务处理线程class RecordHandler extends Thread{ private ConsumerRecords<String, String> records; RecordHandler( ConsumerRecords<String, String> records ){ this.records = records; } @Override public void run(){TopicPartition topicPartition=null; for (ConsumerRecord<String,String> record:records){ System.out.println(record.topic()+" "+record.key() +" "+ record.value() ); //记录offset synchronized (offsets){ if(topicPartition==null){ topicPartition = new TopicPartition(record.topic(),record.partition()); } long offset = record.offset(); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset+1); offsets.put(topicPartition,offsetAndMetadata); } }}}}

并发消费适用于,对消息顺序较高的业务需求。

非并发消费适用于,不关心消息顺序的业务。

消费者参数配置

1、fetch.min.bytes:指定消费者从broker获取消息的最小字节数,即等到有足够的数据时才把它返回给消费者。

2、fetch.max.wait.ms:等待broker返回数据的最大时间,默认是500ms。fetch.min.bytes和fetch.max.wait.ms哪个条件先得到满足,就按照哪种方式返回数据。

3、max.partition.fetch.bytes:指定broker从每个partition中返回给消费者的最大字节数,默认1MB。

4、session.timeout.ms:指定消费者被认定死亡之前可以与服务器断开连接的时间,默认是3s。

5、auto.offset.reset:消费者在读取一个没有偏移量或者偏移量无效的情况下(因为消费者长时间失效,包含偏移量的记录已经过时并被删除)该作何处理。默认是latest(消费者从最新的记录开始读取数据)。另一个值是earliest(消费者从起始位置读取partition的记录)

6、enable.auto.commit:指定消费者是否自动提交偏移量,默认为true。

7、partition.assignment.strategy:指定partition如何分配给消费者,默认是Range。Range:把Topic的若干个连续的partition分配给消费者。RoundRobin:把Topic的所有partition逐个分配给消费者。

8、max.poll.records:单次调用poll方法能够返回的消息数量。

标签: #kafka中的一个消费者可以属于几个消费者组