龙空技术网

Kafka的生产者案例和消费者原理解析—你了解多少?

图灵学院诸葛老师 198

前言:

如今看官们对“kafka一个生产者多个消费者”大体比较看重,咱们都需要了解一些“kafka一个生产者多个消费者”的相关知识。那么小编同时在网摘上搜集了一些对于“kafka一个生产者多个消费者””的相关资讯,希望朋友们能喜欢,朋友们快快来学习一下吧!

引言

一、Kafka的Producer小案例

假设我们现在有一个电商系统,凡是能登录系统的用户都是会员,会员的价值体现在,消费了多少钱,就会累计相应的积分。积分可以兑换礼品包,优惠券···等等。

又到了我们的画图时间。首先我们得先来一个订单系统,那这个订单系统中肯定就会有数据日志产生,它现在就是把这些日志写到Kafka里面,日志我们使用json的方式记录。图中的statement表示订单状态,此时是已支付。

此时担任我们消费者的肯定就是会员系统了,它要对这个id为1的会员进行积分累计。当然必须要考虑到的情况是,这个会员有可能也会进行退款操作,那相应的积分也会减少。statement此时为cancel取消

我们上一讲中的设置参数中,提到我们可以给每一个消息设置一个key,也可以不指定,这个key跟我们要把这个消息发送到哪个主题的哪个分区是有关系的。比如我们现在有一个主题叫 tellYourDream,主题下面有两个分区,两个分区分别存在两个副本(此时我们不关注follower,因为它的数据是同步leader的)

Topic:tellYourDream p0:leader partition <- follower partition p1:leader partition <- follower partition复制代码

如果是不指定key的时候,发送的一条消息会以轮询的方式发送到分区里面。也就是比如说,我第一条消息是one,那这个one就发送到了p0里面,第二条是two,就发送到了p1里面,之后的three就是p0,four就是p1···依次类推。

如果指定key,比如我的key为message1,Kafka就会取得这个key的hash值,取到的数字再对我们的分区数取模,然后根据取模的值来决定哪个分区(例如我们现在是p0,p1两个分区,取模的值就只会是0,1),取模为0,就发送到p0,取模为1,就发送到p1,这样的做法可以保证key相同的消息一定会被发送到同一个分区(也可以使用这个特性来规定某些消息一定会发送到指定的分区)。这个做法和MapReduce的shuffle是不是又类似了,所以这些大数据框架,真的互通点很多。

对于我们刚刚提到的会员系统,如果此时用户下单时的消息发送到了p0,而退款的消息发送到了p1,难免有时会发生消费者先消费到p1中的消息的情况,此时用户的积分还没有增加,就已经扣除1000了,显示就会出现问题。所以为了保证同一个用户的消息发送到同一个分区中,我们需要将其指定key。

代码部分

因为在 Kafka的生产者原理及重要参数说明 中我们已经把下面的prop.put的所有配置都已经解释过了,所以这次就直接ctrl+c,ctrl+v上来。其实就是把那时候的创建生产者的代码抽取出来成为一个createProducer()方法而已。

public class OrderProducer {	public static KafkaProducer<String, String> createProducer() {		Properties props = new Properties();		props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092"); 		props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");		props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");		props.put("buffer.memory", 33554432);		props.put("compression.type", "lz4");		props.put("batch.size", 32768);		props.put("linger.ms", 100);		props.put("retries", 10);//5 10		props.put("retry.backoff.ms", 300);		props.put("request.required.acks", "1");		KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);		return producer;	}复制代码

这里就是一段生产JSON格式的消息代码而已,也抽取成一个方法。

	public static JSONObject createRecord() {		JSONObject order=new JSONObject();		order.put("userId", 12344);		order.put("amount", 100.0);		order.put("statement", "pay");		return order;	}复制代码

这里就是直接创建生产者和消息,此时key使用userId或者订单id都行,问题不大。

