龙空技术网

Kafka:消息如何在服务端存储与读取?必须知道集群内部工作原理

大数据架构师 391

前言:

此刻大家对“kafka的消息是存储在哪里”大致比较关注,咱们都想要学习一些“kafka的消息是存储在哪里”的相关文章。那么小编也在网摘上收集了一些对于“kafka的消息是存储在哪里””的相关内容,希望朋友们能喜欢,小伙伴们快快来学习一下吧!

Kafka系列:消息是如何在服务端存储与读取的,你真的知道吗?

(本文来自公众号:z小赵)

前言

经过前 5 篇文章的介绍,估么着小伙伴们已经对消息生产和消费的流程应该有一个比较清晰的认识了。当然小伙伴们肯定也比较好奇,Kafka 能够处理千万级消息,那它的消息是如何在 Partition 上存储的呢?今天这篇文章就来为大家揭秘消息是如何存储的。本文主要从消息的逻辑存储和物理存储两个角度来介绍其实现原理。

文章概览Partition、Replica、Log 和 LogSegment 的关系。写入消息流程分析。消费消息及副本同步流程分析。Partition、Replica、Log 和 LogSegment 的关系

假设有一个 Kafka 集群,Broker 个数为 3,Topic 个数为 1,Partition 个数为 3,Replica 个数为 2。Partition 的物理分布如下图所示。

Partition分布图

从上图可以看出,该 Topic 由三个 Partition 构成,并且每个 Partition 由主从两个副本构成。每个 Partition 的主从副本分布在不同的 Broker 上,通过这点也可以看出,当某个 Broker 宕机时,可以将分布在其他 Broker 上的从副本设置为主副本,因为只有主副本对外提供读写请求,当然在最新的 2.x 版本中从副本也可以对外读请求了。将主从副本分布在不同的 Broker 上从而提高系统的可用性。

Partition 的实际物理存储是以 Log 文件的形式展示的,而每个 Log 文件又以多个 LogSegment 组成。Kafka 为什么要这么设计呢?其实原因比较简单,随着消息的不断写入,Log 文件肯定是越来越大,Kafka 为了方便管理,将一个大文件切割成一个一个的 LogSegment 来进行管理;每个 LogSegment 由数据文件和索引文件构成,数据文件是用来存储实际的消息内容,而索引文件是为了加快消息内容的读取。

可能又有朋友会问,Kafka 本身消费是以 Partition 维度顺序消费消息的,磁盘在顺序读的时候效率很高完全没有必要使用索引啊。其实 Kafka 为了满足一些特殊业务需求,比如要随机消费 Partition 中的消息,此时可以先通过索引文件快速定位到消息的实际存储位置,然后进行处理。

总结一下 Partition、Replica、Log 和 LogSegment 之间的关系。消息是以 Partition 维度进行管理的,为了提高系统的可用性,每个 Partition 都可以设置相应的 Replica 副本数,一般在创建 Topic 的时候同时指定 Replica 的个数;Partition 和 Replica 的实际物理存储形式是通过 Log 文件展现的,为了防止消息不断写入,导致 Log 文件大小持续增长,所以将 Log 切割成一个一个的 LogSegment 文件。

注意: 在同一时刻,每个主 Partition 中有且只有一个 LogSegment 被标识为可写入状态,当一个 LogSegment 文件大小超过一定大小后(比如当文件大小超过 1G,这个就类似于 HDFS 存储的数据文件,HDFS 中数据文件达到 128M 的时候就会被分出一个新的文件来存储数据),就会新创建一个 LogSegment 来继续接收新写入的消息。

写入消息流程分析

消息写入及落盘流程

流程解析

在第 3 篇文章讲过,生产者客户端对于每个 Partition 一次会发送一批消息到服务端,服务端收到一批消息后写入相应的 Partition 上。上图流程主要分为如下几步:

客户端消息收集器收集属于同一个分区的消息,并对每条消息设置一个偏移量,且每一批消息总是从 0 开始单调递增。比如第一次发送 3 条消息,则对三条消息依次编号 [0,1,2],第二次发送 4 条消息,则消息依次编号为 [0,1,2,3]。注意此处设置的消息偏移量是相对偏移量。客户端将消息发送给服务端,服务端拿到下一条消息的绝对偏移量,将传到服务端的这批消息的相对偏移量修改成绝对偏移量。将修改后的消息以追加的方式追加到当前活跃的 LogSegment 后面,然后更新绝对偏移量。将消息集写入到文件通道。文件通道将消息集 flush 到磁盘,完成消息的写入操作。

了解以上过程后,我们再来看看消息的具体构成情况。

消息构成细节图

一条消息由如下三部分构成:

OffSet:偏移量,消息在客户端发送前将相对偏移量存储到该位置,当消息存储到 LogSegment 前,先将其修改为绝对偏移量再写入磁盘。Size:本条 Message 的内容大小Message:消息的具体内容,其具体又由 7 部分组成,crc 用于校验消息,Attribute 代表了属性,key-length 和 value-length 分别代表 key 和 value 的长度,key 和 value 分别代表了其对应的内容。消息偏移量的计算过程

通过以上流程可以看出,每条消息在被实际存储到磁盘时都会被分配一个绝对偏移量后才能被写入磁盘。在同一个分区内,消息的绝对偏移量都是从 0 开始,且单调递增;在不同分区内,消息的绝对偏移量是没有任何关系的。接下来讨论下消息的绝对偏移量的计算规则。

确定消息偏移量有两种方式,一种是顺序读取每一条消息来确定,此种方式代价比较大,实际上我们并不想知道消息的内容,而只是想知道消息的偏移量;第二种是读取每条消息的 Size 属性,然后计算出下一条消息的起始偏移量。比如第一条消息内容为 “abc”,写入磁盘后的偏移量为:8(OffSet)+ 4(Message 大小)+ 3(Message 内容的长度)= 15。第二条写入的消息内容为“defg”,其起始偏移量为 15,下一条消息的起始偏移量应该是:15+8+4+4=31,以此类推。

消费消息及副本同步流程分析

和写入消息流程不同,读取消息流程分为两种情况,分别是消费端消费消息和从副本(备份副本)同步主副本的消息。在开始分析读取流程之前,需要先明白几个用到的变量,不然流程分析可能会看的比较糊涂。

BaseOffSet:基准偏移量,每个 Partition 由 N 个 LogSegment 组成,每个 LogSegment 都有基准偏移量,大概由如下构成,数组中每个数代表一个 LogSegment 的基准偏移量:[0,200,400,600, ...]。StartOffSet:起始偏移量,由消费端发起读取消息请求时,指定从哪个位置开始消费消息。MaxLength:拉取大小,由消费端发起读取消息请求时,指定本次最大拉取消息内容的数据大小。该参数可以通过max.partition.fetch.bytes来指定,默认大小为 1M。MaxOffSet:最大偏移量,消费端拉取消息时,最高可拉取消息的位置,即俗称的“高水位”。该参数由服务端指定,其作用是为了防止生产端还未写入的消息就被消费端进行消费。此参数对于从副本同步主副本不会用到。MaxPosition:LogSegment 的最大位置,确定了起始偏移量在某个 LogSegment 上开始,读取 MaxLength 后,不能超过 MaxPosition。MaxPosition 是一个实际的物理位置,而非偏移量。

假设消费端从 000000621 位置开始消费消息,关于几个变量的关系如下图所示。

位置关系图

消费端和从副本拉取流程如下:客户端确定拉取的位置,即 StartOffSet 的值,找到主副本对应的 LogSegment。LogSegment 由索引文件和数据文件构成,由于索引文件是从小到大排列的,首先从索引文件确定一个小于等于 StartOffSet 最近的索引位置。根据索引位置找到对应的数据文件位置,由于数据文件也是从小到大排列的,从找到的数据文件位置顺序向后遍历,直到找到和 StartOffSet 相等的位置,即为消费或拉取消息的位置。从 StartOffSet 开始向后拉取 MaxLength 大小的数据,返回给消费端或者从副本进行消费或备份操作。

假设拉取消息起始位置为 00000313,消息拉取流程图如下:

消息拉取流程图

总结

本文从逻辑存储和物理存储的角度,分析了消息的写入与消费流程。其中逻辑存储是以 Partition 来管理一批一批的消息,Partition 映射 Log 对象,Log 对象管理了多个 LogSegment,多个 Partition 构成了一个完整的 Topic。消息的实际物理存储是由一个一个的 LogSegment 构成,每个 LogSegment 又由索引文件和数据文件构成。下篇文章我们来分析一些实际生产环境中的常用操作及数据接入方案,敬请期待。