 public static void main(String[] args) throws Exception { KafkaProducer<String, String> producer = createProducer(); JSONObject order=createRecord(); ProducerRecord<String, String> record = new ProducerRecord<>( "tellYourDream",order.getString("userId") ,order.toString()); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null) { System.out.println("消息发送成功");  } else { //进行处理 } } }); Thread.sleep(10000);  producer.close(); } }复制代码

此时如果进行过重试机制后,消息还存在异常的话,公司比较严谨的项目都会有备用链路,比如把数据存到MySQL,Redis···等来保证消息不会丢失。

补充:自定义分区(可自行了解)

因为其实Kafka自身提供的机制已经基本满足生产环境中的使用了,所以这块就不展开详细的说明了。此外还有自定义序列化,自定义拦截器,这些在工作当中使用得频率不高,如果用到大概可以进行百度自行学习。

例如,通话记录中,给客服打电话的记录要存到一个分区中,其余的记录均分的分布到剩余的分区中。我们就这个案例来进行演示,要自定义的情况就要实现Partition接口,然后实现3个方法,说是实现3个,其实主要也就实现partition()这个方法而已。

package com.bonc.rdpe.kafka110.partitioner;import java.util.List;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;import org.apache.kafka.common.Cluster;import org.apache.kafka.common.PartitionInfo;/*** @Title PhonenumPartitioner.java * @Description 自定义分区器* @Date 2018-06-25 14:58:14*/public class PhonenumPartitioner implements Partitioner{ @Override public void configure(Map<String, ?> configs) { // TODO nothing } @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 得到 topic 的 partitions 信息 List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); // 模拟某客服 if(key.toString().equals("10000") || key.toString().equals("11111")) { // 放到最后一个分区中 return numPartitions - 1; } String phoneNum = key.toString(); return phoneNum.substring(0, 3).hashCode() % (numPartitions - 1); } @Override public void close() { // TODO nothing }}复制代码

使用自定义分区器

package com.bonc.rdpe.kafka110.producer;import java.util.Properties;import java.util.Random;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.clients.producer.RecordMetadata;/** * @Title PartitionerProducer.java  * @Description 测试自定义分区器 * @Date 2018-06-25 15:10:04 */public class PartitionerProducer { private static final String[] PHONE_NUMS = new String[]{ "10000", "10000", "11111", "13700000003", "13700000004", "10000", "15500000006", "11111", "15500000008",  "17600000009", "10000", "17600000011"  }; public static void main(String[] args) throws Exception {  Properties props = new Properties(); props.put("bootstrap.servers", "192.168.42.89:9092,192.168.42.89:9093,192.168.42.89:9094"); // 设置分区器 props.put("partitioner.class", "com.bonc.rdpe.kafka110.partitioner.PhonenumPartitioner"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  Producer<String, String> producer = new KafkaProducer<>(props);  int count = 0; int length = PHONE_NUMS.length;  while(count < 10) { Random rand = new Random(); String phoneNum = PHONE_NUMS[rand.nextInt(length)]; ProducerRecord<String, String> record = new ProducerRecord<>("dev3-yangyunhe-topic001", phoneNum, phoneNum); RecordMetadata metadata = producer.send(record).get(); String result = "phonenum [" + record.value() + "] has been sent to partition " + metadata.partition(); System.out.println(result); Thread.sleep(500); count++; } producer.close(); }}复制代码

自定义分区结果:

二、Kafka消费者原理解析

1.offset 偏移量

此时再次请出我们的kafka集群,有多个消费者同时去消费集群中的信息

如果程序一直在稳定执行,那我们的整个流程是不会出现啥问题的,可是现在如果程序停止执行了呢?有可能是程序出现了bug,也有可能是因为我们进行修改手动停止了程序。那下一次恢复的时候,消费者又该从哪个地方开始消费?

Topic:tellYourDream ConsumerA tellYourDream:p0(10000) tellYourDream:p1(10001)复制代码

offset就类似于数组下标的那种理解类似,比如数组的下标为什么要从0开始,基于数组的内存模型。就是所处数组位置离首地址的距离而定。array[0]就是偏移为0的位置,也就是首地址。array[k]也就是偏移为k的位置。kafka中的offset也是同样的理解,这个偏移量其实就是记录一个位置而使用的。用来标识消费者这次消费到了这个位置。

在kafka里面,kafka是不帮忙维护这个offset偏移量的,这个offset需要consumer自行维护。kafka提供了两个关于offset的参数,一个是enable_auto_commit,当这个参数设置为true的时候,每次重启kafka都会把所有的数据重新消费一遍。再一个是auto_commit_interval_ms,这个是每次提交offset的一个时间间隔。

这个offset的存储位置在0.8版本(再次划重点,0.8之前的kafka尽量不要使用)之前,是存放在zookeeper里面的。这个设计明显是存在问题的,整个kafka集群有众多的topic,而系统中又有成千上万的消费者去消费它们,如果offset存放在zookeeper上,消费者每次都要提交给zookeeper这个值,这样zookeeper能顶得住吗?如果这时候觉得没啥问题的同学,那你就是没认真去读 插曲:Kafka的集群部署实践及运维相关 中的 3.4---消费信息 啦,赶快去复习一下。

在0.8版本之后,kafka就把这个offset存在了内部的一个主题里面,这个主题的名字叫做 consumer_offset。这个内部主题默认有50个分区,我们知道,消费者组是有它们的一个group.id的。提交过去的时候,key是group.id+topic+分区号(这是为了保证Kakfa集群中同分区的数据偏移量都提交到consumer_offset的同一个分区下)。这句话有点绕口,不过请务必读懂。

value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact。也就是每个group.id+topic+分区号就保留最新的那条数据即可。而且因为这个 consumer_offsets可能会接收高并发的请求,所以默认分区50个,这样如果你的kafka部署了一个大的集群,比如有50台机器,就可以用50台机器来抗offset提交的请求压力,就好很多。

2.Coordinator

每个consumer group都会选择一个broker作为自己的coordinator,负责监控这个消费组里的各个消费者的心跳,以及判断是否宕机,然后开启rebalance, 根据内部的一个选择机制,会挑选一个对应的Broker,Kafka会把各个消费组均匀分配给各个Broker作为coordinator来进行管理,consumer group中的每个consumer刚刚启动就会跟选举出来的这个consumer group对应的coordinator所在的broker进行通信,然后由coordinator分配分区给这个consumer来进行消费。coordinator会尽可能均匀的分配分区给各个consumer来消费。

2.1 如何选择哪台是coordinator?

首先对消费组的groupId进行hash,接着对consumer_offsets的分区数量取模,默认是50,可以通过offsets.topic.num.partitions来设置,找到你的这个consumer group的offset要提交到consumer_offsets的哪个分区。比如说:groupId,“membership-consumer-group” -> hash值(数字)-> 对50取模(结果只能是0~49,又是以往的那个套路) -> 就知道这个consumer group下的所有的消费者提交offset的时候是往哪个分区去提交offset,找到consumer_offsets的一个分区(这里consumer_offset的分区的副本数量默认来说1,只有一个leader),然后对这个分区找到对应的leader所在的broker,这个broker就是这个consumer group的coordinator了,consumer接着就会维护一个Socket连接跟这个Broker进行通信。

其实简单点解释,就是找到consumer_offsets中编号和它对应的一个分区而已。取模后是2,那就找consumer_offsets那50个分区中的第二个分区,也就是p1。取模后是10,那就找consumer_offsets那50个分区中的第十个分区,也就是p9.

2.2 coordinator完成了什么工作

然后这个coordinator会选出一个leader consumer(谁先注册上来,谁就是leader),这时候coordinator也会把整个Topic的情况汇报给leader consumer,,由leader consumer来制定消费方案。之后会发送一个SyncGroup请求把消费方案返回给coordinator。

用一小段话再总结一遍吧:

首先有一个消费者组,这个消费者组会有一个它们的group.id号,根据这个可以计算出哪一个broker作为它们的coodinator,确定了coordinator之后,所有的consumer都会发送一个join group请求注册。之后coordinator就会默认把第一个注册上来的consumer选择成为leader consumer,把整个Topic的情况汇报给leader consumer。之后leader consumer就会根据负载均衡的思路制定消费方案,返回给coordinator,coordinator拿到方案之后再下发给所有的consumer,完成流程。

consumer都会向coordinator发送心跳,可以认为consumer是从节点,coordinator是主节点。当有consumer长时间不再和coordinator保持联系,就会重新把分配给这个consumer的任务重新执行一遍。如果断掉的是leader consumer,就会重新选举新的leader,再执行刚刚提到的步骤。

2.3 分区方案的负载均衡

如果临时有consumer加入或退出,leader consumer就需要重新制定消费方案。

比如我们消费的一个主题有12个分区: p0,p1,p2,p3,p4,p5,p6,p7,p8,p9,p10,p11

假设我们的消费者组里面有三个消费者

2.3.1 range策略

range策略就是按照partiton的序号范围

p0~3 consumer1p4~7 consumer2p8~11 consumer3复制代码

2.3.2.round-robin策略

consumer1:0,3,6,9consumer2:1,4,7,10consumer3:2,5,8,11复制代码

但是前面的这两个方案有个问题: 假设consuemr1挂了:p0-5分配给consumer2,p6-11分配给consumer3 这样的话,原本在consumer2上的的p6,p7分区就被分配到了 consumer3上。

2.3.3.sticky策略

最新的一个sticky策略,就是说尽可能保证在rebalance的时候,让原本属于这个consumer 的分区还是属于他们, 然后把多余的分区再均匀分配过去,这样尽可能维持原来的分区分配的策略

consumer1:0-3consumer2: 4-7consumer3: 8-11 假设consumer3挂了consumer1:0-3,+8,9consumer2: 4-7,+10,11复制代码

2.3.4 Rebalance分代机制

在rebalance的时候,可能你本来消费了partition3的数据,结果有些数据消费了还没提交offset,结果此时rebalance,把partition3分配给了另外一个consumer了,此时你如果提交partition3的数据的offset,能行吗?必然不行,所以每次rebalance会触发一次consumer group generation,分代,每次分代会加1,然后你提交上一个分代的offset是不行的,那个partiton可能已经不属于你了,大家全部按照新的partiton分配方案重新消费数据。

以上就是比较重要的事情了,之后到了轻松愉快的代码时间。

三、消费者代码部分

其实和生产者不能说它们一模一样可是结构完全就是一样的,所以会比生产者的时候更加简短点。因为已经知道有这些东西了,很多东西通过搜索引擎就不难解决了。

public class ConsumerDemo {	private static ExecutorService threadPool = Executors.newFixedThreadPool(20);		public static void main(String[] args) throws Exception {		KafkaConsumer<String, String> consumer = createConsumer();				//指定消费的主题		consumer.subscribe(Arrays.asList("order-topic")); 		try {			while(true) { 			 			 //这里设置的是一个超时时间				ConsumerRecords<String, String> records = consumer.poll(Integer.MAX_VALUE); 								//对消费到的数据进行业务处理				for(ConsumerRecord<String, String> record : records) {					JSONObject order = JSONObject.parseObject(record.value()); 					threadPool.submit(new CreditManageTask(order));				}			}		} catch(Exception e) {			e.printStackTrace();			consumer.close();		}	}		private static KafkaConsumer<String, String> createConsumer() {		 //设置参数的环节		Properties props = new Properties();		props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");		props.put("group.id", "test-group");		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");		props.put("heartbeat.interval.ms", 1000); // 这个尽量时间可以短一点		props.put("session.timeout.ms", 10 * 1000); // 如果说kafka broker在10秒内感知不到一个consumer心跳		props.put("max.poll.interval.ms", 30 * 1000); // 如果30秒才去执行下一次poll		// 就会认为那个consumer挂了,此时会触发rebalance		// 如果说某个consumer挂了,kafka broker感知到了,会触发一个rebalance的操作,就是分配他的分区		// 给其他的cosumer来消费,其他的consumer如果要感知到rebalance重新分配分区,就需要通过心跳来感知		// 心跳的间隔一般不要太长,1000,500		props.put("fetch.max.bytes", 10485760);		props.put("max.poll.records", 500); // 如果说你的消费的吞吐量特别大,此时可以适当提高一些		props.put("connection.max.idle.ms", -1); // 不要去回收那个socket连接		// 开启自动提交,他只会每隔一段时间去提交一次offset		// 如果你每次要重启一下consumer的话,他一定会把一些数据重新消费一遍		props.put("enable.auto.commit", "true");		// 每次自动提交offset的一个时间间隔		props.put("auto.commit.ineterval.ms", "1000");		// 每次重启都是从最早的offset开始读取,不是接着上一次		props.put("auto.offset.reset", "earliest"); 		KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);		return consumer;	}		static class CreditManageTask implements Runnable {		private JSONObject order;		public CreditManageTask(JSONObject order) {			this.order = order;		}		@Override		public void run() {			System.out.println("对订单进行积分的维护......" + order.toJSONString()); 			// 就可以做一系列的数据库的增删改查的事务操作		}	}}复制代码

3.1 消费者的核心参数

3.1.1 【heartbeat.interval.ms】

consumer心跳时间,必须得保持心跳才能知道consumer是否故障了,然后如果故障之后,就会通过心跳下发rebalance的指令给其他的consumer通知他们进行rebalance的操作

3.1.2 【session.timeout.ms】

kafka多长时间感知不到一个consumer就认为他故障了,默认是10秒

3.1.3 【max.poll.interval.ms】

如果在两次poll操作之间,超过了这个时间,那么就会认为这个consume处理能力太弱了,会被踢出消费组,分区分配给别人去消费,一遍来说结合你自己的业务处理的性能来设置就可以了

3.1.4【fetch.max.bytes】

获取一条消息最大的字节数,一般建议设置大一些

3.1.5 【max.poll.records】

一次poll返回消息的最大条数,默认是500条

3.1.6 【connection.max.idle.ms】

consumer跟broker的socket连接如果空闲超过了一定的时间,此时就会自动回收连接,但是下次消费就要重新建立socket连接,这个建议设置为-1,不要去回收

3.1.7 【auto.offset.reset】

earliest:	当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费	topicA -> partition0:1000 		 partitino1:2000 			 latest:	当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从当前位置开始消费none:	topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常复制代码

注:我们生产里面一般设置的是latest

3.1.8 【enable.auto.commit】

这个就是开启自动提交唯一

3.1.9 【auto.commit.ineterval.ms】

这个指的是多久条件一次偏移量

四、加餐时间:补充第一篇没提到的内容

日志二分查找

其实这也可以被称作稀松索引。也是一个类似跳表的结构。打开某主题下的分区,我们能看到这样的一些文件

00000000000000000000.index(偏移量的索引)00000000000000000000.log(日志文件)00000000000000000000.timeindex(时间的索引)复制代码

日志段文件,.log文件会对应一个.index和.timeindex两个索引文件。kafka在写入日志文件的时候,同时会写索引文件,就是.index和.timeindex,一个是位移索引,一个是时间戳索引。

默认情况下,有个参数log.index.interval.bytes限定了在日志文件写入多少数据,就要在索引文件写一条索引,默认是4KB,写4kb的数据然后在索引里写一条索引,所以索引本身是稀疏格式的索引,不是每条数据对应一条索引的。而且索引文件里的数据是按照位移和时间戳升序排序的,所以kafka在查找索引的时候,会用二分查找,时间复杂度是O(logN),找到索引,就可以在.log文件里定位到数据了。

上面的0,2039···这些代表的是物理位置。为什么稀松索引会比直接一条条读取速度快,它不是每一条数据都会记录,是相隔几条数据的记录方式,但是就比如现在要消费偏移量为7的数据,就直接先看这个稀松索引上的记录,找到一个6时,7比6大,然后直接看后面的数据,找到8,8比7大,再看回来,确定7就是在6~8之间,而6的物理位置在9807,8的物理位置在12345,直接从它们中间去找。就提升了查找物理位置的速度。就类似于普通情况下的二分查找。

ISR机制

光是依靠多副本机制能保证Kafka的高可用性,但是能保证数据不丢失吗?不行,因为如果leader宕机,但是leader的数据还没同步到follower上去,此时即使选举了follower作为新的leader,当时刚才的数据已经丢失了。

ISR是:in-sync replica,就是跟leader partition保持同步的follower partition的数量,只有处于ISR列表中的follower才可以在leader宕机之后被选举为新的leader,因为在这个ISR列表里代表他的数据跟leader是同步的。

如果要保证写入kafka的数据不丢失,首先需要保证ISR中至少有一个follower,其次就是在一条数据写入了leader partition之后,要求必须复制给ISR中所有的follower partition,才能说代表这条数据已提交,绝对不会丢失,这是Kafka给出的承诺

那什么情况下副本会被踢出出ISR呢,如果一个副本超过10s没有去和leader同步数据的话,那么它就会被踢出ISR列表。但是这个问题如果解决了(网络抖动或者full gc···等),follower再次和leader同步了,leader会有一个判断,如果数据差异小就会让follower重新加入,那么怎么才算差异大,怎么才算小呢,咱们留到源码时说明。这次的篇幅非常非常长,而且需要理解的地方也不少,后面其实本来在kafka的内核里还有个HW&LEO原理的,可自己都懒得继续写了hhh。

小结

欢迎关注头条号:JAVA大飞哥

点击关注评论转发一波~~

私信小编发送“架构”(免费获取JAVA相关的面试架构资料哟)可以获取上方资料哦

最后,每一位读到这里的Java程序猿朋友们,感谢你们能耐心地看完。希望在成为一名更优秀的Java程序猿的道路上,我们可以一起学习、一起进步!都能赢取白富美,走向架构师的人生巅峰!

标签: #kafka一个生产者多个消费者