前言

上篇文章讲到了消息在 Partition 上的存储形式,本来准备接着来聊聊生产中的一些使用方式,想了想还有些很重要的工作组件原理没有讲清楚,比如一个 Topic 由 N 个 Partition 组成,那么这些 Partition 是如何均匀的分布在不同的 Broker 上?再比如当一个 Broker 宕机后,其上负责读写请求的主 Partition 无法正常访问,如何让从 Partition 转变成主 Partition 来继续提供正常的读写服务?想要解决这些问题,就必须先要了解一下 Kafka 集群内部的管理机制,其中一个非常重要的控制器就是 KafkaController。本文我们就来讲讲 KafkaController 是如何来解决上面提到的那些问题的。

文章概览KafkaController 是什么及其选举策略。KafkaController 监控 ZK 的目录分布。Partition 分布算法。Partition 的状态转移。Kafka 集群的负载均衡处理流程解析。KafkaController 是什么及其选举策略

Kafka 集群由多台 Broker 构成,每台 Broker 都有一个 KafkaController 用于管理当前的 Broker。试想一下,如果一个集群没有一个“领导者”,那么谁去和“外界”(比如 ZK)沟通呢?谁去协调 Partition 应该如何分布在集群中的不同 Broker 上呢?谁去处理 Broker 宕机后,在其 Broker 上的主 Partition 无法正常提供读写服务后,将对应的从 Partition 转变成主 Partition 继续正常对外提供服务呢?那么由哪个 Broker 的 KafkaController 来担当“领导者”这个角色呢?

Kafka 的设计者很聪明,Zookeeper 既然是分布式应用协调服务,那么干脆就让它来帮 Kafka 集群选举一个“领导者”出来,这个“领导者”对应的 KafkaController 称为 Leader,其他的 KafkaController 被称为 Follower,在同一时刻,一个 Kafka 集群只能有一个 Leader 和 N 个 Follower。

Zookeeper 是怎么实现 KafkaController 的选主工作呢?

稍微熟悉 Zookeeper 的小伙伴应该都比较清楚,Zookeeper 是通过监控目录(zNode)的变化,从而做出一些相应的动作。

Zookeeper 的目录分为四种,第一种是永久的,被称作为 Persistent;

第二种是顺序且永久的,被称作为 Persistent_Sequential;

第三种是临时的,被称为 Ephemeral;

第四种是顺序且临时的,被称作为 Ephemeral_Sequential。

KafkaController 正是利用了临时的这一特性来完成选主的,在 Broker 启动时,每个 Broker 的 KafkaController 都会向 ZK 的 /controller目录写入 BrokerId,谁先写入谁就是 Leader,剩余的 KafkaController 是 Follower,当 Leader 在周期内没有向 ZK 发送报告的话,则认为 Leader 挂了,此时 ZK 删除临时的 /controller 目录,Kafka 集群中的其他 KafkaController 开始新的一轮争主操作,流程和上面一样。下面是选 Leader 的流程图。

Leader选举流程图

从上图可以看出,第一次,Broker1 成功抢先在 Zookeeper 的 /controller 目录上写入信息,所以 Broker1 的 KafkaController 为 Leader,其他两个为 Follower。第二次,Broker1 宕机或者下线,此时 Broker2 和 Broker3 检测到,于是开始新一轮的争抢将信息写入 Zookeeper,从图中可以看出,Broker2 争到了 Leader,所以 Broker3 是 Follower 状态。

正常情况下,上面这个流程没有问题,但是如果在 Broker1 离线的情况下,Zookeeper 准备删除 /controller 的临时 node 时,系统 hang 住没办法删除,改怎么办呢?这里留个小疑问供大家思考。后面会用一篇文章专门来解答 Kafka 相关的问题(包括面试题哦,敬请期待)。

KafkaController 监控的 ZK 目录分布

KafkaController 在初始化的时候,会针对不同的 zNode 注册各种各样的监听器,以便处理不同的用户请求或者系统内部变化请求。监控 ZK 的目录大概可以分为两部分,分别是 /admin 目录和 /brokers 目录。各目录及其对应的功能如下表所示,需要的朋友自提。

Partition 分布算法

Partition分布算法图

图解:假设集群有 3 个 Broker,Partition 因子为 2。

随机选取 Broker 集群中的一个 Broker 节点,然后以轮询的方式将主 Partition 均匀的分布到不同的 Broker 上。主 Partition 分布完成后,将从 Partition 按照 AR 组内顺序以轮询的方式将 Partition 均匀的分布到不同的 Broker 上。Partition 的状态转移

用户针对特定的 Topic 创建了相应的 Partition ,但是这些 Partition 不一定时刻都能够正常工作,所有 Partition 在同一时刻会对应 4 个状态中的某一个;其整个生命周期会经历如下状态的转移,分别是 NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition,其对应的状态转移情况如下图所示。

Partition状态转移图

从上图可以看出,Partition 的状态会由前置状态才能够转移到目标状态的,而不是胡乱转移状态的。

NonExistentPartition:代表没有创建 Partition 时的状态,也有可能是离线的 Partition 被删除后的状态。

NewPartition:当 Partition 被创建时,此时只有 AR(Assigned Replica),还没有 ISR(In-Synic Replica),此时还不能接受数据的写入和读取。

OnlinePartition:由 NewPartition 状态转移为 OnlinePartition 状态,此时 Partition 的 Leader 已经被选举出来了,并且也有对应的 ISR 列表等。此时已经可以对外提供读写请求了。

OfflinePartition:当 Partition 对应的 Broker 宕机或者网络异常等问题,由 OnlinePartition 转移到 OfflinePartition,此时的 Partition 已经不能在对外提供读写服务。当 Partition 被彻底删除后状态就转移成 NonExistentPartition,当网络恢复或者 Broker 恢复后,其状态又可以转移到 OnlinePartition,从而继续对外提供读写服务。

Kafka 集群的负载均衡处理流程解析

前面的文章讲到过,Partition 有 Leader Replica 和 Preferred Replica 两种角色,Leader Replica 负责对外提供读写服务 Preferred Replica 负责同步 Leader Replica 上的数据。现在集群中假设有 3 个 Broker,3 个 Partition,每个 Partition 有 3 个 Replica,当集群运行一段时候后,集群中某些 Broker 宕机,Leader Replica 进行转移,其过程如下图所示。

Partition的Leader转移图

从上图可以看出,集群运行一段时间后,Broker1 挂掉了,在其上运行的 Partition0 对应的 Leader Replica 转移到了 Broker2 上。假设一段时间后 Broker3 也挂了,则 Broker3 上的 Partition3 对应的 Leader Replica 也转移到了 Broker2 上,集群中只有 Broker2 上的 Partition 在对外提供读写服务,从而造成 Broker2 上的服务压力比较大,之后 Broker1 和 Broker3 恢复后,其上只有 Preferred Replica 做备份操作。

针对以上这种随着时间的推移,集群不再像刚开始时那样平衡,需要通过后台线程将 Leader Replica 重新分配到不同 Broker 上,从而使得读写服务尽量均匀的分布在不同的节点上。

重平衡操作是由 partition-rebalance-thread 后台线程操作的,由于其优先级很低,所以只会在集群空闲的时候才会执行。集群的不平衡的评判标准是由leader.imbalance.per.broker.percentage配置决定的,当集群的不平衡度达到 10%(默认)时,会触发后台线程启动重平衡操作,其具体执行步骤如下:

对 Partition 的 AR 列表根据 Preferred Replica 进行分组操作。遍历 Broker,对其上的 Partition 进行处理。统计 Broker 上的 Leader Replica 和 Preferred Replica 不相等的 Partition 个数。统计 Broker 上的 Partition 个数。Partition 个数 / 不相等的 Partition 个数,如果大于 10%,则触发重平衡操作;反之,则不做任何处理。总结

本文主要介绍了 Kafka 集群服务内部的一些工作机制,相信小伙伴们掌握了这部分内容后,对 Broker 服务端的工作流程有了进一步的理解,从而更好的把控整体集群服务。下篇文章我们来正式介绍一下Kafka 常用的命令行操作,敬请期待。

感谢大家支持,多多转发关注不迷路哈~~~~

标签: #kafka的消息是存储在哪里